Skip to content

Commit fc7560f

Browse files
authored
[AI-6101] Add support for customizable cache keys for persistent cache (#21316)
* Add support for customizable cache keys to store log cursor * Add changelog * Remove PersistentCacheKey lefotver from AgentCheck * Fix docstring mistakes * Reduce cache key scope to checks with the same name * Add test to cover restart behavior * Address most of the comments on the PR * Fix imports in tests * Replace manual sorthing by mutable_hash utility * Update changelog message * Remove CacheKeyManager * Reword docstrings * Finalize update of ConfigSet * Reframe implementation towards a invalidation strategy more than the full key derivation * Fix module renaming and keep tests dry * Fix typo in prefix method * Simplify implementation to use persistent_cache_id * Simplify id namespace calculation
1 parent df8de38 commit fc7560f

File tree

5 files changed

+252
-30
lines changed

5 files changed

+252
-30
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add support for customizable cache keys to be used by the agent persistent cache. This allows integrations developers to define when the cache will be invalidated for each integration.

datadog_checks_base/datadog_checks/base/checks/base.py

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,10 @@
1111
import re
1212
from collections import deque
1313
from os.path import basename
14-
from typing import ( # noqa: F401
14+
from typing import (
1515
TYPE_CHECKING,
1616
Any,
17-
AnyStr,
18-
Callable,
19-
Deque,
20-
Dict,
21-
List,
22-
Optional,
23-
Sequence,
24-
Set,
25-
Tuple,
26-
Union,
17+
Deque, # noqa: F401
2718
)
2819

2920
import lazy_loader
@@ -294,10 +285,16 @@ def __init__(self, *args, **kwargs):
294285
# Functions that will be called exactly once (if successful) before the first check run
295286
self.check_initializations = deque() # type: Deque[Callable[[], None]]
296287

297-
self.check_initializations.append(self.load_configuration_models)
288+
self.check_initializations.extend(
289+
[
290+
self.load_configuration_models,
291+
self.__initialize_persistent_cache_key_prefix,
292+
]
293+
)
298294

299295
self.__formatted_tags = None
300296
self.__logs_enabled = None
297+
self.__persistent_cache_key_prefix: str = ""
301298

302299
if os.environ.get("GOFIPS", "0") == "1":
303300
enable_fips()
@@ -491,6 +488,18 @@ def in_developer_mode(self):
491488
self._log_deprecation('in_developer_mode')
492489
return False
493490

491+
def persistent_cache_id(self) -> str:
492+
"""
493+
Returns the ID that identifies this check instance in the Agent persistent cache.
494+
495+
Overriding this method modifies the default behavior of the AgentCheck and can
496+
be used to customize when the persistent cache is invalidated. The default behavior
497+
defines the persistent cache ID as the digest of the full check configuration.
498+
499+
Some per-check isolation is still applied to avoid different checks with the same ID to share the same keys.
500+
"""
501+
return self.check_id.split(":")[-1]
502+
494503
def log_typos_in_options(self, user_config, models_config, level):
495504
# See Performance Optimizations in this package's README.md.
496505
from jellyfish import jaro_winkler_similarity
@@ -1009,13 +1018,15 @@ def send_log(self, data, cursor=None, stream='default'):
10091018
attributes['timestamp'] = int(timestamp * 1000)
10101019

10111020
datadog_agent.send_log(json.encode(attributes), self.check_id)
1021+
10121022
if cursor is not None:
1013-
self.write_persistent_cache('log_cursor_{}'.format(stream), json.encode(cursor))
1023+
self.write_persistent_cache(f'log_cursor_{stream}', json.encode(cursor))
10141024

10151025
def get_log_cursor(self, stream='default'):
10161026
# type: (str) -> dict[str, Any] | None
10171027
"""Returns the most recent log cursor from disk."""
1018-
data = self.read_persistent_cache('log_cursor_{}'.format(stream))
1028+
data = self.read_persistent_cache(f'log_cursor_{stream}')
1029+
10191030
return json.decode(data) if data else None
10201031

10211032
def _log_deprecation(self, deprecation_key, *args):
@@ -1082,9 +1093,9 @@ def entrypoint(self, *args, **kwargs):
10821093

10831094
return entrypoint
10841095

1085-
def _persistent_cache_id(self, key):
1086-
# type: (str) -> str
1087-
return '{}_{}'.format(self.check_id, key)
1096+
def __initialize_persistent_cache_key_prefix(self):
1097+
namespace = ':'.join(self.check_id.split(':')[:-1])
1098+
self.__persistent_cache_key_prefix = f'{namespace}:{self.persistent_cache_id()}_'
10881099

10891100
def read_persistent_cache(self, key):
10901101
# type: (str) -> str
@@ -1094,9 +1105,9 @@ def read_persistent_cache(self, key):
10941105
key (str):
10951106
the key to retrieve
10961107
"""
1097-
return datadog_agent.read_persistent_cache(self._persistent_cache_id(key))
1108+
return datadog_agent.read_persistent_cache(f"{self.__persistent_cache_key_prefix}{key}")
10981109

1099-
def write_persistent_cache(self, key, value):
1110+
def write_persistent_cache(self, key: str, value: str):
11001111
# type: (str, str) -> None
11011112
"""Stores `value` in a persistent cache for this check instance.
11021113
The cache is located in a path where the agent is guaranteed to have read & write permissions. Namely in
@@ -1110,7 +1121,7 @@ def write_persistent_cache(self, key, value):
11101121
value (str):
11111122
the value to store
11121123
"""
1113-
datadog_agent.write_persistent_cache(self._persistent_cache_id(key), value)
1124+
datadog_agent.write_persistent_cache(f"{self.__persistent_cache_key_prefix}{key}", value)
11141125

11151126
def set_external_tags(self, external_tags):
11161127
# type: (Sequence[ExternalTagType]) -> None
@@ -1282,13 +1293,7 @@ def run(self):
12821293

12831294
run_with_isolation(self, aggregator, datadog_agent)
12841295
else:
1285-
while self.check_initializations:
1286-
initialization = self.check_initializations.popleft()
1287-
try:
1288-
initialization()
1289-
except Exception:
1290-
self.check_initializations.appendleft(initialization)
1291-
raise
1296+
self.run_check_initializations()
12921297

12931298
instance = copy.deepcopy(self.instances[0])
12941299

@@ -1328,6 +1333,15 @@ def run(self):
13281333

13291334
return error_report
13301335

1336+
def run_check_initializations(self):
1337+
while self.check_initializations:
1338+
initialization = self.check_initializations.popleft()
1339+
try:
1340+
initialization()
1341+
except Exception:
1342+
self.check_initializations.appendleft(initialization)
1343+
raise
1344+
13311345
def event(self, event):
13321346
# type: (Event) -> None
13331347
"""Send an event.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from collections.abc import Collection
2+
3+
from datadog_checks.base import AgentCheck
4+
from datadog_checks.base.utils.containers import hash_mutable
5+
6+
7+
def config_set_persistent_cache_id(
8+
check: AgentCheck,
9+
init_config_options: Collection[str] | None = None,
10+
instance_config_options: Collection[str] | None = None,
11+
):
12+
"""
13+
Returns an ID for the persisitent cache derives from a subset of the check's config options.
14+
15+
If the value of any of the provided options changes, the generate cache ID will change.
16+
17+
Parameters:
18+
check: the check instance the key is going to be used for.
19+
init_config_options: the subset of init_config options to use to generate the cache ID.
20+
instance_config_options: the subset of config options to use to generate the cache ID.
21+
"""
22+
23+
if not init_config_options and not instance_config_options:
24+
raise ValueError("At least one of init_config_options or instance_config_options must be provided")
25+
26+
set_init_config_options = set(init_config_options) if init_config_options else set()
27+
set_instance_config_options = set(instance_config_options) if instance_config_options else set()
28+
29+
init_config_values = tuple(value for key, value in check.init_config.items() if key in set_init_config_options)
30+
instance_config_values = tuple(value for key, value in check.instance.items() if key in set_instance_config_options)
31+
32+
selected_values = init_config_values + instance_config_values
33+
return str(hash_mutable(selected_values)).replace("-", "")

datadog_checks_base/tests/base/checks/test_agent_check.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@ def test_check_version():
4242

4343

4444
def test_persistent_cache(datadog_agent):
45-
check = AgentCheck()
46-
check.check_id = 'test'
45+
check = AgentCheck(init_config={}, instances=[{}])
46+
check.check_id = 'test:123'
47+
check.run_check_initializations()
4748

4849
check.write_persistent_cache('foo', 'bar')
4950

50-
assert datadog_agent.read_persistent_cache('test_foo') == 'bar'
51+
assert datadog_agent.read_persistent_cache('test:123_foo') == 'bar'
5152
assert check.read_persistent_cache('foo') == 'bar'
5253

5354

@@ -558,6 +559,45 @@ def test_cursor(self, datadog_agent):
558559
)
559560
assert check.get_log_cursor() == {'data': '2'}
560561

562+
def custom_persistent_cache_id_check(self) -> AgentCheck:
563+
class TestCheck(AgentCheck):
564+
def persistent_cache_id(self) -> str:
565+
return "always_the_same"
566+
567+
return TestCheck(name="test", init_config={}, instances=[{}])
568+
569+
def test_cursor_with_custom_cache_invalidation_strategy_after_restart(self):
570+
check = self.custom_persistent_cache_id_check()
571+
check.check_id = 'test:bar:123'
572+
check.send_log({'message': 'foo'}, cursor={'data': '1'})
573+
574+
assert check.get_log_cursor() == {'data': '1'}
575+
576+
new_check = self.custom_persistent_cache_id_check()
577+
new_check.check_id = 'test:bar:123456'
578+
assert new_check.get_log_cursor() == {'data': '1'}
579+
580+
check = self.custom_persistent_cache_id_check()
581+
check.check_id = 'test:bar:123'
582+
check.send_log({'message': 'foo'}, cursor={'data': '1'})
583+
584+
assert check.get_log_cursor() == {'data': '1'}
585+
586+
new_check = self.custom_persistent_cache_id_check()
587+
new_check.check_id = 'test:bar:123456'
588+
assert new_check.get_log_cursor() == {'data': '1'}
589+
590+
def test_cursor_invalidated_for_different_persistent_check_id_part(self):
591+
check = self.custom_persistent_cache_id_check()
592+
check.check_id = 'test:bar:123'
593+
check.send_log({'message': 'foo'}, cursor={'data': '1'})
594+
595+
assert check.get_log_cursor() == {'data': '1'}
596+
597+
new_check = self.custom_persistent_cache_id_check()
598+
new_check.check_id = 'test:bar:123456'
599+
assert new_check.get_log_cursor() == {'data': '1'}
600+
561601
def test_no_cursor(self, datadog_agent):
562602
check = AgentCheck('check_name', {}, [{}])
563603
check.check_id = 'test'
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
from typing import Any
2+
3+
import pytest
4+
5+
from datadog_checks.base.checks.base import AgentCheck
6+
from datadog_checks.base.utils.containers import hash_mutable
7+
from datadog_checks.base.utils.persistent_cache import config_set_persistent_cache_id
8+
9+
10+
@pytest.fixture(scope='module')
11+
def config() -> dict[str, str]:
12+
return {
13+
'init_option1': 'init_value1',
14+
'init_option2': 'init_value2',
15+
'global': 'init_global_value',
16+
}
17+
18+
19+
@pytest.fixture(scope='module')
20+
def instance() -> dict[str, str]:
21+
return {
22+
'instance_option1': 'instance_value1',
23+
'instance_option2': 'instance_value2',
24+
'global': 'instance_global_value',
25+
}
26+
27+
28+
def build_check(init_config: dict[str, Any], instance: dict[str, Any]) -> AgentCheck:
29+
return TestCheck('test', init_config, [instance])
30+
31+
32+
@pytest.fixture(scope='module')
33+
def check(config: dict[str, Any], instance: dict[str, Any]) -> AgentCheck:
34+
return build_check(config, instance)
35+
36+
37+
@pytest.fixture(scope='module')
38+
def cache_id(check: AgentCheck) -> str:
39+
return config_set_persistent_cache_id(check, init_config_options=['init_option1'])
40+
41+
42+
class TestCheck(AgentCheck):
43+
def check(self, instance):
44+
pass
45+
46+
47+
def normalized_hash(value: object) -> str:
48+
return str(hash_mutable(value)).replace("-", "")
49+
50+
51+
def test_config_set_caches(cache_id: str):
52+
assert cache_id == normalized_hash(('init_value1',))
53+
54+
55+
def test_initialization_fails_without_any_options(check: AgentCheck):
56+
with pytest.raises(ValueError):
57+
config_set_persistent_cache_id(check)
58+
59+
60+
def test_same_invalidation_token_on_changes_in_unlesected_other_options(config: dict[str, Any], check: AgentCheck):
61+
cache_id = config_set_persistent_cache_id(check, init_config_options=['init_option1'])
62+
expected_cache_id = normalized_hash(('init_value1',))
63+
assert cache_id == expected_cache_id
64+
65+
config['init_option2'] = 'something elese'
66+
cache_id = config_set_persistent_cache_id(check, init_config_options=['init_option1'])
67+
assert cache_id == expected_cache_id
68+
69+
70+
@pytest.mark.parametrize(
71+
'extra_option',
72+
[
73+
["item1", "item2"],
74+
("item1", "item3"),
75+
{"key1": "item1", "key2": "item2"},
76+
{"key1": {"key2": "item2", "key3": "item3"}},
77+
],
78+
ids=["list", "tuple", "dict", "nested_dict"],
79+
)
80+
def test_support_for_complex_option_values(
81+
check: AgentCheck,
82+
instance: dict[str, Any],
83+
extra_option: list[str] | tuple[str, str] | dict[str, str] | dict[str, dict[str, str]],
84+
):
85+
instance['extra_option'] = extra_option
86+
cache_id = config_set_persistent_cache_id(check, instance_config_options=['extra_option'])
87+
expected_cache_id = normalized_hash((extra_option,))
88+
assert cache_id == expected_cache_id
89+
90+
91+
def deep_reverse(obj: Any) -> Any:
92+
if isinstance(obj, dict):
93+
return {k: deep_reverse(v) for k, v in reversed(list(obj.items()))}
94+
if isinstance(obj, list):
95+
return [deep_reverse(e) for e in reversed(obj)]
96+
if isinstance(obj, tuple):
97+
return tuple(deep_reverse(e) for e in reversed(obj))
98+
return obj
99+
100+
101+
@pytest.mark.parametrize(
102+
'extra_option',
103+
[
104+
["item1", "item2"],
105+
("item1", "item2"),
106+
{"key1": "item1", "key2": "item2"},
107+
{
108+
"key1": {"key2": "item2", "key3": ["item3", "item4"]},
109+
},
110+
],
111+
ids=["list", "tuple", "dict", "nested_dict"],
112+
)
113+
def test_order_does_not_affect_key(
114+
check: AgentCheck,
115+
instance: dict[str, Any],
116+
extra_option: list[str] | tuple[str, str] | dict[str, str] | dict[str, dict[str, str]],
117+
):
118+
instance['extra_option'] = extra_option
119+
cache_id = config_set_persistent_cache_id(check, instance_config_options=['extra_option'])
120+
expected_cache_id = normalized_hash((extra_option,))
121+
122+
instance['extra_option'] = deep_reverse(extra_option)
123+
cache_id = config_set_persistent_cache_id(check, instance_config_options=['extra_option'])
124+
assert cache_id == expected_cache_id
125+
126+
127+
def test_same_option_names_in_init_config_and_instance_config(check: AgentCheck, instance: dict[str, Any]):
128+
cache_id = config_set_persistent_cache_id(check, init_config_options=['global'])
129+
expected_cache_id = normalized_hash(('init_global_value',))
130+
131+
# Modifying the same option name in instance has no effect on key
132+
instance['global'] = 'something'
133+
134+
assert cache_id == expected_cache_id

0 commit comments

Comments
 (0)