Netflow data analysis with SiLK and Pandas

python
netflow
statistics
tableau
machine-learning
Tags: #<Tag:0x00007f38a27d55a0> #<Tag:0x00007f38a27d50f0> #<Tag:0x00007f38a27d4df8> #<Tag:0x00007f38a27d4740> #<Tag:0x00007f38a27d4308>

#1

Going with the flows...

Welcome to another episode of data, data and more data. Today we are going to make data.

We are going to utilize Pandas to read SilK Netflow v9 records into a DataFrame. Then we are plotting the average amount if Kilobytes and packets per second. The plot is going to be exported as a d3.js animation, and also going to be displayable as an inline object of a Jupyter notebook (Anaconda distribution).

Data in, garbage out

In the previous blog post we explored how to setup a SiLK collector and how to run basic rwfilter queries to get ASCII output. I also pasted a basic JSON serializer for SiLK records, based on PySiLK. I had some minutes now, and enhanced this. A little…

Changes

  • added SSH / SFTP support
  • corrected timestamps for sort-able JSON objects (in Sumo, Splunk…)
  • corrected rwfilter queries with activity period so that the data makes sense

If you ever tried out the nfdump Splunk Add On… you might have figured out that it’s a very hacky utility. Not exactly what I want. I will not parse CSV data unless I have to. Also this nfdump approach generates gigabytes of (non / less meaningful) data for large networks, due to some limitation with nfdump.

Simple is better

rwfilter is a simple utility. I don’t see why we should re-invent it. So I build a command based on the CLI for it. This is how FlowBAT utilizes it as well.

In order to define an activity time-frame around the current time, I use Pandas’ timedelta. I want to keep the code as explicitly as possible, and as easy to read as I can.

The function returns the activity time frame start and end point, as well as the entire command. The reason for this is, that I want to create a DataFrame object later, which only contains the activity period.

I retrieve flow data for -1h and +1h (2 hours). The activity interval is +/-15m (30m). The bi-connections within these 30 minutes should be complete (unless someone has a TCP connection which lasts for hours). That is very well possible, but in case of the network in question only a fraction of connections live that long. Generally HTTP connections terminate within seconds.

from datetime import datetime, timedelta
import socket

def build_rwfilter_cmd(date, saddr):
    
    cstart_date = pandas.to_datetime(str(date)) - timedelta(hours = 1)
    start_date = cstart_date.strftime('%Y/%m/%d:%H:%M') 
    
    cend_date = pandas.to_datetime(str(date)) + timedelta(hours = 1)
    end_date = cend_date.strftime('%Y/%m/%d:%H:%M')
    
    act_time_s = pandas.to_datetime(str(date)) - timedelta(minutes = 15)
    act_time_e = pandas.to_datetime(str(date)) + timedelta(minutes = 15)
    act_time = act_time_s.strftime('%Y/%m/%dT%H:%M') + "-" + act_time_e.strftime('%Y/%m/%dT%H:%M')
    
    ipaddr=""
    try:
        ipaddr = socket.gethostbyname(saddr)
    except:
        return "exit"
    
    cmd = "rwfilter "
    cmd+= "--type=all "
    cmd+= "--start-date=" + start_date + " "
    cmd+= "--end-date=" + end_date + " "
    cmd+= "--active-time=" + act_time + " "
    # cmd+= "--saddress=" + ipaddr + " "
    cmd+= "--protocol=0-255" + " "
    cmd+= "--site-config-file=/data/silk.conf --data-rootdir=/data" + " "
    cmd+= "--pass=stdout > /tmp/query.rwf"
    
    return cmd, act_time_s, act_time_e

It’s just building a string. Hello-world.

Parse SiLK records

I just added PySiLK to Anaconda’s site-packages. This is straight forward, but it adds a dependency to the analysis workflow.

