Cloudflare Enterprise Web Application Firewall (WAF) Logs - event enrichment techniques using Python Pandas and tabular correlation

Cloudflare Enterprise Web Application Firewall (WAF) Logs - event enrichment techniques using Python Pandas and tabular correlation

I think it’s almost a decade, and Cloudflare’s services always felt like an upgrade. From Akamai and others.

Always? – Not always. In recent months the informational value of Cloudflare’s Web Application Firewall (WAF) Logs was degraded. For Enterprise customers.

– Degraded?! For the most precious customers? How so? Scandalous! :astonished:

Problem description: proprietary EventID fields cannot get resolved to human-readable event description leading to investigation gaps

By nature a WAF is there to “firewall” bad web traffic. Sometimes it works 100%, sometimes not. It needs to be adjusted and in order to do that staff needs to monitor it. Preferably without much overhead.

To enable Enterprise customers to have this “business” process for their “enterprise security”, Cloudflare offers its API[1] to programmatically retrieve the required data. In Information Security many processes today get driven by data. Call it Security Information and Event Management, Data-driven security, Investigation Theory or Scenario- and Trend Analysis… It really depends on the business sector and the background of the people involved. A good API is a good API for all of us.

I did A123 and I don't tell you what that is

A123! – No seriously, that is what Cloudflare decided to inform its Enterprise customers about. And nothing more.

Now you may say… that’s a huge business. They can’t be that limited. – And you are right. I am not entirely fair here, because of course there are Dashboards within the many Cloudflare services that you can consume.

If you however need to correlate these Web App Firewall logs with other production web services, you may face Enterprise-level issues as an Enterprise-level customer: the logs Cloudflare sends you don’t help (anymore). Split-brain scenarios. Siloed security layers running wild. All that stuff.

Let's dive in: the data

Let’s say I pulled the data using the Logpull API[2]

  • 1st half-cut column: IP
  • fields: Firewall* are new, specifically the ID field doesn’t tell you anything

Are we the first people to have this issue? → No, the issue is known at least since April '22.

So what is the problem again: we don’t know what A123 means, or to be specific: in the table of WAF events we have no idea what 981176 etc. mean. And there is not even a user-friendly way to look this up. Now you say: I have heard that a significant percentage of websites runs on Cloudflare. How is it possible that close to no one cares? It’s a non-issue apparently. Wild…

– You could think that many larger security-conscious Enterprises run their magical Log Management solutions, which take care of the problem. Because of course, they need to manage their Web attacks…

But I don’t think so. Neither Splunk nor Elasticsearch can take care of this in an easy fashion. You need to be quite proficient at these solutions to perform tabular correlation. Splunk offers a lookup function to ingest a CSV with ID to WAF rule mapping (assuming you have that), and Elasticsearch offers an Ingest Enriching feature.

Let’s be honest: you don’t use that. Likely you weren’t even aware of the issue, and no one cares about Web attacks anymore? – Pfff web attacks. Security is busy… with security. Threat hunting baby. A123!


Solutions solutions solutions!

  • I am posting a CSV for the mapping between IDs and WAF event. You may post-process the file, deduplicate the entries, etc. Or update it, using the code below with your personal API credentials.
  • If you use an Enterprise-grade Log Management or SIEM solution, you may find the aforementioned lookup or enrichment features to be suitable.
  • If you don’t have that, or if you cannot implement the enrichment with these features, you can check out the code below, modify it, and operationalize it with your Continuous Integration of choice. Jenkins, GitHub Actions, … Just run the script like a cronjob. :timer_clock:

Summary: workaround exists for the state of Denmark

Given the echo through the security industry (SOCs, CERTs etc.) and the Cloudflare community this doesn’t seem to be a big deal for most practitioners of the art of Information Security (engineering).

