SOC – Testing Microsoft Sentinel Analytic rules at scale

By Iwan Timmer and Sjoerd van den Bedem

Summary

As a SOC we are continuously improving our detection rules to improve coverage and false positive rates, all while new customers keep coming in. Working at scale is key. To ensure our improvements do not introduce regressions we want to continuously test our detection rules without human interaction. In this case: testing Microsoft Sentinel Analytic rules across our entire use case library during development and pre-deployment. There was no public solution available, so we had to build our own testing framework.

This article shows our main challenges during the development of our test framework and how we solved them. In technical terms: we demonstrate a test method for Microsoft Sentinel’s (KQL) Analytic rules, utilizing Python’s built-in unittest framework, based on a Terraform plan.

Sentinel and Terraform setup

In our Northwave Security Operation Center (SOC) we are monitoring our customers by developing and deploying detection rules which create alarms on specific behaviors. The alarms are then analyzed and handled by our analysts.
For most customers we use Microsoft’s Sentinel as SIEM (Security Information and Event Management; storing and analyzing log data) in which we deploy our detection rules. Sentinel provides us with cloud-based log storage and methods to query the logs for cyberattacks. Queries can be run upon request or scheduled as Analytics rules and are written in the KQL language. When an Analytic rule finds a match in the log data a Sentinel alarm is created.

To deploy Sentinel Analytic rules at scale Northwave utilizes Terraform, an infrastructure as code tool. Terraform gives us the ability to quickly spin up resources like Sentinel workspaces and deploy the Analytic rules automatically, without any interaction. The desired environment is described with a list of rules in simple Terraform configuration files. All Sentinel Analytic rules the SOC uses are described in this Terraform configuration format. The configuration files are stored in Git for version control and auditing.

Deploying changes configured with Terraform is a two-stage process: first a Terraform “plan” will determine if resources need to be modified, created, or deleted; then a Terraform “deploy” will make the previously determined changes.

The Information flow:

Terraform lookups

Terraform enables us to share and re-use configuration across use cases and customers, helping us scale. A Terraform detection rule definition is mostly a template with references to other configuration variables or methods, inserting all technical details and tuning for a specific customer. The configuration is written in a semi KQL statement, which is not directly valid in Microsoft Sentinel. At “plan” time Terraform builds the actual KQL Analytics Rule by following all references and inclusions, which is valid KQL.

For illustration see the simplified Terraform code below. Describing a generic detection rule regarding “Azure ATP”. All var, module, local definitions are included from elsewhere in Terraform.

Example Terraform:

let variables = externaldata(excluded_computers: dynamic) [[email protected]’${module.ucvarsblob.url}${var.token}’] with(format='multijson');
let excluded_computers = variables | project excluded_computers;
union ${local.query_workspaces_string}
| where ProviderName == "Azure Advanced Threat Protection"
| where AlertSeverity in ("High", "Critical")
${chomp(module.allowlist_generator.allowlist_insert)}

The references make it possible to easily include customer provided data or reuse “allow list” mechanisms, but hampers testing as there is no valid KQL to test yet. Simply writing customized valid KQL for every individual instance undermines our scaling philosophy: reuse as many code and components to ensure quality and consistency.

To have a valid KQL query to test we need to extract the resolved KQL-statement from Terraform’s “plan”. Luckily the Terraform plan can be exported in a JSON format.

Test setup

Time to write our test framework. As we use Python a lot at Northwave, we implemented our testing framework on top of the Python Unit Testing Framework [i], aka unittest. This gives us a nice Pythonic way of writing tests, which is easy to understand.

Definition time: Our goal is to run the KQL queries from the Terraform configuration, with minimal modification, in a real Sentinel Workspace, with data-input we control, and check if the rule generates an alarm or not. Test cases should succeed within seconds per test case.

The first step is to setup an empty test Sentinel Workspace through Terraform so all KQL queries are “missing”. This empty workspace will later also be used to run our test queries.

