Skip to content

Commit 35c0a62

Browse files
authored
tests(deletions): Add CustomTaskQueue to enable testing (#103974)
The changes includes allow replacing the task_queue with a custom class to support testing of cleanup functions.
1 parent e83058a commit 35c0a62

File tree

2 files changed

+103
-31
lines changed

2 files changed

+103
-31
lines changed

src/sentry/runner/commands/cleanup.py

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
logger = logging.getLogger(__name__)
2626

2727
TRANSACTION_PREFIX = "cleanup"
28+
DELETES_BY_PROJECT_CHUNK_SIZE = 100
2829

2930
if TYPE_CHECKING:
3031
from sentry.db.models.base import BaseModel
@@ -85,28 +86,15 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None:
8586
# Configure within each Process
8687
import logging
8788

88-
from sentry.utils.imports import import_string
89-
9089
logger = logging.getLogger("sentry.cleanup")
9190

9291
from sentry.runner import configure
9392

9493
configure()
9594

96-
from sentry import deletions, models, options, similarity
95+
from sentry import options
9796
from sentry.utils import metrics
9897

99-
skip_child_relations_models = [
100-
# Handled by other parts of cleanup
101-
models.EventAttachment,
102-
models.UserReport,
103-
models.Group,
104-
models.GroupEmailThread,
105-
models.GroupRuleStatus,
106-
# Handled by TTL
107-
similarity,
108-
]
109-
11098
while True:
11199
j = task_queue.get()
112100
if j == _STOP_WORKER:
@@ -124,21 +112,7 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None:
124112
with sentry_sdk.start_transaction(
125113
op="cleanup", name=f"{TRANSACTION_PREFIX}.multiprocess_worker"
126114
):
127-
model = import_string(model_name)
128-
task = deletions.get(
129-
model=model,
130-
query={"id__in": chunk},
131-
skip_models=skip_child_relations_models,
132-
transaction_id=uuid4().hex,
133-
)
134-
135-
while True:
136-
debug_output(f"Processing chunk of {len(chunk)} {model_name} objects")
137-
metrics.incr(
138-
"cleanup.chunk_processed", tags={"model": model_name}, amount=len(chunk)
139-
)
140-
if not task.chunk(apply_filter=True):
141-
break
115+
task_execution(model_name, chunk)
142116
except Exception:
143117
metrics.incr(
144118
"cleanup.error",
@@ -154,6 +128,37 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None:
154128
task_queue.task_done()
155129

156130

131+
def task_execution(model_name: str, chunk: tuple[int, ...]) -> None:
132+
from sentry import deletions, models, similarity
133+
from sentry.utils import metrics
134+
from sentry.utils.imports import import_string
135+
136+
skip_child_relations_models = [
137+
# Handled by other parts of cleanup
138+
models.EventAttachment,
139+
models.UserReport,
140+
models.Group,
141+
models.GroupEmailThread,
142+
models.GroupRuleStatus,
143+
# Handled by TTL
144+
similarity,
145+
]
146+
147+
model = import_string(model_name)
148+
task = deletions.get(
149+
model=model,
150+
query={"id__in": chunk},
151+
skip_models=skip_child_relations_models,
152+
transaction_id=uuid4().hex,
153+
)
154+
155+
while True:
156+
debug_output(f"Processing chunk of {len(chunk)} {model_name} objects")
157+
metrics.incr("cleanup.chunk_processed", tags={"model": model_name}, amount=len(chunk))
158+
if not task.chunk(apply_filter=True):
159+
break
160+
161+
157162
@click.command()
158163
@click.option("--days", default=30, show_default=True, help="Numbers of days to truncate on.")
159164
@click.option("--project", help="Limit truncation to only entries from project.")
@@ -760,7 +765,7 @@ def run_bulk_deletes_by_project(
760765
order_by=order_by,
761766
)
762767

763-
for chunk in q.iterator(chunk_size=100):
768+
for chunk in q.iterator(chunk_size=DELETES_BY_PROJECT_CHUNK_SIZE):
764769
task_queue.put((imp, chunk))
765770
except Exception:
766771
capture_exception(

tests/sentry/runner/commands/test_cleanup.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,35 @@
11
from __future__ import annotations
22

3+
from unittest.mock import patch
4+
35
from sentry.constants import ObjectStatus
4-
from sentry.runner.commands.cleanup import prepare_deletes_by_project
6+
from sentry.models.group import Group
7+
from sentry.runner.commands.cleanup import (
8+
prepare_deletes_by_project,
9+
run_bulk_deletes_by_project,
10+
task_execution,
11+
)
512
from sentry.silo.base import SiloMode
613
from sentry.testutils.cases import TestCase
14+
from sentry.testutils.helpers.datetime import before_now
715
from sentry.testutils.silo import assume_test_silo_mode
816

917

18+
class SynchronousTaskQueue:
19+
"""Mock task queue that partially implements the _WorkQueue protocol but executes tasks synchronously."""
20+
21+
def __init__(self) -> None:
22+
# You can use this to inspect the calls to the queue.
23+
self.put_calls: list[tuple[str, tuple[int, ...]]] = []
24+
25+
def put(self, item: tuple[str, tuple[int, ...]]) -> None:
26+
self.put_calls.append(item)
27+
task_execution(item[0], item[1])
28+
29+
def join(self) -> None:
30+
pass
31+
32+
1033
class PrepareDeletesByProjectTest(TestCase):
1134
def test_no_filters(self) -> None:
1235
"""Test that without filters, all active projects are included."""
@@ -104,3 +127,47 @@ def test_region_silo_mode_returns_projects(self) -> None:
104127
assert sorted(project_ids) == [project1.id, project2.id]
105128
# Should have model tuples to delete
106129
assert len(to_delete) > 0
130+
131+
132+
class RunBulkQueryDeletesByProjectTest(TestCase):
133+
def test_run_bulk_query_deletes_by_project(self) -> None:
134+
"""Test that the function runs bulk query deletes by project as expected."""
135+
days = 30
136+
# Creating the groups in out of order to test that the chunks are created in the correct order.
137+
self.create_group(last_seen=before_now(days=days + 4))
138+
self.create_group()
139+
self.create_group(last_seen=before_now(days=days + 2))
140+
self.create_group(last_seen=before_now(days=days + 3))
141+
142+
assert Group.objects.count() == 4
143+
assert Group.objects.filter(last_seen__lt=before_now(days=days)).count() == 3
144+
ids = list(
145+
Group.objects.filter(last_seen__lt=before_now(days=days)).values_list("id", flat=True)
146+
)
147+
148+
with (
149+
assume_test_silo_mode(SiloMode.REGION),
150+
patch("sentry.runner.commands.cleanup.DELETES_BY_PROJECT_CHUNK_SIZE", 2),
151+
):
152+
task_queue = SynchronousTaskQueue()
153+
154+
models_attempted: set[str] = set()
155+
run_bulk_deletes_by_project(
156+
task_queue=task_queue, # type: ignore[arg-type] # It partially implements the queue protocol
157+
project_id=None,
158+
start_from_project_id=None,
159+
is_filtered=lambda model: False,
160+
days=days,
161+
models_attempted=models_attempted,
162+
)
163+
assert models_attempted == {"group", "projectdebugfile"}
164+
165+
assert len(task_queue.put_calls) == 2
166+
# Verify we deleted all expected groups (order may vary due to non-unique last_seen)
167+
all_deleted_ids: set[int] = set()
168+
for call in task_queue.put_calls:
169+
assert call[0] == "sentry.models.group.Group"
170+
all_deleted_ids.update(call[1])
171+
assert all_deleted_ids == set(ids)
172+
173+
assert Group.objects.all().count() == 1

0 commit comments

Comments
 (0)