Skip to content

Commit 2e01b2d

Browse files
authored
fix(reprocessing): Migrate event attachments from "remaining" events. (#28025)
1 parent d752f76 commit 2e01b2d

File tree

3 files changed

+50
-21
lines changed

3 files changed

+50
-21
lines changed

src/sentry/reprocessing2.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,22 @@
9898
_REDIS_SYNC_TTL = 3600 * 24
9999

100100

101-
# Note: Event attachments and group reports are migrated in save_event.
101+
# Group-related models are only a few per-group and are migrated at
102+
# once.
102103
GROUP_MODELS_TO_MIGRATE = DIRECT_GROUP_RELATED_MODELS + (models.Activity,)
103104

104105
# If we were to move groupinbox to the new, empty group, inbox would show the
105106
# empty, unactionable group while it is reprocessing. Let post-process take
106107
# care of assigning GroupInbox like normally.
107108
GROUP_MODELS_TO_MIGRATE = tuple(x for x in GROUP_MODELS_TO_MIGRATE if x != models.GroupInbox)
108109

110+
# Event attachments and group reports are per-event. This means that:
111+
#
112+
# 1. they are migrated as part of the processing pipeline (post-process/save-event)
113+
# 2. there are a lot of them per group. For remaining events, we need to chunk
114+
# up those queries for them to not get too slow
115+
EVENT_MODELS_TO_MIGRATE = (models.EventAttachment, models.UserReport)
116+
109117

110118
class CannotReprocess(Exception):
111119
pass

src/sentry/tasks/reprocessing2.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import sentry_sdk
55
from django.db import transaction
66

7-
from sentry import eventstore, eventstream, models, nodestore
7+
from sentry import eventstore, eventstream, nodestore
88
from sentry.eventstore.models import Event
99
from sentry.tasks.base import instrumented_task, retry
1010
from sentry.utils.query import celery_run_batch_query
@@ -158,14 +158,13 @@ def handle_remaining_events(
158158

159159
from sentry import buffer
160160
from sentry.models.group import Group
161+
from sentry.reprocessing2 import EVENT_MODELS_TO_MIGRATE
161162

162163
assert remaining_events in ("delete", "keep")
163164

164165
if remaining_events == "delete":
165-
models.EventAttachment.objects.filter(
166-
project_id=project_id, event_id__in=event_ids
167-
).delete()
168-
models.UserReport.objects.filter(project_id=project_id, event_id__in=event_ids).delete()
166+
for cls in EVENT_MODELS_TO_MIGRATE:
167+
cls.objects.filter(project_id=project_id, event_id__in=event_ids).delete()
169168

170169
# Remove from nodestore
171170
node_ids = [Event.generate_node_id(project_id, event_id) for event_id in event_ids]
@@ -176,6 +175,11 @@ def handle_remaining_events(
176175
project_id, event_ids, from_timestamp=from_timestamp, to_timestamp=to_timestamp
177176
)
178177
elif remaining_events == "keep":
178+
for cls in EVENT_MODELS_TO_MIGRATE:
179+
cls.objects.filter(project_id=project_id, event_id__in=event_ids).update(
180+
group_id=new_group_id
181+
)
182+
179183
eventstream.replace_group_unsafe(
180184
project_id,
181185
event_ids,

tests/sentry/tasks/test_reprocessing2.py

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,27 @@
2626
from sentry.utils.cache import cache_key_for_event
2727

2828

29+
def _create_event_attachment(evt, type):
30+
file = File.objects.create(name="foo", type=type)
31+
file.putfile(BytesIO(b"hello world"))
32+
EventAttachment.objects.create(
33+
event_id=evt.event_id,
34+
group_id=evt.group_id,
35+
project_id=evt.project_id,
36+
file_id=file.id,
37+
type=file.type,
38+
name="foo",
39+
)
40+
41+
42+
def _create_user_report(evt):
43+
UserReport.objects.create(
44+
project_id=evt.project_id,
45+
event_id=evt.event_id,
46+
name="User",
47+
)
48+
49+
2950
@pytest.fixture(autouse=True)
3051
def reprocessing_feature(monkeypatch):
3152
monkeypatch.setattr("sentry.tasks.reprocessing2.GROUP_REPROCESSING_CHUNK_SIZE", 1)
@@ -237,6 +258,9 @@ def event_preprocessor(data):
237258
event_id: eventstore.get_event_by_id(default_project.id, event_id) for event_id in event_ids
238259
}
239260

261+
for evt in old_events.values():
262+
_create_user_report(evt)
263+
240264
(group_id,) = {e.group_id for e in old_events.values()}
241265

242266
with burst_task_runner() as burst:
@@ -257,6 +281,12 @@ def event_preprocessor(data):
257281
elif remaining_events == "keep":
258282
assert event.group_id != group_id
259283
assert dict(event.data) == dict(old_events[event_id].data)
284+
assert (
285+
UserReport.objects.get(
286+
project_id=default_project.id, event_id=event_id
287+
).group_id
288+
!= group_id
289+
)
260290
else:
261291
raise ValueError(remaining_events)
262292
else:
@@ -314,22 +344,9 @@ def event_preprocessor(data):
314344

315345
for evt in (event, event_to_delete):
316346
for type in ("event.attachment", "event.minidump"):
317-
file = File.objects.create(name="foo", type=type)
318-
file.putfile(BytesIO(b"hello world"))
319-
EventAttachment.objects.create(
320-
event_id=evt.event_id,
321-
group_id=evt.group_id,
322-
project_id=default_project.id,
323-
file_id=file.id,
324-
type=file.type,
325-
name="foo",
326-
)
347+
_create_event_attachment(evt, type)
327348

328-
UserReport.objects.create(
329-
project_id=default_project.id,
330-
event_id=evt.event_id,
331-
name="User",
332-
)
349+
_create_user_report(evt)
333350

334351
with burst_task_runner() as burst:
335352
reprocess_group(default_project.id, event.group_id, max_events=1)

0 commit comments

Comments
 (0)