Then we generate a Terraform plan, which contains the complete KQL Analytic rules to be created. A Terraform plan consist of all the changes that Terraform thinks are necessary to migrate the current state of an environment to the requested state. In our case the current state is an empty Sentinel workspace and the requested state is one with all Analytic rules deployed.

Due to technical reasons, next to every KQL rule we deploy a JSON file to the Azure Blob Storage[ii] with Terraform as well. This JSON can then be referenced in our query, for instance containing a list of events to be filtered out. Terraform will output both the templated KQL and the JSON blob when generating a plan.

Terraform test config

When we have our Sentinel workspace, we can add the rules we want to test to our Terraform configuration. An example of a Terraform configuration for a detection rule in our setup may look like:

module test_uc_1234 {
  source = “../abstract-uc-modules/abstract-ucs/abstract-uc-1234”
  customer_data = module.test.customer_data
  use_case_id = 1234
  excluded_computers = [“testexclude”]
}

This short statement instructs Terraform to take the templated KQL from “source” and fill it with the other data definitions. This generates the following resources in the Terraform plan:

KQL:

let variables = externaldata(excluded_computers: dynamic) [[email protected]'https://example.blob.core.windows.net/example-sac/CUST_uc123.json'] with(format='multijson');
let excluded_computers = variables | project excluded_computers;
workspace("c162847b-5f5c-424c-ab24-a2e95c93168d").SecurityAlert
| where ProviderName == "Azure Advanced Threat Protection"
| where AlertSeverity in ("High", "Critical")
| where Computer !in (excluded_computers)

With KQL syntax highlighting:

Blob:

{“excluded_computers”:[“testexclude”]}

Note that we must make some changes to the KQL in our test code to ensure a query does not depend on yet unavailable data. For example, the reference to the JSON blob.

Test case definition

Let’s move on to the test case definition: A Python unittest definition consisting of a Python class that will set up the test environment and has one or multiple test functions. The setup part is implemented in the SentinelTestCase class and will setup the connection to the Sentinel test workspace. Every test function consists of one or more assert functions that will test an input against an expected output. If an assert function fails, the whole test function will be stopped and marked as failed and the next test function will be run.

from sktf.test import SentinelTestCase
from sktf.kql import time


class TestAzureATP(SentinelTestCase):
    def test_high_critical(self):
        (_, rules, blob) = self.get_uc_by_abstract(1234)

        self.assertAlarmsEqual(self.workspace.run_rules(rules, blob, time(300), {
            "SecurityAlert": [
                {"TimeGenerated": time(0), "ProviderName": "Azure Advanced Threat Protection", "AlertSeverity": "Critical", "Computer": "computer01"},
                {"TimeGenerated": time(0), "ProviderName": "Azure Advanced Threat Protection", "AlertSeverity": "High", "Computer": "computer01"},
                {"TimeGenerated": time(0), "ProviderName": "Azure Advanced Threat Protection", "AlertSeverity": "Low", "Computer": "computer01"},
                {"TimeGenerated": time(0), "ProviderName": "Azure Advanced Threat Protection", "AlertSeverity": "High", "Computer": "textexclude"},
                {"TimeGenerated": time(0), "ProviderName": "MCAS", "AlertSeverity": "Critical", "Computer": "computer02"},
            ]
        }), [
            {"Computer": "computer01", "AlertSeverity": "Critical"},
            {"Computer": "computer01", "AlertSeverity": "High"}
        ])

With Python syntax highlighting:

The get_uc_by_abstract function performs the logic to extract the rule details and the content of the blobs from the Terraform plan. This part is very specific to how we have structured our rules in Terraform. The important part is that it returns the KQL and blob content from the Terraform plan.

Mitigating external calls

Let’s get back to the external calls in the original KQL:

