Skip to content
This repository was archived by the owner on Apr 29, 2020. It is now read-only.

Commit 2406a71

Browse files
Merge pull request #356 from anthonybishopric/service
Add watches for label selectors
2 parents 3009999 + 0a62928 commit 2406a71

File tree

7 files changed

+409
-9
lines changed

7 files changed

+409
-9
lines changed

pkg/labels/applicator.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,14 @@ type Applicator interface {
6161

6262
// Return all objects of the given type that match the given selector
6363
GetMatches(selector labels.Selector, labelType Type) ([]Labeled, error)
64+
65+
// Watch a label selector of a given type and see updates to that set
66+
// of Labeled over time. If an error occurs, the Applicator implementation
67+
// is responsible for handling it and recovering gracefully.
68+
//
69+
// The watch may be terminated by the underlying implementation without signaling on
70+
// the quit channel - this will be indicated by the closing of the result channel. For
71+
// this reason, the safest way to process the result of this channel is to use
72+
// it in a for loop. Alternatively, you may check for nullity of the result []Labeled
73+
WatchMatches(selector labels.Selector, labelType Type, quitCh chan struct{}) chan *[]Labeled
6474
}

pkg/labels/consul_aggregator.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package labels
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/square/p2/pkg/kp/consulutil"
8+
"github.com/square/p2/pkg/logging"
9+
10+
"github.com/square/p2/Godeps/_workspace/src/github.com/Sirupsen/logrus"
11+
"github.com/square/p2/Godeps/_workspace/src/github.com/hashicorp/consul/api"
12+
"github.com/square/p2/Godeps/_workspace/src/k8s.io/kubernetes/pkg/labels"
13+
)
14+
15+
// minimum required time between retrievals of label subtrees.
16+
var AggregationRateCap = 10 * time.Second
17+
18+
// linked list of watches with control channels
19+
type selectorWatch struct {
20+
selector labels.Selector
21+
resultCh chan *[]Labeled
22+
canceled chan struct{}
23+
next *selectorWatch
24+
}
25+
26+
func (s *selectorWatch) append(w *selectorWatch) {
27+
if w == nil {
28+
return
29+
}
30+
if s.next != nil {
31+
s.next.append(w)
32+
} else {
33+
s.next = w
34+
}
35+
}
36+
37+
func (s *selectorWatch) delete(w *selectorWatch) {
38+
if w == nil {
39+
return
40+
}
41+
if s.next == w {
42+
s.next = w.next
43+
} else if s.next != nil {
44+
s.next.delete(w)
45+
}
46+
}
47+
48+
type consulAggregator struct {
49+
logger logging.Logger
50+
labelType Type
51+
watcherLock sync.Mutex
52+
path string
53+
kv consulutil.ConsulLister
54+
watchers *selectorWatch
55+
labeledCache []Labeled // cached contents of the label subtree
56+
aggregatorQuit chan struct{}
57+
}
58+
59+
func NewConsulAggregator(labelType Type, kv consulutil.ConsulLister, logger logging.Logger) *consulAggregator {
60+
return &consulAggregator{
61+
kv: kv,
62+
logger: logger,
63+
labelType: labelType,
64+
path: typePath(labelType),
65+
aggregatorQuit: make(chan struct{}),
66+
}
67+
}
68+
69+
// Add a new selector to the aggregator. New values on the output channel may not appear
70+
// right away.
71+
func (c *consulAggregator) Watch(selector labels.Selector, quitCh chan struct{}) chan *[]Labeled {
72+
resCh := make(chan *[]Labeled)
73+
select {
74+
case <-c.aggregatorQuit:
75+
c.logger.WithField("selector", selector.String()).Warnln("New selector added after aggregator was closed")
76+
close(resCh)
77+
return resCh
78+
default:
79+
}
80+
c.watcherLock.Lock()
81+
defer c.watcherLock.Unlock()
82+
watch := &selectorWatch{
83+
selector: selector,
84+
resultCh: resCh,
85+
canceled: make(chan struct{}),
86+
}
87+
if c.watchers == nil {
88+
c.watchers = watch
89+
} else {
90+
c.watchers.append(watch)
91+
}
92+
go func() {
93+
select {
94+
case <-quitCh:
95+
case <-c.aggregatorQuit:
96+
}
97+
c.removeWatch(watch)
98+
}()
99+
return watch.resultCh
100+
}
101+
102+
func (c *consulAggregator) removeWatch(watch *selectorWatch) {
103+
c.watcherLock.Lock()
104+
defer c.watcherLock.Unlock()
105+
close(watch.canceled)
106+
close(watch.resultCh)
107+
if c.watchers == watch {
108+
c.watchers = c.watchers.next
109+
} else {
110+
c.watchers.delete(watch)
111+
}
112+
}
113+
114+
func (c *consulAggregator) Quit() {
115+
close(c.aggregatorQuit)
116+
}
117+
118+
// Aggregate does the labor of querying Consul for all labels under a given type,
119+
// applying each watcher's label selector to the results and sending those results on each
120+
// watcher's output channel respectively.
121+
// Aggregate will loop forever, constantly sending matches to each watcher
122+
// until Quit() has been invoked.
123+
func (c *consulAggregator) Aggregate() {
124+
outPairs := make(chan api.KVPairs)
125+
done := make(chan struct{})
126+
outErrors := make(chan error)
127+
go consulutil.WatchPrefix(c.path, c.kv, outPairs, done, outErrors)
128+
for {
129+
loopTime := time.After(AggregationRateCap)
130+
select {
131+
case <-c.aggregatorQuit:
132+
return
133+
case pairs := <-outPairs:
134+
// Convert watch result to []Labeled
135+
c.labeledCache = make([]Labeled, len(pairs))
136+
for i, kvp := range pairs {
137+
val, err := convertKVPToLabeled(kvp)
138+
if err != nil {
139+
c.logger.WithErrorAndFields(err, logrus.Fields{
140+
"key": kvp.Key,
141+
"value": string(kvp.Value),
142+
}).Errorln("Invalid key encountered, skipping this value")
143+
continue
144+
}
145+
c.labeledCache[i] = val
146+
}
147+
148+
// Iterate over each watcher and send the []Labeled
149+
// that match the watcher's selector to the watcher's out channel.
150+
c.watcherLock.Lock()
151+
watcher := c.watchers
152+
for watcher != nil {
153+
matches := []Labeled{}
154+
for _, labeled := range c.labeledCache {
155+
if watcher.selector.Matches(labeled.Labels) {
156+
matches = append(matches, labeled)
157+
}
158+
}
159+
select {
160+
case watcher.resultCh <- &matches:
161+
case <-watcher.canceled:
162+
}
163+
watcher = watcher.next
164+
}
165+
c.watcherLock.Unlock()
166+
}
167+
select {
168+
case <-c.aggregatorQuit:
169+
return
170+
case <-loopTime:
171+
}
172+
173+
}
174+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package labels
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/square/p2/pkg/logging"
8+
9+
. "github.com/square/p2/Godeps/_workspace/src/github.com/anthonybishopric/gotcha"
10+
"github.com/square/p2/Godeps/_workspace/src/k8s.io/kubernetes/pkg/labels"
11+
)
12+
13+
func alterAggregationTime(dur time.Duration) {
14+
AggregationRateCap = dur
15+
}
16+
17+
func fakeLabeledPods() map[string][]byte {
18+
return map[string][]byte{
19+
objectPath(POD, "maroono"): []byte(`{"color": "red", "deployment": "production"}`),
20+
objectPath(POD, "emeralda"): []byte(`{"color": "green", "deployment": "canary"}`),
21+
objectPath(POD, "slashi"): []byte(`{"color": "red", "deployment": "canary"}`),
22+
}
23+
}
24+
25+
// Check that two clients can share an aggregator
26+
func TestTwoClients(t *testing.T) {
27+
alterAggregationTime(time.Millisecond)
28+
29+
fakeKV := &fakeLabelStore{fakeLabeledPods(), nil}
30+
aggreg := NewConsulAggregator(POD, fakeKV, logging.DefaultLogger)
31+
go aggreg.Aggregate()
32+
defer aggreg.Quit()
33+
34+
quitCh := make(chan struct{})
35+
labeledChannel1 := aggreg.Watch(labels.Everything().Add("color", labels.EqualsOperator, []string{"green"}), quitCh)
36+
labeledChannel2 := aggreg.Watch(labels.Everything().Add("deployment", labels.EqualsOperator, []string{"canary"}), quitCh)
37+
38+
var checked string
39+
for i := 0; i < 2; i++ {
40+
select {
41+
case <-time.After(time.Second):
42+
t.Fatal("Should not have taken a second to get results")
43+
case labeledPtr := <-labeledChannel1:
44+
Assert(t).IsNotNil(labeledPtr, "ptr should not have been nil")
45+
labeled := *labeledPtr
46+
Assert(t).AreNotEqual("green", checked, "Should not have already checked the green selector result")
47+
checked = "green" // ensure that both sides get checked
48+
Assert(t).AreEqual(1, len(labeled), "Should have received one result from the color watch")
49+
Assert(t).AreEqual("emeralda", labeled[0].ID, "should have received the emerald app")
50+
case labeledPtr := <-labeledChannel2:
51+
Assert(t).IsNotNil(labeledPtr, "ptr should not have been nil")
52+
labeled := *labeledPtr
53+
Assert(t).AreNotEqual("canary", checked, "Should not have already checked the canary selector result")
54+
checked = "canary" // ensure that both sides get checked
55+
Assert(t).AreEqual(2, len(labeled), "Should have received two results from the canary watch")
56+
emeraldaIndex := 0
57+
slashiIndex := 1
58+
if labeled[0].ID == "slashi" { // order doesn't matter
59+
emeraldaIndex, slashiIndex = slashiIndex, emeraldaIndex
60+
}
61+
Assert(t).AreEqual("emeralda", labeled[emeraldaIndex].ID, "should have received the emerald app")
62+
Assert(t).AreEqual("slashi", labeled[slashiIndex].ID, "should have received the slashi app")
63+
}
64+
}
65+
}
66+
67+
func TestQuitAggregateAfterResults(t *testing.T) {
68+
alterAggregationTime(time.Millisecond)
69+
70+
fakeKV := &fakeLabelStore{fakeLabeledPods(), nil}
71+
aggreg := NewConsulAggregator(POD, fakeKV, logging.DefaultLogger)
72+
go aggreg.Aggregate()
73+
74+
quitCh := make(chan struct{})
75+
res := aggreg.Watch(labels.Everything().Add("color", labels.EqualsOperator, []string{"green"}), quitCh)
76+
77+
// Quit now. We expect that the aggregator will close the res channels
78+
aggreg.Quit()
79+
success := make(chan struct{})
80+
go func() {
81+
for _ = range res {
82+
}
83+
success <- struct{}{}
84+
}()
85+
86+
select {
87+
case <-success:
88+
case <-time.After(time.Second):
89+
t.Fatal("Should not be waiting or processing results after a second")
90+
}
91+
}
92+
93+
func TestQuitAggregateBeforeResults(t *testing.T) {
94+
alterAggregationTime(time.Millisecond)
95+
96+
// this channel prevents the List from returning, so the aggregator
97+
// must quit prior to entering the loop
98+
trigger := make(chan struct{})
99+
fakeKV := &fakeLabelStore{fakeLabeledPods(), trigger}
100+
aggreg := NewConsulAggregator(POD, fakeKV, logging.DefaultLogger)
101+
go aggreg.Aggregate()
102+
103+
quitCh := make(chan struct{})
104+
res := aggreg.Watch(labels.Everything().Add("color", labels.EqualsOperator, []string{"green"}), quitCh)
105+
106+
// Quit now. We expect that the aggregator will close the res channels
107+
aggreg.Quit()
108+
109+
select {
110+
case labeled := <-res:
111+
Assert(t).IsNotNil(labeled, "Should not have received any results")
112+
case <-time.After(time.Second):
113+
t.Fatal("Should still be waiting or processing results after a second")
114+
}
115+
}
116+
117+
func TestQuitIndividualWatch(t *testing.T) {
118+
alterAggregationTime(time.Millisecond)
119+
120+
fakeKV := &fakeLabelStore{fakeLabeledPods(), nil}
121+
aggreg := NewConsulAggregator(POD, fakeKV, logging.DefaultLogger)
122+
go aggreg.Aggregate()
123+
124+
quitCh1 := make(chan struct{})
125+
labeledChannel1 := aggreg.Watch(labels.Everything().Add("color", labels.EqualsOperator, []string{"green"}), quitCh1)
126+
127+
quitCh2 := make(chan struct{})
128+
labeledChannel2 := aggreg.Watch(labels.Everything().Add("deployment", labels.EqualsOperator, []string{"production"}), quitCh2)
129+
130+
close(quitCh1) // this should not interrupt the flow of messages to the second channel
131+
132+
// iterate twice to show that we are not waiting on other now-closed channels
133+
for i := 0; i < 2; i++ {
134+
select {
135+
case <-time.After(time.Second):
136+
t.Fatalf("Should not have taken a second to get results on iteration %v", i)
137+
case labeledPtr := <-labeledChannel2:
138+
Assert(t).IsNotNil(labeledPtr, "ptr should not have been nil")
139+
labeled := *labeledPtr
140+
Assert(t).AreEqual(1, len(labeled), "Should have one result with a production deployment")
141+
Assert(t).AreEqual("maroono", labeled[0].ID, "Should have received maroono as the one production deployment")
142+
}
143+
}
144+
145+
// drain the first channel to show that it was closed. We do this
146+
// in a loop since it is possible that a value was sent on the channel
147+
success := make(chan struct{})
148+
go func() {
149+
for _ = range labeledChannel1 {
150+
}
151+
success <- struct{}{}
152+
}()
153+
select {
154+
case <-time.After(time.Second):
155+
t.Fatal("Should not have taken a second to see the closed label channel")
156+
case <-success:
157+
}
158+
}

0 commit comments

Comments
 (0)