Network analysis with Pandas, memSQL and Python - is it BigData or just fast?

Tags: #<Tag:0x00007febf40f2048> #<Tag:0x00007febf40f1f80> #<Tag:0x00007febf40f1eb8> #<Tag:0x00007febf40f1df0>

Data time

We are continuing the blog-series of data, data, data, and more data. Today it’s about data. Network data. Which actually is timeline data; and timeline analysis.

Much of the tech-data we have ends up being deleted without purpose. Because no one bridged the output into analytical systems. Until now. The following tasks can be done in Bash, old school and simple. However the results of Bash commands end put in a terminal buffer and not in a trend analysis (“technical dept is a trend…”). One outcome of using a data analysis framework for tech data is that you will look at the behavior much more. That is something we aren’t really good at in IT. Especially when it comes to networks many network engineers believe ACLs will define the network behavior. That is not the case.

The following paragraphs contain information about:

  • how to use Pandas for Netflow v9 or PCAP analysis with Python 2.7
  • how to store large DataFrames in memSQL; and how not to do that
  • how to apply domain specific maps to DataFrame columns to enrich the data-set
  • how to use Pandas in conjunction with JavaScript (Google Charts) for impressive visualizations
  • we also take a look at Wireshark’s tshark to convert a PCAP into a DataFrame, and to parse the statistical output of the tool. We use the option to convert a PCAP to a CSV, or to a JSON file via Pandas.
  • we setup short term and long term retention with memSQL, to store stats long-term and tech-details short

Even if you are not interested in network analysis, it’s interesting, because of the workflow and because of the applied usage of Pandas.

Did someone say Python and performance in one sentence?

I don’t think that Python is the right language for data analysis tasks, which require high performance. I think it’s the simplest language to prototype analysis workflows, because it’s orchestration focused. My recommendation for performance focused (BigData) workflows is to use C++ (or Python) with Intel’s DAAL; for example.

Machine Learning first - domain expertise next

In this blog series we aren’t at a point where I want to be to appy machine learning on network activity. This is another step in this direction.

Analysis workflow

  1. We retrieve the Netflow data via SiLK or PCAP headers via tshark.
  2. We normalize, sanitize and filter the data-set
  3. We add calculations, like the median. We handle our metrics, like bytes, meters, or what ever we have.
  4. We enrich the filtered data-set intelligently
  5. We commit the filtered data-set to a fast RDBMS, and make sure we can retend data
  6. We sort the data-set (DBs are good at that)
  7. We visualize the top 10, top 50 etc. or some hundred points on a line plot (interpolated etc.)

Recap: data-retrieval

Let’s take a step back first and see how the data gets loaded.

Wireshark has a C based command-line utility called tshark. Let’s use it from Python.

import paramiko
import zipfile

from datetime import datetime, timedelta
import socket

Believe me: soon timedelta is your new best friend. No one wants to see math functions applied to date strings.

def build_tshark_cmd(filepath):
    cmd = "/usr/bin/tshark -r " + filepath + " "
    cmd+= "-T fields "
    cmd+= "-e frame.number -e frame.time -e frame.time_epoch -e ip.len  -e eth.src -e eth.dst -e ip.src -e ip.dst -e ip.proto "
    cmd+= "-E header=y -E separator=, -E quote=d -E occurrence=f "
    cmd+= "> /tmp/ringbuffer.csv"
    # cmd+= ">> /tmp/ringbuffer.csv"
    return cmd

We give this function a file path, and it will concatenate the string cmd. This is a tshark command to parse frame and IP headers from a PCAP stored at filepath. The resulting data is a CSV in /tmp. We can download it via SFTP with paramiko. SFTP - we remember - is a secure channel. paramiko, yes… that was that library which allows us to use key based authentication for OpenSSH.

Usually (for a Network IDS ring buffer) we have multiple small (rotating) PCAPs and we need to generate a file list in order to get the data for a relevant time frame.

