deep_walk

deep_walk

August 19, 2023

deep_walk Overview #

I’m usually pretty self-conscious about my code and given that I have yet to write a blog post about my coding adventures, so here goes!

deep_walk was the name of Python helper function I wrote as an addition to Panther’s global helpers which was tailored specifically to handle arbitrarily nested dictionary structures. This was in response to an edge-case encountered by a customer trying to parse logs where a key contained a list of dictionaries. The current pattern was to use the deep_get function which essentially calls .get on nested dictionaries which is inadequate for lists. To work around this limitation, a user would need to use nested deep_get calls with the inner call accessing the contents of the list which can cause issues when no list elements are present, thus requiring additional validations.

Eessentially, the following would need to be done:

a_value: Any = deep_get(deep_get(a_dict, "list_key", default=[])[0], "desired_key", default="<NOT FOUND>")

But as mentioned, handling an empty list would need to be considered since [0] would break on an empty list:

list_key: List[Any] = deep_get(a_dict, "list_key", default=[])

# `if list_key` would also work
if len(list_key) > 0:
    a_value: Any = deep_get(list_key[0], "desired_key", default="<NOT FOUND>")

When I saw this, I decided that I would hack around on a better approach to this problem and came up with the deep_walk implementation. I set out to improve on deep_get’s functionality and to cover dictionary structures where said dictionaries could potentially contain any or all of the following (even nested keys containing of these types):

  • Keys with string or integer values
  • Keys with dictionary values
  • Keys with list values

Within the latter two, further nesting of dictionaries and lists was also possible and each of these nested elements could also be empty (either directly or within another nested key). The goal of this function is to search the structure for the value of a given key and return its value if present or return a default value if not. While seeminly straightforward in theory, handling the various data types and ensuring that each branch of the structure was appropriately searched resulted in some interesting logistical choices. I also wanted to resiliently handle the very unlikely scenario that an empty list of nested empty lists was handled such that the default value was returned in cases like this. To top it all off, I wanted to include three modes of functionality:

  • Return the first value found (if it exists)
  • Return the last value found (if it exists)
  • Return all values found as a list (if they exist)

To outline the general requirements (in no specific order):

  • Take a dictionary object and search its structure for a desired value
    • Return the value if found
    • Return the default value if not found
  • Recursively handle nested dictionaries by using the get method
  • Iterate through nested lists and apppriately handle the types found within
  • Safely handle empty lists or lists of infinitely many empty lists
  • Safely handle None values

The Function Itself #

Code #

Without further ado, the code (with comments removed for brevity) is as follows:

def deep_walk(
    obj: Optional[Any], *keys: str, default: Optional[str] = None, return_val: str = "all"
) -> Union[Optional[Any], Optional[List[Any]]]:
    def _empty_list(sub_obj: Any):
        return (
            all(_empty_list(next_obj) for next_obj in sub_obj)
            if isinstance(sub_obj, Sequence) and not isinstance(sub_obj, str)
            else False
        )

    if not keys:
        return default if _empty_list(obj) else obj

    current_key = keys[0]
    found: OrderedDict = OrderedDict()

    if isinstance(obj, Mapping):
        next_key = obj.get(current_key, None)
        return (
            deep_walk(next_key, *keys[1:], default=default, return_val=return_val)
            if next_key is not None
            else default
        )
    if isinstance(obj, Sequence) and not isinstance(obj, str):
        for item in obj:
            value = deep_walk(item, *keys, default=default, return_val=return_val)
            if value is not None:
                if isinstance(value, Sequence) and not isinstance(value, str):
                    for sub_item in value:
                        found[sub_item] = None
                else:
                    found[value] = None

    found_list: list[Any] = list(found.keys())
    if not found_list:
        return default
    return {
        "first": found_list[0],
        "last": found_list[-1],
        "all": found_list[0] if len(found_list) == 1 else found_list,
    }.get(return_val, "all")

