Skip to content

Commit 2fac0eb

Browse files
committed
Use state file for updating N+ upstreams
Problem: When splitting the data and control plane, we want to be able to maintain the ability to dynamically update upstreams without a reload if using NGINX Plus. With the current process, we write the upstreams into the nginx conf but don't reload, then call the API to actually do the update. Writing into the conf will trigger a reload from the agent once we split, however. There were also two bugs: - if metrics were disabled, the nginx plus client was not initialized, preventing API calls from occuring and instead a reload occurred - stream upstreams were not updated with the API Solution: Don't write the upstream servers into the nginx conf anymore when using NGINX Plus. Instead, utilize the `state` file option that the NGINX Plus API will populate with the upstream servers. This way we can just call the API and don't unintentionally reload by writing servers into the conf. Also added support for updating stream upstreams, and fixed the initialization bug.
1 parent 245b516 commit 2fac0eb

File tree

14 files changed

+693
-193
lines changed

14 files changed

+693
-193
lines changed

internal/mode/static/handler.go

Lines changed: 84 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,11 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
182182

183183
h.setLatestConfiguration(&cfg)
184184

185-
err = h.updateUpstreamServers(
186-
ctx,
187-
logger,
188-
cfg,
189-
)
185+
if h.cfg.plus {
186+
err = h.updateUpstreamServers(logger, cfg)
187+
} else {
188+
err = h.updateNginxConf(ctx, logger, cfg)
189+
}
190190
case state.ClusterStateChange:
191191
h.version++
192192
cfg := dataplane.BuildConfiguration(ctx, gr, h.cfg.serviceResolver, h.version)
@@ -198,10 +198,7 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
198198

199199
h.setLatestConfiguration(&cfg)
200200

201-
err = h.updateNginxConf(
202-
ctx,
203-
cfg,
204-
)
201+
err = h.updateNginxConf(ctx, logger, cfg)
205202
}
206203

207204
var nginxReloadRes status.NginxReloadResult
@@ -306,7 +303,11 @@ func (h *eventHandlerImpl) parseAndCaptureEvent(ctx context.Context, logger logr
306303
}
307304

