Skip to content

Commit abb50b3

Browse files
haircommanderLuap99
authored andcommitted
dbus: allow multiple calls for the same unit to *Unit
As well as add a test to prove StopUnit is reentrant There are cases that multiple StopUnits may be called for the same job. For instance, in kubernetes, the Kubelet does so in some cases, when it races against itself to delete a pod. Instead of fixing in the caller, this can be worked around in this library. Instead of losing track of an old job if a new StopUnit is called for the same job, we can just track multiple jobs, and signal to multiple channels when the job finishes. see kubernetes/kubernetes#135826 and kubernetes/kubernetes#135825 for some more context Signed-off-by: Peter Hunt <[email protected]>
1 parent 27f6bea commit abb50b3

File tree

3 files changed

+51
-6
lines changed

3 files changed

+51
-6
lines changed

dbus/dbus.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ type Conn struct {
9595
sigobj dbus.BusObject
9696

9797
jobListener struct {
98-
jobs map[dbus.ObjectPath]chan<- string
98+
jobs map[dbus.ObjectPath][]chan<- string
9999
sync.Mutex
100100
}
101101
subStateSubscriber struct {
@@ -207,7 +207,7 @@ func NewConnection(dialBus func() (*dbus.Conn, error)) (*Conn, error) {
207207
}
208208

209209
c.subStateSubscriber.ignore = make(map[dbus.ObjectPath]int64)
210-
c.jobListener.jobs = make(map[dbus.ObjectPath]chan<- string)
210+
c.jobListener.jobs = make(map[dbus.ObjectPath][]chan<- string)
211211

212212
// Setup the listeners on jobs so that we can get completions
213213
c.sigconn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0,

dbus/methods.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,10 @@ func (c *Conn) jobComplete(signal *dbus.Signal) {
4444

4545
_ = dbus.Store(signal.Body, &id, &job, &unit, &result)
4646
c.jobListener.Lock()
47-
out, ok := c.jobListener.jobs[job]
48-
if ok {
47+
for _, out := range c.jobListener.jobs[job] {
4948
out <- result
50-
delete(c.jobListener.jobs, job)
5149
}
50+
delete(c.jobListener.jobs, job)
5251
c.jobListener.Unlock()
5352
}
5453

@@ -65,7 +64,7 @@ func (c *Conn) startJob(ctx context.Context, ch chan<- string, job string, args
6564
}
6665

6766
if ch != nil {
68-
c.jobListener.jobs[p] = ch
67+
c.jobListener.jobs[p] = append(c.jobListener.jobs[p], ch)
6968
}
7069

7170
// ignore error since 0 is fine if conversion fails

dbus/methods_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1749,3 +1749,49 @@ func TestAttachProcessesToUnit(t *testing.T) {
17491749
func TestAttachProcessesToUnitWithSubcgroup(t *testing.T) {
17501750
testAttachProcessesToUnit(t, "/test-subcgroup")
17511751
}
1752+
1753+
func TestStopUnitReentrant(t *testing.T) {
1754+
target := "start-stop.service"
1755+
conn := setupConn(t)
1756+
1757+
setupUnit(target, conn, t)
1758+
linkUnit(target, conn, t)
1759+
1760+
jobSize := len(conn.jobListener.jobs)
1761+
1762+
reschan := make(chan string)
1763+
// Buffered channels are important for multiple calls to the same job,
1764+
// so the order of when we pull them out doesn't need to matter.
1765+
reschan2 := make(chan string, 1)
1766+
reschan3 := make(chan string, 1)
1767+
errChan := make(chan error, 2)
1768+
_, err := conn.StartUnit(target, "replace", reschan)
1769+
if err != nil {
1770+
t.Fatal(err)
1771+
}
1772+
1773+
<-reschan
1774+
1775+
go func() {
1776+
_, err := conn.StopUnit(target, "replace", reschan2)
1777+
errChan <- err
1778+
}()
1779+
go func() {
1780+
_, err := conn.StopUnit(target, "replace", reschan3)
1781+
errChan <- err
1782+
}()
1783+
1784+
<-reschan2
1785+
<-reschan3
1786+
1787+
for range 2 {
1788+
if err := <-errChan; err != nil {
1789+
t.Fatal(err)
1790+
}
1791+
}
1792+
1793+
currentJobSize := len(conn.jobListener.jobs)
1794+
if jobSize != currentJobSize {
1795+
t.Fatal("JobListener jobs leaked")
1796+
}
1797+
}

0 commit comments

Comments
 (0)