Skip to content

Commit c8999d5

Browse files
authored
Merge pull request #29 from deverton-godaddy/deverton/ignore-more-events
2 parents 0d25ae1 + 1a194ee commit c8999d5

File tree

3 files changed

+96
-28
lines changed

3 files changed

+96
-28
lines changed

main.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,21 @@ func run(conf config) error {
9595
return fmt.Errorf("unable to connect to Nomad: %s", err)
9696
}
9797

98+
self, err := nomad_client.Agent().Self()
99+
if err != nil {
100+
return fmt.Errorf("unable to query local agent info: %s", err)
101+
}
102+
103+
clientStats, ok := self.Stats["client"]
104+
if !ok {
105+
return fmt.Errorf("not running on a client node")
106+
}
107+
108+
nodeID, ok := clientStats["node_id"]
109+
if !ok {
110+
return fmt.Errorf("unable to get local node ID")
111+
}
112+
98113
// Step 1: Leader election
99114
ctx, cancel := context.WithCancel(context.Background())
100115
defer cancel()
@@ -114,7 +129,7 @@ func run(conf config) error {
114129
}
115130

116131
zap.S().Debug("Starting endpoint reaper")
117-
endpoint_reaper, err := reapers.NewEndpointReaper(cilium_client, nomad_client.Allocations(), nomad_client.EventStream())
132+
endpoint_reaper, err := reapers.NewEndpointReaper(cilium_client, nomad_client.Allocations(), nomad_client.EventStream(), nodeID)
118133
if err != nil {
119134
return err
120135
}

reapers/endpoints.go

Lines changed: 79 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"fmt"
66
"math"
7-
"net"
7+
"sort"
88
"strings"
99
"time"
1010

@@ -28,15 +28,17 @@ type EndpointReaper struct {
2828
cilium EndpointUpdater
2929
nomadAllocations AllocationInfo
3030
nomadEventStream EventStreamer
31+
nodeID string
3132
}
3233

