Skip to content

Commit 34e59ca

Browse files
committed
progress: fix progress writer pause and unpause to prevent panics
This changes the progress printer's pause and unpause implementation to be reentrant to prevent race conditions and it also allows the status updates to be buffered when the display is paused. The previous implementation mixed the pause implementation with the finish implementation and could cause a send on closed channel panic because it could close the status channel before it had finished being used. Now, the status channel is not closed. When the display is enabled, the status channel will be forwarded to an internal channel that is used to display the updates. When the display is paused, the status channel will have the statuses buffered in memory to be sent when the progress display is resumed. The `Unpause` method has also been renamed to `Resume`. Signed-off-by: Jonathan A. Sternberg <[email protected]>
1 parent 02ab492 commit 34e59ca

File tree

2 files changed

+148
-45
lines changed

2 files changed

+148
-45
lines changed

monitor/monitor.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,8 @@ func (m *Monitor) Close() error {
113113

114114
// RunMonitor provides an interactive session for running and managing containers via specified IO.
115115
func RunMonitor(ctx context.Context, invokeConfig *build.InvokeConfig, rCtx *build.ResultHandle, stdin io.ReadCloser, stdout, stderr io.WriteCloser, progress *progress.Printer) error {
116-
if err := progress.Pause(); err != nil {
117-
return err
118-
}
119-
defer progress.Unpause()
116+
progress.Pause()
117+
defer progress.Resume()
120118

121119
defer stdin.Close()
122120

@@ -475,10 +473,10 @@ func printError(err error, printer *progress.Printer) error {
475473
if err == nil {
476474
return nil
477475
}
478-
if err := printer.Pause(); err != nil {
479-
return err
480-
}
481-
defer printer.Unpause()
476+
477+
printer.Pause()
478+
defer printer.Resume()
479+
482480
for _, s := range errdefs.Sources(err) {
483481
s.Print(os.Stderr)
484482
}

util/progress/printer.go

Lines changed: 142 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,24 @@ import (
1616
"go.opentelemetry.io/otel/metric"
1717
)
1818

19+
type printerState int
20+
21+
const (
22+
printerStateDone printerState = iota
23+
printerStateRunning
24+
printerStatePaused
25+
)
26+
1927
type Printer struct {
20-
status chan *client.SolveStatus
28+
out console.File
29+
mode progressui.DisplayMode
30+
opt *printerOpts
31+
32+
status chan *client.SolveStatus
33+
interrupt chan interruptRequest
34+
state printerState
2135

22-
ready chan struct{}
2336
done chan struct{}
24-
paused chan struct{}
2537
closeOnce sync.Once
2638

2739
err error
@@ -54,13 +66,23 @@ func (p *Printer) IsDone() bool {
5466
}
5567

5668
func (p *Printer) Pause() error {
57-
p.paused = make(chan struct{})
58-
return p.Wait()
69+
done := make(chan struct{})
70+
p.interrupt <- interruptRequest{
71+
desiredState: printerStatePaused,
72+
done: done,
73+
}
74+
75+
// Need to wait for a response to confirm we have control
76+
// of the console output.
77+
<-done
78+
return nil
5979
}
6080

61-
func (p *Printer) Unpause() {
62-
close(p.paused)
63-
<-p.ready
81+
func (p *Printer) Resume() {
82+
p.interrupt <- interruptRequest{
83+
desiredState: printerStateRunning,
84+
}
85+
// Do not care about waiting for a response.
6486
}
6587

6688
func (p *Printer) Write(s *client.SolveStatus) {
@@ -115,42 +137,114 @@ func NewPrinter(ctx context.Context, out console.File, mode progressui.DisplayMo
115137
}
116138

117139
pw := &Printer{
118-
ready: make(chan struct{}),
119-
metrics: opt.mw,
140+
out: out,
141+
mode: mode,
142+
opt: opt,
143+
status: make(chan *client.SolveStatus),
144+
interrupt: make(chan interruptRequest),
145+
state: printerStateRunning,
146+
done: make(chan struct{}),
120147
}
148+
go pw.run(ctx, d)
149+
150+
return pw, nil
151+
}
152+
153+
func (p *Printer) run(ctx context.Context, d progressui.Display) {
154+
defer close(p.done)
155+
defer close(p.interrupt)
156+
157+
var ss []*client.SolveStatus
158+
for p.state != printerStateDone {
159+
switch p.state {
160+
case printerStatePaused:
161+
ss, p.err = p.bufferDisplay(ctx, ss)
162+
case printerStateRunning:
163+
var warnings []client.VertexWarning
164+
warnings, ss, p.err = p.updateDisplay(ctx, d, ss)
165+
p.warnings = append(p.warnings, warnings...)
166+
167+
d, _ = p.newDisplay()
168+
}
169+
}
170+
171+
if p.opt.onclose != nil {
172+
p.opt.onclose()
173+
}
174+
}
175+
176+
func (p *Printer) newDisplay() (progressui.Display, error) {
177+
return progressui.NewDisplay(p.out, p.mode, p.opt.displayOpts...)
178+
}
179+
180+
func (p *Printer) updateDisplay(ctx context.Context, d progressui.Display, ss []*client.SolveStatus) ([]client.VertexWarning, []*client.SolveStatus, error) {
181+
p.logMu.Lock()
182+
p.logSourceMap = map[digest.Digest]any{}
183+
p.logMu.Unlock()
184+
185+
resumeLogs := logutil.Pause(logrus.StandardLogger())
186+
defer resumeLogs()
187+
188+
interruptCh := make(chan interruptRequest, 1)
189+
ingress := make(chan *client.SolveStatus)
190+
121191
go func() {
192+
defer close(ingress)
193+
defer close(interruptCh)
194+
195+
for _, s := range ss {
196+
ingress <- s
197+
}
198+
122199
for {
123-
pw.status = make(chan *client.SolveStatus)
124-
pw.done = make(chan struct{})
125-
pw.closeOnce = sync.Once{}
126-
127-
pw.logMu.Lock()
128-
pw.logSourceMap = map[digest.Digest]any{}
129-
pw.logMu.Unlock()
130-
131-
resumeLogs := logutil.Pause(logrus.StandardLogger())
132-
close(pw.ready)
133-
// not using shared context to not disrupt display but let is finish reporting errors
134-
pw.warnings, pw.err = d.UpdateFrom(ctx, pw.status)
135-
resumeLogs()
136-
close(pw.done)
137-
138-
if opt.onclose != nil {
139-
opt.onclose()
140-
}
141-
if pw.paused == nil {
142-
break
200+
select {
201+
case s, ok := <-p.status:
202+
if !ok {
203+
return
204+
}
205+
ingress <- s
206+
case req := <-p.interrupt:
207+
interruptCh <- req
208+
return
209+
case <-ctx.Done():
210+
return
143211
}
212+
}
213+
}()
214+
215+
warnings, err := d.UpdateFrom(context.Background(), ingress)
216+
if err == nil {
217+
err = context.Cause(ctx)
218+
}
144219

145-
pw.ready = make(chan struct{})
146-
<-pw.paused
147-
pw.paused = nil
220+
interrupt := <-interruptCh
221+
p.state = interrupt.desiredState
222+
interrupt.close()
223+
return warnings, nil, err
224+
}
148225

149-
d, _ = progressui.NewDisplay(out, mode, opt.displayOpts...)
226+
// bufferDisplay will buffer display updates from the status channel into a
227+
// slice.
228+
//
229+
// This method returns if either status gets closed or if an interrupt is received.
230+
func (p *Printer) bufferDisplay(ctx context.Context, ss []*client.SolveStatus) ([]*client.SolveStatus, error) {
231+
for {
232+
select {
233+
case s, ok := <-p.status:
234+
if !ok {
235+
p.state = printerStateDone
236+
return ss, nil
237+
}
238+
ss = append(ss, s)
239+
case req := <-p.interrupt:
240+
p.state = req.desiredState
241+
req.close()
242+
return ss, nil
243+
case <-ctx.Done():
244+
p.state = printerStateDone
245+
return nil, context.Cause(ctx)
150246
}
151-
}()
152-
<-pw.ready
153-
return pw, nil
247+
}
154248
}
155249

156250
func (p *Printer) WriteBuildRef(target string, ref string) {
@@ -221,3 +315,14 @@ func dedupWarnings(inp []client.VertexWarning) []client.VertexWarning {
221315
}
222316
return res
223317
}
318+
319+
type interruptRequest struct {
320+
desiredState printerState
321+
done chan<- struct{}
322+
}
323+
324+
func (req *interruptRequest) close() {
325+
if req.done != nil {
326+
close(req.done)
327+
}
328+
}

0 commit comments

Comments
 (0)