Cloudflare’s API team has deprecated the WAF* fields (inconsistently) without informing customers, impaired the Enterprise-level logging, and disabled SIEM / Log-Management. If you can only find a workaround here in this little blog, written by a dude who doesn’t even do the engineering anymore, something is “rotten in the state of Denmark”.

Working with Cloudflare usually is relatively easy, and people seemed to show a great amount of expertise. I want that back for security-level features, please.

I have been informed that the API dev team is going to add the field back in Q4 2022 / Q1 2023. That being said, it’s gone for over 6 months, and few people realized that their logs were useless. Web logs make for a large percentage of the SIEM / Log System invoices, right?

Appendix: Code listing and data-walkthrough

Map a EventID to a description - a solution agnostic data-science workflow

It’s always comfortable to assume that the implementation of advanced log correlation features works perfectly well, but how do you verify that? You’d need to work on a sample data set and compare, right? Log enrichment isn’t magic, and people make mistakes during the implementation…

Pull Cloudflare WAF events using the Logpull API with Python and Pandas

This continues to be a commented code review to share InfoSec data-science tricks to moderate the following: think beyond SIEMs and Log Management tools. Use, but verify these tools. They are used by humans, created by humans, and have errors like we humans do :slight_smile:

Libs

import json
import pandas as pd
import requests
from datetime import datetime, timedelta

Manage access keys

global api_key
global mail

data = {
    'X-Auth-Key': api_key,
    'X-Auth-Email' : mail,
    'Content-Type': 'application/json'
}

zone = "123" + "/"

def read_config():
    """
    Use a JSON file as a config file for the secrets
    :return: none, sets global variables
    """

    with open('./config.json') as file:
        config = json.load(file)
        api_key = config["api_key"]
        mail = config["mail"]

read_config()
  • Cloudflare uses Zones. We need to tell the API: this is what I want to look at.

Log retrieval

Logs are available via “Logpull”:

def get_cloudflare_logs():
    """
    Use the Logpull API (v4
    :return: Log DataFrame
    """

    end_time = datetime.now() - timedelta(minutes = 5)
    end_time = end_time.strftime('%s')

    start_time = datetime.now() - timedelta(minutes = 65)
    start_time = start_time.strftime('%s')

    print("Max 1h of logs can be fetched at once. - Limited by server.")

    readable_starttime = str(datetime.fromtimestamp(int(start_time)).strftime('%c') )
    readable_endtime = str(datetime.fromtimestamp(int(end_time)).strftime('%c'))

    print("Start: " + readable_starttime)
    print("End: " + readable_endtime)

    url="https://api.cloudflare.com/client/v4/zones/"
    path="logs/received?"
    start="start=" + start_time +"&"
    end="end=" + end_time
    timestamps="&timestamps=unixnano"
    # we can be selective about the fields, but we cannot filter further with field=XYZ
    fields="&fields=EdgeStartTimestamp,ClientIP,ClientIPClass,ClientRequestURI,ClientRequestUserAgent,FirewallMatchesActions,FirewallMatchesSources,FirewallMatchesRuleIDs"

    r = requests.get(url + zone + path + start + end + timestamps + fields, headers=data)
    content = r.content.decode("utf-8")

    logs_df = pd.read_json(content, lines=True)
    print("Logs have been pulled.")

    return logs_df
  • the log events are individual JSON records, that are fed into a DataFrame (like an Excel table), line by line.
  • the encoding needs to be UTF8, assuming that some level of adversarial data is present

Screen Shot 2022-10-14 at 08.29.37

Some data-magic to simplify the analysis:

logs_df["EventTime"] = pd.to_datetime(logs_df.EdgeStartTimestamp, unit='ns')
logs_df["FirewallMatchesSources"] = logs_df["FirewallMatchesSources"].astype(str)
waf_logs_subset = logs_df.loc[logs_df['FirewallMatchesSources'].str.contains("waf", case=False)]
logs_df.loc[logs_df['FirewallMatchesSources'].str.contains("waf", case=False)]

