deep_walk
August 19, 2023
deep_walk
Overview
#
I’m usually pretty self-conscious about my code and given that I have yet to write a blog post about my coding adventures, so here goes!
deep_walk
was the name of Python helper function I wrote as an addition to Panther’s global helpers which was tailored specifically to handle arbitrarily nested dictionary structures. This was in response to an edge-case encountered by a customer trying to parse logs where a key contained a list of dictionaries. The current pattern was to use the deep_get function which essentially calls .get
on nested dictionaries which is inadequate for lists. To work around this limitation, a user would need to use nested deep_get
calls with the inner call accessing the contents of the list which can cause issues when no list elements are present, thus requiring additional validations.
Eessentially, the following would need to be done:
a_value: Any = deep_get(deep_get(a_dict, "list_key", default=[])[0], "desired_key", default="<NOT FOUND>")
But as mentioned, handling an empty list would need to be considered since [0]
would break on an empty list:
list_key: List[Any] = deep_get(a_dict, "list_key", default=[])
# `if list_key` would also work
if len(list_key) > 0:
a_value: Any = deep_get(list_key[0], "desired_key", default="<NOT FOUND>")
When I saw this, I decided that I would hack around on a better approach to this problem and came up with the deep_walk
implementation. I set out to improve on deep_get
’s functionality and to cover dictionary structures where said dictionaries could potentially contain any or all of the following (even nested keys containing of these types):
- Keys with string or integer values
- Keys with dictionary values
- Keys with list values
Within the latter two, further nesting of dictionaries and lists was also possible and each of these nested elements could also be empty (either directly or within another nested key). The goal of this function is to search the structure for the value of a given key and return its value if present or return a default value if not. While seeminly straightforward in theory, handling the various data types and ensuring that each branch of the structure was appropriately searched resulted in some interesting logistical choices. I also wanted to resiliently handle the very unlikely scenario that an empty list of nested empty lists was handled such that the default value was returned in cases like this. To top it all off, I wanted to include three modes of functionality:
- Return the first value found (if it exists)
- Return the last value found (if it exists)
- Return all values found as a list (if they exist)
To outline the general requirements (in no specific order):
- Take a dictionary object and search its structure for a desired value
- Return the value if found
- Return the default value if not found
- Recursively handle nested dictionaries by using the
get
method - Iterate through nested lists and apppriately handle the types found within
- Safely handle empty lists or lists of infinitely many empty lists
- Safely handle
None
values
The Function Itself #
Code #
Without further ado, the code (with comments removed for brevity) is as follows:
def deep_walk(
obj: Optional[Any], *keys: str, default: Optional[str] = None, return_val: str = "all"
) -> Union[Optional[Any], Optional[List[Any]]]:
def _empty_list(sub_obj: Any):
return (
all(_empty_list(next_obj) for next_obj in sub_obj)
if isinstance(sub_obj, Sequence) and not isinstance(sub_obj, str)
else False
)
if not keys:
return default if _empty_list(obj) else obj
current_key = keys[0]
found: OrderedDict = OrderedDict()
if isinstance(obj, Mapping):
next_key = obj.get(current_key, None)
return (
deep_walk(next_key, *keys[1:], default=default, return_val=return_val)
if next_key is not None
else default
)
if isinstance(obj, Sequence) and not isinstance(obj, str):
for item in obj:
value = deep_walk(item, *keys, default=default, return_val=return_val)
if value is not None:
if isinstance(value, Sequence) and not isinstance(value, str):
for sub_item in value:
found[sub_item] = None
else:
found[value] = None
found_list: list[Any] = list(found.keys())
if not found_list:
return default
return {
"first": found_list[0],
"last": found_list[-1],
"all": found_list[0] if len(found_list) == 1 else found_list,
}.get(return_val, "all")
There’s a lot going on in this function to keep everything self-contained (though splitting it up wouldn’t be the worst idea); the remainder of this section will focus on breaking the code down and explaining the functionality.
Context #
Before diving into the code, showing how the function is called will offer some context. Given the following, contrived dictionary (formatted for readability):
a_dict = {
"key": {
"multiple_nested_lists_with_dict": [[[
{ "very_nested_key": "very_nested_value" }
]]]
}
}
returning the value of very_nested_key
would handled like so:
deep_walk(a_dict, "key", "multiple_nested_lists_with_dict", "very_nested_key", default="")
To clarify:
a_dict
is the dictionary object outlined abovekey
,multiple_nested_lists_with_dict
, andvery_nested_key
is the path to inspect for the desired value (which in this case should bevery_nested_value
)default=""
is the default value to return if the desired value is not found in the structure (in this case, an empty string but something more obvious would be appropriate)- By default,
return_val
, which affects the behavior ofdeep_walk
, returns all of the retrieved values, though it can also be set tofirst
orlast
explicitly to override this behavior
Within the deep_walk
function, the dictionary object is used as a starting for the traversal and the keys are the path the traversal should take.
Core Functionality #
Starting out, the keys are inspected:
if not keys:
return default if _empty_list(obj) else obj
In the event there be no more keys to iterate through, either the default value is returned if the remaining object is an empty list or the remaining object is returned.
To determine the former outcome, the _empty_list()
function is used:
def _empty_list(sub_obj: Any):
return (
all(_empty_list(next_obj) for next_obj in sub_obj)
if isinstance(sub_obj, Sequence) and not isinstance(sub_obj, str)
else False
)
This function calls itself recursively on each Sequence
object passed in to determine its emptiness. If the object is not empty, it can be traversed.
If there are keys remaining in *keys
, the first key is assigned to the current_key
variable and used until the next recursive call to deep_walk
. The bulk of the deep_walk
function is contained within the two if
-statements:
if isinstance(obj, Mapping):
next_key = obj.get(current_key, None)
return (
deep_walk(next_key, *keys[1:], default=default, return_val=return_val)
if next_key is not None
else default
)
if isinstance(obj, Sequence) and not isinstance(obj, str):
for item in obj:
value = deep_walk(item, *keys, default=default, return_val=return_val)
if value is not None:
if isinstance(value, Sequence) and not isinstance(value, str):
for sub_item in value:
found[sub_item] = None
else:
found[value] = None
To cover the two paths:
-
If the object passed into
deep_walk
is of theMapping
type (i.e., a dictionary), the next key is retrieved by running a.get
on thecurrent_key
(or,keys[0
). Usingnext_key
:deep_walk
is called recursively using thenext_key
object as theobj
argument ifnext_key
is notNone
- The remaining keys are passed to the recursive
deep_walk
call - Neither the default return value or the behavior of
deep_walk
is modified
-
If the object passed into
deep_walk
(usually after a recursive call) is aSequence
type (i.e., a list), the object is iterated through and the subsequent value is retrieved by a recursivedeep_walk
call- If the value is not
None
and the proceeding value is anotherSequence
(but not a string since strings are themselves sequences which would break the functionality), the object is iterated over and added to thefound
OrderedDict- An
OrderedDict
is used to preserve the order in which the retrieved values are stored
- An
- If the value is not a
Sequence
, the value is directly added to the OrderedDict
- If the value is not
To wrap up the functionality of deep_walk
, the keys of the OrderedDict
are added to a list:
found_list: list[Any] = list(found.keys())
Since we used an OrderedDict and stored the retrieved values as keys, we can drop all of the values without issue.
The return logic of deep_walk
is as follows:
if not found_list:
return default
return {
"first": found_list[0],
"last": found_list[-1],
"all": found_list[0] if len(found_list) == 1 else found_list,
}.get(return_val, "all")
If no keys were added to the found
OrderedDict, the default value is returned.
Finally, a dictionary outlining the behavior of return_val
is contstructed:
"first"
returns the first element offound_list
"last"
returns the last element offound_list
"all"
returns the first element offound_list
if the length of the list is one, otherwise the entire list is returned
As mentioned above, "all"
is used when returning the retrieved values.
Testing #
To test the functionality of deep_walk
, I employed two distinct methods:
- Randomly-generated structures to stress-test the recursive functionality
- Contrived edge-cases or expected use-cases
The former involved generating structures of two types:
- Structures with the expected traversal path included for the test
- Structures that will always return the default value
Randomly Structured Tests #
To generate the random structures, I used a function to create not only the dictionaries and lists, but also the key-value pairs contained within. These pairs were just random strings of characters (or integers) with no discernible pattern.
For context, all of this code exists inside of a unittest.TestCase
Class’ methods.
To generate the KV pair, the following code is used:
@staticmethod
def random_kv_pair() -> dict:
return {
"".join(
secrets.choice(string.hexdigits)
for _ in range(secrets.SystemRandom().randrange(1, 6))
): "".join(
secrets.choice(string.hexdigits)
for _ in range(secrets.SystemRandom().randrange(1, 6))
)
if secrets.choice([True, False])
else secrets.randbelow(secrets.SystemRandom().randrange(1, 11) ** 10)
for _ in range(secrets.SystemRandom().randrange(1, 6))
}
secrets.SystemRandom
is used since it is more secure than therandom
module
Depending on the “random” outcome, either a string or integer value is tied to the key and each string is between one and five characters long for the sake of performance.
The code to generate the structures is split between two similar, yet distinct methods.
The first method is for generating structures that have a list of keys and an expected value to retrieve based on the keys:
def generate_random_test_case_success(self, max_depth=10):
def _generate(keys=None, depth=0):
if keys is None:
keys = []
kv_pair = self.random_kv_pair()
key = secrets.choice(list(kv_pair.keys()))
if depth == max_depth:
return kv_pair, keys + [key], kv_pair[key]
nested, keys, expected = _generate(keys + [key], depth + 1)
kv_pair[key] = nested
if secrets.choice(["dict", "list"]) == "list":
kv_pair[key] = [nested for _ in range(secrets.SystemRandom().randrange(1, 6))]
return kv_pair, keys, expected
return _generate()
This method generates test cases that should always succeed. Each individual test case is not reproducable but the outcome of each is. The method calls the KV pair generation method above and creates a random dictionary structure up to ten levels deep and stores a list of generated keys to be returned.
The test for this method runs one-thousand times:
def test_deep_walk_success_random(self):
for _ in range(1000):
data, keys, expected = self.generate_random_test_case_success()
self.assertEqual(p_b_h.deep_walk(data, *keys, default=""), expected)
Since the list of keys is returned, a known traversal path and value at the end of the path for the object is known.
The second method is for generating structures that will not have a known path or expected value. Test cases generated by this method will always return the default value.
def generate_random_test_case_default(self, depth=0, max_depth=10):
def _generate():
kv_pair = self.random_kv_pair()
key = secrets.choice(list(kv_pair.keys()))
if secrets.choice(["dict", "list"]) == "list":
if depth != max_depth:
return [{key: self.generate_random_test_case_default(depth + 1, max_depth)}]
return [kv_pair]
if depth != max_depth:
return {key: self.generate_random_test_case_default(depth + 1, max_depth)}
return kv_pair
nonexistent_keys = [
"".join(secrets.choice(string.hexdigits) for _ in range(10))
for _ in range(secrets.SystemRandom().randrange(1, max_depth))
]
return _generate(), nonexistent_keys
Essentially, the method generates a random structure as well as a list of keys that cannot possibly collide with the contents of the the dictionary. This method also generates test cases that should always succeed, but in the opposite way that generate_random_test_case_success
would.
The test for this method also runs one-thousand times:
def test_deep_walk_default_random(self):
for _ in range(1000):
data, keys = self.generate_random_test_case_default()
self.assertEqual(p_b_h.deep_walk(data, *keys, default="NOT FOUND"), "NOT FOUND")
Manual Tests #
To test deep_walk
manually, I wrote up a handful of interesting edge-cases and expected structures:
a_dict = {
"key": {
"inner_key": [{"nested_key": "nested_value"}, {"nested_key": "nested_value2"}],
"very_nested": [
{
"outer_key": [
{
"nested_key": "value",
"nested_key2": [{"nested_key3": "value2"}],
},
{
"nested_key": "value4",
"nested_key2": [{"nested_key3": "value2"}],
},
],
"outer_key2": [{"nested_key4": "value3"}],
}
],
"another_key": "value6",
"empty_list_key": [],
"multiple_empty_lists_key1": [[]],
"multiple_empty_lists_key2": [[[]]],
"multiple_empty_lists_key3": [[[[[[]]]]]],
"multiple_nested_lists_with_dict": [
[[{"very_nested_key": "very_nested_value"}]]
],
"nested_dict_key": {"nested_dict_value": "value7"},
"none_value": None,
}
}
Most of these keys are straightforward, but the notable keys in the keys
dictionary are:
"very_nested": [ ... ]
"empty_list_key": []
"multiple_empty_lists_key3": [[[[[[]]]]]]
"none_value": None
Each of these covers an edge-case deep_walk
was built to address, namely duplicate keys, empty lists, and None
values –
- The result of the
very_nested
key is["value", "value4"]
which is due to the defaultreturn_val
ofdeep_walk
being"all"
. - The result of the
empty_list_key
key is""
- The result of the
multiple_empty_lists_key3
key is also""
- The result of the
none_value
is also""
To round out this post, deep_walk
was over-engineered to handle any dictionary structure resiliently, even in cases where the structure makes little to no sense even if the performance impact is a little higher than supporting only nested dictionaries. Perhaps in the future I’ll work on adding a function that scans a dictionary for a value without needing to know the path. The UX of such a function would be much higher than determining the list of keys needed to reach a value, though the performance trade-off would be yet higher still. While this effort covered only one function, it was fun to ideate and iterate on, especially when coming up with ways to tests the functionality.
Golang Version #
I also ported this code over to Golang; the repository is located here and the package is also on Go’s site here.
The implementation is much the same as Python, even down to the test generation. Go has its idiosyncracies that change a few things but the overall structure and functionality remains the same. While the package itself is useful, it would be easier to have a “greedy” approach that traverses the entirety of a map for a given key rather than needing to know a path to traverse.
In the context of normalized logs, a path will be known hence the original implementation details. For broader use, the greedy approach – while less performant – would benefit wider usage of the package.
Late 2023 Update #
I spent some time during my end-of-year break working on the Go version of deepwalk
and made several additions and improvements.
- I updated the
DeepWalk
method to support the traversal of struct fields - I added a
DeepSearch
function that can traverse a data structure for a key without needing to know the top-level path required to reach that key - I used Cobra to make
deepwalk
a CLI so that the binary can be executed against a file or string object to retrieve the desired value - I added a GoReleaser Workflow to create signed releases each time a new tag/release is cut
- I added an SBOM workflow to attach a report each time a release is created as well
2024 Update #
deepwalk
is now v2
and utilizes a naive, iterative approach to traversing data structures. This approach is both easier and faster since the entire traversal path does not need to be determined beforehand.
I stripped away a lot of the cruft and may need to re-add some of it, but I’ll look into that going forward. For now, the package is a lot more straightforward and minimal.
Resources #
To read more about this function or other code used for writing Panther detections, check out these resources: