Python Programming Language and Libraries - usage journal

Tags: #<Tag:0x00007f0cad6ad888> #<Tag:0x00007f0cad6ad270> #<Tag:0x00007f0cad6acf28> #<Tag:0x00007f0cad6acbe0>


Python Programming Language and Libraries - usage journal

Usage journals are a lose wiki format to maintain coding patterns. These patterns might not be applicable to individual use cases, apart from mine. I collect these as a reference, and hope that this helps someone else as well. I also want to mention that I use Python as mere tool.


Medium data processing distribution with Pandas and Dask

Medium data processing is a term I use to distinguish an approach from the Big Data world. If you can process the data in memory on a single machine, it’s “Medium data processing”. :wink:

Pandas DataFrame to Dask - there and back again

I have a medium-big Pandas DataFrame df, which I fed with a lot of rows. It’s 12 GB in size. Now I want it in Dask to make use of its distributed processing.

import dask as dd
from dask.diagnostics import ProgressBar
import gc
numthreads = 4
ddf =  dd.from_pandas(df, npartitions=numthreads)
del df

1 logical line of Python is required to to this. Let’s not argue about the del and gc.collect() here… but for the sake of completeness I see that I get rid of DataFrames this way. The Pandas DataFrame gets split in 4 parts, which will be worked upon separately.

Let’s utilize these 4 cores and put them on a workout:

pbar = ProgressBar()
with pbar:
    ddf["sip_infos"] = ddf["sip"].map(lambda ip: custom_netmap(ip, mynet),  meta=('ip', str))
    ddf["dip_infos"] = ddf["dip"].map(lambda ip: custom_netmap(ip, mynet),  meta=('ip', str))
    df = ddf.compute()
    del ddf

Essentially what happens is that the mapping operation custom_netmap is applied to 2 columns, in parallel on 4 cores / Dask DataFrame partitions. The result is written into the Pandas DataFrame df. What you can also see is, that the meta statement was used. It types the lambda expressions for Dask.

Result: from a single-threaded Pandas mapping execution we get to a multi-threaded Dask execution, seamlessly. It’s an anti-pattern, but it works.

File extraction from a tar ball, and finding IP addresses via Regular Expressions

I have a tar ball full of IP addresses I want to use, to match them against logs and activity patterns. Python can do this easily. I define a couple of files within a tar ball full of other files, and pass the content of these files to a conversion function. More on that later.

The file can be retrieved from here, with less current information than in the ET Pro variant. The code is not different.

I use the Python tar module.

def extract_feedfiles(tarball):
    """ read and extract the tarball data, pass them on for the formatting"""
    feed_list = ["rules/tor.rules", "rules/compromised.rules", "rules/dshield.rules"]
        tar =, "r:gz")
    except (OSError, IOError) as e:
        print("Wrong file or file path?")
        print e

    for tarinfo in tar:
        # print tarinfo
        if in feed_list:
            f = tar.extractfile(tarinfo)
            content =
            # print content
            feed_name ="/")[1]
            parse_feed_to_json(content, feed_name)

I can hard-code the / for the .split(), because I know the file names. Now within this content string there are IP addresses I am interested in. How do you get them? Out of the string?

The easiest way is to use fitting regular expressions with Python.

def parse_feed_to_json(etpro_feed_suricata, name="ip"):
    """ quick and simple IP extraction and data frame'ing"""
    target_path = DEST_PATH + os.sep + name + ".json"
    ip = re.findall(r'[0-9]+(?:\.[0-9]+){3}(?:\/[\d]+)?', etpro_feed_suricata)
    # ip = re.findall(r'\b[0-9]+(?:\.[0-9]+){3}(?:\/[\d]+)?\b', file, flag...
    df = pd.DataFrame(ip)  # .drop_duplicates(keep=False)
    df.columns = [name]

DEST_PATH is a constant which holds the file destination.

etpro_feed_suricata is the content from the extracted string.

The IP addresses are collected into the list ip. The Regular Expression optionally allows a CIDR mask (/24,/16 etc.) with (?:\/[\d]+)?. The RegEx is a bit too wide, and may allow invalid IPs.

The final lines of this function drop the list into a Pandas DataFrame, which is written out as a JSON file. This file is served via a web root to some other application to import the IP list.

Result: in conjunction with each other these 2 functions grep through a tar ball for IP addresses, and write the result to a JSON file.
You can also do this without Pandas; if that is too much of a dependency. I use Pandas for some later tasks, which include dropping duplicates, merging with other feeds etc… The most useful part probably is the RegEx.