I hard-coded the temporary file path, which contains SiLK’s flow records in a binary format. And I hard-coded the output file, which contains the readable JSON records after the execution. This is because I am just doing this for my analysis and my own little network.

from silk import *
import json
import datetime
import pandas

def parse_records():
    ffile = '/tmp/query.rwf'
    flow = SilkFile(ffile, READ)
    f = open('/tmp/query.json','w')
    d = {}
    l = []

    for rec in flow:
        d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
        d['stime'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
        d['etime'] = rec.etime.strftime("%Y-%m-%d %H:%M:%S")

        d['icmpcode'] = rec.icmpcode
        d['sip'] = str(rec.sip)
        d['protocol'] = rec.protocol
        d['output'] = rec.output
        d['packets'] = rec.packets
        d['bytes'] = rec.bytes
        d['application'] = rec.application
        d['sensor_id'] = rec.sensor_id
        d['classtype_id'] = rec.classtype_id
        d['nhip'] = str(rec.nhip)
        d['input'] = rec.input
        d['icmptype'] = rec.icmptype
        d['dip'] = str(rec.dip)
        d['sport'] = rec.sport
        d['dport'] = rec.dport
        f.write(json.dumps(d))
        # l.append(d)
        # print d
        f.write("\n")
        # print json.dumps(d)
        # print "\n"
    f.close()
    # print l
    # we stream this file to a log service
    with open('/tmp/query.json', 'rb') as f:
        data = f.readlines()
    data = map(lambda x: x.rstrip(), data)
    data_json_str = "[" + ','.join(data) + "]"

    data_df = pandas.read_json(data_json_str)
    return data_df

This function returns a DataFrame object, which is re-serialized from the written JSON records. This looks odd, and it’s a workaround. I use this JSON file to forward it to Sumo via the collector. There I can perform the aggregation for some overview dashboards in real time. But that’s another topic. If you create a cron-job based on this, you can adapt the time-frame as you see fit.

But I want to use Tableau... and it doesn't read JSON

Well… I know I need to provide pretty pictures on the internet in order to be relevant. So here we go:

Ehe… why would you cluster source IPs (SIP == source IP) and the sum of packets within a time-frame?

  • the many orange boxes are a workstation network. Some people use the network more than others. :wink:
  • the red box is an egress point
  • the green and yellow numbers are some internal server IPs (Active Directory, DNS)
  • the blue boxes are central online services (AWS…)

Working as intended.

No surprises here. Line charts - gnuplot could do this.

Here I removed the labels for this blog. The bubbles are IPs, and their size is the relative dimension of packets per second. A nice bubble chart. Eye candy.

This is K-means clustering again… it’s 4 distinct clusters if we cluster by the average amount of bytes over time… just loosely here. This isn’t an in-depth Tableau walk-through. I know that this is not how to apply ML on time-series data.

95th percentile, clustered - this makes the traffic clusters more visible. It has worked surprisingly well in this case, but it’s not robust.

What does this mean?

This means that I can apply ML on network statistics with Tableau and already get some success. If I use a better approach, like Dynamic Time Wrapping, the clusters should make more sense. I could also apply domain specific information like CIDRs.

Something like this is on my todo, with scikit and Python. - Just another blog entry. At this point I just want to surface the motivation behind this, and visualize the potential.

The motto is not, know your network. It’s know your network activity. And be prepared for surprises.

Ehe... so we have Netflow v9 in a Pandas DataFrame object now

This doesn’t sound like much, but Netflow (v9) is a really efficient standard for statistical network analysis. Pandas is an extremely powerful library for data analysis.

Running this analysis

  • takes 1 minute,
  • goes over 2 hours of network traffic stats,
  • and uses about ~ 150 MB of memory.
  • On top of that it’s generating pretty results.
  • And it makes statistical network data accessible for “the business”[tm].

Let’s say there is/was a DDoS attack and you need a report. Run this code, copy paste, go. The ipt_netflow / SiLK VM (see the earlier blog post) should be powerful enough to generate the stats during the attack, since it’s essentially a multi-core system, which is just generating flow-data. This can aid the remediation of DDoS attacks in complex networks. Since it also works in a VM you can deploy it to the cloud as well.

It can easily show which network segments are targeted, and provide insights on the attack volume, composition and origin. Distinguishing between an SNMP and DNS reflection attack is possible with Netflow. You don’t need PCAPs for that.

Back to the programming.,

Run the rwfilter command via SSH

We use Paramiko here and OpenSSH’s key based authentication. Make sure the keys are exchanged. We want this secure and fast. And no hard coded passwords.

import paramiko
import datetime
import subprocess # run it locally if you want, use this for Bash commands

def run_netflow_cmd(command):
    
    rwflow_server_ip = "1.2.3.4" # SiLK box
    user_name="netflow"
    keyfile="/home/marius/.ssh/id_rsa"
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(rwflow_server_ip, username=user_name, key_filename=keyfile)
    stdin, stdout, stderr = ssh.exec_command(command + "&& echo 'done'")

    for line in stderr.readlines():
        print line

    for line in stdout.readlines():
        # print line
        exit_status = stdout.channel.recv_exit_status()  # Blocking call
    
    if exit_status == 0:
        print str(datetime.datetime.today()) + ": Command finished successfully."
    else:
        print("Error", exit_status)
    ssh.close()

SFTP download the SiLK records

SiLK has a really efficient storage format. These binary files are really small. We are using PySiLK to parse these records locally. The download happens to a hard coded path as well.

def get_netflow_records():
    rwflow_server_ip = "1.2.3.4" # 
    user_name="netflow"
    keyfile="/home/marius/.ssh/id_rsa"
    
    transport = paramiko.Transport((rwflow_server_ip, 22))
    key=paramiko.RSAKey.from_private_key_file(keyfile)
    
    transport.connect(username = user_name, pkey=key)

    sftp = paramiko.SFTPClient.from_transport(transport)

    filepath = '/tmp/query.rwf'
    localpath = '/tmp/query.rwf'
    sftp.get(filepath, localpath)
    
    return True

One thing after the other...

Here’s how this looks like in action:

2016-09-29 13:37:19.625878: Building command to run the SiLK query:

rwfilter --type=all --start-date=2016/09/29:12:37 --end-date=2016/09/29:14:37 --active-time=2016/09/29T13:22-2016/09/29T13:52 --protocol=0-255 --site-config-file=/data/silk.conf --data-rootdir=/data --pass=stdout > /tmp/query.rwf

Using activity time-frame for flow-tracking interval:
Activity time start: 2016-09-29 13:22:19.625972
Activity time end: 2016-09-29 13:52:19.625972

2016-09-29 13:37:19.628114: Running the rwfilter command on the remote host.
2016-09-29 13:37:20.395747: Command finished successfully.
2016-09-29 13:37:20.397152: Command executed. Starting download.

2016-09-29 13:37:21.337295: Download process finished. Starting the parser locally.
2016-09-29 13:38:12.458796: Parsing of the records finished locally.
2016-09-29 13:38:12.459042: Data ready for action.

What we see:

  1. SiLK command gets build, based on the execution time

  2. Command gets executed via SSH

  3. Command finishes in a second

  4. Download finished in 2 seconds

  5. Parsing takes around 60s, because it’s single threaded and uses a file input and output

  6. Data is ready for action[tm]

    import datetime

     def get_data_process():
         print str(datetime.datetime.today()) + ": Building command to run the SiLK query:"
         cmd, act_time_s, act_time_e = build_rwfilter_cmd(datetime.datetime.today(), "graylog.ccp.ad.local")
         print "\n"
         print cmd
         print "\n"
         
         print "Using activity time-frame for flow-tracking interval:"
         print "Activity time start:\t " + str(act_time_s)
         print "Activity time end:\t " + str(act_time_e)
         print "\n"
         
         print str(datetime.datetime.today()) + ": Running the rwfilter command on the remote host."
         run_netflow_cmd(cmd)
         print str(datetime.datetime.today()) + ": Command executed. Starting download."
         print "\n"
         
         done = get_netflow_records()
         
         
         if done:
             print str(datetime.datetime.today()) + ": Download process finished. Starting the parser locally."
             netflow_df = parse_records()
             print str(datetime.datetime.today()) + ": Parsing of the records finished locally."
             return done, netflow_df, act_time_s, act_time_e
         
     def get_data():    
         state, netflow_df, act_time_s, act_time_e = get_data_process()
         if state:
             print str(datetime.datetime.today()) + ": Data ready for action."
             return netflow_df, act_time_s, act_time_e
    
     netflow_df, act_time_s, act_time_e = get_data()
    

Paramiko has an issue with SFTP transfers and blocking. So this code is hacky.

Data analysis with Pandas

print netflow_df[["timestamp", "bytes", "packets"]].tail(5)
                  timestamp  bytes  packets
1086148 2016-09-29 13:40:22    418        2
1086149 2016-09-29 13:40:21    522        4
1086150 2016-09-29 13:40:22    104        2
1086151 2016-09-29 13:40:21   5788       10
1086152 2016-09-29 13:40:21   1816       12

Nice. Data is ready for action. As it said.

Data massage please

We need to apply the correct data-type and ensure this time-series data is indexed by the timestamp.

# massage data
netflow_df["timestamp"] = pandas.to_datetime(netflow_df['timestamp'])
netflow_df.set_index('timestamp')
netflow_df["bytes"] = netflow_df["bytes"].astype(int)
netflow_df = netflow_df.sort_values(by='timestamp')

This is just Pandas. No loops required.

Select the active time-frame

Here we select the time-frame we care for. This is related to the rwfilter command and how flows behave in the network. We must respect that this is specific time-series data, related to bi-directional communication and that the values depend on the communication state changes.

# select the active time frame
netflow_df2 = netflow_df[(netflow_df['timestamp'] > act_time_s) & (netflow_df['timestamp'] <= act_time_e)]
netflow_df2 = netflow_df2.set_index(netflow_df2['timestamp'])
del netflow_df

We drop the old big DataFrame object to the Python garbage collector.

Statistics my friend...

For our statistical analysis we apply a unit conversion from bytes to kilobytes. We drop all 0 values, because these aren’t relevant for the data-set here. We are calculating the mean, without accounting for 0 values. In order to make the data useful we resample the DataFrame in 1s intervals to produce a mean for the bytes and packets per second. We interpolate and downcast the curve. Pandas makes this very easy.

import numpy

netflow_df2["bytes"] = netflow_df2["bytes"].astype(int)
netflow_df2["packets"] = netflow_df2["packets"].astype(int)
# drop 0s
netflow_df2["bytes"].replace(0, numpy.NaN)
netflow_df2["packets"].replace(0, numpy.NaN)
# resample
netflow_df2["rbytes"] = netflow_df2["bytes"].resample('1s').mean()
netflow_df2["rbytes"] = netflow_df2["rbytes"].interpolate(method='cubic', downcast='infer')
netflow_df2["packets"] = netflow_df2["packets"].resample('1s').mean()
# convert bytes -> kb
netflow_df2["rbytes"] = netflow_df2["rbytes"].div(1024)

Now I want my d3 chart!

Go for it. We use mpld3 to auto-generate the d3.js chart based on matplotlib, and dump the figure out to an html file. If you need a boilerplate to make a nice d3 animation, that’s a good start. Since the data is already embedded and serialized.

import matplotlib.pyplot as plt
import matplotlib

# for inline display
%matplotlib inline

import mpld3
mpld3.enable_notebook()

# size of the plot
matplotlib.rcParams['figure.figsize'] = (10.0, 4.0)
# matplotlib.style.use('ggplot')
# matplotlib.style.use('dark_background')
matplotlib.style.use('fivethirtyeight')

# define plot
ax = netflow_df2[["timestamp", "rbytes", "packets"]].set_index('timestamp').resample('1s').mean().plot(title='KB/s, Kp/s', lw=1,colormap='jet',marker='.',markersize=3)
ax.set_xlabel("Time UTC")
ax.set_ylabel("KBps or KPps")
ax.grid('on', which='minor', axis='x' )
ax.grid('on', which='major', axis='y' )

# label plot
mylabels = ["KBytes/s", "Kp/s"]
ax.legend(labels=mylabels, loc='best')

mpld3.save_html(ax.get_figure(), "/tmp/test.html")

Straight like an arrow…

Technical Results

print "From " + str(act_time_s) + " to " + str(act_time_e) + " we had " + str(netflow_df2["bytes"].sum() / 1024 / 1024) + " MB in the wire"

From 2016-09-29 13:22:19.625972 to 2016-09-29 13:52:19.625972 we had 40102 MB in the wire

print "The traffic was caused by " + str( len(netflow_df2["sip"].unique()) ) + " source IPs and " + str(netflow_df2["dip"].nunique()) + " dest IPs."

The traffic was caused by 11980 source IPs and 10126 dest IPs.

print "We analyzed " + str( len(netflow_df2["protocol"].unique()) ) + " different protocols."

We analyzed 6 different protocols.

We could feed the data into a DB. Like Elasticsearch, with Splunk, Sumo… MongoDB. What ever we can use.

netflow_df2.to_json("/tmp/normalized_netflow.json" , orient='records')

Once we have these JSON records, the data-set is portable.

The analysis is memory-efficient.

netflow_df2.memory_usage()
Index           8490872
application     8490872
bytes           8490872
classtype_id    8490872
dip             8490872
dport           8490872
etime           8490872
icmpcode        8490872
icmptype        8490872
input           8490872
nhip            8490872
output          8490872
packets         8490872
protocol        8490872
sensor_id       8490872
sip             8490872
sport           8490872
stime           8490872
timestamp       8490872
rbytes          8490872
dtype: int64

And here is our data for some quick Tableau visualizations.

netflow_df2.to_csv("/tmp/normalized_netflow.csv")

Results

We have a simple network analysis workflow, based on OpenSource technology, which scales for large-scale networks. This approach is robust, and flexible enough to provide data-sets for all kinds of tools and environments. It’s possible to apply Machine Learning for clustering on network statistics, but the K-means approach is semi-ideal. It should be possible to develop a better approach now.

In a prior blog entry I pointed out, that I consider using Suricata’s EVE.json flow serialization as well. I think we should be able to implement the ML-clustering in a solution agnostic approach. There are some security specific advantages to how Suricata tags flow-records, which allow a fast quantification of network traffic for individual IDS alerts.
SiLK’s netflow data(-sets) can be correlated with an Endpoint Protection as well, given that the market nowadays has flexible and programmable tools for security engineers.

Network statistics are business-data for many SaaS companies, which provide their service online to the customers. Keeping an eye on the service quality is a worthwhile effort in a market, with such heavy competition. Keeping an eye on security is becoming mandatory in many legislations.

The fact that we have capable OpenSource projects to support a data-driven security culture should motivate us to approach BigData and Machine Learning as security professionals. Especially if we can keep our approaches formalized and transparent to avoid returning to the age of Fear Uncertainty and Doubt (FUD).

From many infosec professionals in the field there is a lot of loud skepticism regarding Machine Learning. Even though major innovative companies go Machine Learning First. My recommendation is to make security analytics Machine Learning first as well, and to commit to a result driven culture. How many attacks has your traditional signature based SIEM caught? Can it catch more? How relevant are the usual 60% of alerts it generates? How much risk / likelihood do they impose?