Skip to content

Commit 049cfcd

Browse files
authored
Merge pull request #177 from digitalocean/katco/libvirt-qmp-mon-close-chan
In `Events` functions, always close the returned channel when done
2 parents ac9e0b6 + 59f18a6 commit 049cfcd

File tree

5 files changed

+151
-110
lines changed

5 files changed

+151
-110
lines changed

qemu/block.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2016 The go-qemu Authors.
1+
// Copyright 2022 The go-qemu Authors.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@ package qemu
1717
import (
1818
"errors"
1919
"fmt"
20+
"io"
2021
"path/filepath"
2122
"time"
2223

@@ -232,10 +233,8 @@ func waitForSignal(d *Domain, signal string, timeout time.Duration, fn func() er
232233
if err != nil {
233234
return err
234235
}
235-
defer func() { done <- struct{}{} }()
236+
defer close(done)
236237

237-
// start listening for events prior to command execution. QMP events
238-
// may emit before the command returns.
239238
jobErr := make(chan error)
240239
go func() {
241240
jobErr <- waitForJob(events, signal, timeout)
@@ -248,17 +247,20 @@ func waitForSignal(d *Domain, signal string, timeout time.Duration, fn func() er
248247
return <-jobErr
249248
}
250249

251-
// waitForJob monitors the domain's QMP event stream, waiting
252-
// for the provided signal, timeout, or BLOCK_JOB_ERROR.
253-
// An error is returned should either BLOCK_JOB_ERROR or timeout occur.
254-
func waitForJob(events chan qmp.Event, signal string, timeout time.Duration) error {
250+
// waitForJob monitors the domain's QMP event stream, waiting for the provided
251+
// signal. An error is returned when a BLOCK_JOB_ERROR is seen, a timeout
252+
// occurs, or the underlying channel is closed.
253+
func waitForJob(events <-chan qmp.Event, signal string, timeout time.Duration) error {
255254
// Consider events stalled after timeout for X amount of time total,
256255
// rather than X amount of time without an incoming event
257256
stalled := time.After(timeout)
258257

259258
for {
260259
select {
261-
case e := <-events:
260+
case e, ok := <-events:
261+
if !ok {
262+
return io.EOF
263+
}
262264
switch e.Event {
263265
case signal:
264266
return nil

qemu/block_test.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package qemu
1616

1717
import (
1818
"errors"
19+
"io"
1920
"testing"
2021

2122
"github.com/digitalocean/go-qemu/qmp"
@@ -104,22 +105,22 @@ func TestCommitActiveBlockJob(t *testing.T) {
104105
func TestCommitBlockJobError(t *testing.T) {
105106
d, done := testDomain(t, func(_ qmp.Command) (interface{}, error) {
106107
return success{}, nil
107-
})
108-
d.m.(*testMonitor).eventErrors = true
108+
}, withEventErrors)
109109
defer done()
110110

111111
disk := BlockDevice{}
112112
err := disk.Commit(d, "/tmp/foo", "job-id", defaultTestTimeout)
113113
if err == nil {
114114
t.Error("expected block job error to cause failure")
115+
} else if errors.Is(err, io.EOF) {
116+
t.Error("didn't expect the event stream to close")
115117
}
116118
}
117119

118120
func TestCommitTimeout(t *testing.T) {
119121
d, done := testDomain(t, func(_ qmp.Command) (interface{}, error) {
120122
return success{}, nil
121-
})
122-
d.m.(*testMonitor).eventTimeout = true
123+
}, withEventTimeout)
123124
defer done()
124125

125126
disk := BlockDevice{Device: "test"}
@@ -157,8 +158,7 @@ func TestJobComplete(t *testing.T) {
157158
func TestJobCompleteEventError(t *testing.T) {
158159
d, done := testDomain(t, func(_ qmp.Command) (interface{}, error) {
159160
return success{}, nil
160-
})
161-
d.m.(*testMonitor).eventErrors = true
161+
}, withEventErrors)
162162
defer done()
163163

164164
job := BlockJob{Device: "test"}
@@ -171,8 +171,7 @@ func TestJobCompleteEventError(t *testing.T) {
171171
func TestJobCompleteTimeout(t *testing.T) {
172172
d, done := testDomain(t, func(_ qmp.Command) (interface{}, error) {
173173
return success{}, nil
174-
})
175-
d.m.(*testMonitor).eventTimeout = true
174+
}, withEventTimeout)
176175
defer done()
177176

178177
job := BlockJob{Device: "test"}

qemu/domain.go

Lines changed: 79 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2016 The go-qemu Authors.
1+
// Copyright 2022 The go-qemu Authors.
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@ import (
2727
"os"
2828
"path/filepath"
2929
"strings"
30+
"sync"
3031
"time"
3132

3233
"github.com/digitalocean/go-qemu/qmp"
@@ -40,13 +41,14 @@ var (
4041

4142
// Domain represents a QEMU instance.
4243
type Domain struct {
43-
Name string
44-
m qmp.Monitor
45-
rm *raw.Monitor
46-
done chan struct{}
47-
connect chan chan qmp.Event
48-
disconnect chan chan qmp.Event
49-
listeners []chan qmp.Event
44+
Name string
45+
m qmp.Monitor
46+
rm *raw.Monitor
47+
cancel context.CancelFunc
48+
listeners struct {
49+
sync.Mutex
50+
value []chan<- qmp.Event
51+
}
5052

5153
eventsUnsupported bool
5254

@@ -57,7 +59,7 @@ type Domain struct {
5759
// qmp.Monitor. Close must be called when done with a Domain to avoid leaking
5860
// resources.
5961
func (d *Domain) Close() error {
60-
close(d.done)
62+
d.cancel()
6163
return d.m.Disconnect()
6264
}
6365

@@ -358,48 +360,45 @@ func (d *Domain) PackageVersion() (string, error) {
358360
return vers.Package, nil
359361
}
360362

361-
// Events streams QEMU QMP events.
362-
// Two channels are returned, the first contains events emitted by the domain.
363-
// The second is used to signal completion of event processing.
364-
// It is the responsibility of the caller to always signal when finished.
363+
// Events streams QEMU QMP events. Two channels are returned, the first contains
364+
// events emitted by the domain. The second is used to signal completion of
365+
// event processing. It is the responsibility of the caller to always close this
366+
// channel when finished.
365367
func (d *Domain) Events() (chan qmp.Event, chan struct{}, error) {
366368
if d.eventsUnsupported {
367369
return nil, nil, qmp.ErrEventsNotSupported
368370
}
369371

370372
stream := make(chan qmp.Event)
371-
done := make(chan struct{})
373+
// The previous expectation was that you write to this channel, not
374+
// close it, so ensure we continue to support this.
375+
done := make(chan struct{}, 1)
372376

373377
// handle disconnection
374378
go func() {
375379
<-done
376-
// drain anything that gets sent on the channel
377-
// because the disconnect won't be processed if the
378-
// listenAndServe loop is waiting for the listener
379-
// to read from the unbuffered channel.
380+
// If the caller has indicated they are done, they will
381+
// no longer be reading from the stream, and there needs
382+
// to be something which unblocks the main broadcast
383+
// goroutine for writes to this stream so we can remove
384+
// the stream from the list of listeners.
380385
go func() {
381386
for range stream {
382387
}
383388
}()
384-
d.disconnect <- stream
385-
close(stream)
386-
close(done)
389+
d.closeAndRemoveListener(stream)
387390
}()
388391

389-
// add stream to broadcast
390-
d.connect <- stream
391-
392+
d.addListener(stream)
392393
return stream, done, nil
393394
}
394395

395396
// listenAndServe handles a domain's event broadcast service.
396-
func (d *Domain) listenAndServe() error {
397-
ctx, cancel := context.WithCancel(context.Background())
397+
func (d *Domain) listenAndServe(ctx context.Context) error {
398398
stream, err := d.m.Events(ctx)
399399
if err != nil {
400-
cancel()
401400
// let Event() inform the user events are not supported
402-
if err == qmp.ErrEventsNotSupported {
401+
if errors.Is(err, qmp.ErrEventsNotSupported) {
403402
d.eventsUnsupported = true
404403
return nil
405404
}
@@ -408,41 +407,56 @@ func (d *Domain) listenAndServe() error {
408407
}
409408

410409
go func() {
411-
defer cancel()
412-
for {
413-
select {
414-
case <-d.done:
415-
return
416-
case client := <-d.connect:
417-
d.addListener(client)
418-
case client := <-d.disconnect:
419-
d.removeListener(client)
420-
case event := <-stream:
421-
d.broadcast(event)
422-
}
410+
// When we're done broadcasting, ensure all of our listeners
411+
// become aware.
412+
defer d.closeAndRemoveListeners()
413+
for event := range stream {
414+
d.broadcast(event)
423415
}
424416
}()
425417

426418
return nil
427419
}
428420

429-
// addListener adds the given stream to the domain's event broadcast.
430-
func (d *Domain) addListener(stream chan qmp.Event) {
431-
d.listeners = append(d.listeners, stream)
421+
// addListener adds the given stream to the domain's event broadcast. The main
422+
// broadcast goroutine takes ownership of the goroutine's lifetime.
423+
func (d *Domain) addListener(stream chan<- qmp.Event) {
424+
d.listeners.Lock()
425+
defer d.listeners.Unlock()
426+
d.listeners.value = append(d.listeners.value, stream)
427+
}
428+
429+
// closeAndRemoveListeners closes all listeners and removes them from the list.
430+
func (d *Domain) closeAndRemoveListeners() {
431+
d.listeners.Lock()
432+
defer d.listeners.Unlock()
433+
for _, l := range d.listeners.value {
434+
close(l)
435+
}
436+
d.listeners.value = nil
432437
}
433438

434-
// removeListener removes the given stream from the domain's event broadcast.
435-
func (d *Domain) removeListener(stream chan qmp.Event) {
436-
for i, client := range d.listeners {
439+
// closeAndRemoveListener closes the listener and removes it from the domain's
440+
// event broadcast.
441+
func (d *Domain) closeAndRemoveListener(stream chan<- qmp.Event) {
442+
d.listeners.Lock()
443+
defer d.listeners.Unlock()
444+
445+
listeners := d.listeners.value
446+
for i, client := range listeners {
437447
if client == stream {
438-
d.listeners = append(d.listeners[:i], d.listeners[i+1:]...)
448+
close(client)
449+
listeners = append(listeners[:i], listeners[i+1:]...)
439450
}
440451
}
452+
d.listeners.value = listeners
441453
}
442454

443455
// broadcast sends the provided event to all event listeners.
444456
func (d *Domain) broadcast(event qmp.Event) {
445-
for _, stream := range d.listeners {
457+
d.listeners.Lock()
458+
defer d.listeners.Unlock()
459+
for _, stream := range d.listeners.value {
446460
stream <- event
447461
}
448462
}
@@ -451,13 +465,15 @@ func (d *Domain) broadcast(event qmp.Event) {
451465
// QMP communication is handled by the provided monitor socket.
452466
func NewDomain(m qmp.Monitor, name string) (*Domain, error) {
453467
d := &Domain{
454-
Name: name,
455-
m: m,
456-
rm: raw.NewMonitor(m),
457-
done: make(chan struct{}),
458-
connect: make(chan chan qmp.Event),
459-
disconnect: make(chan chan qmp.Event),
460-
listeners: []chan qmp.Event{},
468+
Name: name,
469+
m: m,
470+
rm: raw.NewMonitor(m),
471+
listeners: struct {
472+
sync.Mutex
473+
value []chan<- qmp.Event
474+
}{
475+
value: []chan<- qmp.Event{},
476+
},
461477

462478
// By default, try to generate decently random file names
463479
// for temporary files.
@@ -474,7 +490,13 @@ func NewDomain(m qmp.Monitor, name string) (*Domain, error) {
474490
}
475491

476492
// start event broadcast
477-
err := d.listenAndServe()
493+
ctx, cancel := context.WithCancel(context.Background())
494+
d.cancel = cancel
495+
err := d.listenAndServe(ctx)
496+
if err != nil {
497+
cancel()
498+
return nil, err
499+
}
478500

479-
return d, err
501+
return d, nil
480502
}

0 commit comments

Comments
 (0)