And we are at this point again, just where we started:

  • Some level of esoteric programming and hallucinogenic security engineering and voila: data, data, data. A dream! :zzz:

Nested / distributed complexity: how do I retrieve the descriptions fitting to the IDs using the Cloudflare API?

Not sure what to tell you… get a coffee mug and buckle up. Code is commented. The structure of the rules is deeply hierarchical.

def get_WAF_rules_id_map_for_zone():
    """
    Cloudflare WAF rules reside in Zones, Packages, RuleGroups, Rule with ID
    The rule list gets returned from the API with pagination
    :return: DataFrame with Rules and corresponding IDs
    """

    def get_package_ids():
        """
        We need to get the enabled packages in the zone
        :return:
        """

        url="https://api.cloudflare.com/client/v4/zones/"
        path="firewall/waf/packages"

        r = requests.get(url + zone + path, headers=data)

        content = r.content.decode("utf-8")
        waf_packages_reply_json = json.loads(content)["result"]

        # this is what we want: the waf packages from the zone
        waf_packs_list_of_dataframes = list()

        for waf_package in waf_packages_reply_json:
            temp_df = pd.json_normalize(waf_package)
            waf_packs_list_of_dataframes.append(temp_df)

        # combine all entries into one dataframe table
        waf_packs_df = pd.concat(waf_packs_list_of_dataframes, ignore_index=False)
        # reindex the dataframe
        waf_packs_df.reset_index(drop=True, inplace=True)
        # print(tabulate(result_df, tablefmt="simple_grid", showindex=False))
        print("WAF packages listed successfully.")
        # print(rule_group_df)
        return waf_packs_df["id"].tolist()

    def dump_waf_rules_of_package(package_id_list):
        """
        We need to get the enabled groups per pack
        :param package_id_list:
        :return:
        """

        url="https://api.cloudflare.com/client/v4/zones/"
        path="firewall/waf/packages" + "/"

        waf_rules_group_list = list()
        print("The package IDs are:")
        print(package_id_list)

        # go through all packs
        for package_id in package_id_list:

            call = "/rules"
            r = requests.get(url + zone + path + package_id + call, headers=data)
            api_response = r.content.decode("utf-8")

            result = json.loads(api_response)["result"]
            waf_groups_df = pd.DataFrame(result)

            # make a unique list of rule groups across all packs
            if "group" in waf_groups_df.columns:
                # print(waf_groups_df["group"])
                rule_groups = waf_groups_df["group"].tolist()

                for group in rule_groups:
                    if group not in waf_rules_group_list:
                        waf_rules_group_list.append(group)


        waf_rule_group_df = pd.DataFrame(waf_rules_group_list)
        print(waf_rule_group_df)

        # finalize list
        waf_rules_group_list = waf_rule_group_df["id"].tolist()
        print("WAF Rule Groups in Zone: ")
        print(waf_rules_group_list)

        rule_to_id_list = list()

        # curl -X GET "https://api.cloudflare.com/client/v4/zones/$ZONEID/firewall/waf/packages/$PACKAGEID/rules?&group_id=$GROUPID
        for package_id in package_id_list:
            print("Package ID: " + package_id)

            for group_id in waf_rules_group_list:
                # loop control variables:
                page_nr = 1
                pages_total = 5 # x > 1, gets overwritten

                print("Group ID: " + group_id)
                url="https://api.cloudflare.com/client/v4/zones/"
                path="firewall/waf/packages" + "/"
                call="/rules?&group_id=" + group_id

                # pagination loop
                while page_nr != pages_total:
                    # adjust request parameter to call next page iteratively
                    params="&page=" + str(page_nr) + "&per_page=20"

                    # print("Page nr: " + str(page_nr))
                    r = requests.get(url + zone + path + package_id + call + params, headers=data)
                    # print(url + zone + path + package_id + call + params)
                    api_response = json.loads(r.content.decode("utf-8"))
                    # print(api_response)

                    if pages_total == 0:
                        break
                    else:
                        pages_total = int(api_response["result_info"]["total_pages"])

                    rule_id_api_df_reply_segment = pd.DataFrame(api_response["result"])

                    if "id" and "description" in rule_id_api_df_reply_segment.columns:
                        # print(rule_id_api_df_reply_segment[["id", "description"]].head(1))
                        rule_to_id_list.append(rule_id_api_df_reply_segment[["id", "description"]])
                    else:
                        # print(rule_id_api_df_reply_segment)
                        print("X")

                    # next page
                    page_nr = page_nr + 1
                    # print("Page total: " + str(pages_total))
                    print("." * page_nr)


        rule_id_dataframe = pd.concat(rule_to_id_list, ignore_index=False)
        rule_id_dataframe.reset_index(drop=True, inplace=True)

        # print(rule_id_dataframe)
        print("WAF Rule to ID description map generated")
        return rule_id_dataframe


    package_id_list = get_package_ids()
    rule_id_df = dump_waf_rules_of_package(package_id_list)
    return rule_id_df

