@@ -157,7 +157,13 @@ func (s *SourceManager) EnumerateAndScan(ctx context.Context, sourceName string,
157
157
ctx := context .WithValues (ctx ,
158
158
"source_manager_worker_id" , common .RandomID (5 ),
159
159
)
160
- defer common .Recover (ctx )
160
+ defer common .RecoverWithHandler (ctx , func (err error ) {
161
+ progress .ReportError (Fatal {err })
162
+ select {
163
+ case s .firstErr <- err :
164
+ default :
165
+ }
166
+ })
161
167
defer cancel (nil )
162
168
if err := s .run (ctx , source , progress , targets ... ); err != nil {
163
169
select {
@@ -458,10 +464,13 @@ func (s *SourceManager) enumerateWithUnits(ctx context.Context, source SourceUni
458
464
459
465
// Produce units.
460
466
func () {
461
- // TODO: Catch panics and add to report.
462
467
report .StartEnumerating (time .Now ())
463
468
defer func () { report .EndEnumerating (time .Now ()) }()
464
469
ctx .Logger ().V (2 ).Info ("enumerating source with units" )
470
+ defer common .RecoverWithHandler (ctx , func (err error ) {
471
+ report .ReportError (Fatal {err })
472
+ catchFirstFatal (Fatal {err })
473
+ })
465
474
if err := source .Enumerate (ctx , reporter ); err != nil {
466
475
report .ReportError (Fatal {err })
467
476
catchFirstFatal (Fatal {err })
@@ -523,13 +532,17 @@ func (s *SourceManager) runWithUnits(ctx context.Context, source SourceUnitEnumC
523
532
default :
524
533
}
525
534
}
535
+
526
536
// Produce units.
527
537
go func () {
528
- // TODO: Catch panics and add to report.
529
538
report .StartEnumerating (time .Now ())
530
539
defer func () { report .EndEnumerating (time .Now ()) }()
531
540
defer close (unitReporter .unitCh )
532
541
ctx .Logger ().V (2 ).Info ("enumerating source" )
542
+ defer common .RecoverWithHandler (ctx , func (err error ) {
543
+ report .ReportError (Fatal {err })
544
+ catchFirstFatal (Fatal {err })
545
+ })
533
546
if err := source .Enumerate (ctx , unitReporter ); err != nil {
534
547
report .ReportError (Fatal {err })
535
548
catchFirstFatal (Fatal {err })
@@ -551,11 +564,14 @@ func (s *SourceManager) runWithUnits(ctx context.Context, source SourceUnitEnumC
551
564
// Consume units and produce chunks.
552
565
unitPool .Go (func () error {
553
566
report .StartUnitChunking (unit , time .Now ())
554
- // TODO: Catch panics and add to report.
555
567
defer close (chunkReporter .chunkCh )
556
568
id , kind := unit .SourceUnitID ()
557
569
ctx := context .WithValues (ctx , "unit_kind" , kind , "unit" , id )
558
570
ctx .Logger ().V (3 ).Info ("chunking unit" )
571
+ defer common .RecoverWithHandler (ctx , func (err error ) {
572
+ report .ReportError (Fatal {ChunkError {Unit : unit , Err : err }})
573
+ catchFirstFatal (Fatal {err })
574
+ })
559
575
if err := source .ChunkUnit (ctx , unit , chunkReporter ); err != nil {
560
576
report .ReportError (Fatal {ChunkError {Unit : unit , Err : err }})
561
577
catchFirstFatal (Fatal {err })
@@ -597,11 +613,14 @@ func (s *SourceManager) scanWithUnit(ctx context.Context, source SourceUnitChunk
597
613
var chunkErr error
598
614
go func () {
599
615
report .StartUnitChunking (unit , time .Now ())
600
- // TODO: Catch panics and add to report.
601
616
defer close (chunkReporter .chunkCh )
602
617
id , kind := unit .SourceUnitID ()
603
618
ctx := context .WithValues (ctx , "unit_kind" , kind , "unit" , id )
604
619
ctx .Logger ().V (3 ).Info ("chunking unit" )
620
+ defer common .RecoverWithHandler (ctx , func (err error ) {
621
+ report .ReportError (Fatal {ChunkError {Unit : unit , Err : err }})
622
+ chunkErr = Fatal {err }
623
+ })
605
624
if err := source .ChunkUnit (ctx , unit , chunkReporter ); err != nil {
606
625
report .ReportError (Fatal {ChunkError {Unit : unit , Err : err }})
607
626
chunkErr = Fatal {err }
0 commit comments