There’s a lot going on in this function to keep everything self-contained (though splitting it up wouldn’t be the worst idea); the remainder of this section will focus on breaking the code down and explaining the functionality.

Context #

Before diving into the code, showing how the function is called will offer some context. Given the following, contrived dictionary (formatted for readability):

a_dict = {
    "key": {
        "multiple_nested_lists_with_dict": [[[
            { "very_nested_key": "very_nested_value" }
        ]]]
    }
}

returning the value of very_nested_key would handled like so:

deep_walk(a_dict, "key", "multiple_nested_lists_with_dict", "very_nested_key", default="")

To clarify:

  • a_dict is the dictionary object outlined above
  • key, multiple_nested_lists_with_dict, and very_nested_key is the path to inspect for the desired value (which in this case should be very_nested_value)
  • default="" is the default value to return if the desired value is not found in the structure (in this case, an empty string but something more obvious would be appropriate)
  • By default, return_val, which affects the behavior of deep_walk, returns all of the retrieved values, though it can also be set to first or last explicitly to override this behavior

Within the deep_walk function, the dictionary object is used as a starting for the traversal and the keys are the path the traversal should take.

Core Functionality #

Starting out, the keys are inspected:

if not keys:
    return default if _empty_list(obj) else obj

In the event there be no more keys to iterate through, either the default value is returned if the remaining object is an empty list or the remaining object is returned. To determine the former outcome, the _empty_list() function is used:

def _empty_list(sub_obj: Any):
    return (
        all(_empty_list(next_obj) for next_obj in sub_obj)
        if isinstance(sub_obj, Sequence) and not isinstance(sub_obj, str)
        else False
    )

This function calls itself recursively on each Sequence object passed in to determine its emptiness. If the object is not empty, it can be traversed.

If there are keys remaining in *keys, the first key is assigned to the current_key variable and used until the next recursive call to deep_walk. The bulk of the deep_walk function is contained within the two if-statements:

if isinstance(obj, Mapping):
    next_key = obj.get(current_key, None)
    return (
        deep_walk(next_key, *keys[1:], default=default, return_val=return_val)
        if next_key is not None
        else default
    )
if isinstance(obj, Sequence) and not isinstance(obj, str):
    for item in obj:
        value = deep_walk(item, *keys, default=default, return_val=return_val)
        if value is not None:
            if isinstance(value, Sequence) and not isinstance(value, str):
                for sub_item in value:
                    found[sub_item] = None
            else:
                found[value] = None

