Make Application Security DevOps-friendly, continuous, manageable and keep it simple: GitHub Advanced Enterprise Security to Action(s)

Make Application Security DevOps-friendly, continuous, manageable and keep it simple: GitHub Advanced Enterprise Security to Action(s)

GitHub came out with its “Advanced Enterprise Security” feature package about 2-3 years ago.

Gartner put it into the popular Magic Quadrant as a “Visionary”-level product, but I disagree. I find Veracode, Coverity, and others to be visionaries.

The Application Security sector is full of visions, fantasies, and dreams. Few of the fantasies matter to the modern Software Engineer if we are honest. Big discussions on the why all lead to one result: it’s “good enough” as it is or the regulations are made by people who don’t write code. One is an assumption: “good enough”.

From vision to Action

What is good enough, speaking of Application Security, for the service?

Out of the box, GitHub’s Advanced Enterprise Security cannot answer that. It would mean that it would have to draw a baseline across (validated) findings (of identified relevant services and components) and somehow introduce metrics.

I have done that in the simplest form, taking the good parts of Veracode’s vision as an inspiration. At least in the way, I remember them :slight_smile: - Abstraction matters! A score-based metric is such an abstraction. I find that to be a relevant communication tool as well.

But there is one thing where Veracode (et. al.) fall short at: Continuous Integration. That means daily reports need to fit into a Slack message (or equivalent). Small messages, small amount of information, incrementally. Veracode (the last time I used it) was too slow. And also didn’t have the reporting DevOps processes needed. Their API was a mixture of XML and JSON iirc. Not very good.

For some use cases (audits, compliance, management) you may need a corporate “PDF” but engineers don’t need that. – Not if the detailed information is available next to the code in the repo, which is what GH Enterprise Sec already delivers. Code and security are next to each other. That’s the innovation and not the vision. No more silos. Living processes.

A very shallow and lightweight process sketch simply to understand the coming parts in conjunction.

Screen Shot 2022-10-14 at 12.39.07

2100-08-14
                                          Code score    Dependency score
--------------------------------------  ------------  ------------------
TeamShop/WebShop_Front                           100                  64
TeamFrontend/web-libs                            100                  73
TeamAPI/api-gateway                              100                  81
TeamData/middleware-process                      100                  82
  • How good are we? “x / 100.”
  • Are we getting better? “Check yesterday’s Slack. Same.”
  • Any big issues? “Most above 75 points, passing tests. One outlier, a little too much risk. The team is informed.”

Done. Boom. Most CTOs / Dev Team Leads / Engineering managers don’t want more than that. Or even less. So why overdo it? PDFs are for other use cases. Not for quick checkups.

Global multi-org PDF report needed? It’s not hard. Simple language, simple visualizations, clear metrics.

Screen Shot 2022-10-14 at 11.27.44

Simple enough? – The core logic of this report is so simple, you could do it with other tools as well.

Score Test Result
>95+ A
90-94 B
85-89 C
75 - 84 D
<75 FAIL

You can do that for every GitHub Repository if

  • the features are available and
  • he CodeQL (Code checks) and Dependabot (Dependency checks) are enabled,
  • and the respective Continuous Integration is setup.

Here is how!

Rather than philosophizing let’s dive into the solution to get that sort of report. GH Enterprise Security offers descriptions, recommendations for fixes, and a lot more. But no simple and consumable reporting with value for cross-functional processes. No baselining. Maybe it’s too easy… or better left to a human being. Who knows.

Baseline init: Minimum Viable Product Security

A good start point is to adopt the Minimum Viable Product Security baseline, and to apply a metrical threshold: over 75 = baseline met.

That assumes some level of knowledge on what needs to be in scope etc., but engineers know that. Subject matter experts have an idea of what’s “reachable from the outside” or “processing important data”. In my experience security-folks make the mistake to throw “everything” into SAST / CAST tools and are surprised if they get overwhelmed with False Positives. That’s not an issue here. Which is the reason why this approach scales from day 1.

How you can technically archive that is documented in the following, part by part. What’s out of scope here is how you embed this into your ISMS / Compliance framework and how you cooperate with Devs. Let’s assume that’s your homework :slight_smile: Or you know how to use the search function of this knowledge base. It’s key to success.

Operationalise Application Security Metrics generation with GH Actions and Enterprise Security

First things first: operationalise means that a theory needs to be put into action, into reality. In order to do that some foundation needs to be established: we have

  • GitHub Enterprise Security and a Linux Runner (with a user)
    • a small virtual machine suffices
  • Repositories with projects to work with
  • a business that enables people to take part in effective security management and practices
  • optionally: Elastic, Sharepoint Online

The following listing illustrates these requirements in a deeply technical fashion.

Configuration file example
[DEFAULT]
; for debugging
VERBOSE = FALSE

; GitHub Enterprise Server endpoints
WEB = https://githubserver.com
REST = https:/githubserver.comt/api/v3
GRAPHQL = https://githubserver.comt/api/graphql

; GitHub Enterprise access
GH_USER = None
; Token via env variable GH_TOKEN, no user needed

; no spaces between colons, use forward slash between org/repo
PROJECTS : TeamShop/WebShop_Front,TeamFrontend/web-libs,TeamAPI/api-gateway,TeamData/middleware-process

; dependency scan metrics
DEPENDENCIES = TRUE
; code scan metrics
CODE_CHECKS = TRUE

; pdf report generation
PDF_REPORTS = TRUE
;retain n reports, locally and in sharepoint archive
;set to zero to disable deletion
PDF_REPORTS_NUMBER_KEEP = 3

; add history timeline metrics for baseline 
COMPLIANCE_BASELINE_HIST = TRUE

; Slack Oauth token and Webhook via env
SLACK_SUMMARY = TRUE

; logging and uploading feature toggles
SHAREPOINT = FALSE
ELASTICSEARCH = TRUE

; config and access to Elasticsarch DB (REST)
ELASTIC_ENDPOINT = https://elasticsearchserver.com
ELASTIC_USER = svc_user
ELASTIC_INDEX = metrics-appsec.github
; PW via env variable ELASTIC_PW

; config and access to Sharepoint Online (REST)
SP_APPSEC_USER = [email protected],com
SP_APPSEC_ORG = https://company.sharepoint.com
SP_APPSEC_SPACE = https://company.sharepoint.com/sites/AppSec/
; PW via env variable SP_APPSEC_PW

APPSEC_BASELINE_DOC_LNK = https://confluence.company.de/display/APPSECBL
Environment bash file example
export SP_APPSEC_PW='longpw'

# svc_user_seccomp_reporting 
export ELASTIC_PW='longerpw'

# [email protected]
export GH_TOKEN='longtoken'
export SLACK_WEBHOOK_URL='https://hooks.slack.com/services/vr/blablabla/'
requirements.txt for Python
elasticsearch==7.12.0
Jinja2==3.0.2
matplotlib==3.5.1
pandas==1.3.5
pandasticsearch==0.6.0
requests==2.27.1
SharePlum==0.5.1
weasyprint==54.2

Screen Shot 2022-10-14 at 12.58.38

You can define the degree of verbosity you want to log into the GH Action backlog. Obviously, the respective projects can be opened and the individual findings with all the information are available. Otherwise, these would not be in the backlog. The data is the same.

Summary

  • Application security metrics with GH Advanced Enterprise Security and a little DataScience
  • Abstraction enables clearer reports on security expert topics
  • Automation (via Continuous Integration) is a powerful enable for security
  • Less is more

Appendix: implementation with commented code listing and AppSec DataScience walkthrough

The code isn’t shared as an OpenSource project because I won’t maintain it. It’s the to proof that the concept applies. Nothing more.

From Baseline to Application Security Process and Software Bill of Materials management / supply-chain security for real

GitHub Action integration

Pretty much like a cronjob with exposed logs. I leave the setup of the repository’s secret environment to you. :slight_smile:

name: SSH command for metrics generation and Slack notice

on:
  schedule:
    - cron: "0 7 * * 1-5"

jobs:

  build:
    name: Run analysis of code and dependencies and post results
    environment: REPORT_ENV_INTERN_1
    runs-on: [self-hosted, linux]
    steps:
      - name: executing remote ssh commands using ssh key
        uses: appleboy/[email protected]
        with:
          host: github-actions-runner-01.mgmt.xxx.zone 
          username: svc-ghsec
          key: ${{ secrets.SSH_KEY }}
          port: 22
          script: bash githubsec_bot.sh

This should be enough to understand the core principle: the Action just logs into the Runner, and executes SSH commands with a user. This is kept simple for a variety of reasons. One is, that PDF generation becomes difficult in Docker if you need pseudo-printers, fonts etc… Stupid stuff. If your env is Docker-only, fix that :slight_smile:

Think API first - security reports need to be consumable

Less is more. DevOps engineers (or equivalent roles) and managers are busy.

Libs

Mainly Pandas and Weasyprint are used for the “business-style” reporting. Again: less is more. No auditor / biz person needs multi-page AppSec reports.

Apart from libs within the standard Python >= 3.10:

  • Pandas is used to make data tabular and exportable
  • Weasyprint is used to create PDFs with a simple HTML template (optional import)
    • Jinja2 is used as the templating engine (optional import)
  • SharePlum for Sharepoint Online and report retention (optional import)

Optional libs are imported within the functions that implement the respective features. This way you can copy and paste, piece by piece. I attached the requirements.txt file for the Python AnaConda environment.

Global libs
import os, sys
from pathlib import Path
import configparser
import requests
import pandas as pd
import json
from datetime import date, datetime, timezone
"Helper functions
Helper functions
# global variables 
timestamp_day_epoch = int(datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0).timestamp())

# global helper functions
def get_tempdir():
    """
    Where is /tmp or the equivalent?
    :return:
    """
    import tempfile
    import platform
    from pathlib import Path
    tempdir = Path("/tmp" if platform.system() == "Darwin" else tempfile.gettempdir())
    return tempdir
  • timestamp_day_epoch marks the day in “epoch” time format.
  • the folder for temporary data may not reside in “/tmp” but the workflow is for Linux and macOS
# this is just for better commandline output
def set_pandas_output_opts():
    """
    Configures column display of DataFrame objects
    :return:
    """
    pd.set_option('display.max_rows', 500)
    pd.set_option('display.max_columns', 500)
    pd.set_option('display.width', 1000)

