Elasticsearch Wiki - JSON in, garbage out

elasticsearch
log_management
graphs
siem
pandas
data_science
jupyter
networkx
python
Tags: #<Tag:0x00007fe3bc463578> #<Tag:0x00007fe3bc463438> #<Tag:0x00007fe3bc4632f8> #<Tag:0x00007fe3bc4631b8> #<Tag:0x00007fe3bc463078> #<Tag:0x00007fe3bc462f38> #<Tag:0x00007fe3bc462df8> #<Tag:0x00007fe3bc462cb8> #<Tag:0x00007fe3bc462b78>
#1

This is a subjective Wiki entry and reflection upon Elasticsearch and how to use it with Python.

Elasticsearch - can't search this!

Please check the Applies to paragraph in order to determine the applicability. Elastic often pushes breaking changes.

Using the scroll API

Applies to: Elasticsearch 7.0, elasticsearch (Python 3 package 7.0.0)

The Elasticsarch API is not intuitive. One of the many shortcomings is, that we need to iterate over a result set larger than 10 000 rows with a Scroll API.

The following code is based upon a blog article at Techoverflow[1]. I only needed to do minor modifications. The Elastic forums are full of deprecated and wrong answers, that will not yield results.

def es_iterate_all_documents(es, index, pagesize=250, scroll_timeout="1m", query=OrderedDict()):
    """
    Helper to iterate ALL values from a single index
    Yields all the documents.
    """
    is_first = True
    query.update({'size':pagesize})
    query = json.dumps(query)

    while True:
        # Scroll next
        if is_first: # Initialize scroll
            result = es.search(index=index, scroll="1m", body=query)
            is_first = False
        else:
            result = es.scroll(body={
                "scroll_id": scroll_id,
                "scroll": scroll_timeout
            })
        scroll_id = result["_scroll_id"]
        hits = result["hits"]["hits"]
        # Stop after no more docs
        if not hits:
            break
        # Yield each entry
        yield from (hit['_source'] for hit in hits)

How do issue an Elasticsearch query to this function?

This function returns the typical nested JSON string. By default 250 rows are returned, until the result set is complete. The query, which needs to be passed as a OrderedDict() parameter can de defined in a calling function like this:

esq = OrderedDict({
        "query":
            {"match":
                 {"example_field": {"query": "1.2.3.4"}}
             }
    })

Using an OrderedDict() object here makes this a little more bearable.

How do I pass a client object to this function?

Applies to: Elasticsearch 7.0, elasticsearch (Python 3 package 7.0.0)

The es parameter is supposed to hold an Elasticsearch client.

from elasticsearch import Elasticsearch
es = Elasticsearch(hosts="4.3.2.1")

How do I pass the current index to this function?

Assuming that you have a daily rolling index model, you can use:

index = "myindex-1.0.0-" + str(datetime.datetime.today().year) + "." \
          + str('{:02d}'.format(datetime.datetime.today().month)) + "." \
          + str('{:02d}'.format(datetime.datetime.today().day))

Passing the result set into a Pandas DataFrame

Append to a list per iteration

Applies to: Elasticsearch 7.0, pandas (Python 3 package 0.24)

The simple (and not most effective) way is to append the rows per iteration to a list.

presult = []
for entry in es_iterate_all_documents(es, index_ds, query=esq):
    presult.append(entry["root_field_xyz"])
df = pd.DataFrame(presult)

Handling the JSON result set with pandas (pd) makes tabular integration less painful, because Elasticsearch results aren’t easy to process.

Using Elasticsearch result sets with Pandas and Networkx

Let’s assume that we have our data set in a ordered DataFrame:

print(df[["bytes", "packets", "dst_addr", "src_addr"]].drop_duplicates(subset=["dst_addr", "src_addr"]))

 bytes packets        dst_addr         src_addr
0     46       1  144.76.Y.X    62.231.A.B
1     46       1  144.76.Y.X      37.6.A.B
...
9    152       3  144.76.Y.X    80.169.A.B

.drop_duplicates() is a Pandas function[2], that will take a tupel from two rows and eliminate the duplicates. There are other statistical functions for different purposes, which allow us to calculate aggregates, median and DataFrame wide statistics.

Now for a quick demo of the capabilities of applied DataScience using Pandas and Elasticsearch, consider the following basic example with NetworkX [3] and JupyterLab [4]:

from networkx import DiGraph
G = nx.from_pandas_edgelist(df, 'src_addr', 'dst_addr', ['bytes'], create_using=DiGraph())
57

  1. https://techoverflow.net/2019/05/07/elasticsearch-how-to-iterate-scroll-through-all-documents-in-index/ ↩︎

  2. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.drop_duplicates.html ↩︎

  3. https://networkx.github.io/ ↩︎

  4. https://jupyterlab.readthedocs.io/en/stable/ ↩︎