@@ -35,6 +35,7 @@ type HistoryQueueOpt struct {
35
35
}
36
36
37
37
type HistoryQueue struct {
38
+ // mu protects active, refs and deleted maps
38
39
mu sync.Mutex
39
40
initOnce sync.Once
40
41
HistoryQueueOpt
@@ -559,6 +560,9 @@ func (h *HistoryQueue) Listen(ctx context.Context, req *controlapi.BuildHistoryR
559
560
if req .Ref != "" && e .Ref != req .Ref {
560
561
continue
561
562
}
563
+ if _ , ok := h .deleted [e .Ref ]; ok {
564
+ continue
565
+ }
562
566
sub .ps .Send (& controlapi.BuildHistoryEvent {
563
567
Type : controlapi .BuildHistoryEventType_STARTED ,
564
568
Record : e ,
@@ -568,6 +572,7 @@ func (h *HistoryQueue) Listen(ctx context.Context, req *controlapi.BuildHistoryR
568
572
h .mu .Unlock ()
569
573
570
574
if ! req .ActiveOnly {
575
+ events := []* controlapi.BuildHistoryEvent {}
571
576
if err := h .DB .View (func (tx * bolt.Tx ) error {
572
577
b := tx .Bucket ([]byte (recordsBucket ))
573
578
if b == nil {
@@ -581,17 +586,31 @@ func (h *HistoryQueue) Listen(ctx context.Context, req *controlapi.BuildHistoryR
581
586
if err := br .Unmarshal (dt ); err != nil {
582
587
return errors .Wrapf (err , "failed to unmarshal build record %s" , key )
583
588
}
584
- if err := f ( & controlapi.BuildHistoryEvent {
589
+ events = append ( events , & controlapi.BuildHistoryEvent {
585
590
Record : & br ,
586
591
Type : controlapi .BuildHistoryEventType_COMPLETE ,
587
- }); err != nil {
588
- return err
589
- }
592
+ })
590
593
return nil
591
594
})
592
595
}); err != nil {
593
596
return err
594
597
}
598
+ // filter out records that have been marked for deletion
599
+ h .mu .Lock ()
600
+ for i , e := range events {
601
+ if _ , ok := h .deleted [e .Record .Ref ]; ok {
602
+ events [i ] = nil
603
+ }
604
+ }
605
+ h .mu .Unlock ()
606
+ for _ , e := range events {
607
+ if e .Record == nil {
608
+ continue
609
+ }
610
+ if err := f (e ); err != nil {
611
+ return err
612
+ }
613
+ }
595
614
}
596
615
597
616
if req .EarlyExit {
0 commit comments