The Python code is going to use a switch-case statement, which is only available since version 3.10.

# ensure at least Python 3.10 is being used
def check_python_version():
    """
    Checks Python version dependency
    :return:
    """
    MIN_PYTHON = (3, 10)
    if sys.version_info < MIN_PYTHON:
        sys.exit("Python %s.%s or later is required.\n" % MIN_PYTHON)

Configuration of the CI workflow

  • This tool gets controlled with config.ini in the same directory (no command-line args)
  • GitHub’s static OAuth token resides in an environment variable
    • the (technical) user needs to be granted access to CodeQL and Dependabot results within the GH orgs
    • the user scope token inherits the privileges
  • Elasticsearch PW needs to be set as env variable
  • Sharepoint PW needs to be set as env variable

These can also be passed via Docker, but this workflow is simpler.

def read_config():
    """
    Config parser
    :return:
    """
    config = configparser.ConfigParser()
    config.read("config.ini")
    return config

Static Application Security Testing (1st party score metrics)

  • Wikipedia: SAST - code checks
  • Technology: GitHub CodeQL incl. GH advisory or internal severity rating
def get_sast_issues_score(verbose = False, hed = dict, scope="", org="", repo="", rest_api_endpoint=""):
    """
    Ask GitHub Sec API for data about the CodeQL findings and analyse it
    :param verbose: boolean, flag
    :param hed: dict, auth data
    :param scope: string, part of REST API call
    :param org: string, part of REST API call
    :param repo: string, part of REST API call
    :param rest_api_endpoint: REST API URL
    :return: code_score (int), code_severity_list (DataFrame statistical object)
    """

    import sys

    req = "/code-scanning/alerts"
    url = rest_api_endpoint + "/" + scope + "/" + org + "/" + repo + req
    print("CodeQL Repo URL: " + url, file=sys.stdout)
   
    response = requests.get(url, headers=hed)

    if response.status_code == 404 or response.status_code == 403:
        # CodeQL support issue for some languages
        status = { "Status" : "CodeQL disabled or unsupported language" }
        status_df = pd.DataFrame([status])
        print("CodeQL Status: " + "disabled or not supported for repo", file=sys.stdout)
        return 0, status_df
    
    parsed = json.loads(response.text)

    if verbose:
        print(json.dumps(parsed, indent=4, sort_keys=True))

    df = pd.DataFrame(parsed)
    
    if len(df) == 0 and response.status_code == 200:
        status = { "Status" : "No findings" }
        status_df = pd.DataFrame([status])
        print("CodeQL Status: " + "no findings for repo", file=sys.stdout)
        return 0, status_df

    if len(df) > 0 and response.status_code == 200:
        print("CodeQL Status: " + "processing findings for repo", file=sys.stdout)

    df = pd.concat([df.drop(['rule'], axis=1), df['rule'].apply(pd.Series)], axis=1)
    df['state'] = df['state'].astype(str)
    df["state"] = df["state"].str.lower()

    if verbose:
        print(df[["number", "name", "state", "severity", "security_severity_level"]])

    # filter out anything that's not "open"
    df_open = df.loc[df['state'] == 'open']
    code_severity_list = df_open["security_severity_level"].value_counts()

    if verbose: 
        print("Code Severity List (open)")
        print(code_severity_list)
        print("Code Severity Score (open)")
    code_score = set_score(severity_df=code_severity_list)
    
    # for better table style
    code_severity_list = code_severity_list.reset_index()
    code_severity_list.columns = ['Risk', 'Code Findings Reported']

    return code_score, code_severity_list

Assumption: closed issues are fixed, and / or not relevant anymore.

Dependency checks (3rd party score metrics)

  • SBOM stands for Sofware Bill Of Materials and it’s a term used for supply-chain security analysis in Application Security
    • equivalent here to Compositional Analysis Security Testing (CAST)
  • the score is based on GH’s advisories within GH Enterprise Dependabot
