@@ -168,10 +168,11 @@ def migrate_events(
168
168
opt_destination_id : int | None ,
169
169
opt_eventstream_state : Mapping [str , Any ] | None ,
170
170
) -> tuple [int , Mapping [str , Any ]]:
171
+ extra = {"source_id" : args .source_id , "project_id" : project .id }
171
172
logger .info (
172
173
"migrate_events.start" ,
173
174
extra = {
174
- "source_id" : args . source_id ,
175
+ ** extra ,
175
176
"opt_destination_id" : opt_destination_id ,
176
177
"migrate_args" : args ,
177
178
},
@@ -202,7 +203,7 @@ def migrate_events(
202
203
destination .update (** get_group_backfill_attributes (caches , destination , events ))
203
204
204
205
update_open_periods (source , destination )
205
- logger .info ("migrate_events.migrate" , extra = {"destination_id" : destination_id })
206
+ logger .info ("migrate_events.migrate" , extra = {** extra , "destination_id" : destination_id })
206
207
207
208
if isinstance (args , InitialUnmergeArgs ) or opt_eventstream_state is None :
208
209
eventstream_state = args .replacement .start_snuba_replacement (
@@ -522,6 +523,8 @@ def unlock_hashes(project_id: int, locked_primary_hashes: Sequence[str]) -> None
522
523
)
523
524
def unmerge (* posargs : Any , ** kwargs : Any ) -> None :
524
525
args = UnmergeArgsBase .parse_arguments (* posargs , ** kwargs )
526
+ extra = {"source_id" : args .source_id , "project_id" : args .project_id }
527
+ logger .info ("unmerge.start.task" , extra = extra )
525
528
526
529
source = Group .objects .get (project_id = args .project_id , id = args .source_id )
527
530
@@ -552,22 +555,14 @@ def unmerge(*posargs: Any, **kwargs: Any) -> None:
552
555
# Convert Event objects to GroupEvent objects
553
556
events : list [GroupEvent ] = [event .for_group (source ) for event in raw_events ]
554
557
# Log info related to this unmerge
555
- logger .info (
556
- "unmerge.check" ,
557
- extra = {
558
- "source_id" : source .id ,
559
- "num_events" : len (events ),
560
- },
561
- )
558
+ logger .info ("unmerge.check" , extra = {** extra , "num_events" : len (events )})
562
559
563
560
# If there are no more events to process, we're done with the migration.
564
561
if not events :
565
562
unlock_hashes (args .project_id , locked_primary_hashes )
566
- for unmerge_key , (group_id , eventstream_state ) in args .destinations .items ():
563
+ for unmerge_key , (_ , eventstream_state ) in args .destinations .items ():
567
564
logger .warning (
568
- "Unmerge complete (eventstream state: %s)" ,
569
- eventstream_state ,
570
- extra = {"source_id" : source .id },
565
+ "Unmerge complete (eventstream state: %s)" , eventstream_state , extra = extra
571
566
)
572
567
if eventstream_state :
573
568
args .replacement .stop_snuba_replacement (eventstream_state )
@@ -597,7 +592,7 @@ def unmerge(*posargs: Any, **kwargs: Any) -> None:
597
592
logger .info (
598
593
"unmerge.destinations" ,
599
594
extra = {
600
- "source_id" : source . id ,
595
+ ** extra ,
601
596
"source_events" : len (source_events ),
602
597
"destination_events" : len (destination_events ),
603
598
"source_fields_reset" : source_fields_reset ,
0 commit comments