308305
// updateNginxConf updates nginx conf files and reloads nginx.
309-
func (h *eventHandlerImpl) updateNginxConf(ctx context.Context, conf dataplane.Configuration) error {
306+
func (h *eventHandlerImpl) updateNginxConf(
307+
ctx context.Context,
308+
logger logr.Logger,
309+
conf dataplane.Configuration,
310+
) error {
310311
files := h.cfg.generator.Generate(conf)
311312
if err := h.cfg.nginxFileMgr.ReplaceFiles(files); err != nil {
312313
return fmt.Errorf("failed to replace NGINX configuration files: %w", err)
@@ -316,89 +317,111 @@ func (h *eventHandlerImpl) updateNginxConf(ctx context.Context, conf dataplane.C
316317
return fmt.Errorf("failed to reload NGINX: %w", err)
317318
}
318319

320+
// If using NGINX Plus, update upstream servers using the API.
321+
if err := h.updateUpstreamServers(logger, conf); err != nil {
322+
return fmt.Errorf("failed to update upstream servers: %w", err)
323+
}
324+
319325
return nil
320326
}
321327

322-
// updateUpstreamServers is called only when endpoints have changed. It updates nginx conf files and then:
323-
// - if using NGINX Plus, determines which servers have changed and uses the N+ API to update them;
324-
// - otherwise if not using NGINX Plus, or an error was returned from the API, reloads nginx.
325-
func (h *eventHandlerImpl) updateUpstreamServers(
326-
ctx context.Context,
327-
logger logr.Logger,
328-
conf dataplane.Configuration,
329-
) error {
330-
isPlus := h.cfg.nginxRuntimeMgr.IsPlus()
331-
332-
files := h.cfg.generator.Generate(conf)
333-
if err := h.cfg.nginxFileMgr.ReplaceFiles(files); err != nil {
334-
return fmt.Errorf("failed to replace NGINX configuration files: %w", err)
328+
// updateUpstreamServers determines which servers have changed and uses the NGINX Plus API to update them.
329+
// Only applicable when using NGINX Plus.
330+
func (h *eventHandlerImpl) updateUpstreamServers(logger logr.Logger, conf dataplane.Configuration) error {
331+
if !h.cfg.plus {
332+
return nil
335333
}
336334

337-
reload := func() error {
338-
if err := h.cfg.nginxRuntimeMgr.Reload(ctx, conf.Version); err != nil {
339-
return fmt.Errorf("failed to reload NGINX: %w", err)
340-
}
335+
prevUpstreams, prevStreamUpstreams, err := h.cfg.nginxRuntimeMgr.GetUpstreams()
336+
if err != nil {
337+
return fmt.Errorf("failed to get upstreams from API: %w", err)
338+
}
341339

342-
return nil
340+
type upstream struct {
341+
name string
342+
servers []ngxclient.UpstreamServer
343343
}
344+
var upstreams []upstream
344345

345-
if isPlus {
346-
type upstream struct {
347-
name string
348-
servers []ngxclient.UpstreamServer
346+
for _, u := range conf.Upstreams {
347+
confUpstream := upstream{
348+
name: u.Name,
349+
servers: ngxConfig.ConvertEndpoints(u.Endpoints),
349350
}
350-
var upstreams []upstream
351351

352-
prevUpstreams, err := h.cfg.nginxRuntimeMgr.GetUpstreams()
353-
if err != nil {
354-
logger.Error(err, "failed to get upstreams from API, reloading configuration instead")
355-
return reload()
352+
if u, ok := prevUpstreams[confUpstream.name]; ok {
353+
if !serversEqual(confUpstream.servers, u.Peers) {
354+
upstreams = append(upstreams, confUpstream)
355+
}
356356
}
357+
}
357358

358-
for _, u := range conf.Upstreams {
359-
confUpstream := upstream{
360-
name: u.Name,
361-
servers: ngxConfig.ConvertEndpoints(u.Endpoints),
362-
}
359+
type streamUpstream struct {
360+
name string
361+
servers []ngxclient.StreamUpstreamServer
362+
}
363+
var streamUpstreams []streamUpstream
363364

364-
if u, ok := prevUpstreams[confUpstream.name]; ok {
365-
if !serversEqual(confUpstream.servers, u.Peers) {
366-
upstreams = append(upstreams, confUpstream)
367-
}
368-
}
365+
for _, u := range conf.StreamUpstreams {
366+
confUpstream := streamUpstream{
367+
name: u.Name,
368+
servers: ngxConfig.ConvertStreamEndpoints(u.Endpoints),
369369
}
370370

371-
var reloadPlus bool
372-
for _, upstream := range upstreams {
373-
if err := h.cfg.nginxRuntimeMgr.UpdateHTTPServers(upstream.name, upstream.servers); err != nil {
374-
logger.Error(
375-
err, "couldn't update upstream via the API, reloading configuration instead",
376-
"upstreamName", upstream.name,
377-
)
378-
reloadPlus = true
371+
if u, ok := prevStreamUpstreams[confUpstream.name]; ok {
372+
if !serversEqual(confUpstream.servers, u.Peers) {
373+
streamUpstreams = append(streamUpstreams, confUpstream)
379374
}
380375
}
376+
}
377+
378+
for _, upstream := range upstreams {
379+
if err := h.cfg.nginxRuntimeMgr.UpdateHTTPServers(upstream.name, upstream.servers); err != nil {
380+
logger.Error(err, "couldn't update upstream via the API", "upstreamName", upstream.name)
381+
}
382+
}
381383

382-
if !reloadPlus {
383-
return nil
384+
for _, upstream := range streamUpstreams {
385+
if err := h.cfg.nginxRuntimeMgr.UpdateStreamServers(upstream.name, upstream.servers); err != nil {
386+
logger.Error(err, "couldn't update stream upstream via the API", "upstreamName", upstream.name)
384387
}
385388
}
386389

387-
return reload()
390+
return nil
388391
}
389392

390-
func serversEqual(newServers []ngxclient.UpstreamServer, oldServers []ngxclient.Peer) bool {
393+
// serversEqual accepts lists of either UpstreamServer/Peer or StreamUpstreamServer/StreamPeer and determines
394+
// if the server names within these lists are equal.
395+
func serversEqual[
396+
upstreamServer ngxclient.UpstreamServer | ngxclient.StreamUpstreamServer,
397+
peer ngxclient.Peer | ngxclient.StreamPeer,
398+
](newServers []upstreamServer, oldServers []peer) bool {
391399
if len(newServers) != len(oldServers) {
392400
return false
393401
}
394402

403+
getServerVal := func(T any) string {
404+
var server string
405+
switch t := T.(type) {
406+
case ngxclient.UpstreamServer:
407+
server = t.Server
408+
case ngxclient.StreamUpstreamServer:
409+
server = t.Server
410+
case ngxclient.Peer:
411+
server = t.Server
412+
case ngxclient.StreamPeer:
413+
server = t.Server
414+
}
415+
return server
416+
}
417+
395418
diff := make(map[string]struct{}, len(newServers))
396419
for _, s := range newServers {
397-
diff[s.Server] = struct{}{}
420+
diff[getServerVal(s)] = struct{}{}
398421
}
399422

400423
for _, s := range oldServers {
401-
if _, ok := diff[s.Server]; !ok {
424+
if _, ok := diff[getServerVal(s)]; !ok {
402425
return false
403426
}
404427
}

internal/mode/static/handler_test.go

Lines changed: 75 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -423,20 +423,29 @@ var _ = Describe("eventHandler", func() {
423423
},
424424
},
425425
}
426-
fakeNginxRuntimeMgr.GetUpstreamsReturns(upstreams, nil)
426+
427+
streamUpstreams := ngxclient.StreamUpstreams{
428+
"two": ngxclient.StreamUpstream{
429+
Peers: []ngxclient.StreamPeer{
430+
{Server: "server2"},
431+
},
432+
},
433+
}
434+
435+
fakeNginxRuntimeMgr.GetUpstreamsReturns(upstreams, streamUpstreams, nil)
427436
})
428437

429438
When("running NGINX Plus", func() {
430439
It("should call the NGINX Plus API", func() {
431-
fakeNginxRuntimeMgr.IsPlusReturns(true)
440+
handler.cfg.plus = true
432441

433442
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
434443

435444
dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1)
436445
Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty())
437446

438-
Expect(fakeGenerator.GenerateCallCount()).To(Equal(1))
439-
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(1))
447+
Expect(fakeGenerator.GenerateCallCount()).To(Equal(0))
448+
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(0))
440449
Expect(fakeNginxRuntimeMgr.GetUpstreamsCallCount()).To(Equal(1))
441450
})
442451
})
@@ -463,19 +472,11 @@ var _ = Describe("eventHandler", func() {
463472
Name: "one",
464473
},
465474
},
466-
}
467-
468-
type callCounts struct {
469-
generate int
470-
update int
471-
reload int
472-
}
473-
474-
assertCallCounts := func(cc callCounts) {
475-
Expect(fakeGenerator.GenerateCallCount()).To(Equal(cc.generate))
476-
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(cc.generate))
477-
Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(cc.update))
478-
Expect(fakeNginxRuntimeMgr.ReloadCallCount()).To(Equal(cc.reload))
475+
StreamUpstreams: []dataplane.Upstream{
476+
{
477+
Name: "two",
478+
},
479+
},
479480
}
480481

481482
BeforeEach(func() {
@@ -486,47 +487,39 @@ var _ = Describe("eventHandler", func() {
486487
},
487488
},
488489
}
489-
fakeNginxRuntimeMgr.GetUpstreamsReturns(upstreams, nil)
490+
491+
streamUpstreams := ngxclient.StreamUpstreams{
492+
"two": ngxclient.StreamUpstream{
493+
Peers: []ngxclient.StreamPeer{
494+
{Server: "server2"},
495+
},
496+
},
497+
}
498+
499+
fakeNginxRuntimeMgr.GetUpstreamsReturns(upstreams, streamUpstreams, nil)
490500
})
491501

492502
When("running NGINX Plus", func() {
493503
BeforeEach(func() {
494-
fakeNginxRuntimeMgr.IsPlusReturns(true)
504+
handler.cfg.plus = true
495505
})
496506

497507
It("should update servers using the NGINX Plus API", func() {
498-
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())
499-
500-
assertCallCounts(callCounts{generate: 1, update: 1, reload: 0})
501-
})
502-
503-
It("should reload when GET API returns an error", func() {
504-
fakeNginxRuntimeMgr.GetUpstreamsReturns(nil, errors.New("error"))
505-
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())
506-
507-
assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
508+
Expect(handler.updateUpstreamServers(ctlrZap.New(), conf)).To(Succeed())
509+
Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(1))
508510
})
509511

510-
It("should reload when POST API returns an error", func() {
511-
fakeNginxRuntimeMgr.UpdateHTTPServersReturns(errors.New("error"))
512-
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())
513-
514-
assertCallCounts(callCounts{generate: 1, update: 1, reload: 1})
512+
It("should return error when GET API returns an error", func() {
513+
fakeNginxRuntimeMgr.GetUpstreamsReturns(nil, nil, errors.New("error"))
514+
Expect(handler.updateUpstreamServers(ctlrZap.New(), conf)).ToNot(Succeed())
515515
})
516516
})
517517

518518
When("not running NGINX Plus", func() {
519-
It("should update servers by reloading", func() {
520-
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())
519+
It("should not do anything", func() {
520+
Expect(handler.updateUpstreamServers(ctlrZap.New(), conf)).To(Succeed())
521521

522-
assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
523-
})
524-
525-
It("should return an error when reloading fails", func() {
526-
fakeNginxRuntimeMgr.ReloadReturns(errors.New("error"))
527-
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).ToNot(Succeed())
528-
529-
assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
522+
Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(0))
530523
})
531524
})
532525
})
@@ -612,7 +605,7 @@ var _ = Describe("eventHandler", func() {
612605
})
613606