def get_sbom_issues_score(hed=dict, graphql_url="", verbose=False, repo="", org=""):
    """
    Ask GitHub Sec API for data about the Dependabot findings and analyze it
    :param hed: dict, auth data
    :param graphql_url: GraphQL endpoint
    :param verbose: boolean, flag
    :param repo: string, repository name
    :param org: string, org name
    :return: sbom_score (int), sbom_severity_list (DataFrame statistical object)
    """

    from string import Template
    
    
    # this is the GraphQL query for the API
    query_template = """
    {
    repository(name: "$repo", owner: "$org") {
        vulnerabilityAlerts(first: 100) {
            nodes {
                createdAt
                dismissedAt
                state
              	dismissReason
                securityVulnerability {
                    package {
                        name
                    }
                  	severity
                    advisory {
                        description
                    }
                }
            }
        }
    }
    }
    """
    
    print("Dependabot Repo: " + repo, file=sys.stdout)

    # prevent escaping the literal context of the graphql template
    if "\'" or "\"" not in repo + org:
        template = Template(query_template)
        query = template.substitute({ 'repo': repo, 'org': org})

    response_dp = requests.post(graphql_url, headers=hed, json={'query': query})
    parsed_dp = response_dp.json()["data"]
    # print(json.dumps(parsed_dp, indent=4, sort_keys=True))

    # making the data tabular is difficult because the JSON reply is nested multiple times
    df_deps = pd.json_normalize(parsed_dp)

    # we need to rename the columns because dots with table headers cannot get handled correctly
    cols = df_deps.columns.map(lambda x: x.replace('.', '_') if isinstance(x, (str)) else x)
    df_deps.columns = cols
    
    # a sub-section of the flattened JSON gets extracted
    sub_json = df_deps['repository_vulnerabilityAlerts_nodes'][0]

    # needed in case there are 0 issues and the HTTP status code is ok
    if len(sub_json) == 0 and response_dp.status_code == 200:
        status = { "Status" : "No findings" }
        status_df = pd.DataFrame([status])
        print("Dependabot Status: " + "no findings for repo", file=sys.stdout)
        return 0, status_df

    if len(sub_json) > 0 and response_dp.status_code == 200:
        print("Dependabot Status: " + "processing findings for repo", file=sys.stdout)

    # data with the findings needs to be re-framed
    dependabot_data = pd.DataFrame(sub_json)
   
    # data needs to vbe flattened again
    dependabot_issues = pd.json_normalize(pd.DataFrame.from_records(sub_json)["securityVulnerability"])
   
    # since the data is flattened and framed from JSON we need to normalize the types
    dependabotDf = pd.concat([dependabot_data["state"], dependabot_issues], axis=1)
    dependabotDf["state"] = dependabotDf["state"].astype(str)
    dependabotDf["severity"] = dependabotDf["severity"].str.lower()
    # print(dependabotDf)
    
    # column renamed again for this dataframe
    cols = dependabotDf.columns.map(lambda x: x.replace('.', '_') if isinstance(x, (str)) else x)
    dependabotDf.columns = cols
    
    # filter out anything that's not been treated (marked as dismissed in API)
    dependabot_severity_open_list = dependabotDf[dependabotDf['state'] == 'OPEN']
    print(dependabot_severity_open_list)

    if verbose:
        print("Software Components Issue List (open)")
    
    dependabot_severity_list = dependabot_severity_open_list["severity"].value_counts()
    
    if verbose:
        print(dependabot_severity_list)
        print("Software Components Severity Score (open)")
    
    sbom_score = set_score(severity_df=dependabot_severity_list)

    # for better table style
    dependabot_severity_list = dependabot_severity_list.reset_index()
    dependabot_severity_list.columns = ['Risk', 'Dependency Findings Reported']

    return sbom_score, dependabot_severity_list
  • Sadly this function is only available via GraphQL and not via the (more common) REST endpoint (onprem and cloud).
  • Assumption lasts on the same process: not open = not relevant

Optional: PDF report generation

  • generating PDFs from code can be difficult. Weasyprint requires OS components to do that (Pango)
    • also within a Docker container (which is why this CI workflow doesn’t get executed within Docker)
  • files will get overwritten
  • folders get created if they do not exist (recursively)
def make_pdf_report(reported_app_name="", codeql_table=pd.DataFrame, sbom_table=pd.DataFrame, code_score=0, sbom_score=0,
                filename="report.pdf", web_endpoint="", pdf_reports_to_keep=0, compliance_baseline_hist=False, appsec_baseline_lnk = "https://"):
    """
    Creates a PDF file to report the baseline conformance per project
    :param reported_app_name: Application name in the report
    :param codeql_table: CodeQL issues that were retrieved from GH Enterprise Security
    :param sbom_table: Dependabot issues that were retrieved from GH Enterprise Security
    :param code_score: SAST score metric
    :param sbom_score: SBOM score metric
    :param filename: outout filename (convention applies)
    :param web_endpoint: URL to GitHub Enterprise repository
    :param pdf_reports_to_keep: number of PDF files to keep in local folder
    :param compliance_baseline_hist: Boolean, add chart feature toggle
    :param appsec_baseline_lnk: URL to documented AppSec Baseline
    :return:
    """

    from jinja2 import Environment, FileSystemLoader
    from weasyprint import HTML, CSS
    import glob
    import os

    # start score for the baseline conformance measurement
    baseline_start_score = 100
    
    # use template files local directory (should also work on Windows)
    env = Environment(loader=FileSystemLoader('.'))
    template = env.get_template("report.html")
    
    # create reports folder if not exists (untested for Windows, but should work)
    # subfolders according to GH org and repo structure
    output_directory = "./reports/" + reported_app_name + "/"
    Path(output_directory).mkdir(parents=True, exist_ok=True)
    
    # scoring results
    code_score_result = baseline_start_score - code_score
    # minimum for pdf report is 0
    code_score_result = code_score_result if code_score_result > 0 else 0
    sbom_score_result = baseline_start_score - sbom_score
    sbom_score_result = sbom_score_result if sbom_score_result > 0 else 0
    
    marker = ""
    if code_score_result < 75 or sbom_score_result < 75:
        marker = "_fail_"
    else:
        marker = "_pass_"
        
    # adapt filename based on result
    filename_elements = filename.split(".")
    filename = filename_elements[0] + marker + "." + filename_elements[-1]
    
    # supply data to the template
    template_vars = {"reported_app" : reported_app_name,
                     "gh_web_url" : web_endpoint + "/" + reported_app_name + "/security",
                     "codeql_table": codeql_table.to_html(index=False),
                     "sbom_table" : sbom_table.to_html(index=False),
                     "code_score" : code_score_result,
                     "sbom_score" : sbom_score_result,
                     "report_date" : date.today(), # readable date
                     "baseline_plot" : str(get_tempdir()) + os.sep + reported_app_name.replace("/", "_") + "_" "appsec.png",
                     "baseline_doc" : appsec_baseline_lnk
                     }
        
    
    html_out = template.render(template_vars)
    pdf = HTML(string=html_out).write_pdf(stylesheets=[CSS("typography.css"), CSS("pdf.css")])
    f = open(output_directory + filename, 'wb')
    f.write(pdf)
    f.close()
    
    if pdf_reports_to_keep >= 0:
        # glob match all pdfs in the sub-directory and keep only the newest reports (configured number)
        files = sorted(glob.iglob(output_directory + "*.pdf"), key=os.path.getctime, reverse=True)
        
        # keep a configured number of pdfs locally
        if len(files) > pdf_reports_to_keep:
            for file in files[-pdf_reports_to_keep:]:
                os.remove(file)
  • There are simpler options with Splunk or Elasticsearch (later). Let’s assume this is what is available at a minimum. It will be enough for ISO 27k audits etc.
