From 751674dafd815bf69cf48d11ca05eaede4eac003 Mon Sep 17 00:00:00 2001 From: Saylor Berman Date: Wed, 11 Dec 2024 13:44:40 -0700 Subject: [PATCH] Use state file for updating N+ upstreams Problem: When splitting the data and control plane, we want to be able to maintain the ability to dynamically update upstreams without a reload if using NGINX Plus. With the current process, we write the upstreams into the nginx conf but don't reload, then call the API to actually do the update. Writing into the conf will trigger a reload from the agent once we split, however. Solution: Don't write the upstream servers into the nginx conf anymore when using NGINX Plus. Instead, utilize the `state` file option that the NGINX Plus API will populate with the upstream servers. This way we can just call the API and don't unintentionally reload by writing servers into the conf. --- internal/mode/static/handler.go | 104 +++++++----------- internal/mode/static/handler_test.go | 52 ++------- .../mode/static/nginx/config/http/config.go | 7 +- .../mode/static/nginx/config/stream/config.go | 7 +- .../mode/static/nginx/config/upstreams.go | 16 ++- .../static/nginx/config/upstreams_template.go | 7 +- .../static/nginx/config/upstreams_test.go | 65 ++++++++--- 7 files changed, 125 insertions(+), 133 deletions(-) diff --git a/internal/mode/static/handler.go b/internal/mode/static/handler.go index e09732e7f0..9f7efd163d 100644 --- a/internal/mode/static/handler.go +++ b/internal/mode/static/handler.go @@ -182,11 +182,11 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log h.setLatestConfiguration(&cfg) - err = h.updateUpstreamServers( - ctx, - logger, - cfg, - ) + if h.cfg.plus { + err = h.updateUpstreamServers(logger, cfg) + } else { + err = h.updateNginxConf(ctx, logger, cfg) + } case state.ClusterStateChange: h.version++ cfg := dataplane.BuildConfiguration(ctx, gr, h.cfg.serviceResolver, h.version) @@ -198,10 +198,7 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log h.setLatestConfiguration(&cfg) - err = h.updateNginxConf( - ctx, - cfg, - ) + err = h.updateNginxConf(ctx, logger, cfg) } var nginxReloadRes status.NginxReloadResult @@ -306,7 +303,11 @@ func (h *eventHandlerImpl) parseAndCaptureEvent(ctx context.Context, logger logr } // updateNginxConf updates nginx conf files and reloads nginx. -func (h *eventHandlerImpl) updateNginxConf(ctx context.Context, conf dataplane.Configuration) error { +func (h *eventHandlerImpl) updateNginxConf( + ctx context.Context, + logger logr.Logger, + conf dataplane.Configuration, +) error { files := h.cfg.generator.Generate(conf) if err := h.cfg.nginxFileMgr.ReplaceFiles(files); err != nil { return fmt.Errorf("failed to replace NGINX configuration files: %w", err) @@ -316,75 +317,52 @@ func (h *eventHandlerImpl) updateNginxConf(ctx context.Context, conf dataplane.C return fmt.Errorf("failed to reload NGINX: %w", err) } - return nil -} - -// updateUpstreamServers is called only when endpoints have changed. It updates nginx conf files and then: -// - if using NGINX Plus, determines which servers have changed and uses the N+ API to update them; -// - otherwise if not using NGINX Plus, or an error was returned from the API, reloads nginx. -func (h *eventHandlerImpl) updateUpstreamServers( - ctx context.Context, - logger logr.Logger, - conf dataplane.Configuration, -) error { - isPlus := h.cfg.nginxRuntimeMgr.IsPlus() - - files := h.cfg.generator.Generate(conf) - if err := h.cfg.nginxFileMgr.ReplaceFiles(files); err != nil { - return fmt.Errorf("failed to replace NGINX configuration files: %w", err) + // If using NGINX Plus, update upstream servers using the API. + if err := h.updateUpstreamServers(logger, conf); err != nil { + return fmt.Errorf("failed to update upstream servers: %w", err) } - reload := func() error { - if err := h.cfg.nginxRuntimeMgr.Reload(ctx, conf.Version); err != nil { - return fmt.Errorf("failed to reload NGINX: %w", err) - } + return nil +} +// updateUpstreamServers determines which servers have changed and uses the NGINX Plus API to update them. +// Only applicable when using NGINX Plus. +func (h *eventHandlerImpl) updateUpstreamServers(logger logr.Logger, conf dataplane.Configuration) error { + if !h.cfg.plus { return nil } - if isPlus { - type upstream struct { - name string - servers []ngxclient.UpstreamServer - } - var upstreams []upstream - - prevUpstreams, err := h.cfg.nginxRuntimeMgr.GetUpstreams() - if err != nil { - logger.Error(err, "failed to get upstreams from API, reloading configuration instead") - return reload() - } + type upstream struct { + name string + servers []ngxclient.UpstreamServer + } + var upstreams []upstream - for _, u := range conf.Upstreams { - confUpstream := upstream{ - name: u.Name, - servers: ngxConfig.ConvertEndpoints(u.Endpoints), - } + prevUpstreams, err := h.cfg.nginxRuntimeMgr.GetUpstreams() + if err != nil { + return fmt.Errorf("failed to get upstreams from API: %w", err) + } - if u, ok := prevUpstreams[confUpstream.name]; ok { - if !serversEqual(confUpstream.servers, u.Peers) { - upstreams = append(upstreams, confUpstream) - } - } + for _, u := range conf.Upstreams { + confUpstream := upstream{ + name: u.Name, + servers: ngxConfig.ConvertEndpoints(u.Endpoints), } - var reloadPlus bool - for _, upstream := range upstreams { - if err := h.cfg.nginxRuntimeMgr.UpdateHTTPServers(upstream.name, upstream.servers); err != nil { - logger.Error( - err, "couldn't update upstream via the API, reloading configuration instead", - "upstreamName", upstream.name, - ) - reloadPlus = true + if u, ok := prevUpstreams[confUpstream.name]; ok { + if !serversEqual(confUpstream.servers, u.Peers) { + upstreams = append(upstreams, confUpstream) } } + } - if !reloadPlus { - return nil + for _, upstream := range upstreams { + if err := h.cfg.nginxRuntimeMgr.UpdateHTTPServers(upstream.name, upstream.servers); err != nil { + logger.Error(err, "couldn't update upstream via the API", "upstreamName", upstream.name) } } - return reload() + return nil } func serversEqual(newServers []ngxclient.UpstreamServer, oldServers []ngxclient.Peer) bool { diff --git a/internal/mode/static/handler_test.go b/internal/mode/static/handler_test.go index 67bf0e8e0e..39408b0c30 100644 --- a/internal/mode/static/handler_test.go +++ b/internal/mode/static/handler_test.go @@ -428,15 +428,15 @@ var _ = Describe("eventHandler", func() { When("running NGINX Plus", func() { It("should call the NGINX Plus API", func() { - fakeNginxRuntimeMgr.IsPlusReturns(true) + handler.cfg.plus = true handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch) dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1) Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty()) - Expect(fakeGenerator.GenerateCallCount()).To(Equal(1)) - Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(1)) + Expect(fakeGenerator.GenerateCallCount()).To(Equal(0)) + Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(0)) Expect(fakeNginxRuntimeMgr.GetUpstreamsCallCount()).To(Equal(1)) }) }) @@ -465,19 +465,6 @@ var _ = Describe("eventHandler", func() { }, } - type callCounts struct { - generate int - update int - reload int - } - - assertCallCounts := func(cc callCounts) { - Expect(fakeGenerator.GenerateCallCount()).To(Equal(cc.generate)) - Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(cc.generate)) - Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(cc.update)) - Expect(fakeNginxRuntimeMgr.ReloadCallCount()).To(Equal(cc.reload)) - } - BeforeEach(func() { upstreams := ngxclient.Upstreams{ "one": ngxclient.Upstream{ @@ -491,42 +478,25 @@ var _ = Describe("eventHandler", func() { When("running NGINX Plus", func() { BeforeEach(func() { - fakeNginxRuntimeMgr.IsPlusReturns(true) + handler.cfg.plus = true }) It("should update servers using the NGINX Plus API", func() { - Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed()) - - assertCallCounts(callCounts{generate: 1, update: 1, reload: 0}) + Expect(handler.updateUpstreamServers(ctlrZap.New(), conf)).To(Succeed()) + Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(1)) }) - It("should reload when GET API returns an error", func() { + It("should return error when GET API returns an error", func() { fakeNginxRuntimeMgr.GetUpstreamsReturns(nil, errors.New("error")) - Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed()) - - assertCallCounts(callCounts{generate: 1, update: 0, reload: 1}) - }) - - It("should reload when POST API returns an error", func() { - fakeNginxRuntimeMgr.UpdateHTTPServersReturns(errors.New("error")) - Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed()) - - assertCallCounts(callCounts{generate: 1, update: 1, reload: 1}) + Expect(handler.updateUpstreamServers(ctlrZap.New(), conf)).ToNot(Succeed()) }) }) When("not running NGINX Plus", func() { - It("should update servers by reloading", func() { - Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed()) - - assertCallCounts(callCounts{generate: 1, update: 0, reload: 1}) - }) - - It("should return an error when reloading fails", func() { - fakeNginxRuntimeMgr.ReloadReturns(errors.New("error")) - Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).ToNot(Succeed()) + It("should not do anything", func() { + Expect(handler.updateUpstreamServers(ctlrZap.New(), conf)).To(Succeed()) - assertCallCounts(callCounts{generate: 1, update: 0, reload: 1}) + Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(0)) }) }) }) diff --git a/internal/mode/static/nginx/config/http/config.go b/internal/mode/static/nginx/config/http/config.go index 24aecaa3e4..6d063dc8a7 100644 --- a/internal/mode/static/nginx/config/http/config.go +++ b/internal/mode/static/nginx/config/http/config.go @@ -82,9 +82,10 @@ const ( // Upstream holds all configuration for an HTTP upstream. type Upstream struct { - Name string - ZoneSize string // format: 512k, 1m - Servers []UpstreamServer + Name string + ZoneSize string // format: 512k, 1m + StateFile string + Servers []UpstreamServer } // UpstreamServer holds all configuration for an HTTP upstream server. diff --git a/internal/mode/static/nginx/config/stream/config.go b/internal/mode/static/nginx/config/stream/config.go index ddc215eea7..0e9c7a4631 100644 --- a/internal/mode/static/nginx/config/stream/config.go +++ b/internal/mode/static/nginx/config/stream/config.go @@ -15,9 +15,10 @@ type Server struct { // Upstream holds all configuration for a stream upstream. type Upstream struct { - Name string - ZoneSize string // format: 512k, 1m - Servers []UpstreamServer + Name string + ZoneSize string // format: 512k, 1m + StateFile string // FIXME(sberman): temporary until stream upstreams template is split in UpstreamSettingsPolicy work. + Servers []UpstreamServer } // UpstreamServer holds all configuration for a stream upstream server. diff --git a/internal/mode/static/nginx/config/upstreams.go b/internal/mode/static/nginx/config/upstreams.go index 88c66c47fd..612a84a46f 100644 --- a/internal/mode/static/nginx/config/upstreams.go +++ b/internal/mode/static/nginx/config/upstreams.go @@ -27,6 +27,8 @@ const ( ossZoneSizeStream = "512k" // plusZoneSize is the upstream zone size for nginx plus. plusZoneSizeStream = "1m" + // stateDir is the directory for storing state files. + stateDir = "/var/lib/nginx/state" ) func (g GeneratorImpl) executeUpstreams(conf dataplane.Configuration) []executeResult { @@ -101,15 +103,18 @@ func (g GeneratorImpl) createUpstreams(upstreams []dataplane.Upstream) []http.Up } func (g GeneratorImpl) createUpstream(up dataplane.Upstream) http.Upstream { + var stateFile string zoneSize := ossZoneSize if g.plus { zoneSize = plusZoneSize + stateFile = fmt.Sprintf("%s/%s.conf", stateDir, up.Name) } if len(up.Endpoints) == 0 { return http.Upstream{ - Name: up.Name, - ZoneSize: zoneSize, + Name: up.Name, + ZoneSize: zoneSize, + StateFile: stateFile, Servers: []http.UpstreamServer{ { Address: nginx503Server, @@ -130,9 +135,10 @@ func (g GeneratorImpl) createUpstream(up dataplane.Upstream) http.Upstream { } return http.Upstream{ - Name: up.Name, - ZoneSize: zoneSize, - Servers: upstreamServers, + Name: up.Name, + ZoneSize: zoneSize, + StateFile: stateFile, + Servers: upstreamServers, } } diff --git a/internal/mode/static/nginx/config/upstreams_template.go b/internal/mode/static/nginx/config/upstreams_template.go index a04915bec8..40d5740ad0 100644 --- a/internal/mode/static/nginx/config/upstreams_template.go +++ b/internal/mode/static/nginx/config/upstreams_template.go @@ -12,8 +12,13 @@ upstream {{ $u.Name }} { {{ if $u.ZoneSize -}} zone {{ $u.Name }} {{ $u.ZoneSize }}; {{ end -}} - {{ range $server := $u.Servers }} + + {{- if $u.StateFile }} + state {{ $u.StateFile }}; + {{- else }} + {{ range $server := $u.Servers }} server {{ $server.Address }}; + {{- end }} {{- end }} } {{ end -}} diff --git a/internal/mode/static/nginx/config/upstreams_test.go b/internal/mode/static/nginx/config/upstreams_test.go index 5b3a8268a3..8cfeab1f9c 100644 --- a/internal/mode/static/nginx/config/upstreams_test.go +++ b/internal/mode/static/nginx/config/upstreams_test.go @@ -289,29 +289,60 @@ func TestCreateUpstreamPlus(t *testing.T) { t.Parallel() gen := GeneratorImpl{plus: true} - stateUpstream := dataplane.Upstream{ - Name: "multiple-endpoints", - Endpoints: []resolver.Endpoint{ - { - Address: "10.0.0.1", - Port: 80, + tests := []struct { + msg string + stateUpstream dataplane.Upstream + expectedUpstream http.Upstream + }{ + { + msg: "with endpoints", + stateUpstream: dataplane.Upstream{ + Name: "endpoints", + Endpoints: []resolver.Endpoint{ + { + Address: "10.0.0.1", + Port: 80, + }, + }, + }, + expectedUpstream: http.Upstream{ + Name: "endpoints", + ZoneSize: plusZoneSize, + StateFile: stateDir + "/endpoints.conf", + Servers: []http.UpstreamServer{ + { + Address: "10.0.0.1:80", + }, + }, }, }, - } - expectedUpstream := http.Upstream{ - Name: "multiple-endpoints", - ZoneSize: plusZoneSize, - Servers: []http.UpstreamServer{ - { - Address: "10.0.0.1:80", + { + msg: "no endpoints", + stateUpstream: dataplane.Upstream{ + Name: "no-endpoints", + Endpoints: []resolver.Endpoint{}, + }, + expectedUpstream: http.Upstream{ + Name: "no-endpoints", + ZoneSize: plusZoneSize, + StateFile: stateDir + "/no-endpoints.conf", + Servers: []http.UpstreamServer{ + { + Address: nginx503Server, + }, + }, }, }, } - result := gen.createUpstream(stateUpstream) - - g := NewWithT(t) - g.Expect(result).To(Equal(expectedUpstream)) + for _, test := range tests { + t.Run(test.msg, func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + result := gen.createUpstream(test.stateUpstream) + g.Expect(result).To(Equal(test.expectedUpstream)) + }) + } } func TestExecuteStreamUpstreams(t *testing.T) {