I'm trying to have logstash output to elasticsearch but I'm not sure how to use the mapping I defined in elasticsearch...
In Kibana, I did this:
Created an index and mapping like this:
PUT /kafkajmx2 { "mappings": { "kafka_mbeans": { "properties": { "@timestamp": { "type": "date" }, "@version": { "type": "integer" }, "host": { "type": "keyword" }, "metric_path": { "type": "text" }, "type": { "type": "keyword" }, "path": { "type": "text" }, "metric_value_string": { "type": "keyword" }, "metric_value_number": { "type": "float" } } } } }
Can write data to it like this:
POST /kafkajmx2/kafka_mbeans { "metric_value_number":159.03478490788203, "path":"/home/usrxxx/logstash-5.2.0/bin/jmxconf", "@timestamp":"2017-02-12T23:08:40.934Z", "@version":"1","host":"localhost", "metric_path":"node1.kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec.FifteenMinuteRate", "type":null }
now my logstash output looks like this:
input { kafka { kafka details here } } output { elasticsearch { hosts => "http://elasticsearch:9050" index => "kafkajmx2" } }
and it just writes it to the kafkajmx2
index but doesn't use the map, when I query it like this in kibana:
get /kafkajmx2/kafka_mbeans/_search?q=* { }
I get this back:
{ "_index": "kafkajmx2", "_type": "logs", "_id": "AVo34xF_j-lM6k7wBavd", "_score": 1, "_source": { "@timestamp": "2017-02-13T14:31:53.337Z", "@version": "1", "message": """ {"metric_value_number":0,"path":"/home/usrxxx/logstash-5.2.0/bin/jmxconf","@timestamp":"2017-02-13T14:31:52.654Z","@version":"1","host":"localhost","metric_path":"node1.kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec.Count","type":null} """ } }
how do I tell it to use the map kafka_mbeans
in the logstash output?
-----EDIT-----
I tried my output like this but still get the same results:
output { elasticsearch { hosts => "http://10.204.93.209:9050" index => "kafkajmx2" template_name => "kafka_mbeans" codec => plain { format => "%{message}" } } }
the data in elastic search should look like this:
{ "@timestamp": "2017-02-13T14:31:52.654Z", "@version": "1", "host": "localhost", "metric_path": "node1.kafka.server:type=SessionExpireListener,name=ZooKeeperAuthFailuresPerSec.Count", "metric_value_number": 0, "path": "/home/usrxxx/logstash-5.2.0/bin/jmxconf", "type": null }
--------EDIT 2--------------
I atleast got the message to parse into json by adding a filter like this:
input { kafka { ...kafka details.... } } filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { hosts => "http://node1:9050" index => "kafkajmx2" template_name => "kafka_mbeans" } }
It doesn't use the template still but this atleast parses the json correctly...so now I get this:
{ "_index": "kafkajmx2", "_type": "logs", "_id": "AVo4a2Hzj-lM6k7wBcMS", "_score": 1, "_source": { "metric_value_number": 0.9967205071482902, "path": "/home/usrxxx/logstash-5.2.0/bin/jmxconf", "@timestamp": "2017-02-13T16:54:16.701Z", "@version": "1", "host": "localhost", "metric_path": "kafka1.kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent.Value", "type": null } }
2 Answers
Answers 1
What you need to change is very simple. First use the json
codec in your kafka
input. No need for the json
filter, you can remove it.
kafka { ...kafka details.... codec => "json" }
Then in your elasticsearch
output you're missing the mapping type (parameter document_type
below), which is important otherwise it defaults to logs
(as you can see) and that doesn't match your kafka_mbeans
mapping type. Moreover, you don't really need to use template since your index already exists. Make the following modification:
elasticsearch { hosts => "http://node1:9050" index => "kafkajmx2" document_type => "kafka_mbeans" }
Answers 2
This is defined with the template_name
parameter on the elasticsearch
output.
elasticsearch { hosts => "http://elasticsearch:9050" index => "kafkajmx2" template_name => "kafka_mbeans" }
One warning, though. If you want to start creating indexes that are boxed on time, such as one index a week, you will have to take a few more steps to ensure your mapping stays with each. You have a couple of options there:
- Create an elasticsearch template, and define it to apply to indexes using a glob. Such as
kafkajmx2-*
- Use the
template
parameter on the output, which specifies a JSON file that defines your mapping that will be used with all indexes created through that output.
0 comments:
Post a Comment