Skip to content

Commit 90cbfda

Browse files
author
Brandon Philips
committed
feat(dbus): introduce SubscriptionSet
A SubscriptionSet is similar to conn.Subscribe but does filtering to a set of provided units.
1 parent e943fc7 commit 90cbfda

File tree

5 files changed

+136
-4
lines changed

5 files changed

+136
-4
lines changed

dbus/set.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package dbus
2+
3+
type set struct {
4+
data map[string]bool
5+
}
6+
7+
func (s *set) Add(value string) {
8+
s.data[value] = true
9+
}
10+
11+
func (s *set) Contains(value string) (exists bool) {
12+
_, exists = s.data[value]
13+
return
14+
}
15+
16+
func (s *set) Length() (int) {
17+
return len(s.data)
18+
}
19+
20+
func newSet() (*set) {
21+
return &set{make(map[string] bool)}
22+
}

dbus/subscription.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,13 @@ func (c *Conn) initDispatch() {
9595
// Returns two unbuffered channels which will receive all changed units every
9696
// interval. Deleted units are sent as nil.
9797
func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) {
98-
return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 })
98+
return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil)
9999
}
100100

101101
// SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer
102-
// size of the channels and the comparison function for detecting changes.
103-
func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool) (<-chan map[string]*UnitStatus, <-chan error) {
102+
// size of the channels, the comparison function for detecting changes and a filter
103+
// function for cutting down on the noise that your channel receives.
104+
func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func (string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
104105
old := make(map[string]*UnitStatus)
105106
statusChan := make(chan map[string]*UnitStatus, buffer)
106107
errChan := make(chan error, buffer)
@@ -113,6 +114,9 @@ func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChange
113114
if err == nil {
114115
cur := make(map[string]*UnitStatus)
115116
for i := range units {
117+
if filterUnit != nil && filterUnit(units[i].Name) {
118+
continue
119+
}
116120
cur[units[i].Name] = &units[i]
117121
}
118122

@@ -132,7 +136,9 @@ func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChange
132136

133137
old = cur
134138

135-
statusChan <- changed
139+
if len(changed) != 0 {
140+
statusChan <- changed
141+
}
136142
} else {
137143
errChan <- err
138144
}

dbus/subscription_set.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package dbus
2+
3+
import (
4+
"time"
5+
)
6+
7+
// SubscriptionSet returns a subscription set which is like conn.Subscribe but
8+
// can filter to only return events for a set of units.
9+
type SubscriptionSet struct {
10+
*set
11+
conn *Conn
12+
}
13+
14+
15+
func (s *SubscriptionSet) filter(unit string) bool {
16+
return !s.Contains(unit)
17+
}
18+
19+
// Subscribe starts listening for dbus events for all of the units in the set.
20+
// Returns channels identical to conn.SubscribeUnits.
21+
func (s *SubscriptionSet) Subscribe() (<-chan map[string]*UnitStatus, <-chan error) {
22+
// TODO: Make fully evented by using systemd 209 with properties changed values
23+
return s.conn.SubscribeUnitsCustom(time.Second, 0,
24+
func(u1, u2 *UnitStatus) bool { return *u1 != *u2 },
25+
func(unit string) bool { return s.filter(unit) },
26+
)
27+
}
28+
29+
// NewSubscriptionSet returns a new subscription set.
30+
func (conn *Conn) NewSubscriptionSet() (*SubscriptionSet) {
31+
return &SubscriptionSet{newSet(), conn}
32+
}

dbus/subscription_set_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package dbus
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
// TestSubscribeUnit exercises the basics of subscription of a particular unit.
9+
func TestSubscriptionSetUnit(t *testing.T) {
10+
target := "subscribe-events-set.service"
11+
12+
conn, err := New()
13+
14+
if err != nil {
15+
t.Fatal(err)
16+
}
17+
18+
err = conn.Subscribe()
19+
if err != nil {
20+
t.Fatal(err)
21+
}
22+
23+
subSet := conn.NewSubscriptionSet()
24+
evChan, errChan := subSet.Subscribe()
25+
26+
subSet.Add(target)
27+
setupUnit(target, conn, t)
28+
29+
job, err := conn.StartUnit(target, "replace")
30+
if err != nil {
31+
t.Fatal(err)
32+
}
33+
34+
if job != "done" {
35+
t.Fatal("Couldn't start", target)
36+
}
37+
38+
timeout := make(chan bool, 1)
39+
go func() {
40+
time.Sleep(3 * time.Second)
41+
close(timeout)
42+
}()
43+
44+
for {
45+
select {
46+
case changes := <-evChan:
47+
tCh, ok := changes[target]
48+
49+
if !ok {
50+
t.Fatal("Unexpected event %v", changes)
51+
}
52+
53+
if tCh.ActiveState == "active" && tCh.Name == target {
54+
goto success
55+
}
56+
case err = <-errChan:
57+
t.Fatal(err)
58+
case <-timeout:
59+
t.Fatal("Reached timeout")
60+
}
61+
}
62+
63+
success:
64+
return
65+
}
66+
67+
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[Unit]
2+
Description=start stop test
3+
4+
[Service]
5+
ExecStart=/bin/sleep 400

0 commit comments

Comments
 (0)