Skip to content

Commit 5d4742a

Browse files
authored
fix(aci): Make rule/workflow redis buffer interactions non-transactional (#97917)
Clustered redis doesn't support transactional pipelines, which are the default in the Buffer code. However, we aren't too reliant on the transactionality; in most cases, we're performing single operations and adding an expiry. We don't rely on expiry for key removal, and remove or clear them explicitly. The main multi-key cases where this might have impact are cases in which partial keysets are arguably more correct than no key at all. We still need to be careful, but on the whole, this seems safe and is necessary to move to a standard cluster.
1 parent 29e47e1 commit 5d4742a

File tree

1 file changed

+13
-11
lines changed

1 file changed

+13
-11
lines changed

src/sentry/buffer/redis.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -332,12 +332,12 @@ def get_redis_connection(self, key: str, transaction: bool = True) -> Pipeline:
332332
pipe = conn.pipeline(transaction=transaction)
333333
return pipe
334334

335-
def _execute_redis_operation(
335+
def _execute_redis_operation_no_txn(
336336
self, key: str, operation: RedisOperation, *args: Any, **kwargs: Any
337337
) -> Any:
338338
metrics_str = f"redis_buffer.{operation.value}"
339339
metrics.incr(metrics_str)
340-
pipe = self.get_redis_connection(self.pending_key)
340+
pipe = self.get_redis_connection(self.pending_key, transaction=False)
341341
getattr(pipe, operation.value)(key, *args, **kwargs)
342342
if args:
343343
pipe.expire(key, self.key_expire)
@@ -356,7 +356,7 @@ def _execute_sharded_redis_operation(
356356

357357
metrics_str = f"redis_buffer.{operation.value}"
358358
metrics.incr(metrics_str, amount=len(keys))
359-
pipe = self.get_redis_connection(self.pending_key)
359+
pipe = self.get_redis_connection(self.pending_key, transaction=False)
360360
for key in keys:
361361
getattr(pipe, operation.value)(key, *args, **kwargs)
362362
if args:
@@ -369,10 +369,10 @@ def push_to_sorted_set(self, key: str, value: list[int] | int) -> None:
369369
value_dict = {v: now for v in value}
370370
else:
371371
value_dict = {value: now}
372-
self._execute_redis_operation(key, RedisOperation.SORTED_SET_ADD, value_dict)
372+
self._execute_redis_operation_no_txn(key, RedisOperation.SORTED_SET_ADD, value_dict)
373373

374374
def get_sorted_set(self, key: str, min: float, max: float) -> list[tuple[int, float]]:
375-
redis_set = self._execute_redis_operation(
375+
redis_set = self._execute_redis_operation_no_txn(
376376
key,
377377
RedisOperation.SORTED_SET_GET_RANGE,
378378
min=min,
@@ -410,7 +410,9 @@ def bulk_get_sorted_set(
410410
return data_to_timestamps
411411

412412
def delete_key(self, key: str, min: float, max: float) -> None:
413-
self._execute_redis_operation(key, RedisOperation.SORTED_SET_DELETE_RANGE, min=min, max=max)
413+
self._execute_redis_operation_no_txn(
414+
key, RedisOperation.SORTED_SET_DELETE_RANGE, min=min, max=max
415+
)
414416

415417
def delete_keys(self, keys: list[str], min: float, max: float) -> None:
416418
self._execute_sharded_redis_operation(
@@ -427,7 +429,7 @@ def delete_hash(
427429
fields: list[str],
428430
) -> None:
429431
key = self._make_key(model, filters)
430-
pipe = self.get_redis_connection(self.pending_key)
432+
pipe = self.get_redis_connection(self.pending_key, transaction=False)
431433
for field in fields:
432434
getattr(pipe, RedisOperation.HASH_DELETE.value)(key, field)
433435
pipe.expire(key, self.key_expire)
@@ -441,7 +443,7 @@ def push_to_hash(
441443
value: str,
442444
) -> None:
443445
key = self._make_key(model, filters)
444-
self._execute_redis_operation(key, RedisOperation.HASH_ADD, field, value)
446+
self._execute_redis_operation_no_txn(key, RedisOperation.HASH_ADD, field, value)
445447

446448
def push_to_hash_bulk(
447449
self,
@@ -450,11 +452,11 @@ def push_to_hash_bulk(
450452
data: dict[str, str],
451453
) -> None:
452454
key = self._make_key(model, filters)
453-
self._execute_redis_operation(key, RedisOperation.HASH_ADD_BULK, data)
455+
self._execute_redis_operation_no_txn(key, RedisOperation.HASH_ADD_BULK, data)
454456

455457
def get_hash(self, model: type[models.Model], field: dict[str, BufferField]) -> dict[str, str]:
456458
key = self._make_key(model, field)
457-
redis_hash = self._execute_redis_operation(key, RedisOperation.HASH_GET_ALL)
459+
redis_hash = self._execute_redis_operation_no_txn(key, RedisOperation.HASH_GET_ALL)
458460
decoded_hash = {}
459461
for k, v in redis_hash.items():
460462
if isinstance(k, bytes):
@@ -467,7 +469,7 @@ def get_hash(self, model: type[models.Model], field: dict[str, BufferField]) ->
467469

468470
def get_hash_length(self, model: type[models.Model], field: dict[str, BufferField]) -> int:
469471
key = self._make_key(model, field)
470-
return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH)
472+
return self._execute_redis_operation_no_txn(key, RedisOperation.HASH_LENGTH)
471473

472474
def incr(
473475
self,

0 commit comments

Comments
 (0)