Skip to content

Commit cf0dab7

Browse files
committed
dbus: allow multiple calls for the same unit to *Unit
As well as add a test to prove StopUnit is reentrant Signed-off-by: Peter Hunt <[email protected]>
1 parent d25876d commit cf0dab7

File tree

3 files changed

+56
-5
lines changed

3 files changed

+56
-5
lines changed

dbus/dbus.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ type Conn struct {
9595
sigobj dbus.BusObject
9696

9797
jobListener struct {
98-
jobs map[dbus.ObjectPath]chan<- string
98+
jobs map[dbus.ObjectPath][]chan<- string
9999
sync.Mutex
100100
}
101101
subStateSubscriber struct {
@@ -207,7 +207,7 @@ func NewConnection(dialBus func() (*dbus.Conn, error)) (*Conn, error) {
207207
}
208208

209209
c.subStateSubscriber.ignore = make(map[dbus.ObjectPath]int64)
210-
c.jobListener.jobs = make(map[dbus.ObjectPath]chan<- string)
210+
c.jobListener.jobs = make(map[dbus.ObjectPath][]chan<- string)
211211

212212
// Setup the listeners on jobs so that we can get completions
213213
c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,

dbus/methods.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,11 @@ func (c *Conn) jobComplete(signal *dbus.Signal) {
4444

4545
_ = dbus.Store(signal.Body, &id, &job, &unit, &result)
4646
c.jobListener.Lock()
47-
out, ok := c.jobListener.jobs[job]
47+
outs, ok := c.jobListener.jobs[job]
4848
if ok {
49-
out <- result
49+
for _, out := range outs {
50+
out <- result
51+
}
5052
delete(c.jobListener.jobs, job)
5153
}
5254
c.jobListener.Unlock()
@@ -65,7 +67,10 @@ func (c *Conn) startJob(ctx context.Context, ch chan<- string, job string, args
6567
}
6668

6769
if ch != nil {
68-
c.jobListener.jobs[p] = ch
70+
if _, ok := c.jobListener.jobs[p]; !ok {
71+
c.jobListener.jobs[p] = make([]chan<- string, 0, 1)
72+
}
73+
c.jobListener.jobs[p] = append(c.jobListener.jobs[p], ch)
6974
}
7075

7176
// ignore error since 0 is fine if conversion fails

dbus/methods_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1749,3 +1749,49 @@ func TestAttachProcessesToUnit(t *testing.T) {
17491749
func TestAttachProcessesToUnitWithSubcgroup(t *testing.T) {
17501750
testAttachProcessesToUnit(t, "/test-subcgroup")
17511751
}
1752+
1753+
func TestStopUnitReentrant(t *testing.T) {
1754+
target := "start-stop.service"
1755+
conn := setupConn(t)
1756+
1757+
setupUnit(target, conn, t)
1758+
linkUnit(target, conn, t)
1759+
1760+
jobSize := len(conn.jobListener.jobs)
1761+
1762+
reschan := make(chan string)
1763+
// Buffered channels are important for multiple calls to the same job,
1764+
// so the order of when we pull them out doesn't need to matter.
1765+
reschan2 := make(chan string, 1)
1766+
reschan3 := make(chan string, 1)
1767+
errChan := make(chan error, 2)
1768+
_, err := conn.StartUnit(target, "replace", reschan)
1769+
if err != nil {
1770+
t.Fatal(err)
1771+
}
1772+
1773+
<-reschan
1774+
1775+
go func() {
1776+
_, err = conn.StopUnit(target, "replace", reschan2)
1777+
errChan <- err
1778+
}()
1779+
go func() {
1780+
_, err = conn.StopUnit(target, "replace", reschan3)
1781+
errChan <- err
1782+
}()
1783+
1784+
<-reschan2
1785+
<-reschan3
1786+
1787+
for i := 0; i < 2; i++ {
1788+
if err := <-errChan; err != nil {
1789+
t.Fatal(err)
1790+
}
1791+
}
1792+
1793+
currentJobSize := len(conn.jobListener.jobs)
1794+
if jobSize != currentJobSize {
1795+
t.Fatal("JobListener jobs leaked")
1796+
}
1797+
}

0 commit comments

Comments
 (0)