let variables = externaldata(excluded_computers: dynamic) [[email protected]'https://example.blob.core.windows.net/example-sac/CUST_uc123.json'] with(format='multijson');
let excluded_computers = variables | project excluded_computers;
workspace("c162847b-5f5c-424c-ab24-a2e95c93168d").SecurityAlert
| where ProviderName == "Azure Advanced Threat Protection"
| where AlertSeverity in ("High", "Critical")
| where Computer !in (excluded_computers)

In the KQL we marked all the parts yellow that are referencing tables and external data that is not available. To be able to test this query against the provided events in the unittest and use the non-existing externaldata blob, the run_rules functions in our test code will perform some transformations on the query before it is sent to the LogAnalytics query API.

Normally you would query a table from the LogAnalytics workspace underlying Microsoft Sentinel. But it takes some time for LogAnalytics to process new data if you would upload it and it will only store data for a certain amount of time. So instead of querying the table, the provided events will be converted to an inline datatable statement and all references to the table will be replaced in the query. The run_rules function creates a new variable for the Table events as in some rules the same Table is referenced multiple times. A datatable also needs the value types of all the columns. Therefore run_rules will query Sentinel to get the table schema of the referenced tables and fill in None for all columns which were not filled in by the provided event data. We also replace the externaldata statements with an inline datatable statement. This externaldata reference imports our JSON with suppression information. We do not use an additional variable as every externaldata source is only used once in every query.

To make the test as real as possible we also replicate some of the Analytics Rule query behavior of Sentinel, and therefore also take the query period into account. We do this by checking if the TimeGenerated field of the event data is between a certain period. We also explicitly set the query_now variable (third parameter of the run_rules function) to have this query run at a fixed point in time. The resulting query will now look like the code below and will no longer referencing any data outside of the query itself. For clarity the added lines are marked green, and the changed parts are marked yellow.

set query_now = datetime(2022-01-01 00:00:05);
let _SecurityAlert = datatable(TimeGenerated:datetime, ProviderName:string, AlertSeverity:string, Computer:string) [
    datetime(2022-01-01), "Azure Advanced Threat Protection", "Critical", "computer01",
    datetime(2022-01-01), "Azure Advanced Threat Protection", "High", "computer01",
    datetime(2022-01-01), "Azure Advanced Threat Protection", "Low", "computer01",
    datetime(2022-01-01), "Azure Advanced Threat Protection", "High", "testexclude",
    datetime(2022-01-01), "MCAS", "High", "computer01",
] | where TimeGenerated between (ago(10m) .. now());
let variables = datatable(excluded_computers:dynamic) [
    dynamic(["testexclude"])
];
let excluded_computers = variables | project excluded_computers;
_SecurityAlert
| where ProviderName == "Azure Advanced Threat Protection"
| where AlertSeverity in ("High", "Critical")
| where Computer !in (excluded_computers)

While there is a C# and JavaScript parser available for KQL, we use regex to find and replace the relevant parts. For our use case this worked well enough. Since LogAnalytics is a closed source cloud solution, we must run it using the LogAnalytics API. And a workspace specification is required although no actual data from the workspace is queried. Hence we create the Sentinel workspace through Terraform.

Test execution

The modified KQL query is directly executed in the (empty) Sentinel workspace created earlier. The result from the KQL query is then checked in the unittest Test Case. The assertAlarmsEqual() function will check if the returned results in the expected list of alarms. A test will fail if more or less than the number of expected alarms is returned. In this case we expect a high and critical alarm for the computer computer01.

Performance

Running tests is pretty quick. Our testing Terraform plan currently takes under 15s and we average a 1 second duration per KQL query test.

Conclusion

We succeeded in creating a testing framework for our Terraform deployed Microsoft Sentinel Analytic rules. Run-time performance is good while using Python unittest gives us flexibility and a familiar approach. With the testing framework we deploy our changes with confidence across customers. The test suite caught bugs in our more complex rules during development, the litmus test of every test suite. Mission accomplished.