Call it, write it:

rules_to_ids_df = get_WAF_rules_id_map_for_zone()
rules_to_ids_df.to_csv("./rules_to_ids.csv")

Screen Shot 2022-10-14 at 08.36.20

Lookup a rule ID (either by running the long Python function or by loading the CSV):

Screen Shot 2022-10-14 at 08.37.35

Finally: tabular correlation for data-enrichment of Cloudflare WAF logs using Pandas!

  • in the following I use a single-threaded mapping approach (.apply()), but given the implementation that’s ok here. This is just calling a sub-function for each entry in the row with the IDs
  • some events cannot get resolved
def map_ruleid_to_desc(rules_df : pd.Series, logs_df : pd.Series):
    """
    Enrich a data-set: resolve the IDs
    :param rules_df: DataFrame with columns for WAF rule IDs and rule descriptions
    :param logs_df: DataFrame with WAF logs to make lookups per ID within
    :return: DataFrame with addional description column
    """

    # we are not chaining args (warning disabled)
    pd.options.mode.chained_assignment = None

    # get waf events out of the general web logs
    logs_df["FirewallMatchesSources"] = logs_df["FirewallMatchesSources"].astype(str)
    waf_logs_subset = logs_df.loc[logs_df['FirewallMatchesSources'].str.contains("waf", case=False)]

    rules_df["id"] = rules_df["id"].astype(str)

    def lookup_id(id):
        """
        This is the lookup function, which can be parallelized
        :param id: an ID of a WAF event
        :return: a String containing the first available description fitting to the ID
        """
        lookup_df = rules_df.loc[rules_df['id'] == id ]

        if lookup_df.empty:
            return "No match in WAF rules"
        else:
            result = str(lookup_df.iloc[0]["description"])
            return result

    print("> Performing lookup / event enrichment")
    # swifter makes this lookup concurrent
    waf_logs_subset["WAFEvent"] = waf_logs_subset["FirewallMatchesRuleIDs"].swifter.apply(lambda x: lookup_id(str(x[-1])))
    # rules_subset = rules_df.loc[rules_df['id'].str.contains("981176", case=False)]

    # print(waf_logs_subset[["ClientIP", "FirewallMatchesRuleIDs", "WAFEvent"]])
    # print(waf_logs_subset["FirewallMatchesRuleIDs"].tolist())
    # print(rules_subset[["id", "description"]])
    # print(lookup_id("100203"))
    return waf_logs_subset[["ClientIP", "FirewallMatchesRuleIDs", "WAFEvent"]]

# enriched_waf_logs_df = map_ruleid_to_desc(rules_to_ids_df, logs_df)


  1. Application Programming Interface ↩︎

  2. Requesting logs · Cloudflare Logs docs ↩︎