The PutElasticSearch processor does expect the flow file to be in JSON
format. PutElasticSearch uses the Bulk java API(1), and other examples can
be seen here(2). The processor will create the correct structure as seen
in the bulk insert, but expects the incoming file to be the field-value
JSON mapping. An actual example can be seen here(3), showing the format of
the flow file that was inserted.
In addition to Chad's comments, as of NiFi 1.3.0 there is also a
PutElasticsearchHttpRecord processor , which allows you to accept
records in any format (you just have to set up the appropriate reader,
such as an AvroReader in this case) and otherwise works like the
PutElasticsearchHttp processor. I see you are using PutElasticsearch
rather than PutElasticsearchHttp; if you have a large number of
records in a flow file (such as a single flow file coming from
QueryDatabaseTable, and you are using PutElasticsearch instead of
PutElasticsearchHttp for performance reasons, you may find that
PutElasticsearchHttpRecord outperforms even the native
Thank you Chad and Matt,
I'm now able to use PutElasticSearch5 processor for indexing rows from database since i've converted each record (avro format) to json first (using convertAvroToJson processor) before performing the indexing task.
My flow look like:
So any idea for how to do the same, but instead of pull data from database i would like to pull them from filesystem.
So my idea is to pull files from a directory using the ListFile processor. And after that, i want to be able to convert them into Json (or another intermediate format) before indexing them using the PutElasticSerach5 processor.