Optional: create the baseline plot for the project report

Prepares plot only, embedding in PDF is separate

  • Queries Elasticsearch DB with a “range” type query to retrieve up to 30d of data from the dedicated index for the AppSec metrics
  • uses a technical user
  • data gets converted into a timeline to show the history of the baseline conformance (trend)
  • pivot table per project in the retrived data set
  • Plot gets exported as PNG into the OS’s temp file system (static)
  • Plot gets optimised for readability (sadly the code must be verbose)
def prepare_baseline_plot_from_elastic_data(username = "", pw = "", index="", elastic_ep = "", project="org/repo"):
    """
    Creates chart / plot from Elasticsearch data for visualization
    :param username: Elasticsearch user (ELASTIC_USER in config.ini)
    :param pw: Elasticsarch password (ELASTIC_PW via env)
    :param index: Elasticsearch index name (
    :param elastic_ep: Elasticsearch endpoint
    :param project: GitHub org / repository name from the GH Enterprise server
    :return:
    """
    import pandas as pd
    from elasticsearch import Elasticsearch
    from pandasticsearch import Select
    import matplotlib.pyplot as plt
    import matplotlib.ticker as ticker
    import os
    
    # sub-function for data retrieval from Elastic
    def get_elastic_data_timeline():
        """
        Retrieve stored data from Elasticsearch with timestamps
        :return: data from Elasticsarch as a DataFrame object
        """
        es = Elasticsearch(elastic_ep, 
                       http_auth=(username, pw))

        # hardcoded 30 days time period here using the separate index
        result_dict = es.search(index=index, body={
          "query": {
            "range": {
              "@timestamp": {
                "format": "strict_date_optional_time",
                "gte": "now-1M",
                "lt": "now"
              }
            }
          }
        })


        # (!) if we expect more than 10k hits we have to set the scoll option and loop
        return Select.from_dict(result_dict).to_pandas()
    
    # apply data type conversions, date normalization 
    def massage_elastic_data():
        """
        Data transformation on DataFrame object for timeline analysis
        :return:
        """
        pandas_df["@timestamp"] = pd.to_datetime(pandas_df["@timestamp"])
        pandas_df["code-score"] = pandas_df["code-score"].astype(int) 
        pandas_df["sbom-score"] = pandas_df["sbom-score"].astype(int)
        pandas_df["project"] = pandas_df["project"].astype('category')
        return pandas_df
    
    # make a pivot timeline per project in the timeline
    def make_pivot_timelines():
        """
        Pivot table per project over indexed DataFrame
        :return:
        """
        pandas_df.set_index("@timestamp", inplace=True) # timestamp becomes the index
        return pandas_df[["sbom-score", "code-score", "project"]].groupby("project") # pirvot table with timeline index per project
        
    
    # function to create the baseline plot PNGs 
    def create_baseline_plot_to_tmpdir(grp_df = pd.Series, repo_string = "org/repo"):
        """
        Creates plot as PNG to embed it in the PDF report
        :param grp_df: grouped DataFrame object with data from multiple projects
        :param repo_string: GitHub org / repository string
        :return: writes PNG to OS temporary file system
        """

        for name, project_df in grp_df:
            
            # get temp files dir
            tempdir = get_tempdir()
            plot_output_path = str(tempdir) + os.sep + name.replace("/", "_") + "_" "appsec.png"
            
            # plot if enough values exist in the metrics index
            if len(project_df.index) < 7:
                with open(plot_output_path, 'a'):
                    os.utime(plot_output_path, None)
            
            elif name == repo_string:

                # the plot config is verbose
                ax = project_df.plot(kind="bar", ylim=(0,100), 
                                     y=["sbom-score", "code-score"], figsize=(16, 8), 
                                     title=name, edgecolor='white', linewidth=3)

                # x axis config:
                ticklabels = ['']*len(project_df.index)
                # Every 3rd ticklablel shows the month and day
                ticklabels[::3] = [item.strftime('%b %d') for item in project_df.index[::4]]
                # Every 6th ticklabel includes the year
                ticklabels[::6] = [item.strftime('%b %d\n%Y') for item in project_df.index[::12]]
                ax.xaxis.set_major_formatter(ticker.FixedFormatter(ticklabels))
                plt.gcf().autofmt_xdate()

                # ax.invert_xaxis()

                ax.axhline(y=75, c='red', linestyle='dashed')

                plt.xlabel('Date →')
                plt.ylabel('←worse-     Test score    -better→')
                ax.legend(["Baseline (min 75)", "Code Dependencies", "Inhouse Code"], 
                          loc='center left', bbox_to_anchor=(1, 0.5), fancybox=True, shadow=True)

                # save plot as PNG to embed it in the PDF later
                # avoid /s during file creation
                # overwrite with latest results
                ax.figure.savefig(plot_output_path, bbox_inches = 'tight')

    # data flow
    pandas_df = get_elastic_data_timeline()
    pandas_df = massage_elastic_data()
    pandas_df = make_pivot_timelines()
    create_baseline_plot_to_tmpdir(grp_df = pandas_df, repo_string = project)   
  • ideally instead of an Elasticsearch DSL range query you rewrite the code to use the SQL feature. For data-streams it became available just recently. The DSL query assumes that there is a separate index for this data.