To cover the two paths:

  • If the object passed into deep_walk is of the Mapping type (i.e., a dictionary), the next key is retrieved by running a .get on the current_key (or, keys[0). Using next_key:

    • deep_walk is called recursively using the next_key object as the obj argument if next_key is not None
    • The remaining keys are passed to the recursive deep_walk call
    • Neither the default return value or the behavior of deep_walk is modified
  • If the object passed into deep_walk (usually after a recursive call) is a Sequence type (i.e., a list), the object is iterated through and the subsequent value is retrieved by a recursive deep_walk call

    • If the value is not None and the proceeding value is another Sequence (but not a string since strings are themselves sequences which would break the functionality), the object is iterated over and added to the found OrderedDict
      • An OrderedDict is used to preserve the order in which the retrieved values are stored
    • If the value is not a Sequence, the value is directly added to the OrderedDict

To wrap up the functionality of deep_walk, the keys of the OrderedDict are added to a list:

found_list: list[Any] = list(found.keys())

Since we used an OrderedDict and stored the retrieved values as keys, we can drop all of the values without issue.

The return logic of deep_walk is as follows:

if not found_list:
    return default
return {
    "first": found_list[0],
    "last": found_list[-1],
    "all": found_list[0] if len(found_list) == 1 else found_list,
}.get(return_val, "all")

If no keys were added to the found OrderedDict, the default value is returned. Finally, a dictionary outlining the behavior of return_val is contstructed:

  • "first" returns the first element of found_list
  • "last" returns the last element of found_list
  • "all" returns the first element of found_list if the length of the list is one, otherwise the entire list is returned

As mentioned above, "all" is used when returning the retrieved values.

Testing #

To test the functionality of deep_walk, I employed two distinct methods:

  • Randomly-generated structures to stress-test the recursive functionality
  • Contrived edge-cases or expected use-cases

The former involved generating structures of two types:

  • Structures with the expected traversal path included for the test
  • Structures that will always return the default value

Randomly Structured Tests #

To generate the random structures, I used a function to create not only the dictionaries and lists, but also the key-value pairs contained within. These pairs were just random strings of characters (or integers) with no discernible pattern.

For context, all of this code exists inside of a unittest.TestCase Class’ methods.

To generate the KV pair, the following code is used:

@staticmethod
def random_kv_pair() -> dict:
    return {
        "".join(
            secrets.choice(string.hexdigits)
            for _ in range(secrets.SystemRandom().randrange(1, 6))
        ): "".join(
            secrets.choice(string.hexdigits)
            for _ in range(secrets.SystemRandom().randrange(1, 6))
        )
        if secrets.choice([True, False])
        else secrets.randbelow(secrets.SystemRandom().randrange(1, 11) ** 10)
        for _ in range(secrets.SystemRandom().randrange(1, 6))
    }

secrets.SystemRandom is used since it is more secure than the random module

Depending on the “random” outcome, either a string or integer value is tied to the key and each string is between one and five characters long for the sake of performance.

The code to generate the structures is split between two similar, yet distinct methods.

The first method is for generating structures that have a list of keys and an expected value to retrieve based on the keys:

def generate_random_test_case_success(self, max_depth=10):
    def _generate(keys=None, depth=0):
        if keys is None:
            keys = []
        kv_pair = self.random_kv_pair()
        key = secrets.choice(list(kv_pair.keys()))
        if depth == max_depth:
            return kv_pair, keys + [key], kv_pair[key]
        nested, keys, expected = _generate(keys + [key], depth + 1)
        kv_pair[key] = nested
        if secrets.choice(["dict", "list"]) == "list":
            kv_pair[key] = [nested for _ in range(secrets.SystemRandom().randrange(1, 6))]
        return kv_pair, keys, expected

    return _generate()

This method generates test cases that should always succeed. Each individual test case is not reproducable but the outcome of each is. The method calls the KV pair generation method above and creates a random dictionary structure up to ten levels deep and stores a list of generated keys to be returned.

The test for this method runs one-thousand times:

def test_deep_walk_success_random(self):
    for _ in range(1000):
        data, keys, expected = self.generate_random_test_case_success()
        self.assertEqual(p_b_h.deep_walk(data, *keys, default=""), expected)

Since the list of keys is returned, a known traversal path and value at the end of the path for the object is known.

The second method is for generating structures that will not have a known path or expected value. Test cases generated by this method will always return the default value.

def generate_random_test_case_default(self, depth=0, max_depth=10):
    def _generate():
        kv_pair = self.random_kv_pair()
        key = secrets.choice(list(kv_pair.keys()))
        if secrets.choice(["dict", "list"]) == "list":
            if depth != max_depth:
                return [{key: self.generate_random_test_case_default(depth + 1, max_depth)}]
            return [kv_pair]
        if depth != max_depth:
            return {key: self.generate_random_test_case_default(depth + 1, max_depth)}
        return kv_pair

    nonexistent_keys = [
        "".join(secrets.choice(string.hexdigits) for _ in range(10))
        for _ in range(secrets.SystemRandom().randrange(1, max_depth))
    ]
    return _generate(), nonexistent_keys

Essentially, the method generates a random structure as well as a list of keys that cannot possibly collide with the contents of the the dictionary. This method also generates test cases that should always succeed, but in the opposite way that generate_random_test_case_success would.

The test for this method also runs one-thousand times:

 def test_deep_walk_default_random(self):
    for _ in range(1000):
        data, keys = self.generate_random_test_case_default()
        self.assertEqual(p_b_h.deep_walk(data, *keys, default="NOT FOUND"), "NOT FOUND")

Manual Tests #

To test deep_walk manually, I wrote up a handful of interesting edge-cases and expected structures:

a_dict = {
    "key": {
        "inner_key": [{"nested_key": "nested_value"}, {"nested_key": "nested_value2"}],
        "very_nested": [
            {
                "outer_key": [
                    {
                        "nested_key": "value",
                        "nested_key2": [{"nested_key3": "value2"}],
                    },
                    {
                        "nested_key": "value4",
                        "nested_key2": [{"nested_key3": "value2"}],
                    },
                ],
                "outer_key2": [{"nested_key4": "value3"}],
            }
        ],
        "another_key": "value6",
        "empty_list_key": [],
        "multiple_empty_lists_key1": [[]],
        "multiple_empty_lists_key2": [[[]]],
        "multiple_empty_lists_key3": [[[[[[]]]]]],
        "multiple_nested_lists_with_dict": [
            [[{"very_nested_key": "very_nested_value"}]]
        ],
        "nested_dict_key": {"nested_dict_value": "value7"},
        "none_value": None,
    }
}

Most of these keys are straightforward, but the notable keys in the keys dictionary are:

  • "very_nested": [ ... ]
  • "empty_list_key": []
  • "multiple_empty_lists_key3": [[[[[[]]]]]]
  • "none_value": None

Each of these covers an edge-case deep_walk was built to address, namely duplicate keys, empty lists, and None values –

  1. The result of the very_nested key is ["value", "value4"] which is due to the default return_val of deep_walk being "all".
  2. The result of the empty_list_key key is ""
  3. The result of the multiple_empty_lists_key3 key is also ""
  4. The result of the none_value is also ""

To round out this post, deep_walk was over-engineered to handle any dictionary structure resiliently, even in cases where the structure makes little to no sense even if the performance impact is a little higher than supporting only nested dictionaries. Perhaps in the future I’ll work on adding a function that scans a dictionary for a value without needing to know the path. The UX of such a function would be much higher than determining the list of keys needed to reach a value, though the performance trade-off would be yet higher still. While this effort covered only one function, it was fun to ideate and iterate on, especially when coming up with ways to tests the functionality.

Golang Version #

I also ported this code over to Golang; the repository is located here and the package is also on Go’s site here.

The implementation is much the same as Python, even down to the test generation. Go has its idiosyncracies that change a few things but the overall structure and functionality remains the same. While the package itself is useful, it would be easier to have a “greedy” approach that traverses the entirety of a map for a given key rather than needing to know a path to traverse.

In the context of normalized logs, a path will be known hence the original implementation details. For broader use, the greedy approach – while less performant – would benefit wider usage of the package.

Late 2023 Update #

I spent some time during my end-of-year break working on the Go version of deepwalk and made several additions and improvements.

  • I updated the DeepWalk method to support the traversal of struct fields
  • I added a DeepSearch function that can traverse a data structure for a key without needing to know the top-level path required to reach that key
  • I used Cobra to make deepwalk a CLI so that the binary can be executed against a file or string object to retrieve the desired value
  • I added a GoReleaser Workflow to create signed releases each time a new tag/release is cut
  • I added an SBOM workflow to attach a report each time a release is created as well

2024 Update #

deepwalk is now v2 and utilizes a naive, iterative approach to traversing data structures. This approach is both easier and faster since the entire traversal path does not need to be determined beforehand.

I stripped away a lot of the cruft and may need to re-add some of it, but I’ll look into that going forward. For now, the package is a lot more straightforward and minimal.

Resources #

To read more about this function or other code used for writing Panther detections, check out these resources: