How to Parse XML Log Files with Multiple Entries in Logstash for Elasticsearch


2 views

When dealing with XML log files containing multiple entries in a nested structure like:

<root>
    <entry>
        <fieldx>...</fieldx>
        <fieldy>...</fieldy>
        <fieldarray>
            <fielda>...</fielda>
            <fielda>...</fielda>
        </fieldarray>
    </entry>
    <entry>...</entry>
</root>

The challenge is to properly parse each <entry> as a separate log event in Logstash.

The key is using Logstash's XML filter with proper xpath configuration:

filter {
  xml {
    source => "message"
    target => "parsed_xml"
    xpath => [
      "/root/entry/fieldx/text()", "fieldx",
      "/root/entry/fieldy/text()", "fieldy",
      "/root/entry/fieldarray/fielda/text()", "fielda"
    ]
    store_xml => false
    remove_namespaces => true
  }
  
  split {
    field => "parsed_xml"
  }
}

For the array fields, we need additional processing:

filter {
  if [parsed_xml][fieldarray] {
    mutate {
      add_field => {
        "fielda_count" => "%{[parsed_xml][fieldarray][fielda][length()]}"
      }
    }
  }
}

Here's a full configuration that handles the XML parsing and splits entries:

input {
  file {
    path => "/path/to/your/logfile.xml"
    start_position => "beginning"
    codec => "plain"
  }
}

filter {
  # First parse the complete XML structure
  xml {
    source => "message"
    target => "parsed_xml"
    store_xml => true
  }

  # Split into individual entries
  split {
    field => "[parsed_xml][entry]"
  }

  # Extract fields from each entry
  mutate {
    add_field => {
      "fieldx" => "%{[parsed_xml][entry][fieldx]}"
      "fieldy" => "%{[parsed_xml][entry][fieldy]}"
    }
  }

  # Handle array fields
  if [parsed_xml][entry][fieldarray] {
    ruby {
      code => "
        field_a = event.get('[parsed_xml][entry][fieldarray][fielda]')
        if field_a.is_a?(Array)
          event.set('fielda_array', field_a)
          event.set('fielda_count', field_a.size)
        else
          event.set('fielda_array', [field_a])
          event.set('fielda_count', 1)
        end
      "
    }
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "xml-logs-%{+YYYY.MM.dd}"
  }
}

When processing large XML files:

  • Use the streaming option in the XML filter for memory efficiency
  • Consider preprocessing very large files to split them into smaller chunks
  • Monitor heap usage in your Logstash instance

If the XML filter proves insufficient:

  1. Pre-process the file with a script to split into individual entry files
  2. Use the exec input to run XML conversion tools
  3. Consider Logstash's Ruby filter for custom parsing logic

When dealing with XML-based log files containing multiple entries, we often encounter structures like:

<root>
    <entry>
        <fieldx>value1</fieldx>
        <fieldy>value2</fieldy>
        <fieldarray>
            <fielda>subvalue1</fielda>
            <fielda>subvalue2</fielda>
        </fieldarray>
    </entry>
    <entry>
        ...
    </entry>
</root>

Here's a complete Logstash pipeline configuration to handle such XML files:

input {
  file {
    path => "/path/to/your/logfile.xml"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => multiline {
      pattern => "<root>.*</root>"
      negate => true
      what => "previous"
    }
  }
}

filter {
  xml {
    source => "message"
    target => "parsed_xml"
    xpath => [
      "//entry/fieldx/text()", "fieldx",
      "//entry/fieldy/text()", "fieldy",
      "//entry/fieldarray/fielda/text()", "fielda_array"
    ]
    store_xml => false
    remove_namespaces => true
  }
  
  split {
    field => "parsed_xml"
  }

  mutate {
    remove_field => ["message"]
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "xml-logs-%{+YYYY.MM.dd}"
  }
  stdout { codec => rubydebug }
}

For more complex nested structures, consider using the following approach:

filter {
  xml {
    source => "message"
    target => "parsed_xml"
    remove_namespaces => true
    force_array => false
    store_xml => true
  }

  split {
    field => "[parsed_xml][entry]"
  }

  mutate {
    rename => {
      "[parsed_xml][entry][fieldx]" => "fieldx"
      "[parsed_xml][entry][fieldy]" => "fieldy"
    }
    add_field => {
      "fielda_array" => "%{[parsed_xml][entry][fieldarray][fielda]}"
    }
  }
}

When processing large XML files:

  • Use the streaming XML filter for memory efficiency
  • Consider breaking large files into smaller chunks
  • Enable the sincedb feature to track processed files
  • Monitor heap usage during processing

If the XML parsing becomes too complex:

  1. Pre-process the XML with a simple script to convert to JSON
  2. Use the JIRA REST API instead of exported files
  3. Consider Filebeat with custom processors