@@ -35,6 +35,7 @@ type HistoryQueueOpt struct {
35
35
}
36
36
37
37
type HistoryQueue struct {
38
+ // mu protects active, refs and deleted maps
38
39
mu sync.Mutex
39
40
initOnce sync.Once
40
41
HistoryQueueOpt
@@ -132,6 +133,10 @@ func (h *HistoryQueue) delete(ref string, sync bool) error {
132
133
return nil
133
134
}
134
135
delete (h .deleted , ref )
136
+ h .ps .Send (& controlapi.BuildHistoryEvent {
137
+ Type : controlapi .BuildHistoryEventType_DELETED ,
138
+ Record : & controlapi.BuildHistoryRecord {Ref : ref },
139
+ })
135
140
if err := h .DB .Update (func (tx * bolt.Tx ) error {
136
141
b := tx .Bucket ([]byte (recordsBucket ))
137
142
if b == nil {
@@ -559,7 +564,10 @@ func (h *HistoryQueue) Listen(ctx context.Context, req *controlapi.BuildHistoryR
559
564
if req .Ref != "" && e .Ref != req .Ref {
560
565
continue
561
566
}
562
- sub .ps .Send (& controlapi.BuildHistoryEvent {
567
+ if _ , ok := h .deleted [e .Ref ]; ok {
568
+ continue
569
+ }
570
+ sub .send (& controlapi.BuildHistoryEvent {
563
571
Type : controlapi .BuildHistoryEventType_STARTED ,
564
572
Record : e ,
565
573
})
@@ -568,6 +576,7 @@ func (h *HistoryQueue) Listen(ctx context.Context, req *controlapi.BuildHistoryR
568
576
h .mu .Unlock ()
569
577
570
578
if ! req .ActiveOnly {
579
+ events := []* controlapi.BuildHistoryEvent {}
571
580
if err := h .DB .View (func (tx * bolt.Tx ) error {
572
581
b := tx .Bucket ([]byte (recordsBucket ))
573
582
if b == nil {
@@ -581,17 +590,31 @@ func (h *HistoryQueue) Listen(ctx context.Context, req *controlapi.BuildHistoryR
581
590
if err := br .Unmarshal (dt ); err != nil {
582
591
return errors .Wrapf (err , "failed to unmarshal build record %s" , key )
583
592
}
584
- if err := f ( & controlapi.BuildHistoryEvent {
593
+ events = append ( events , & controlapi.BuildHistoryEvent {
585
594
Record : & br ,
586
595
Type : controlapi .BuildHistoryEventType_COMPLETE ,
587
- }); err != nil {
588
- return err
589
- }
596
+ })
590
597
return nil
591
598
})
592
599
}); err != nil {
593
600
return err
594
601
}
602
+ // filter out records that have been marked for deletion
603
+ h .mu .Lock ()
604
+ for i , e := range events {
605
+ if _ , ok := h .deleted [e .Record .Ref ]; ok {
606
+ events [i ] = nil
607
+ }
608
+ }
609
+ h .mu .Unlock ()
610
+ for _ , e := range events {
611
+ if e .Record == nil {
612
+ continue
613
+ }
614
+ if err := f (e ); err != nil {
615
+ return err
616
+ }
617
+ }
595
618
}
596
619
597
620
if req .EarlyExit {
0 commit comments