Saturday, October 21, 2017

How to specify document version in elasticsearch pipeline?

Leave a Comment

I currently use an ingest node pipeline which looks like this:

{     "my-pipeline": {         "description": "pipeline for my filebeat",         "processors": [             {                 "json": {                     "field": "message",                     "add_to_root": true,                     "on_failure": [                         {                             "rename": {                                 "field": "message",                                 "target_field": "originalMessage",                                 "ignore_missing": true                             }                         },                         {                             "set": {                                 "field": "indexName",                                 "value": "pipeline-errors"                             }                         },                         {                             "set": {                                 "field": "indexType",                                 "value": "pipeline-error"                             }                         },                         {                             "rename": {                                 "field": "@timestamp",                                 "target_field": "errorTimestamp",                                 "ignore_missing": true                             }                         }                     ]                 }             },             {                 "remove": {                     "field": "@timestamp",                     "ignore_failure": true                 }             },             {                 "remove": {                     "field": "message",                     "ignore_failure": true                 }             },             {                 "script": {                     "inline": "ctx._index = ctx.indexName; ctx._type=ctx.indexType; if (ctx.docVersion != null) {ctx._version = ctx.docVersion; ctx._version_type='external'}"                 }             },             {                 "remove": {                     "field": "indexName",                     "ignore_failure": true                 }             },             {                 "remove": {                     "field": "indexType",                     "ignore_failure": true                 }             }         ]     } } 

This pipeline is used simply unbox a log forwarded by filebeat. In the script processor i look for the 'indexName' and 'indexType' fields and assign it to '_index' and '_type' respectively. Since i need to take the version into account, a 'version' field is included in the log (but this is optional as some logs does not contain the version).

Using this pipeline triggers:

org.elasticsearch.index.mapper.MapperParsingException: Cannot generate dynamic mappings of type [_version] for [_version]     at org.elasticsearch.index.mapper.DocumentParser.createBuilderFromFieldType(DocumentParser.java:656) ~[elasticsearch-5.5.0.jar:5.5.0]     at org.elasticsearch.index.mapper.DocumentParser.parseDynamicValue(DocumentParser.java:805) ~[elasticsearch-5.5.0.jar:5.5.0] 

What i've tried so far (updated 09-16):

  • Replaced the field name to something like 'docVersion' just to be sure that it does not collide if its a keyword. This does not work too
  • Tried to use the ctx._source.version, this would trigger a ScriptException[runtime error]; after all, notice that the _index and _type values come from ctx.indexName and ctx.indexType respectively
  • Tried adding a 'version_type=external' on the script as well;i still get the MapperParsingException as above;
  • Tried using a 'version_type=external_gte' but i got the MapperParsingException as well

How do i specify/use external versioning in elasticsearch documents when using ingester node pipelines? if this is not possible through pipelines' script processor, what are the options to use an external version when working with filebeat-to-elasticsearch in such a way that older version of the document gets rejected?

1 Answers

Answers 1

The following variables are available through the ctx map: _index, _type, _id, _version, _routing, _parent, _now and _source. You can access the original source for a field as ctx._source.field-name.

It looks the script is trying to access a document field named "version" via ctx.version but that maps to ctx._version.

The internal doc value should be retrieved as ctx._source.version , can you try that?

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment