Indexing csv files using Elasticsearch pipelines

By | March 11, 2019

In this tutorial on indexing csv files using Elasticsearch pipelines we will use painless script ingest a csv file. The painless script will run in a elasticsearch pipelines. This problem of ingesting csv logs shipped from filebeats directly into elasticsearch can be solved in many ways. I will discuss the usual method as well as the brute force method. I will also highlight the potential pitfalls which can trip you when dealing with application logs not conforming to any logging standard.


Once your shiny new elasticsearch cluster is up you will be approached by someone who is handling legacy systems. The application is generating logs which he wants to handle via Elasticsearch. Legacy applications almost always will dump csv logs. This is good. But anything and everything can (and usually is) dumped into those fields. I have seen fields where Errorcode as well as the sensor reading was dumped. Thats text as well as number in same column !!! And multiline error stacktrace stuffed in error message field?

A csv filter is present in Logstash but if you are running a lean operation (read no budget) then you will like to avoid logstash (Its a heavy application warranting a seperate machine for itself). With elasticsearch pipelines now it is possible to do some processing on data before it is indexed.

Complicated data transformations are still best handled by logstash. But it is always worth a try to see if you can get away with a light weight filebeat running on the client machine and sending logs. In elasticsearch piplelines we can process the csv and index them. Easy? No. They somehow managed to miss out on providing in built csv processor for pipelines. One easy way out is to use the csv processor written by someone but it is still not a part of official stack. But if you want to do it yourself, read on.


There are essentially two ways you can go about this.
First is the one is using Grok processor. I highly recommend this in case you are dealing with refined gentleman logs. Something like this: - - [07/Mar/2004:16:31:48 -0800] "GET /twiki/bin/view/TWiki/WebTopicEditTemplate HTTP/1.1" 200 3732

These logs are essentially generated by well defined and standardized logging containers. You can read how to use Grok to handle such logs here.

How about rouge logs?
Here is a sample