Logging (Dashboard data for Splunk or Elasticsearch)

  • data gets logged as JSON and forwarded to Elasticsearch with Filebeat
    • this updates the Kibana Dashboard that collects security metrics on the (Micro)Services
    • the server github-actions-runner-01.mgmt.xxx.zone hosts the deployment

→ Our AppSec Dashboard (Kibana) uses these values (changes need to be made in code and in the dashboard widgets)

def make_logs_for_elastic(project_name="none", code_score=0, sbom_score=0, filepath="none"):
    """
    Log writer for Elasticsearch FileBeat forwarder
    :param project_name: identifier for the project ("org/repo")
    :param code_score: integer, metric
    :param sbom_score: integer, metric
    :param filepath: log sink (JSON file, tracked)
    :return:
    """

    # last value per day counts in Dashboard (use maximum score in day bracket)
    timestamp = datetime.today().strftime("%s") 

    log_dict = {
        "date" : timestamp,
        "project": project_name,
        "code-score": 100 - code_score,
        "sbom-score": 100 - sbom_score
    }

    # create logs directory if it does not exist
    # hardcoded path for Beats
    Path(str(Path.home()) + "/logs/").mkdir(exist_ok=True)
    
    # append log lines (no rotation given the low data volume for statistics here)
    # file obj gets closed automatically by Python
    with open(filepath, 'a') as log_sink:
        log_sink.write(json.dumps(log_dict) + "\n")

Scoring function (metrics generator)

  • the formula is like in a typical exam: you start with 100 points and for each issue there is a deduction

    • it’s the same for code and dependency checks
    • the score is per test method
    Score Test Result
    >95+ A
    90-94 B
    85-89 C
    75 - 84 D
    <75 FAIL
  • the more severe, the more we deduct

  • this counts only open issues. The assumption is that all open issues are real and pose risk.

    • within GH we can easily close / mark as accepted / state that there is “no bandwidth to fix” etc.
def set_score(severity_df : pd.Series):
    """
    Helper function to claclulate the score metrics
    :param severity_df: DataFrame object with statistics based on GH Sec issues
    :return: malus (how much to deduct from the optimal start value)
    """

    malus = 0

    for key, value in severity_df.iteritems():
        key = key.lower()

        # this requires python 3.10
        match key:
            case "critical":
                malus = malus + 4 * value
            case "high":
                malus = malus + 3 * value
            case "moderate":
                malus = malus + 2 * value
            case "medium":
                malus = malus + 2 * value
            case "low":
                malus = malus + 1 * value
            case _:
                sys.exit("Unknown key:" + key)

    return malus

Report management (Sharepoint document retention + draw regular baseline and file it)

Report management (Sharepoint document retention + draw regular baseline and file it)

  • Store the most recent reports in Sharepoint (./Current / GH org / repo / naming_convention.pdf)

  • files will be synced from local ./reports sub-directory

  • the remote retention will be the same based on remote file count for Current

  • Weekly “Monday baseline” report will be archived (Weekly / GH org / repo / YYYY-CW) based on Pass / Fail count per Org with repo

  • retained up to 4 month (1 quarter)

  • Email Notifications via Microsoft 365 PowerAutomate via Sharepoint Online (no SMTP setup here)

  • Email notification profile: if baseline not met by service mail repo owner

