Skip to content

Commit c45cd8f

Browse files
committed
history: handle gracefulstop when history is active
When GracefulStop is called gRPC waits for current requests to finish before closing. While this is generally the behavior we want, it is not always same for the History.Listen endpoint. That endpoint is usually open even if buildkit is not actively processing any builds, because client may be waiting for new events. The new logic is that if GracefulStop will happen, history will close active listeners if there are no active builds. If there are active builds then active listeners will be closed after all the active builds have completed their finalizers. Signed-off-by: Tonis Tiigi <[email protected]>
1 parent 9dd9f64 commit c45cd8f

File tree

3 files changed

+36
-2
lines changed

3 files changed

+36
-2
lines changed

cmd/buildkitd/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ func main() {
341341
return err
342342
}
343343

344-
controller, err := newController(c, &cfg)
344+
controller, err := newController(ctx, c, &cfg)
345345
if err != nil {
346346
return err
347347
}
@@ -758,7 +758,7 @@ func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) {
758758
return tlsConf, nil
759759
}
760760

761-
func newController(c *cli.Context, cfg *config.Config) (*control.Controller, error) {
761+
func newController(ctx context.Context, c *cli.Context, cfg *config.Config) (*control.Controller, error) {
762762
sessionManager, err := session.NewManager()
763763
if err != nil {
764764
return nil, err
@@ -851,6 +851,7 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err
851851
ContentStore: w.ContentStore(),
852852
HistoryConfig: cfg.History,
853853
GarbageCollect: w.GarbageCollect,
854+
GracefulStop: ctx.Done(),
854855
})
855856
}
856857

control/control.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ type Opt struct {
7171
ContentStore *containerdsnapshot.Store
7272
HistoryConfig *config.HistoryConfig
7373
GarbageCollect func(context.Context) error
74+
GracefulStop <-chan struct{}
7475
}
7576

7677
type Controller struct { // TODO: ControlService
@@ -95,6 +96,7 @@ func NewController(opt Opt) (*Controller, error) {
9596
ContentStore: opt.ContentStore,
9697
CleanConfig: opt.HistoryConfig,
9798
GarbageCollect: opt.GarbageCollect,
99+
GracefulStop: opt.GracefulStop,
98100
})
99101
if err != nil {
100102
return nil, errors.Wrap(err, "failed to create history queue")

solver/llbsolver/history.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type HistoryQueueOpt struct {
4747
ContentStore *containerdsnapshot.Store
4848
CleanConfig *config.HistoryConfig
4949
GarbageCollect func(context.Context) error
50+
GracefulStop <-chan struct{}
5051
}
5152

5253
type HistoryQueue struct {
@@ -137,6 +138,16 @@ func NewHistoryQueue(opt HistoryQueueOpt) (*HistoryQueue, error) {
137138
}
138139
}()
139140

141+
go func() {
142+
<-h.opt.GracefulStop
143+
h.mu.Lock()
144+
defer h.mu.Unlock()
145+
// if active builds then close will happen in finalizer
146+
if len(h.finalizers) == 0 && len(h.active) == 0 {
147+
go h.ps.Close()
148+
}
149+
}()
150+
140151
return h, nil
141152
}
142153

@@ -637,6 +648,14 @@ func (h *HistoryQueue) AcquireFinalizer(ref string) (<-chan struct{}, func()) {
637648
<-f.done
638649
h.mu.Lock()
639650
delete(h.finalizers, ref)
651+
// if gracefulstop then release listeners after finalize
652+
if len(h.finalizers) == 0 {
653+
select {
654+
case <-h.opt.GracefulStop:
655+
go h.ps.Close()
656+
default:
657+
}
658+
}
640659
h.mu.Unlock()
641660
}()
642661
return trigger, sync.OnceFunc(func() {
@@ -1032,6 +1051,18 @@ func (p *pubsub[T]) Send(v T) {
10321051
p.mu.Unlock()
10331052
}
10341053

1054+
func (p *pubsub[T]) Close() {
1055+
p.mu.Lock()
1056+
channels := make([]*channel[T], 0, len(p.m))
1057+
for c := range p.m {
1058+
channels = append(channels, c)
1059+
}
1060+
p.mu.Unlock()
1061+
for _, c := range channels {
1062+
c.close()
1063+
}
1064+
}
1065+
10351066
type channel[T any] struct {
10361067
ps *pubsub[T]
10371068
ch chan T

0 commit comments

Comments
 (0)