"2018/09/20 14:18:44.754","ERROR","15","SMC.S.Command","272979","Error"," Data Fetching","Save Data","75104","(null)","[Sanity System Error]
Internal Error - Failed to create error for code ""ERR3245 Device Error- Failed to Save  Data""","(null)","(null)","PPT974545","SMC.Errors.SanitySystemSetupException: PPT974545-[Sanity System Error]
Internal Error - Failed to create error for code ""ERR3245 Device Error- Failed to Save  Data""Value cannot be null.
Parameter name: args ---> System.ArgumentNullException: Value cannot be null.
Parameter name: args
   at System.String.Format(String format, Object[] args)
   at SMC.Errors.ErrorCodeFactory.CreateError(Exception exception, String[] messageParameter, String errorCode, String searchCode, String errorCodeString, Boolean isFault) in C:\Machines Spares\SMC\core\src\SMC.Client\Errors\ErrorCodeFactory.cs:line 84
   at SMC.Errors.ErrorCodeFactory.Create(String prefix, String code, Exception exception, Boolean isFault, String[] messageParameter) in C:\Machines Spares\SMC\core\src\SMC.Client\Errors\ErrorCodeFactory.cs:line 34
   --- End of inner exception stack trace ---
   at SMC.Errors.ErrorCodeFactory.Create(String prefix, String code, Exception exception, Boolean isFault, String[] messageParameter) in C:\Machines Spares\SMC\core\src\SMC.Client\Errors\ErrorCodeFactory.cs:line 56
   at SMC.Errors.SMCException..ctor(String type, String code, Exception innerException, Boolean isFault, String[] messageParameter) in C:\Machines Spares\SMC\core\src\SMC.Client\Errors\SMCException.cs:line 12
   at SMC.Errors.DeviceException..ctor(String code, Exception innerException, String[] messageParameter) in C:\Machines Spares\SMC\core\src\SMC.Client\Errors\DeviceException.cs:line 19
   at SMC.Device.PService.OnPServiceData() in C:\Machines Spares\DeviceDrivers\PService\PServiceHook.cs:line 60
   at SMC.Step.CommandStep.OnExecute() in C:\Machines Spares\SMC\core\src\SMC\Step\UserStep.cs:line 82
   at SMC.Step.Step.Execute() in C:\Machines Spares\SMC\core\src\SMC\Step\Step.cs:line 98

Indexing csv files using Elasticsearch pipelines
Believe it or not. This is a line from csv log. I am not even going to try to write a grok pattern for this. It is bad but such logs exist. Of course someone can pick up a legacy system, improve the logging and then release it again. But does not happen that often.

So once grok is ruled out, the only way out is brute force csv splitting and then stuffing whatever we get into fields. Text fields unless you are very sure that some columns will only have one particular data type.

Only problem is that there is no csv processor for Elasticsearch pipelines yet.

So how do we go about indexing csv files using Elasticsearch pipelines?

Here are the main steps:
Use the split processor to split the message string.
Use elasticsearch painless scripting to access the elements from array resulting from split. And then assign the values to user defined fields.
Test using simulate facility of Elasticsearch pipelines.

Sounds easy enough and it is easy. The fact that the number of fields in a csv is fixed will save us, no matter how wild and savage the log itself is.

Suppose your log is something like this:

126723,WARNING, ERR126809
126723, FATAL, SCR.AppContainer.Instance: Error in the application.
   at Sitify.AppContainer.AppContainer.ctor() in C:\AppLocation\Source\AppContainer\AppContainer.cs:line 32
   at Forms.WelcomeScreen.sg_DoWork(DoWorkEventArgs e) in C:\AppLocation\Source\AppContainer\Forms\WelcomeScreen.cs:line 83
   at System.ComponentModel.BackgroundWorker.OnDoWork(DoWorkEventArgs e)
   at System.ComponentModel.BackgroundWorker.WorkerThreadStart(Object argument)

First we configure filebeat to parse the log file and send it one line at a time.
This is our filebeat.yml "demo"
setup.template.pattern: "demo-*"

- paths:
    - C:\logs\*csv
  input_type: log
  exclude_lines: ['^APPLICATIONID']
  multiline.pattern: '^\d'
  multiline.negate: true
  multiline.match: after  

 hosts: ["clusterurl:8888"]
 index: "demo-%{+yyyy.MM.dd}"
 pipeline: test_pipeline
logging.level: info
logging.to_files: true
 path: C:\filebeats_logs
 name: filebeat
 keepfiles: 7
 permissions: 0644

Line 6: We setup filebeat to read files with extension csv from C:\logs directory
Line 7: We specify that we are dealing with log files.
Line 8: This is to exclude the header columns if they exist. In our case we exclude lines starting with APPLICATIONID, e.g. the first line.
Line 9 – 11: Read more about multilines. What we are saying here is that anything which is not starting with a number is a part of stacktrace. So rather than emitting that as a message, append it to the previous message and keep doing this till you are at the end of the stacktrace. Then emit the message with full stacktrace.You have to get a bit creative here and choose what works for you. There are many options provided to give you needed flexibility.

Line 13 – 15: We specify the elasticsearch instance where the data has to be sent and the name of the index in which it should be sent.
Line 16: We specify the name of the pipleline which should be working on the data the filebeat sends.

Line 18 – 24: This is optional but recommended. Filebeat will dump records here. We seldom get it right in first go. Logs are lifesaver in this case.

TIP: If things are not working then first place to look into is the filebeat logs. Once everything is clear there, then look into elasticsearch logs.

This is the test_pipeline.

PUT _ingest/pipeline/test_pipeline
  "processors": [
      "split": {
        "field": "message",
        "target_field": "test",
        "separator": ","
      "script": {
        "lang": "painless",
        "source": "ctx.ApplicationId = ctx.test[0];ctx.Level = ctx.test[1];ctx.Error = ctx.test[2]"
      "remove": {
        "field": ["test"]

Line 5: We use the split processor on the incoming message.
Line 6: Incoming data is in message field. We are going to split that.
Line 7: Store the result of the splitting in the field which I am imaginatively going to call test.
Line 8: The seperator is the ,. In case you have pipe or space seperated log lines then use that.

TIP: Use gsub processor to make replacements to get the data cleaned up before split operations. Some csv fields are enclosed in double quotes and what not.

Line 12: The script processor
Line 12: Always better to be explicit what you are using.
Line 13: It all hinges on knowing how many fields will be returned from the split operations. And in case of csv you know. Here we access each field of the test array and assign it to our specified name. If you get array out of range exceptions here then your logs have some unruly lines which on splitting are not giving expected number of elements.
Line 19: Remove the test field. It has done its job.

Time to test it.

POST _ingest/pipeline/test_pipeline/_simulate
        "message" : "126723,FATAL,ERR126809"

And the output is

  "docs" : [
      "doc" : {
        "_index" : "_index",
        "_type" : "_type",
        "_id" : "_id",
        "_source" : {
          "Error" : "ERR126809",
          "Level" : "FATAL",
          "message" : "126723,FATAL,ERR126809",
          "ApplicationId" : "126723"
        "_ingest" : {
          "timestamp" : "2019-03-11T04:21:37.613Z"

Indexing csv files using Elasticsearch pipelines

This is a simple example. You might have to jump through some hoops to make it work. A bit of gsub magic here. A bit of grok there. But you will reach there. And if nothing else, then logstash is there.

This Indexing csv files using Elasticsearch pipelines tutorial will end on a request to elasticsearch to provide an in built csv processor in future releases.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.