Skip to content

Commit fac256c

Browse files
committed
N+ support; fix channel blocking
1 parent a54bbbc commit fac256c

File tree

11 files changed

+473
-77
lines changed

11 files changed

+473
-77
lines changed

charts/nginx-gateway-fabric/templates/tmp-nginx-agent-conf.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,8 @@ data:
1818
- configuration
1919
- certificates
2020
- metrics
21+
{{- if .Values.nginx.plus }}
22+
- api-action
23+
{{- end }}
2124
log:
2225
level: debug

deploy/experimental-nginx-plus/deploy.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ data:
163163
- configuration
164164
- certificates
165165
- metrics
166+
- api-action
166167
log:
167168
level: debug
168169
kind: ConfigMap

deploy/nginx-plus/deploy.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ data:
158158
- configuration
159159
- certificates
160160
- metrics
161+
- api-action
161162
log:
162163
level: debug
163164
kind: ConfigMap

deploy/snippets-filters-nginx-plus/deploy.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ data:
160160
- configuration
161161
- certificates
162162
- metrics
163+
- api-action
163164
log:
164165
level: debug
165166
kind: ConfigMap

internal/mode/static/handler.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,12 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
185185
h.setLatestConfiguration(&cfg)
186186

187187
if h.cfg.plus {
188-
h.cfg.nginxUpdater.UpdateUpstreamServers()
188+
// TODO(sberman): hardcode this deployment name until we support provisioning data planes
189+
deployment := types.NamespacedName{
190+
Name: "tmp-nginx-deployment",
191+
Namespace: h.cfg.gatewayPodConfig.Namespace,
192+
}
193+
err = h.cfg.nginxUpdater.UpdateUpstreamServers(ctx, deployment, cfg)
189194
} else {
190195
err = h.updateNginxConf(ctx, cfg)
191196
}
@@ -314,7 +319,9 @@ func (h *eventHandlerImpl) updateNginxConf(ctx context.Context, conf dataplane.C
314319

315320
// If using NGINX Plus, update upstream servers using the API.
316321
if h.cfg.plus {
317-
h.cfg.nginxUpdater.UpdateUpstreamServers()
322+
if err := h.cfg.nginxUpdater.UpdateUpstreamServers(ctx, deployment, conf); err != nil {
323+
return err
324+
}
318325
}
319326

320327
return nil

internal/mode/static/nginx/agent/agent.go

Lines changed: 84 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@ package agent
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67

78
"github.com/go-logr/logr"
9+
pb "github.com/nginx/agent/v3/api/grpc/mpi/v1"
10+
"google.golang.org/protobuf/types/known/structpb"
811
"k8s.io/apimachinery/pkg/types"
912
"sigs.k8s.io/controller-runtime/pkg/client"
1013

14+
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/broadcast"
1115
agentgrpc "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/nginx/agent/grpc"
16+
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/dataplane"
17+
"github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/resolver"
1218
)
1319

1420
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
@@ -17,8 +23,16 @@ import (
1723

1824
// NginxUpdater is an interface for updating NGINX using the NGINX agent.
1925
type NginxUpdater interface {
20-
UpdateConfig(context.Context, types.NamespacedName, []File) error
21-
UpdateUpstreamServers()
26+
UpdateConfig(
27+
ctx context.Context,
28+
deploymentNsName types.NamespacedName,
29+
files []File,
30+
) error
31+
UpdateUpstreamServers(
32+
ctx context.Context,
33+
deploymentNsName types.NamespacedName,
34+
conf dataplane.Configuration,
35+
) error
2236
}
2337

2438
// NginxUpdaterImpl implements the NginxUpdater interface.
@@ -76,10 +90,76 @@ func (n *NginxUpdaterImpl) UpdateConfig(ctx context.Context, nsName types.Namesp
7690

7791
// UpdateUpstreamServers sends an APIRequest to the agent to update upstream servers using the NGINX Plus API.
7892
// Only applicable when using NGINX Plus.
79-
func (n *NginxUpdaterImpl) UpdateUpstreamServers() {
93+
func (n *NginxUpdaterImpl) UpdateUpstreamServers(
94+
ctx context.Context,
95+
nsName types.NamespacedName,
96+
conf dataplane.Configuration,
97+
) error {
8098
if !n.plus {
81-
return
99+
return nil
82100
}
83101

84102
n.logger.Info("Updating upstream servers using NGINX Plus API")
103+
104+
deployment := n.nginxDeployments.GetOrStore(ctx, nsName)
105+
if deployment == nil {
106+
return fmt.Errorf("failed to register nginx deployment %q", nsName.Name)
107+
}
108+
broadcaster := deployment.GetBroadcaster()
109+
110+
var updateErr error
111+
for _, upstream := range conf.Upstreams {
112+
msg := broadcast.NginxAgentMessage{
113+
Type: broadcast.APIRequest,
114+
NGINXPlusAction: &pb.NGINXPlusAction{
115+
Action: &pb.NGINXPlusAction_UpdateHttpUpstreamServers{
116+
UpdateHttpUpstreamServers: buildUpstreamServers(upstream),
117+
},
118+
},
119+
}
120+
121+
if err := broadcaster.Send(msg); err != nil {
122+
updateErr = errors.Join(updateErr, fmt.Errorf(
123+
"couldn't update upstream %q via the API: %w", upstream.Name, err))
124+
}
125+
}
126+
127+
return updateErr
128+
}
129+
130+
func buildUpstreamServers(upstream dataplane.Upstream) *pb.UpdateHTTPUpstreamServers {
131+
servers := make([]*structpb.Struct, 0, len(upstream.Endpoints))
132+
133+
for _, endpoint := range upstream.Endpoints {
134+
port, format := getPortAndIPFormat(endpoint)
135+
value := fmt.Sprintf(format, endpoint.Address, port)
136+
137+
server := &structpb.Struct{
138+
Fields: map[string]*structpb.Value{
139+
"server": structpb.NewStringValue(value),
140+
},
141+
}
142+
143+
servers = append(servers, server)
144+
}
145+
146+
return &pb.UpdateHTTPUpstreamServers{
147+
HttpUpstreamName: upstream.Name,
148+
Servers: servers,
149+
}
150+
}
151+
152+
func getPortAndIPFormat(ep resolver.Endpoint) (string, string) {
153+
var port string
154+
155+
if ep.Port != 0 {
156+
port = fmt.Sprintf(":%d", ep.Port)
157+
}
158+
159+
format := "%s%s"
160+
if ep.IPv6 {
161+
format = "[%s]%s"
162+
}
163+
164+
return port, format
85165
}

internal/mode/static/nginx/agent/agentfakes/fake_nginx_updater.go

Lines changed: 55 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)