Skip to content

Commit c8cfb23

Browse files
cathtengandrewshie-sentry
authored andcommitted
ref(aci): handle sharding project list key for buffer processing (#97638)
1 parent 450bde5 commit c8cfb23

File tree

5 files changed

+148
-8
lines changed

5 files changed

+148
-8
lines changed

src/sentry/buffer/base.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,14 @@ def get_hash(self, model: type[models.Model], field: dict[str, BufferField]) ->
5858
def get_hash_length(self, model: type[models.Model], field: dict[str, BufferField]) -> int:
5959
raise NotImplementedError
6060

61-
def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, datetime]]:
61+
def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, float]]:
6262
return []
6363

64+
def bulk_get_sorted_set(
65+
self, keys: list[str], min: float, max: float
66+
) -> dict[int, list[float]]:
67+
return {}
68+
6469
def push_to_sorted_set(self, key: str, value: list[int] | int) -> None:
6570
return None
6671

@@ -92,6 +97,9 @@ def delete_hash(
9297
def delete_key(self, key: str, min: float, max: float) -> None:
9398
return None
9499

100+
def delete_keys(self, keys: list[str], min: float, max: float) -> None:
101+
return None
102+
95103
def incr(
96104
self,
97105
model: type[models.Model],

src/sentry/buffer/redis.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import logging
44
import pickle
5+
from collections import defaultdict
56
from collections.abc import Callable
67
from dataclasses import dataclass
78
from datetime import date, datetime, timezone
@@ -368,6 +369,26 @@ def _execute_redis_operation(
368369
pipe.expire(key, self.key_expire)
369370
return pipe.execute()[0]
370371

372+
def _execute_sharded_redis_operation(
373+
self,
374+
keys: list[str],
375+
operation: RedisOperation,
376+
*args: Any,
377+
**kwargs: Any,
378+
) -> Any:
379+
"""
380+
Execute a Redis operation on a list of keys, using the same args and kwargs for each key.
381+
"""
382+
383+
metrics_str = f"redis_buffer.{operation.value}"
384+
metrics.incr(metrics_str, amount=len(keys))
385+
pipe = self.get_redis_connection(self.pending_key)
386+
for key in keys:
387+
getattr(pipe, operation.value)(key, *args, **kwargs)
388+
if args:
389+
pipe.expire(key, self.key_expire)
390+
return pipe.execute()
391+
371392
def push_to_sorted_set(self, key: str, value: list[int] | int) -> None:
372393
now = time()
373394
if isinstance(value, list):
@@ -376,7 +397,7 @@ def push_to_sorted_set(self, key: str, value: list[int] | int) -> None:
376397
value_dict = {value: now}
377398
self._execute_redis_operation(key, RedisOperation.SORTED_SET_ADD, value_dict)
378399

379-
def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, datetime]]:
400+
def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, float]]:
380401
redis_set = self._execute_redis_operation(
381402
key,
382403
RedisOperation.SORTED_SET_GET_RANGE,
@@ -393,9 +414,38 @@ def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, da
393414
decoded_set.append(data_and_timestamp)
394415
return decoded_set
395416

417+
def bulk_get_sorted_set(
418+
self, keys: list[str], min: float, max: float
419+
) -> dict[int, list[float]]:
420+
data_to_timestamps: dict[int, list[float]] = defaultdict(list)
421+
422+
redis_set = self._execute_sharded_redis_operation(
423+
keys,
424+
RedisOperation.SORTED_SET_GET_RANGE,
425+
min=min,
426+
max=max,
427+
withscores=True,
428+
)
429+
for result in redis_set:
430+
for items in result:
431+
item = items[0]
432+
if isinstance(item, bytes):
433+
item = item.decode("utf-8")
434+
data_to_timestamps[int(item)].append(items[1])
435+
436+
return data_to_timestamps
437+
396438
def delete_key(self, key: str, min: float, max: float) -> None:
397439
self._execute_redis_operation(key, RedisOperation.SORTED_SET_DELETE_RANGE, min=min, max=max)
398440

441+
def delete_keys(self, keys: list[str], min: float, max: float) -> None:
442+
self._execute_sharded_redis_operation(
443+
keys,
444+
RedisOperation.SORTED_SET_DELETE_RANGE,
445+
min=min,
446+
max=max,
447+
)
448+
399449
def delete_hash(
400450
self,
401451
model: type[models.Model],

src/sentry/rules/processing/buffer_processing.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ class BufferHashKeys:
3232

3333
class DelayedProcessingBase(ABC):
3434
buffer_key: ClassVar[str]
35+
buffer_shards: ClassVar[int] = 1 # 1 shard will use the original buffer key
36+
buffer_separator: ClassVar[str] = ":"
3537
option: ClassVar[str | None]
3638

3739
def __init__(self, project_id: int):
@@ -47,6 +49,13 @@ def hash_args(self) -> BufferHashKeys:
4749
def processing_task(self) -> Task:
4850
raise NotImplementedError
4951

52+
@classmethod
53+
def get_buffer_keys(cls) -> list[str]:
54+
return [
55+
f"{cls.buffer_key}{cls.buffer_separator}{shard}" if shard > 0 else cls.buffer_key
56+
for shard in range(cls.buffer_shards)
57+
]
58+
5059

5160
delayed_processing_registry = Registry[type[DelayedProcessingBase]]()
5261

@@ -156,21 +165,31 @@ def process_buffer() -> None:
156165
# The staler this timestamp, the more likely it'll miss some recently updated projects,
157166
# and the more likely we'll have frequently updated projects that are never actually
158167
# retrieved and processed here.
159-
fetch_time = datetime.now(tz=timezone.utc)
160-
project_ids = buffer.backend.get_sorted_set(
161-
handler.buffer_key, min=0, max=fetch_time.timestamp()
168+
fetch_time = datetime.now(tz=timezone.utc).timestamp()
169+
buffer_keys = handler.get_buffer_keys()
170+
all_project_ids_and_timestamps = buffer.backend.bulk_get_sorted_set(
171+
buffer_keys,
172+
min=0,
173+
max=fetch_time,
162174
)
175+
163176
if should_emit_logs:
164177
log_str = ", ".join(
165-
f"{project_id}: {timestamp}" for project_id, timestamp in project_ids
178+
f"{project_id}: {timestamps}"
179+
for project_id, timestamps in all_project_ids_and_timestamps.items()
166180
)
167181
log_name = f"{processing_type}.project_id_list"
168182
logger.info(log_name, extra={"project_ids": log_str})
169183

170-
for project_id, _ in project_ids:
184+
project_ids = list(all_project_ids_and_timestamps.keys())
185+
for project_id in project_ids:
171186
process_in_batches(project_id, processing_type)
172187

173-
buffer.backend.delete_key(handler.buffer_key, min=0, max=fetch_time.timestamp())
188+
buffer.backend.delete_keys(
189+
buffer_keys,
190+
min=0,
191+
max=fetch_time,
192+
)
174193

175194

176195
if not redis_buffer_registry.has(BufferHookEvent.FLUSH):

tests/sentry/buffer/test_redis.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import copy
22
import datetime
33
import pickle
4+
import random
45
from collections import defaultdict
56
from collections.abc import Mapping
67
from unittest import mock
@@ -281,6 +282,54 @@ def test_enqueue(self) -> None:
281282
result = json.loads(project_ids_to_rule_data[project_id2][0].get(f"{rule2_id}:{group3_id}"))
282283
assert result.get("event_id") == event4_id
283284

285+
def test_get_bulk_sorted_set(self) -> None:
286+
shards = 3
287+
project_ids = [1, 2, 2, 3, 3, 3, 4, 4, 4, 4]
288+
for id in project_ids:
289+
shard = random.randrange(shards)
290+
if shard == 0:
291+
key = PROJECT_ID_BUFFER_LIST_KEY
292+
else:
293+
key = f"{PROJECT_ID_BUFFER_LIST_KEY}:{shard}"
294+
self.buf.push_to_sorted_set(key=key, value=id)
295+
296+
buffer_keys = [
297+
f"{PROJECT_ID_BUFFER_LIST_KEY}:{shard}" if shard > 0 else PROJECT_ID_BUFFER_LIST_KEY
298+
for shard in range(shards)
299+
]
300+
301+
project_ids_and_timestamps = self.buf.bulk_get_sorted_set(
302+
buffer_keys,
303+
min=0,
304+
max=datetime.datetime.now().timestamp(),
305+
)
306+
assert len(project_ids_and_timestamps) == 4
307+
assert set(project_ids_and_timestamps.keys()) == set(project_ids)
308+
309+
self.buf.delete_keys(
310+
buffer_keys,
311+
min=0,
312+
max=datetime.datetime.now().timestamp(),
313+
)
314+
project_ids_and_timestamps = self.buf.bulk_get_sorted_set(
315+
buffer_keys,
316+
min=0,
317+
max=datetime.datetime.now().timestamp(),
318+
)
319+
assert len(project_ids_and_timestamps) == 0
320+
321+
def test_bulk_sorted_set_single_key(self) -> None:
322+
project_ids = [1, 2, 2, 3, 3, 3, 4, 4, 4, 4]
323+
for id in project_ids:
324+
self.buf.push_to_sorted_set(key=PROJECT_ID_BUFFER_LIST_KEY, value=id)
325+
project_ids_and_timestamps = self.buf.bulk_get_sorted_set(
326+
[PROJECT_ID_BUFFER_LIST_KEY],
327+
min=0,
328+
max=datetime.datetime.now().timestamp(),
329+
)
330+
assert len(project_ids_and_timestamps) == 4
331+
assert set(project_ids_and_timestamps.keys()) == set(project_ids)
332+
284333
def test_buffer_hook_registry(self) -> None:
285334
"""Test that we can add an event to the registry and that the callback is invoked"""
286335
mock = Mock()

tests/sentry/rules/processing/test_buffer_processing.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,20 @@ def test_skips_processing_with_option(self, mock_process_in_batches) -> None:
245245
self.project_two.id,
246246
}
247247

248+
@patch("sentry.rules.processing.buffer_processing.process_in_batches")
249+
@patch("sentry.rules.processing.delayed_processing.DelayedRule.buffer_shards", 3)
250+
def test_fetches_with_shards(self, mock_process_in_batches: MagicMock) -> None:
251+
project = self.create_project()
252+
buffer.backend.push_to_sorted_set(key=f"{PROJECT_ID_BUFFER_LIST_KEY}", value=project.id)
253+
buffer.backend.push_to_sorted_set(
254+
key=f"{PROJECT_ID_BUFFER_LIST_KEY}:1", value=self.project_two.id
255+
)
256+
buffer.backend.push_to_sorted_set(key=f"{PROJECT_ID_BUFFER_LIST_KEY}:2", value=project.id)
257+
258+
process_buffer()
259+
260+
assert mock_process_in_batches.call_count == 3
261+
248262

249263
class ProcessInBatchesTest(CreateEventTestCase):
250264
def setUp(self) -> None:

0 commit comments

Comments
 (0)