We build this command for each file, and run the tshark conversion. In order to select the correct files, we need to check if the meta-data fits our criteria (creation time for example). Or we simply use the filenames, which might follow a useful schema. Our schema is Suricata"s, which is log.pcap.$epoch_timestamp. Having the epoch timestamp is very useful, because our criteria can be formulated in a Python one-liner. For our friend, we remember… timedelta.

import paramiko
import datetime
import calendar
import time
import unicodedata

def get_file_list(epoch_timestamp):
    file_list = []
    ids_server_ip = "" # 
    ssh = paramiko.SSHClient()
    ssh.connect(ids_server_ip, username=user_name, key_filename=keyfile)
    command = "ls /data/log.pcap.*"
    stdin, stdout, stderr = ssh.exec_command(command)

    for line in stderr.readlines():
        exit_status = 

    for line in stdout.readlines():
        # print line
        file_path_fields = line.split(".")
        time_stamp = file_path_fields[2]
        if time_stamp > epoch_timestamp:
            line = unicodedata.normalize('NFKD', line).encode('ascii','ignore')
        exit_status =  # Blocking call
    if exit_status == 0:
        print str( + ": Command finished successfully. File list is present."
        print("Error", exit_status)

We run a Bash command remotely, put the lines it prints to stdout in a list, and compare the epoch timestamp with our desired start date. If the file time-stamp is too small, we skip it.

Then we issue the tshark command:

import datetime

def run_tshark(cmd):
    verbose = False # dissector bugs messages
    ssh.connect(ids_server_ip, username=user_name, key_filename=keyfile)
    command = cmd
    exit_status = 0
    stdin, stdout, stderr = ssh.exec_command(command)

    if exit_status == 0:
        print str( + ": Command finished successfully. CSV has been parsed."
        print(str( + ": Error ", exit_status)

Not much magic here. tshark directs the stdout to the CSV in /tmp, we zip it and download it now. Something like that is 3 lines in Bash. Easy… note that tshark uses the Wireshark dissectors.

If you run this on code, which contains attack data, you might end up executing an exploit. Wireshark is not perfectly secure. My approach: make sure you have the PCAPs on a system, which is GrSec enabled, run this as a low-privilege user, and maybe even sandbox it. That is a specific topic, which has no direct relationship with our data analysis workflow example.

Check this command:

`command = "/usr/bin/zip /tmp/ /tmp/ringbuffer.csv && rm -rf /tmp/ringbuffer.csv"`

And the download can be scripted via Python as well:

   sftp = paramiko.SFTPClient.from_transport(transport)

    filepath = '/tmp/'
    localpath = '/tmp/'
    sftp.get(filepath, localpath)

PCAP to Pandas

import pandas

def parse_ringuffer_records():
    df = pandas.read_csv('/tmp/', compression="zip")
    return df

So far:

  • we run a couple of Bash commands on a remote system
  • we get tshark to generate CSVs for files based on certain criteria - to get network data
  • we download the zipped CSV via SFTP
  • we read it into a Pandas DataFrame

This isn’t rocket science :wink:

But it allows us to read a PCAP from a remote system. We can then export it as JSON or CSV via Pandas. This is useful, if we know what we want to do with the data.

Data massage

Now we need to sanitize the data-set:

# data massage
from import to_datetime

cols = ringbuffer_headers_df.columns
cols = x: x.replace('.', '_') if isinstance(x, (str, unicode)) else x)
ringbuffer_headers_df.columns = cols

We remove the dots from the DataFrame columns, because Pandas has got a problem with this character.

Now we need to convert the PCAP timestamp in the DataFrame.

    def date_convert():
        # ringbuffer_headers_df["frame_time"] = pandas.to_datetime(ringbuffer_headers_df['frame_time'])
        pcap_headers_df["frame_time_epoch"] = pandas.to_datetime(ringbuffer_headers_df['frame_time_epoch'], unit='s')

Try to convert frame.time / frame_time here. You will see that this takes a lot of computation time. And that the epoch timestamp is much better suited our tasks.

PCAP stats with tshark and Pandas

Now this is another ssh -t style Python function.

def build_tshark_stats_cmd(filepath):
    cmd = "/usr/bin/tshark -r " + filepath + " "
    cmd+= "-q -z conv,ip > /tmp/stats.txt" + " && "
    cmd+= "echo 'src, dest, bytes' > /tmp/stats.csv" + " && "
    cmd+= "awk -F \" \" 'NR>5 { print $1\", \" $3 \", \" $9 }' /tmp/stats.txt >> /tmp/stats.csv" + " && "
    cmd+= "sed -i '$ d' /tmp/stats.csv"
    return cmd

It’s a awk / sed hack, because tshark -q (the statistical functions of tshark) does not have a CSV output.

I create the CSV header, let tshark dump the data into the file, and that’s it here. I select the columns with awk. This is old school admin style Linux Kung Fu. I know that this isn’t very popular in the fancy BigData world, but it’s very efficient. Ehh… okay it’s a single-core application, so we don’t have to worry about the CPU utilization… Did I say something about Kung Fu?

Following my earlier approach I just re-use the code:

ringbuffer_stats_df = pandas.read_csv('/tmp/stats.csv')

Now I sanitize the columns (I know there is a flag now, which does that)

cols = ringbuffer_stats_df.columns
cols = x: x.replace(' ', '') if isinstance(x, (str, unicode)) else x)
ringbuffer_stats_df.columns = cols

And voila - PCAP statistics in a DataFrame. Just as good as Netflow. Not really. but it’s nice to have.

Get the top talkers by source IP

ringbuffer_stats_df["bytes"] = ringbuffer_stats_df.bytes.astype(int)
ringbuffer_stats_df["mbytes"] = ringbuffer_stats_df.bytes.div(1024).div(1024)
sip_grp = ringbuffer_stats_df.groupby("src", as_index=False).sum().sort_values(by="mbytes", ascending=False)

For a 5 GB PCAP this takes about 10 seconds. Yes, someone said performance and Python in one sentence. But actually it has very little to do with Python, because of Numpy.

This is also an example of a metrical conversion. We divide by 1024 because we want to get from bytes to kilo bytes, and from kb to mb.

Advanced mapping techniques for data enrichment

Let’s say you have lots of networks and you want to know to which /24 / assigned network belongs now. How would you determine this? - For a column of a DataFrame? For millions of rows?

I will show you how not to do it as well. Evolution works through one major tool: mistakes. We need to make our’s and share them.

import numpy
from collections import defaultdict
from pandas.util.testing import test_parallel
import struct
from tqdm import tqdm

In order to look up where an IP belongs to we need to create a lookup table. Our’s is a dict from the collections framework.

my_net = defaultdict(str)
my_net[""] = "13, web"
my_net[""] = "14, wifi"
my_net[""] = "601, janitor willy"
my_net[""] = "25, qa dudes"
# my_cidrs = list(my_net.keys())

I have a web network, a janitor network, a wifi network… and Python can tell me whether our IP belongs to the janitor or to the qa dudes.

Here is how not to do it:

def netmap(ip, network_lookup_dict):
    This is horribly slow, and I keep it for as an example how not to do it.
    for key, value in network_lookup_dict.iteritems(): 
            if ipaddress.ip_address(unicode(ip)) in ipaddress.ip_network(unicode(key)):
                return str(key) + ", " + str(network_lookup_dict.get(key))
        except KeyError:
            print "Lookup error, key data not valid. Data corrupt."
            return numpy.NaN

Note that every lookup is O(n). I thought that ipaddress as a standard lib would be blazing fast and outperform me 100x. Here is why that is not the case.
In order to write faster code I remembered my CCNA lessions… and how sub-netting really works in hardware.

def custom_netmap(ip, network_lookup_dict):
    This is 100x faster and delivers the same results. We could even use the *_cidrs list and
    improve ~ 5% ish. But it's not required for me. 
    for key, value in network_lookup_dict.iteritems(): 
        ipaddr = struct.unpack('>L',socket.inet_aton(ip))[0]
        netaddr,bits = str(key).split('/')
        netmask = struct.unpack('>L',socket.inet_aton(netaddr))[0]
        ipaddr_masked = ipaddr & (4294967295<<(32-int(bits)))   # Logical AND of IP address and mask will equal the network address if it matches
        if netmask == netmask & (4294967295<<(32-int(bits))):   # Validate network address is valid for mask
            if ipaddr_masked == netmask:
                return str(key) + ", " + str(network_lookup_dict.get(key))
            print "***WARNING*** Network",netaddr,"not valid with mask /"+bits
            return ipaddr_masked == netmask

I copied that from somewhere on Stackoverflow. Lots of try and error later I was faster. Note that some network admins tend to document wrong subnets for static egress points. Because some ISPs tend to reserve the first IP.

Let’s say I have many of these dicts I use as lookup tables. And depending on the location I want to use the correct table.

def apply_netmap(df, location="MY"):
    We add documentation specific information to the columns.
    Multi threading this (released GIL from Pandas) does not make this faster (for me). We aren't there yet. 
    # mapping
    mapped = False
    if location == "MY":
        tqdm.pandas(desc="Corr Network Zones - SIP")
        df["sip_infos"] = df["sip"].progress_map(lambda ip: custom_netmap(ip, my_net))
        tqdm.pandas(desc="Corr Network Zones - DIP")
        df["dip_infos"] = df["dip"].progress_map(lambda ip: custom_netmap(ip, my_net))
        mapped = True
    if location == "Datacenter":
    if not mapped:
        return None
    # post processing
    df["location"] = location
    df["sip_infos"] = df["sip_infos"].astype(str)
    df["sip_infos"] = df["sip_infos"].replace('None', numpy.NaN)
    df["dip_infos"] = df["dip_infos"].astype(str)
    df["dip_infos"] = df["dip_infos"].replace('None', numpy.NaN)
    return df

I use the lambda expression in the .map() to call my netmap() function. This function allows Python to tell me into which category / subnet my IP belongs.
Each row in the column (this contains sip (source IPs) or dip (destination IPs)) gets passed to the function. The return value is very important. For Pandas you should write functions, which always return something. Even if it’s NaN. This way we make sure that we add something into the row of the new column.

This takes time

I use the tqdm package to add progress bars for the mapping operation. Here is how this looks single-threaded for a large DataFrame:

Corr Network Zones - SIP: 100%|██████████| 7037079/7037079 [06:19<00:00, 18554.00it/s]

memSQL and Pandas are very good friends

It’s just that there is a third guy. Named SQLalchemy. I have no idea why the Pandas developers chose to rely on an ORM to commit massive data into a DBMS. That does not work for performance related tasks, and ORMs have never been known for their ability to handle large amounts of data. There are legit use cases for ORMs. Comitting DataFrames to a DB is not one.

Isn't memSQL just a marketing DBMS?

No. memSQL is optimized for aggregation performance from the start. I don’t think anyone gets comparable write speeds with a similar MySQL or PostgreSQL setup.

MySQL has got an in-memory storage engine. Which does not work with JSON blobs - sadly. We don’t need that here, but generally I don’t see much use for MySQL in memory DBs due to this.

PostgreSQL does not have an in-memory storage engine afaik. But of course there are ramdisks. :wink:

My use cases are:

  • write large DataFrames with network data.
  • perform fast queries
  • access the data from Tableau 10
  • use simple ODBCs
  • make sure the data is normalized and usable for ML algorithms

My use cases are not:

  • to dump the data into a MongoDB, which is single-threaded and performs Map Reduce in the backend
  • to provide a DBMS for multiple users and applications
  • web development
  • eh…

I use it, because it super convenient and it performs like a charm for me. I can commit 80000 rows per second to a DB in a single Docker container on an old workstation. With MariaDB (MySQL) I get 8000 - 12000 on the same host, with a similar config from what I can see.

memSQL and Pandas

We have a large DataFrame with Netflow or PCAP infos now, and we want to write it:

import MySQLdb

import mysql.connector
from sqlalchemy import create_engine

SQLalchemy is not good to commit large DataFrames.
It's 100x slower in my tests. 
Try it out! I get 8000 rows/s in memSQL with SQLalchemy and 80000 without it.

engine = MySQLdb.connect("","dude","secret","netflow_db")
# engine = create_engine('mysql+mysqlconnector://dude:[email protected]:3306/netflow_db', echo=False)

def commit_flows(df):
    # chose columns
    df = df[["timestamp", "sip", "dip", "bytes", "packets", "location", "sip_infos", "dip_infos"]]
    # write in chunks
    df.to_sql(name='netflow_table', flavor='mysql', con=engine, if_exists = 'append', index=False, chunksize=40000)

I select the columns I commit. In my earlier post I showed that I can parse many fields from the RWrec objects from SiLK. Here I limit myself to what I need.

I made bad experiences with SQLalchemy here, because it’s apparently not possible to perform bulk commits with this ORM through Pandas. Sure, I can dump the SQL into a string and craft my custom query. But that means I have the DataFrame in memory, the SQL string, and the chunks I want to write. That is inefficient.

In short:

  • do not use SQLalchemy with Pandas if you want performance
  • memSQL and Pandas are good friends. It’s possible to commit large DataFrames fast.
  • memSQL and Tableau are a good combination for impatient people like me
  • Pandas and Tableau become connected this way, because the output format of the DataFrame is compatible. A more sophisticated approach is to deploy Spark with memSQL and to convert the DataFrame object into a PySpark RDD. Then you would send the stream to Kafka. Another iteration of this would involve sending the data via a stream. The Linux kernel module I wrote about in the other blog post has got SNMP support to make this possible.


Cronjob for 01:00 a.m. e.g… This deletes every row in the table, which is older than

use netflow_db;
delete from netflow_table where timestamp < (NOW() - INTERVAL 1 DAY)

In an analog fashion you can do this for the PCAP headers. My recommendation is that you name the columns alike. This will make your scripts straight forward. If your persist the tshark statistics in a DB I recommend to use the file timestamp (from the filename in our case here). You can use the relative time column if you want, and just add it to the capture start point. If you really have to, you can use capinfos to get these infos, in case your filenames do not follow a schema.

Enrichment and visualization

I chose to use Google Charts instead of D3 to get this done fast and pretty.

My initial idea was to just reverse DNS lookup the IP columns of a DataFrame with over one million rows. Eh… I got capped at 100 lookups per second, because ISPs cap the amount of requests per second per source IP. I could should use multiple DNS servers and utilize Python multi-processing.

For now I keep it simple with dnspython:

import dns.resolver
import dns.reversename
from pandas.util.testing import test_parallel
from tqdm import tqdm

Yes, I attempted to lookup the IPs from the columns using multiple threads :).

