Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 41 additions & 63 deletions internal/mode/static/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,11 @@

h.setLatestConfiguration(&cfg)

err = h.updateUpstreamServers(
ctx,
logger,
cfg,
)
if h.cfg.plus {
err = h.updateUpstreamServers(logger, cfg)
} else {
err = h.updateNginxConf(ctx, logger, cfg)
}
case state.ClusterStateChange:
h.version++
cfg := dataplane.BuildConfiguration(ctx, gr, h.cfg.serviceResolver, h.version)
Expand All @@ -198,10 +198,7 @@

h.setLatestConfiguration(&cfg)

err = h.updateNginxConf(
ctx,
cfg,
)
err = h.updateNginxConf(ctx, logger, cfg)
}

var nginxReloadRes status.NginxReloadResult
Expand Down Expand Up @@ -306,7 +303,11 @@
}

// updateNginxConf updates nginx conf files and reloads nginx.
func (h *eventHandlerImpl) updateNginxConf(ctx context.Context, conf dataplane.Configuration) error {
func (h *eventHandlerImpl) updateNginxConf(
ctx context.Context,
logger logr.Logger,
conf dataplane.Configuration,
) error {
files := h.cfg.generator.Generate(conf)
if err := h.cfg.nginxFileMgr.ReplaceFiles(files); err != nil {
return fmt.Errorf("failed to replace NGINX configuration files: %w", err)
Expand All @@ -316,75 +317,52 @@
return fmt.Errorf("failed to reload NGINX: %w", err)
}

return nil
}

// updateUpstreamServers is called only when endpoints have changed. It updates nginx conf files and then:
// - if using NGINX Plus, determines which servers have changed and uses the N+ API to update them;
// - otherwise if not using NGINX Plus, or an error was returned from the API, reloads nginx.
func (h *eventHandlerImpl) updateUpstreamServers(
ctx context.Context,
logger logr.Logger,
conf dataplane.Configuration,
) error {
isPlus := h.cfg.nginxRuntimeMgr.IsPlus()

files := h.cfg.generator.Generate(conf)
if err := h.cfg.nginxFileMgr.ReplaceFiles(files); err != nil {
return fmt.Errorf("failed to replace NGINX configuration files: %w", err)
// If using NGINX Plus, update upstream servers using the API.
if err := h.updateUpstreamServers(logger, conf); err != nil {
return fmt.Errorf("failed to update upstream servers: %w", err)

Check warning on line 322 in internal/mode/static/handler.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/handler.go#L322

Added line #L322 was not covered by tests
}

reload := func() error {
if err := h.cfg.nginxRuntimeMgr.Reload(ctx, conf.Version); err != nil {
return fmt.Errorf("failed to reload NGINX: %w", err)
}
return nil
}

// updateUpstreamServers determines which servers have changed and uses the NGINX Plus API to update them.
// Only applicable when using NGINX Plus.
func (h *eventHandlerImpl) updateUpstreamServers(logger logr.Logger, conf dataplane.Configuration) error {
if !h.cfg.plus {
return nil
}

if isPlus {
type upstream struct {
name string
servers []ngxclient.UpstreamServer
}
var upstreams []upstream

prevUpstreams, err := h.cfg.nginxRuntimeMgr.GetUpstreams()
if err != nil {
logger.Error(err, "failed to get upstreams from API, reloading configuration instead")
return reload()
}
type upstream struct {
name string
servers []ngxclient.UpstreamServer
}
var upstreams []upstream

for _, u := range conf.Upstreams {
confUpstream := upstream{
name: u.Name,
servers: ngxConfig.ConvertEndpoints(u.Endpoints),
}
prevUpstreams, err := h.cfg.nginxRuntimeMgr.GetUpstreams()
if err != nil {
return fmt.Errorf("failed to get upstreams from API: %w", err)
}

if u, ok := prevUpstreams[confUpstream.name]; ok {
if !serversEqual(confUpstream.servers, u.Peers) {
upstreams = append(upstreams, confUpstream)
}
}
for _, u := range conf.Upstreams {
confUpstream := upstream{
name: u.Name,
servers: ngxConfig.ConvertEndpoints(u.Endpoints),
}

var reloadPlus bool
for _, upstream := range upstreams {
if err := h.cfg.nginxRuntimeMgr.UpdateHTTPServers(upstream.name, upstream.servers); err != nil {
logger.Error(
err, "couldn't update upstream via the API, reloading configuration instead",
"upstreamName", upstream.name,
)
reloadPlus = true
if u, ok := prevUpstreams[confUpstream.name]; ok {
if !serversEqual(confUpstream.servers, u.Peers) {
upstreams = append(upstreams, confUpstream)
}
}
}

if !reloadPlus {
return nil
for _, upstream := range upstreams {
if err := h.cfg.nginxRuntimeMgr.UpdateHTTPServers(upstream.name, upstream.servers); err != nil {
logger.Error(err, "couldn't update upstream via the API", "upstreamName", upstream.name)

Check warning on line 361 in internal/mode/static/handler.go

View check run for this annotation

Codecov / codecov/patch

internal/mode/static/handler.go#L361

Added line #L361 was not covered by tests
}
}

return reload()
return nil
}

func serversEqual(newServers []ngxclient.UpstreamServer, oldServers []ngxclient.Peer) bool {
Expand Down
52 changes: 11 additions & 41 deletions internal/mode/static/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,15 +428,15 @@ var _ = Describe("eventHandler", func() {

When("running NGINX Plus", func() {
It("should call the NGINX Plus API", func() {
fakeNginxRuntimeMgr.IsPlusReturns(true)
handler.cfg.plus = true

handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)

dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1)
Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty())

Expect(fakeGenerator.GenerateCallCount()).To(Equal(1))
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(1))
Expect(fakeGenerator.GenerateCallCount()).To(Equal(0))
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(0))
Expect(fakeNginxRuntimeMgr.GetUpstreamsCallCount()).To(Equal(1))
})
})
Expand Down Expand Up @@ -465,19 +465,6 @@ var _ = Describe("eventHandler", func() {
},
}

type callCounts struct {
generate int
update int
reload int
}

assertCallCounts := func(cc callCounts) {
Expect(fakeGenerator.GenerateCallCount()).To(Equal(cc.generate))
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(cc.generate))
Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(cc.update))
Expect(fakeNginxRuntimeMgr.ReloadCallCount()).To(Equal(cc.reload))
}

BeforeEach(func() {
upstreams := ngxclient.Upstreams{
"one": ngxclient.Upstream{
Expand All @@ -491,42 +478,25 @@ var _ = Describe("eventHandler", func() {

When("running NGINX Plus", func() {
BeforeEach(func() {
fakeNginxRuntimeMgr.IsPlusReturns(true)
handler.cfg.plus = true
})

It("should update servers using the NGINX Plus API", func() {
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())

assertCallCounts(callCounts{generate: 1, update: 1, reload: 0})
Expect(handler.updateUpstreamServers(ctlrZap.New(), conf)).To(Succeed())
Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(1))
})

It("should reload when GET API returns an error", func() {
It("should return error when GET API returns an error", func() {
fakeNginxRuntimeMgr.GetUpstreamsReturns(nil, errors.New("error"))
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())

assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
})

It("should reload when POST API returns an error", func() {
fakeNginxRuntimeMgr.UpdateHTTPServersReturns(errors.New("error"))
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())

assertCallCounts(callCounts{generate: 1, update: 1, reload: 1})
Expect(handler.updateUpstreamServers(ctlrZap.New(), conf)).ToNot(Succeed())
})
})

When("not running NGINX Plus", func() {
It("should update servers by reloading", func() {
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())

assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
})

It("should return an error when reloading fails", func() {
fakeNginxRuntimeMgr.ReloadReturns(errors.New("error"))
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).ToNot(Succeed())
It("should not do anything", func() {
Expect(handler.updateUpstreamServers(ctlrZap.New(), conf)).To(Succeed())

assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(0))
})
})
})
Expand Down
7 changes: 4 additions & 3 deletions internal/mode/static/nginx/config/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ const (

// Upstream holds all configuration for an HTTP upstream.
type Upstream struct {
Name string
ZoneSize string // format: 512k, 1m
Servers []UpstreamServer
Name string
ZoneSize string // format: 512k, 1m
StateFile string
Servers []UpstreamServer
}

// UpstreamServer holds all configuration for an HTTP upstream server.
Expand Down
7 changes: 4 additions & 3 deletions internal/mode/static/nginx/config/stream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ type Server struct {

// Upstream holds all configuration for a stream upstream.
type Upstream struct {
Name string
ZoneSize string // format: 512k, 1m
Servers []UpstreamServer
Name string
ZoneSize string // format: 512k, 1m
StateFile string // FIXME(sberman): temporary until stream upstreams template is split in UpstreamSettingsPolicy work.
Servers []UpstreamServer
}

// UpstreamServer holds all configuration for a stream upstream server.
Expand Down
16 changes: 11 additions & 5 deletions internal/mode/static/nginx/config/upstreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
ossZoneSizeStream = "512k"
// plusZoneSize is the upstream zone size for nginx plus.
plusZoneSizeStream = "1m"
// stateDir is the directory for storing state files.
stateDir = "/var/lib/nginx/state"
)

func (g GeneratorImpl) executeUpstreams(conf dataplane.Configuration) []executeResult {
Expand Down Expand Up @@ -101,15 +103,18 @@ func (g GeneratorImpl) createUpstreams(upstreams []dataplane.Upstream) []http.Up
}

func (g GeneratorImpl) createUpstream(up dataplane.Upstream) http.Upstream {
var stateFile string
zoneSize := ossZoneSize
if g.plus {
zoneSize = plusZoneSize
stateFile = fmt.Sprintf("%s/%s.conf", stateDir, up.Name)
}

if len(up.Endpoints) == 0 {
return http.Upstream{
Name: up.Name,
ZoneSize: zoneSize,
Name: up.Name,
ZoneSize: zoneSize,
StateFile: stateFile,
Servers: []http.UpstreamServer{
{
Address: nginx503Server,
Expand All @@ -130,9 +135,10 @@ func (g GeneratorImpl) createUpstream(up dataplane.Upstream) http.Upstream {
}

return http.Upstream{
Name: up.Name,
ZoneSize: zoneSize,
Servers: upstreamServers,
Name: up.Name,
ZoneSize: zoneSize,
StateFile: stateFile,
Servers: upstreamServers,
}
}

Expand Down
7 changes: 6 additions & 1 deletion internal/mode/static/nginx/config/upstreams_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ upstream {{ $u.Name }} {
{{ if $u.ZoneSize -}}
zone {{ $u.Name }} {{ $u.ZoneSize }};
{{ end -}}
{{ range $server := $u.Servers }}

{{- if $u.StateFile }}
state {{ $u.StateFile }};
{{- else }}
{{ range $server := $u.Servers }}
server {{ $server.Address }};
{{- end }}
{{- end }}
}
{{ end -}}
Expand Down
Loading
Loading