def retain_reports_in_sharepoint(sharepoint_appsec_org="", sharepoint_appsec_space="", sharepoint_appsec_user="", sharepoint_appsec_pw="", sharepoint_space_report_folder="Current"):
    """
    This function rotates the PDF reports in a dedicated Sharepoint file system
    :param sharepoint_appsec_org: string, MS Office 365 org
    :param sharepoint_appsec_space: string, Sharepoint within the org
    :param sharepoint_appsec_user: string, username to programmatically access the Sharepoint
    :param sharepoint_appsec_pw: string, from env SP_APPSEC_PW, authentication data
    :param sharepoint_space_report_folder: base folder for the PDFs
    :return:
    """


    from shareplum import Site
    from shareplum import Office365
    from shareplum.site import Version
    from pathlib import Path, PurePath
    import os

    
    def list_local_reports_for_sharepoint_upload():
        """
        list local files and put them in lists depending on the pass / fail state
        :return: file_pass_list, file_fail_list
        """
        
        file_pass_list = list()
        file_fail_list = list()

        # "./reports" sub-dir is hardcoded here
        for path, subdirs, files in os.walk("./reports"):
            for name in files:
                # filter out local files not intended for the sync
                # these are dev files or not reports because they are not PDFs
                if "checkpoint" not in name and ".pdf" in name:
                    if "_fail_" in name:
                        file_fail_list.append(PurePath(path, name))
                    if "_pass_" in name:
                        file_pass_list.append(PurePath(path, name))
        
        # Debug
        for item in file_fail_list:
            print("> Prepping reports for upload (fail)",  file=sys.stdout)
            print("\t" + str(item))
        
        for item in file_pass_list:
            print("> Prepping reports for upload (pass)",  file=sys.stdout)
            print("\t" + str(item))
        
        return file_pass_list, file_fail_list
            
    
    def upload_report_pdfs(file_list : list, status=""):
        """
        Data upload to Sharepoint
        :param file_list: list of files to upload
        :param status: Pass or Fail
        :return:
        """

        auth_cookie = Office365(sharepoint_appsec_org,
                               username=sharepoint_appsec_user, 
                               password=sharepoint_appsec_pw).GetCookies()
        site = Site(sharepoint_appsec_space, version=Version.v2016, authcookie=auth_cookie)

        
        for report_file_path in file_list:
         
            # "Current" is the folder name, also for PowerAutomate, therefore default
            sharepoint_space_sub_folder = sharepoint_space_report_folder + "/" + status # + "/"
            # sharepoint_space_sub_folder = sharepoint_space_sub_folder + str(report_file_path).split(os.sep)[1] + "/" + str(report_file_path).split(os.sep)[-1]
        
            # print(sharepoint_space_sub_folder)
        
            folder = site.Folder("Shared Documents/" + sharepoint_space_sub_folder)
            
            # example value:
            # report_file_path = "reports/pink/customer-account-service/customer-account-service_1648080000_pass_.pdf"
            report_file_name = str(report_file_path).split(os.sep)[-1]
            # print(report_file_name)

            with open(report_file_path, mode='rb') as file:
                    file_content = file.read()
                    folder.upload_file(file_content, report_file_name)

    
    # execution flow
    file_pass_list, file_fail_list = list_local_reports_for_sharepoint_upload()
    
    upload_report_pdfs(file_pass_list, status="Pass")
    upload_report_pdfs(file_fail_list, status="Fail") 

Weekly report and Baseline pass / fail assessment incl. Slack summary

  • Table in Slack with (n top, bottom) projects that pass / fail
def weekly_report_baseline_assess(slack_result_dict : dict(), slack_webhook_url = ""):

    import os
    from os.path import exists
    from datetime import date
    from tabulate import tabulate
    import pandas as pd

    lb = os.linesep

    def log_to_slack(slack_result_dict : dict(), slack_webhook_url=""):

        datestamp = date.today()

        print("> Prepping Slack summary",  file=sys.stdout)

        headers = ['Code score', 'Dependency score']
        # sorted by code score, then by sbom score
        data = pd.DataFrame.from_dict(slack_result_dict).T
        data.columns = headers
        data = data.sort_values(headers, kind='quicksort')

        # results of last check (path to txt file)
        prior_results_path = str(get_tempdir()) + os.sep + "prior_ghsec_report.txt"
        

        # 0 is for Monday == inventory day
        if datestamp.weekday() == 0:
            print(tabulate(data, headers, tablefmt='psql'),  file=sys.stdout)
        else:
            # only filter data when it's not Monday
            data = data.loc[ (data['Code score'] < 99) | (data['Dependency score'] < 99) ]

            # if possible perform differential data frame analysis and report changes only
            if exists(prior_results_path):
                # prior results have been saved and can be used for diffing
                current_results_path = str(get_tempdir()) + os.sep + "current_ghsec_report.txt"
                current_results = open(current_results_path, 'w')
                print(tabulate(data, headers),  file=current_results)
                current_results.close()

                with open(prior_results_path, 'r') as result_before:
                    with open(current_results_path, 'r') as result_current:
                        
                        lines_result_before = result_before.readlines()#[:3]          
                        lines_result_current= result_current.readlines()#[:3]
                        
                        diff = set(lines_result_current).difference(lines_result_before)
                        
                        if len(diff) <= 0:
                            return
                # update result status for next diff
                os.rename(current_results_path, prior_results_path)
             
        print(tabulate(data, headers, tablefmt='psql'),  file=sys.stdout)
        
        slack_text = lb + str(datestamp) + lb
        slack_text+= tabulate(data, headers=headers)
        # replace prior results txt with slack txt
        with open(prior_results_path, 'w') as prior_results_file:
            prior_results_file.write(slack_text)

        # post data as code block because Slack offers no tables feature in its markup
        slack_post_datatable = {"text": """```""" + slack_text + """```"""}

        requests.post(
            slack_webhook_url, data=json.dumps(slack_post_datatable),
            headers={'Content-Type': 'application/json'}
        )

    log_to_slack(slack_result_dict, slack_webhook_url)
  • this is kind of hacky: Slack doesn’t allow you to post tables in its markup, but you can use tabulate to generate ASCII formatted tables and post these using the code tags. That results in a monospaced message. Fair enough imho.