def rdns_lookup(ip):
    This function attemps to reverse DNS lookup an IP. 
    If it fails, it will return the IP. It there is an exception it will return the IP.
    This keeps the data-set dafe. It's slow, so limit it to a couple of thousand lookups.
    Speed it up by using multiple DNS servers, and then use multi-threading.
    results = []
        #if ip is not "NaN":
        for a in dns.resolver.query(dns.reversename.from_address(ip), 'PTR'):
        return ip
    if results:
        return results[0]
        return ip

This is as straight forward as possible: it will under no circumstances corrupt my data-set. It will always return something, even if it runs into an exception.

Here we call these functions via the .map(). tqdm's progress bars visualize nicely how fast your DNS servers let you get.

# @test_parallel(num_threads=8)
def add_rdns_columns(df):
    This function adds the columns to the DataFrame object for the DNS infos.
    tqdm.pandas(desc="Performing source IP DNS lookups")
    df["sip_dns"] = df["sip"].dropna().progress_map(lambda ip: rdns_lookup(ip))
    tqdm.pandas(desc="Performing destination IP DNS lookups")
    df["dip_dns"] = df["dip"].dropna().progress_map(lambda ip: rdns_lookup(ip))
    return df

If you want to go all out with threads and multiple name servers you need to initiate one resolver with one DNS nameserver IP.

