Pipe Suricata eve.json to Elasticsearch with fluentd

Tags: #<Tag:0x00007febeb1a8ae0> #<Tag:0x00007febeb1a8a18> #<Tag:0x00007febeb1a8928> #<Tag:0x00007febeb1a8810> #<Tag:0x00007febeb1a8748>

Suricata can generate gigabytes of logs

Suricata has the feature to dissect and log a lot of network related information into a logging standard called EVE. It’s a simple JSON data format. A multi-Gigabit environment can cause a high data volume. Dealing with this based on local files which get comitted to a DB with disk IO read and writes is not a good solution. Given that there are a lot of versatile middlewares to make this easier, it seems we have a use case for fluentd.

In opposite to Logstash fluentd is written in CRuby. Logstash uses a much slower alternative, JRuby, which also has a lot of dependencies including a full Java virtual machine. The last thing which belongs on a network inspection device is Java. Apart from its architectural difficulties regarding IO. You will also realize that fluend requires less RAM than Logstash, which might be related to the implementation itself.

Example: Ship Suricata EVE logs via a named pipe to Elasticsearch

  • The easiest thing to do is to let Suricata pipe its output as unix_dgram into a named pipe.
  • fluentd can read the named pipe and forward the JSON maps to another fluentd on another system.
  • There we can have a central collection of IDS sensor data.
  • It’s important that the format remains JSON, to be able to automatically generate key value pairs in Elasticsearch for structured searches and queries.

Log-forwarding the easy way: with a named pipe

mkfifo /data/eve.json
chmod 660 /data/eve.json
usermod -a -G suri td-agent / usermod -a G suri fluentd

Change the config for suricata:

outputs:
  - eve-log:
      enabled: yes
      type: unix_dgram #file|syslog|unix_dgram|unix_stream
      # filename: eve.json
      # the following are valid when type: syslog above
      #identity: "suricata"
      #facility: local5
      #level: Info ## possible levels: Emergency, Alert, Critical,
                   ## Error, Warning, Notice, Info, Debug
      types:
        - alert:
            payload: yes
            payload-printable: yes
            packet: yes
            http: yes
            ssh: no               # enable dumping of ssh fields
            smtp: no              # enable dumping of smtp fields
        - http:
            extended: yes     # enable this for extended logging information
        - dns
        - tls:
            extended: yes     # enable this for extended logging information
        # - files:
        #    force-magic: yes   # force logging magic on all logged files
        #    force-md5: yes     # force logging of md5 checksums
        #- drop
        - ssh
        - smtp
        # - flow
        - stats

Give td-agent / fluentd a kick:

<source>
  @type named_pipe
  tag source.eve.json
  path /data/eve.json
  format json
</source>

<filter source.eve.json>
  @type record_transformer
  <record>
    sensor_hostname "#{Socket.gethostname}"
  </record>
</filter>

<match source.eve.json>
  @type forward
  send_timeout 60s
  recover_wait 10s
  heartbeat_interval 1s
  phi_threshold 16
  hard_timeout 60s

  <server>
    name my.cool.collector.local
    host 1.2.3.4
    port 5044
    weight 60
  </server>

</match>

Now the other end needs to accept the JSON maps within fluentd’s forwarding protocol. This is UDP based in case you want to sniff the traffic to verify that there is an actual packet in the wire. Note that fluentd will have a heartbeat to check if the remote server is alive. I will attempt a re-transmission in case of connectivity failures.

One thing which you probably notice is that I have a filter blog where I inline an additional JSON key with the client’s hostname. I enrich the original file to be able to track down its source. For more complex scenarios I recommend the transformer. In case you want to perform DNS lookups though, consider DNS cache on a dedicated forwarder for high traffic sensors (10 Gbps+).

For this config to work I make use of a fluent plugin, which you can install:

fluent-gem install fluent-plugin-named_pipe

Just make sure you use the fitting *-gem install, otherwise you will get gray hair trying to load your plugins.

Receive and aggregate into Elasticsearch

This is straight forward:

<source>
  @type forward
  port 5044
  bind 0.0.0.0
</source>

<match *source.eve.json>
  @type file
  format json
  append true
  buffer_type memory
  # time_slice_wait 10s
  flush_interval 1s
  path /var/log/suricata/eve-remote.json
</match>

Now, why do I write this to a file? I could just feed it into Elasticsearch. The reason is that I use the config of Stasmus Network’s SELKS for now, which reads from a file. I will change this at a later point in time, but the SELKS Kibana 4 dashboards are useful for a start. They require certain fields to be present in order to aggregate the Elasticsearch records into the visualizations. I can get around retention issues with local files by using hourly logrotate triggers.

This is not ideal, but I have had encoding issues when I fed fluent to a local Logstash TCP input.

Update

It’s possible to use the fluentd named pipe plugin and to connect this to logstash in a hacky way:

input {

  pipe {
    command => "cat /var/log/suricata/eve-remote.json"
    codec => json
  }

}

In my tests this is much more reliable than to read from a local file, if the amount of lines per second is high. In this case eve-remote.json is not a file, but a named pipe.

Check out /proc/sys/fs/pipe-max-size for the maximum size of this.

Results

Suricata, fluentd, Elasticsearch and Kibana generate searchable:

  • IDS / IPS logs
  • HTTP logs
  • TLS logs
  • DNS logs

This can be correlated, but there is some work to do in order to get SIEM style insights from this data. I have integrated parts of the EVE format with IBM QRadar to add it to the correlation and event analysis. This can be done with the Snort DSM and RegEx. I pasted my example config in the post, which can basically be applied to other SIEMs like ArcSight or Prelude.

Incident Response works with data

Usual incident response tasks involve the network and the host perspective. If you want to hunt down an intruder, you rely on activity patterns. You can get host activity patterns from tools like Carbon Black; which is like Sysinternals on threat feeds.

Suricata can add to the network perspective. The bad thing about Suricata is, that the available professional rule feeds, speaking of Emerging Threats Pro in particular, are bad. If you check out the SQLi, XSS and port scan rules it’s rather difficult to advance with an OpenSource IDS. Keep that in mind when you roll out an OpenSource IDS: rule definition and generation can become a daily task. Not only for incident response :wink: