Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions src/sentry/tasks/unmerge.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,11 @@ def migrate_events(
opt_destination_id: int | None,
opt_eventstream_state: Mapping[str, Any] | None,
) -> tuple[int, Mapping[str, Any]]:
extra = {"source_id": args.source_id, "project_id": project.id}
logger.info(
"migrate_events.start",
extra={
"source_id": args.source_id,
**extra,
"opt_destination_id": opt_destination_id,
"migrate_args": args,
},
Expand Down Expand Up @@ -202,7 +203,7 @@ def migrate_events(
destination.update(**get_group_backfill_attributes(caches, destination, events))

update_open_periods(source, destination)
logger.info("migrate_events.migrate", extra={"destination_id": destination_id})
logger.info("migrate_events.migrate", extra={**extra, "destination_id": destination_id})

if isinstance(args, InitialUnmergeArgs) or opt_eventstream_state is None:
eventstream_state = args.replacement.start_snuba_replacement(
Expand Down Expand Up @@ -522,6 +523,8 @@ def unlock_hashes(project_id: int, locked_primary_hashes: Sequence[str]) -> None
)
def unmerge(*posargs: Any, **kwargs: Any) -> None:
args = UnmergeArgsBase.parse_arguments(*posargs, **kwargs)
extra = {"source_id": args.source_id, "project_id": args.project_id}
logger.info("unmerge.start.task", extra=extra)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe @nora-shap and I would have benefited from having this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would have helped me and @nora-shap when we were debugging.


source = Group.objects.get(project_id=args.project_id, id=args.source_id)

Expand Down Expand Up @@ -552,22 +555,14 @@ def unmerge(*posargs: Any, **kwargs: Any) -> None:
# Convert Event objects to GroupEvent objects
events: list[GroupEvent] = [event.for_group(source) for event in raw_events]
# Log info related to this unmerge
logger.info(
"unmerge.check",
extra={
"source_id": source.id,
"num_events": len(events),
},
)
logger.info("unmerge.check", extra={**extra, "num_events": len(events)})

# If there are no more events to process, we're done with the migration.
if not events:
unlock_hashes(args.project_id, locked_primary_hashes)
for unmerge_key, (group_id, eventstream_state) in args.destinations.items():
for unmerge_key, (_, eventstream_state) in args.destinations.items():
logger.warning(
"Unmerge complete (eventstream state: %s)",
eventstream_state,
extra={"source_id": source.id},
"Unmerge complete (eventstream state: %s)", eventstream_state, extra=extra
)
if eventstream_state:
args.replacement.stop_snuba_replacement(eventstream_state)
Expand Down Expand Up @@ -597,7 +592,7 @@ def unmerge(*posargs: Any, **kwargs: Any) -> None:
logger.info(
"unmerge.destinations",
extra={
"source_id": source.id,
**extra,
"source_events": len(source_events),
"destination_events": len(destination_events),
"source_fields_reset": source_fields_reset,
Expand Down
Loading