614607
var _ = Describe("serversEqual", func() {
615-
DescribeTable("determines if server lists are equal",
608+
DescribeTable("determines if HTTP server lists are equal",
616609
func(newServers []ngxclient.UpstreamServer, oldServers []ngxclient.Peer, equal bool) {
617610
Expect(serversEqual(newServers, oldServers)).To(Equal(equal))
618611
},
@@ -649,6 +642,43 @@ var _ = Describe("serversEqual", func() {
649642
true,
650643
),
651644
)
645+
DescribeTable("determines if stream server lists are equal",
646+
func(newServers []ngxclient.StreamUpstreamServer, oldServers []ngxclient.StreamPeer, equal bool) {
647+
Expect(serversEqual(newServers, oldServers)).To(Equal(equal))
648+
},
649+
Entry("different length",
650+
[]ngxclient.StreamUpstreamServer{
651+
{Server: "server1"},
652+
},
653+
[]ngxclient.StreamPeer{
654+
{Server: "server1"},
655+
{Server: "server2"},
656+
},
657+
false,
658+
),
659+
Entry("differing elements",
660+
[]ngxclient.StreamUpstreamServer{
661+
{Server: "server1"},
662+
{Server: "server2"},
663+
},
664+
[]ngxclient.StreamPeer{
665+
{Server: "server1"},
666+
{Server: "server3"},
667+
},
668+
false,
669+
),
670+
Entry("same elements",
671+
[]ngxclient.StreamUpstreamServer{
672+
{Server: "server1"},
673+
{Server: "server2"},
674+
},
675+
[]ngxclient.StreamPeer{
676+
{Server: "server1"},
677+
{Server: "server2"},
678+
},
679+
true,
680+
),
681+
)
652682
})
653683

654684
var _ = Describe("getGatewayAddresses", func() {

0 commit comments

Comments
 (0)