Skip to content

Commit 90ece6e

Browse files
authored
Merge pull request #248 from nefeli/property_subscriber
dbus: add SetPropertiesSubscriber method
2 parents 176f854 + 6579259 commit 90ece6e

File tree

6 files changed

+195
-18
lines changed

6 files changed

+195
-18
lines changed

dbus/dbus.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package dbus
1717

1818
import (
19+
"encoding/hex"
1920
"fmt"
2021
"os"
2122
"strconv"
@@ -60,6 +61,27 @@ func PathBusEscape(path string) string {
6061
return string(n)
6162
}
6263

64+
// pathBusUnescape is the inverse of PathBusEscape.
65+
func pathBusUnescape(path string) string {
66+
if path == "_" {
67+
return ""
68+
}
69+
n := []byte{}
70+
for i := 0; i < len(path); i++ {
71+
c := path[i]
72+
if c == '_' && i+2 < len(path) {
73+
res, err := hex.DecodeString(path[i+1 : i+3])
74+
if err == nil {
75+
n = append(n, res...)
76+
}
77+
i += 2
78+
} else {
79+
n = append(n, c)
80+
}
81+
}
82+
return string(n)
83+
}
84+
6385
// Conn is a connection to systemd's dbus endpoint.
6486
type Conn struct {
6587
// sysconn/sysobj are only used to call dbus methods
@@ -74,13 +96,18 @@ type Conn struct {
7496
jobs map[dbus.ObjectPath]chan<- string
7597
sync.Mutex
7698
}
77-
subscriber struct {
99+
subStateSubscriber struct {
78100
updateCh chan<- *SubStateUpdate
79101
errCh chan<- error
80102
sync.Mutex
81103
ignore map[dbus.ObjectPath]int64
82104
cleanIgnore int64
83105
}
106+
propertiesSubscriber struct {
107+
updateCh chan<- *PropertiesUpdate
108+
errCh chan<- error
109+
sync.Mutex
110+
}
84111
}
85112

86113
// New establishes a connection to any available bus and authenticates.
@@ -152,7 +179,7 @@ func NewConnection(dialBus func() (*dbus.Conn, error)) (*Conn, error) {
152179
sigobj: systemdObject(sigconn),
153180
}
154181

155-
c.subscriber.ignore = make(map[dbus.ObjectPath]int64)
182+
c.subStateSubscriber.ignore = make(map[dbus.ObjectPath]int64)
156183
c.jobListener.jobs = make(map[dbus.ObjectPath]chan<- string)
157184

158185
// Setup the listeners on jobs so that we can get completions

dbus/dbus_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,25 @@ func TestPathBusEscape(t *testing.T) {
6767

6868
}
6969

70+
func TestPathBusUnescape(t *testing.T) {
71+
for in, want := range map[string]string{
72+
"_": "",
73+
"foo_2eservice": "foo.service",
74+
"foobar": "foobar",
75+
"woof_40woof_2eservice": "[email protected]",
76+
"_30123456": "0123456",
77+
"account_5fdb_2eservice": "account_db.service",
78+
"got_2ddashes": "got-dashes",
79+
"foobar_": "foobar_",
80+
"foobar_2": "foobar_2",
81+
} {
82+
got := pathBusUnescape(in)
83+
if got != want {
84+
t.Errorf("bad result for pathBusUnescape(%s): got %q, want %q", in, got, want)
85+
}
86+
}
87+
}
88+
7089
// TestNew ensures that New() works without errors.
7190
func TestNew(t *testing.T) {
7291
_, err := New()

dbus/methods.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,3 +584,8 @@ func (c *Conn) Reload() error {
584584
func unitPath(name string) dbus.ObjectPath {
585585
return dbus.ObjectPath("/org/freedesktop/systemd1/unit/" + PathBusEscape(name))
586586
}
587+
588+
// unitName returns the unescaped base element of the supplied escaped path
589+
func unitName(dpath dbus.ObjectPath) string {
590+
return pathBusUnescape(path.Base(string(dpath)))
591+
}

dbus/methods_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1524,3 +1524,20 @@ func TestReload(t *testing.T) {
15241524
t.Fatal(err)
15251525
}
15261526
}
1527+
1528+
func TestUnitName(t *testing.T) {
1529+
for _, unit := range []string{
1530+
"",
1531+
"foo.service",
1532+
"foobar",
1533+
1534+
"0123456",
1535+
"account_db.service",
1536+
"got-dashes",
1537+
} {
1538+
got := unitName(unitPath(unit))
1539+
if got != unit {
1540+
t.Errorf("bad result for unitName(%s): got %q, want %q", unit, got, unit)
1541+
}
1542+
}
1543+
}

dbus/subscription.go

Lines changed: 69 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ func (c *Conn) dispatch() {
7070
c.jobComplete(signal)
7171
}
7272

73-
if c.subscriber.updateCh == nil {
73+
if c.subStateSubscriber.updateCh == nil &&
74+
c.propertiesSubscriber.updateCh == nil {
7475
continue
7576
}
7677

@@ -84,6 +85,12 @@ func (c *Conn) dispatch() {
8485
case "org.freedesktop.DBus.Properties.PropertiesChanged":
8586
if signal.Body[0].(string) == "org.freedesktop.systemd1.Unit" {
8687
unitPath = signal.Path
88+
89+
if len(signal.Body) >= 2 {
90+
if changed, ok := signal.Body[1].(map[string]dbus.Variant); ok {
91+
c.sendPropertiesUpdate(unitPath, changed)
92+
}
93+
}
8794
}
8895
}
8996

@@ -169,15 +176,19 @@ type SubStateUpdate struct {
169176
// is full, it attempts to write an error to errCh; if errCh is full, the error
170177
// passes silently.
171178
func (c *Conn) SetSubStateSubscriber(updateCh chan<- *SubStateUpdate, errCh chan<- error) {
172-
c.subscriber.Lock()
173-
defer c.subscriber.Unlock()
174-
c.subscriber.updateCh = updateCh
175-
c.subscriber.errCh = errCh
179+
c.subStateSubscriber.Lock()
180+
defer c.subStateSubscriber.Unlock()
181+
c.subStateSubscriber.updateCh = updateCh
182+
c.subStateSubscriber.errCh = errCh
176183
}
177184

178185
func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) {
179-
c.subscriber.Lock()
180-
defer c.subscriber.Unlock()
186+
c.subStateSubscriber.Lock()
187+
defer c.subStateSubscriber.Unlock()
188+
189+
if c.subStateSubscriber.updateCh == nil {
190+
return
191+
}
181192

182193
if c.shouldIgnore(unitPath) {
183194
return
@@ -186,7 +197,7 @@ func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) {
186197
info, err := c.GetUnitPathProperties(unitPath)
187198
if err != nil {
188199
select {
189-
case c.subscriber.errCh <- err:
200+
case c.subStateSubscriber.errCh <- err:
190201
default:
191202
}
192203
}
@@ -196,10 +207,10 @@ func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) {
196207

197208
update := &SubStateUpdate{name, substate}
198209
select {
199-
case c.subscriber.updateCh <- update:
210+
case c.subStateSubscriber.updateCh <- update:
200211
default:
201212
select {
202-
case c.subscriber.errCh <- errors.New("update channel full!"):
213+
case c.subStateSubscriber.errCh <- errors.New("update channel full!"):
203214
default:
204215
}
205216
}
@@ -222,7 +233,7 @@ func (c *Conn) sendSubStateUpdate(unitPath dbus.ObjectPath) {
222233
// the properties).
223234

224235
func (c *Conn) shouldIgnore(path dbus.ObjectPath) bool {
225-
t, ok := c.subscriber.ignore[path]
236+
t, ok := c.subStateSubscriber.ignore[path]
226237
return ok && t >= time.Now().UnixNano()
227238
}
228239

@@ -231,20 +242,62 @@ func (c *Conn) updateIgnore(path dbus.ObjectPath, info map[string]interface{}) {
231242

232243
// unit is unloaded - it will trigger bad systemd dbus behavior
233244
if info["LoadState"].(string) == "not-found" {
234-
c.subscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval
245+
c.subStateSubscriber.ignore[path] = time.Now().UnixNano() + ignoreInterval
235246
}
236247
}
237248

238249
// without this, ignore would grow unboundedly over time
239250
func (c *Conn) cleanIgnore() {
240251
now := time.Now().UnixNano()
241-
if c.subscriber.cleanIgnore < now {
242-
c.subscriber.cleanIgnore = now + cleanIgnoreInterval
252+
if c.subStateSubscriber.cleanIgnore < now {
253+
c.subStateSubscriber.cleanIgnore = now + cleanIgnoreInterval
243254

244-
for p, t := range c.subscriber.ignore {
255+
for p, t := range c.subStateSubscriber.ignore {
245256
if t < now {
246-
delete(c.subscriber.ignore, p)
257+
delete(c.subStateSubscriber.ignore, p)
247258
}
248259
}
249260
}
250261
}
262+
263+
// PropertiesUpdate holds a map of a unit's changed properties
264+
type PropertiesUpdate struct {
265+
UnitName string
266+
Changed map[string]dbus.Variant
267+
}
268+
269+
// SetPropertiesSubscriber writes to updateCh when any unit's properties
270+
// change. Every property change reported by systemd will be sent; that is, no
271+
// transitions will be "missed" (as they might be with SetSubStateSubscriber).
272+
// However, state changes will only be written to the channel with non-blocking
273+
// writes. If updateCh is full, it attempts to write an error to errCh; if
274+
// errCh is full, the error passes silently.
275+
func (c *Conn) SetPropertiesSubscriber(updateCh chan<- *PropertiesUpdate, errCh chan<- error) {
276+
c.propertiesSubscriber.Lock()
277+
defer c.propertiesSubscriber.Unlock()
278+
c.propertiesSubscriber.updateCh = updateCh
279+
c.propertiesSubscriber.errCh = errCh
280+
}
281+
282+
// we don't need to worry about shouldIgnore() here because
283+
// sendPropertiesUpdate doesn't call GetProperties()
284+
func (c *Conn) sendPropertiesUpdate(unitPath dbus.ObjectPath, changedProps map[string]dbus.Variant) {
285+
c.propertiesSubscriber.Lock()
286+
defer c.propertiesSubscriber.Unlock()
287+
288+
if c.propertiesSubscriber.updateCh == nil {
289+
return
290+
}
291+
292+
update := &PropertiesUpdate{unitName(unitPath), changedProps}
293+
294+
select {
295+
case c.propertiesSubscriber.updateCh <- update:
296+
default:
297+
select {
298+
case c.propertiesSubscriber.errCh <- errors.New("update channel is full"):
299+
default:
300+
}
301+
return
302+
}
303+
}

dbus/subscription_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,59 @@ func TestSubStateSubscription(t *testing.T) {
151151
}
152152
}
153153
}
154+
155+
// TestPropertiesSubscription exercises the basics of property change event subscriptions
156+
func TestPropertiesSubscription(t *testing.T) {
157+
target := "subscribe-events.service"
158+
159+
conn, err := New()
160+
defer conn.Close()
161+
if err != nil {
162+
t.Fatal(err)
163+
}
164+
165+
err = conn.Subscribe()
166+
if err != nil {
167+
t.Fatal(err)
168+
}
169+
170+
updateCh := make(chan *PropertiesUpdate)
171+
errCh := make(chan error)
172+
conn.SetPropertiesSubscriber(updateCh, errCh)
173+
174+
setupUnit(target, conn, t)
175+
linkUnit(target, conn, t)
176+
177+
reschan := make(chan string)
178+
_, err = conn.StartUnit(target, "replace", reschan)
179+
if err != nil {
180+
t.Fatal(err)
181+
}
182+
183+
job := <-reschan
184+
if job != "done" {
185+
t.Fatal("Couldn't start", target)
186+
}
187+
188+
timeout := make(chan bool, 1)
189+
go func() {
190+
time.Sleep(3 * time.Second)
191+
close(timeout)
192+
}()
193+
194+
for {
195+
select {
196+
case update := <-updateCh:
197+
if update.UnitName == target {
198+
subState, ok := update.Changed["SubState"].Value().(string)
199+
if ok && subState == "running" {
200+
return // success
201+
}
202+
}
203+
case err := <-errCh:
204+
t.Fatal(err)
205+
case <-timeout:
206+
t.Fatal("Reached timeout")
207+
}
208+
}
209+
}

0 commit comments

Comments
 (0)