Main function

Just some glue. If Excel had some better automation capabilities…

def main():

    import argparse

    # setup and test environment if it's suitable
    check_python_version()
    # wide screen terminal output settings for pandas
    set_pandas_output_opts()

    # read config (before cmd flags to easily override the values)
    # convention:
    # * secrets only via os environment
    # * static content via config file

    cfg = read_config()

    # use config variables 
    verbose = cfg.getboolean('DEFAULT', 'VERBOSE')
    ...
    # setup GitHub static OAuth 
    auth_token = os.environ["GH_TOKEN"]
    hed = {'Authorization': 'Bearer ' + auth_token}

    # read feature flags from config
    sast_enabled = cfg.getboolean('DEFAULT', 'CODE_CHECKS')  # code anaysis
   


    # Here starts the batch processing in 2 levels: per project and for all
    ##########################################################
    # per project result processing actions
    # for each project perform the metrical security analysis
    for project in projects.split(','):

        org_and_repo = project.split("/")

        if verbose:
            print(org_and_repo)

        if slack_enabled:
            # lift result variables to higher scope and set default value
            code_score = 0
            sbom_score = 0

        org = org_and_repo[0]
        repo = org_and_repo[1]

        ### call functions handling tasks using the retrieved parameters

        # sast stands for 
        # https://en.wikipedia.org/wiki/Static_application_security_testing 
        if sast_enabled:
            code_score, code_severity_list = get_sast_issues_score(verbose=verbose,
                                                                   hed=hed,
                                                                   scope="repos",
                                                                   org=org,
                                                                   repo=repo,
                                                                   rest_api_endpoint=rest_api_endpoint)

            if verbose:
                print("\n\n")

        # sbom stands for 
        # https://en.wikipedia.org/wiki/Software_bill_of_materials 
        # this is equivalent to software composition analysis here
        if deps_enabled:
            sbom_score, dependabot_severity_list = get_sbom_issues_score(hed=hed,
                                                                         repo=repo,
                                                                         org=org,
                                                                         graphql_url=graphql_api_endpoint,
                                                                         verbose=verbose)

            if verbose:
                print("\n\n")

        # Slack integration sends a digest of the test results
        # here we can only collect one result at a time into a globally defined dict
        if slack_enabled:
            slack_result_dict[project] = [ 100 - code_score, 100 - sbom_score ] 
            

        # pdf reports hold metrics per project and link to the findings in GitHub
        # this is to enable reviews
        if pdf_reports_enabled:
            if compliance_baseline_hist:
                prepare_baseline_plot_from_elastic_data(username=elastic_user,
                                                        pw=elastic_pw,
                                                        index=elastic_index,
                                                        elastic_ep=elastic_endpoint,
                                                        project=project)

            make_pdf_report(reported_app_name=org + "/" + repo,
                            codeql_table=code_severity_list,
                            sbom_table=dependabot_severity_list,
                            code_score=code_score,
                            sbom_score=sbom_score,
                            web_endpoint=web_endpoint,
                            filename=repo + "_" + str(timestamp_day_epoch) + ".pdf",
                            pdf_reports_to_keep=pdf_reports_to_keep,
                            compliance_baseline_hist=compliance_baseline_hist,
                            appsec_baseline_lnk=appsec_baseline_doc_lnk)

        # kibana holds the metrics for up to 1 year (statistics, baseline compliance)
        # these stats can be retrieved for the report to account for the baseline over time
        # credentials for this are in Filebeat (separate forwarder)
        if elasticsearch_logging_enabled:
            make_logs_for_elastic(project_name=org + "/" + repo,
                                  code_score=code_score,
                                  sbom_score=sbom_score,
                                  filepath=str(Path.home()) + "/logs/dashboard.json")

    ##########################################################
    # global result processing actions
    # sharepoint rotates recent reports for reference
    # this can work in conjunction with Kibana to add a baseline compliance history
    if sharepoint_enabled:
        retain_reports_in_sharepoint(sharepoint_appsec_org=sp_appsec_org,
                                     sharepoint_appsec_space=sp_appsec_space,
                                     sharepoint_appsec_user=sp_appsec_user,
                                     sharepoint_appsec_pw=sp_appsec_pw,
                                     sharepoint_space_report_folder="Current")
        
    if slack_enabled:
        weekly_report_baseline_assess(slack_result_dict, slack_webhook_url)   
  • it’s really just batch processing with CI

Technical summary

The code is basic. The use case is basic. Basics matter.

Further reading

  • A more sophisticated policy-oriented scoring scheme is available here.
  • Technical compliance and security management approaches are available here.