Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion snuba/lw_deletions/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(
self.__tables = storage.get_deletion_settings().tables
self.__formatter: Formatter = formatter
self.__metrics = metrics
self.__last_ongoing_mutations_check: Optional[float] = None

def poll(self) -> None:
self.__next_step.poll()
Expand Down Expand Up @@ -155,8 +156,17 @@ def _execute_delete(self, conditions: Sequence[ConditionsBag]) -> None:
raise LWDeleteQueryException(exc.message)

def _check_ongoing_mutations(self) -> None:
now = time.time()
if (
self.__last_ongoing_mutations_check is not None
and now - self.__last_ongoing_mutations_check < 1.0
):
raise TooManyOngoingMutationsError(
"ongoing mutations check is throttled to once per second"
)
start = time.time()
ongoing_mutations = _num_ongoing_mutations(self.__storage.get_cluster(), self.__tables)
self.__last_ongoing_mutations_check = time.time()
max_ongoing_mutations = typing.cast(
int,
get_int_config(
Expand All @@ -167,7 +177,6 @@ def _check_ongoing_mutations(self) -> None:
self.__metrics.timing("ongoing_mutations_query_ms", (time.time() - start) * 1000)
max_ongoing_mutations = int(settings.MAX_ONGOING_MUTATIONS_FOR_DELETE)
if ongoing_mutations > max_ongoing_mutations:

raise TooManyOngoingMutationsError(
f"{ongoing_mutations} mutations for {self.__tables} table(s) is above max ongoing mutations: {max_ongoing_mutations} "
)
Expand Down
Loading