3334
// NewEndpointReaper creates a new EndpointReaper. This will run an initial reconciliation before
3435
// returning the reaper
35-
func NewEndpointReaper(ciliumClient EndpointUpdater, nomadAllocations AllocationInfo, nomadEventStream EventStreamer) (*EndpointReaper, error) {
36+
func NewEndpointReaper(ciliumClient EndpointUpdater, nomadAllocations AllocationInfo, nomadEventStream EventStreamer, nodeID string) (*EndpointReaper, error) {
3637
reaper := EndpointReaper{
3738
cilium: ciliumClient,
3839
nomadAllocations: nomadAllocations,
3940
nomadEventStream: nomadEventStream,
41+
nodeID: nodeID,
4042
}
4143

4244
// Do the initial reconciliation loop
@@ -57,7 +59,7 @@ func (e *EndpointReaper) Run(ctx context.Context) (<-chan bool, error) {
5759
eventChan, err := e.nomadEventStream.Stream(
5860
ctx,
5961
map[nomad_api.Topic][]string{
60-
nomad_api.TopicJob: {},
62+
nomad_api.TopicAllocation: {"*"},
6163
},
6264
math.MaxInt64,
6365
&nomad_api.QueryOptions{
@@ -123,13 +125,12 @@ func (e *EndpointReaper) reconcile() error {
123125
zap.L().Debug("checking each endpoint", zap.Int("endpoints-total", len(endpoints)))
124126

125127
for _, endpoint := range endpoints {
126-
endpointID := endpoint_id.NewCiliumID(endpoint.ID)
127128
containerID := endpoint.Status.ExternalIdentifiers.ContainerID
128129

129130
// Only managing endpoints with container IDs
130131
if containerID == "" {
131132
zap.L().Debug("Skipping endpoint that is not associated with a container",
132-
zap.String("endpoint-id", endpointID),
133+
zap.Int64("endpoint-id", endpoint.ID),
133134
)
134135
continue
135136
}
@@ -139,7 +140,7 @@ func (e *EndpointReaper) reconcile() error {
139140
if err != nil {
140141
zap.L().Warn("Couldn't fetch allocation from Nomad",
141142
zap.String("container-id", containerID),
142-
zap.String("endpoint-id", endpointID),
143+
zap.Int64("endpoint-id", endpoint.ID),
143144
zap.Error(err),
144145
)
145146
continue
@@ -148,15 +149,15 @@ func (e *EndpointReaper) reconcile() error {
148149
if allocation != nil {
149150
zap.L().Debug("Patching labels on endpoint",
150151
zap.String("container-id", containerID),
151-
zap.String("endpoint-id", endpointID),
152+
zap.Int64("endpoint-id", endpoint.ID),
152153
zap.Error(err),
153154
)
154155

155-
e.labelEndpoint(endpointID, allocation)
156+
e.labelEndpoint(endpoint, allocation)
156157
} else {
157158
zap.L().Debug("Skipping endpoint as allocation not in Nomad",
158159
zap.String("container-id", containerID),
159-
zap.String("endpoint-id", endpointID),
160+
zap.Int64("endpoint-id", endpoint.ID),
160161
zap.Error(err),
161162
)
162163
}
@@ -185,6 +186,17 @@ func (e *EndpointReaper) handleAllocationUpdated(event nomad_api.Event) {
185186
return
186187
}
187188

189+
if allocation.NodeID != e.nodeID {
190+
zap.L().Debug("Allocation is not for this node, ignoring",
191+
zap.String("event-type", event.Type),
192+
zap.Uint64("event-index", event.Index),
193+
zap.String("allocation-node-id", allocation.NodeID),
194+
zap.String("container-id", allocation.ID),
195+
zap.String("node-id", e.nodeID),
196+
)
197+
return
198+
}
199+
188200
if allocation.NetworkStatus == nil || allocation.NetworkStatus.Address == "" {
189201
zap.L().Debug("Allocation has no IP address, ignoring",
190202
zap.String("event-type", event.Type),
@@ -194,15 +206,23 @@ func (e *EndpointReaper) handleAllocationUpdated(event nomad_api.Event) {
194206
return
195207
}
196208

197-
allocationIP := net.ParseIP(allocation.NetworkStatus.Address)
198-
endpointID := endpoint_id.NewIPPrefixID(allocationIP)
209+
if allocation.ServerTerminalStatus() || allocation.ClientTerminalStatus() {
210+
zap.L().Debug("Allocation is terminating, ignoring",
211+
zap.String("event-type", event.Type),
212+
zap.Uint64("event-index", event.Index),
213+
zap.String("container-id", allocation.ID),
214+
zap.String("client-status", allocation.ClientStatus),
215+
zap.String("desired-status", allocation.DesiredStatus),
216+
)
217+
return
218+
}
199219

200-
endpoint, err := e.cilium.EndpointGet(endpointID)
220+
endpoint, err := e.cilium.EndpointGet(endpoint_id.NewID(endpoint_id.ContainerIdPrefix, allocation.ID))
201221
if err != nil {
202222
fields := []zap.Field{zap.String("event-type", event.Type),
203223
zap.Uint64("event-index", event.Index),
204224
zap.String("container-id", allocation.ID),
205-
zap.String("endpoint-id", endpointID),
225+
zap.Int64("endpoint-id", endpoint.ID),
206226
zap.Error(err),
207227
}
208228
if strings.Contains(err.Error(), "getEndpointIdNotFound") {
@@ -222,20 +242,38 @@ func (e *EndpointReaper) handleAllocationUpdated(event nomad_api.Event) {
222242
zap.String("event-type", event.Type),
223243
zap.Uint64("event-index", event.Index),
224244
zap.String("container-id", allocation.ID),
225-
zap.String("endpoint-id", endpointID),
245+
zap.Int64("endpoint-id", endpoint.ID),
226246
zap.Error(err),
227247
)
228248
return
229249
}
230250
}
231251

232-
e.labelEndpoint(endpoint_id.NewCiliumID(endpoint.ID), allocation)
252+
e.labelEndpoint(endpoint, allocation)
253+
}
254+
255+
// stringArrayEqual compares two unordered arrays for equality
256+
func stringArrayEqual(left []string, right []string) bool {
257+
if len(left) != len(right) {
258+
return false
259+
}
260+
261+
sort.Strings(left)
262+
sort.Strings(right)
263+
264+
for i, v := range left {
265+
if v != right[i] {
266+
return false
267+
}
268+
}
269+
270+
return true
233271
}
234272

235-
func (e *EndpointReaper) labelEndpoint(endpointID string, allocation *nomad_api.Allocation) {
236-
labels := models.Labels{fmt.Sprintf("%s:%s=%s", netreapLabelPrefix, jobIDLabel, allocation.JobID)}
237-
labels = append(labels, fmt.Sprintf("%s:%s=%s", netreapLabelPrefix, namespaceLabel, allocation.Namespace))
238-
labels = append(labels, fmt.Sprintf("%s:%s=%s", netreapLabelPrefix, taskGroupLabel, allocation.TaskGroup))
273+
func (e *EndpointReaper) labelEndpoint(endpoint *models.Endpoint, allocation *nomad_api.Allocation) {
274+
newLabels := models.Labels{fmt.Sprintf("%s:%s=%s", netreapLabelPrefix, jobIDLabel, allocation.JobID)}
275+
newLabels = append(newLabels, fmt.Sprintf("%s:%s=%s", netreapLabelPrefix, namespaceLabel, allocation.Namespace))
276+
newLabels = append(newLabels, fmt.Sprintf("%s:%s=%s", netreapLabelPrefix, taskGroupLabel, allocation.TaskGroup))
239277

240278
// Combine the metadata from the job and the task group with the task group taking precedence
241279
metadata := make(map[string]string)
@@ -252,26 +290,41 @@ func (e *EndpointReaper) labelEndpoint(endpointID string, allocation *nomad_api.
252290
}
253291

254292
for k, v := range metadata {
255-
labels = append(labels, fmt.Sprintf("%s:%s=%s", nomadLabelPrefix, k, v))
293+
newLabels = append(newLabels, fmt.Sprintf("%s:%s=%s", nomadLabelPrefix, k, v))
294+
}
295+
296+
oldLabels := models.Labels{}
297+
oldLabels = append(oldLabels, endpoint.Status.Labels.SecurityRelevant...)
298+
oldLabels = append(oldLabels, endpoint.Status.Labels.Derived...)
299+
300+
if stringArrayEqual(oldLabels, newLabels) {
301+
zap.L().Debug("Labels unchanged so not patching endpoint",
302+
zap.String("container-id", allocation.ID),
303+
zap.Int64("endpoint-id", endpoint.ID),
304+
zap.Strings("new-labels", newLabels),
305+
zap.Strings("old-labels", oldLabels),
306+
)
307+
308+
return
256309
}
257310

258311
ecr := &models.EndpointChangeRequest{
259312
ContainerID: allocation.ID,
260313
ContainerName: allocation.Name,
261-
Labels: labels,
314+
Labels: newLabels,
262315
State: models.EndpointStateWaitingDashForDashIdentity.Pointer(),
263316
}
264317

265318
f := func() error {
266-
err := e.cilium.EndpointPatch(endpointID, ecr)
319+
err := e.cilium.EndpointPatch(endpoint_id.NewCiliumID(endpoint.ID), ecr)
267320
if err != nil {
268321
// The Cilium client endpoints pass errors through Hint() that does fmt.Errorf to all errors without wrapping
269322
// so we have to treat them as strings
270323
if strings.Contains(err.Error(), "patchEndpointIdTooManyRequests") {
271324
zap.L().Warn("Hit Cilium API rate limit, retrying",
272325
zap.String("container-id", allocation.ID),
273-
zap.String("endpoint-id", endpointID),
274-
zap.Strings("labels", labels),
326+
zap.Int64("endpoint-id", endpoint.ID),
327+
zap.Strings("labels", newLabels),
275328
)
276329
return err
277330
}
@@ -288,8 +341,8 @@ func (e *EndpointReaper) labelEndpoint(endpointID string, allocation *nomad_api.
288341
if permanent, ok := err.(*backoff.PermanentError); ok {
289342
zap.L().Error("Error while patching the endpoint labels of container",
290343
zap.String("container-id", allocation.ID),
291-
zap.String("endpoint-id", endpointID),
292-
zap.Strings("labels", labels),
344+
zap.Int64("endpoint-id", endpoint.ID),
345+
zap.Strings("labels", newLabels),
293346
zap.Error(permanent.Unwrap()),
294347
)
295348
}

reapers/endpoints_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func TestEndpointReconcile(t *testing.T) {
140140
for _, tt := range tests {
141141
tt := tt
142142
t.Run(tt.name, func(t *testing.T) {
143-
reaper, err := NewEndpointReaper(tt.cilium, tt.nomadAllocations, nil)
143+
reaper, err := NewEndpointReaper(tt.cilium, tt.nomadAllocations, nil, "")
144144
if err != nil {
145145
t.Fatalf("unexpected error creating poller %v", err)
146146
}

0 commit comments

Comments
 (0)