my_resolver.nameservers = ['', '',...]

Chances are good that you cannot chose random public name servers in a cooperate network. Therefore I leave this out of my functions for now.

Next: let’s use these new columns with the domains we just looked up and make a Google Chart.

Results here:

The data enrichment wasn’t exactly straight forward. The DNS lookups look doable for every coder, who can run code on a machine with access to multiple DNS servers. But the network mapping with a dict is not easy. The problem here isn’t Pandas or dicts. It’s the massive amount of data. SiLK, if you use Netflow, has a solution for that:

Modify the packing logic C file. […]

Lesson learned: data enrichment is a complex task if you do it too late.

Pretty pics, pretty ticks


Let’s look at the code… for the Sankey diagram in the Google Charts library.

def gen_js_sip_sankey(netflow_df):
    This is a very simple Sankey chart for the Flows.
    We enrich the columns with DNS information.

vis_data = """
  google.charts.load('current', {'packages':['sankey']});

  function drawChart() {
    var data = new google.visualization.DataTable();
    data.addColumn('string', 'From');
    data.addColumn('string', 'To');
    data.addColumn('number', 'Weight');

So far this is all copy and paste boilerplate code for the Sankey diagram from the manual.

Note: Avoid cycles in your data: if A links to itself, or links to B which links to C which links to A, your chart will not render

Ehh, this actually is bad. Why can’t it detect that for me? We can remove the circle with the following one-liner:

netflows = netflow_df[~netflow_df.sip.isin(netflow_df["dip"])]

I’d like to see someone do this in a Bash one-liner actually… :slight_smile:

Then we put the top 25 talkers (by source IP) in the list, reverse lookup their DNS names, and add the list to the JavaScript code.

netflows = netflows.head(25)[["sip", "dip", "bytes"]]
vis_data += str(netflows[["sip_dns", "dip_dns", "bytes"]].groupby(("dip_dns","sip_dns","bytes"), as_index=False).sum()
                .sort_values(by="bytes", ascending=False)

That is a quite useful workflow here. The [-separated structure JavaScript expects is exactly how Pandas’ tolist() outputs the values.

    vis_data += """

        // Sets chart options.
        var options = {
          width: 800,

        // Instantiates and draws our chart, passing in some options.
        var chart = new google.visualization.Sankey(document.getElementById('sankey_basic'));
        chart.draw(data, options);

    f = open('/tmp/data.js','w')

Now let’s just drop this data.js file somewhere and load it as part of the chart. We can do this professionally with beautifulsoup 4, which also prettifies the markup. We just add the script tag to deploy this.

import bs4

def regen_html():
    For the HTML boilerplate with BeautifulSoup

    with open("network_sankey.html") as inf:
        txt =
        soup = bs4.BeautifulSoup(txt)    
    js_tag = soup.new_tag("script", type="text/javascript", src="./data.js")

    html = soup.prettify("utf-8")

    f = open('/tmp/todays_sankey.html','w')

This might be a little bit over the top, but it does the job reliably.

Technical results

  • Google Charts and Pandas work well together
  • memSQL is great to quickly store DataFrames
  • Getting network activity data from Netflow and/or PCAPs is simple
  • Data enrichment is surprisingly complex/slow with large data, and adding domain specific information takes time
  • Filtering and sanitizing the data helps a lot to interact with a DBMS
  • memSQL allows us to get charts in Tableau Desktop (10) with no delays

Further reads:

  • if you want common Goog Charts with Python one-liners, check this out.
  • if you are looking for Bootstrap based dashboard templates, I found these to be simple and straight forward.


We are one step closer to get and manipulate network activity data to cluster network endpoints by activity. It’s important that a data-set doesn’t just contain technical information, but also structural and environmental data. This way it’s much easier to determine whether the Machine Learning approach has resulted into a robust clustering.

It’s possible to centralize network information with a fast database, and to generate insightful information with statistical methods. That is a prerequisite for Machine Learning. The underlying workflow has to be clear and reliable. Otherwise the abstraction, which ML needs, will fail. For any kind of data.

So far we have solidified the technical requirements to continue with this endeavor. We can also answer the question: it’s not BigData and it’s not just fast. BigData isn’t necessarily big information, if you just store it somewhere. But via the means of traffic analysis network data can become big information.