Skip to content

Commit ab05296

Browse files
authored
Allow multiple management planes (#1124)
1 parent 6943136 commit ab05296

File tree

15 files changed

+420
-101
lines changed

15 files changed

+420
-101
lines changed

internal/command/command_plugin.go

Lines changed: 96 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"log/slog"
1111
"sync"
1212

13+
"github.com/nginx/agent/v3/internal/model"
14+
1315
"google.golang.org/protobuf/types/known/timestamppb"
1416

1517
mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
@@ -38,75 +40,102 @@ type (
3840
}
3941

4042
CommandPlugin struct {
41-
messagePipe bus.MessagePipeInterface
42-
config *config.Config
43-
subscribeCancel context.CancelFunc
44-
conn grpc.GrpcConnectionInterface
45-
commandService commandService
46-
subscribeChannel chan *mpi.ManagementPlaneRequest
47-
subscribeMutex sync.Mutex
43+
messagePipe bus.MessagePipeInterface
44+
config *config.Config
45+
subscribeCancel context.CancelFunc
46+
conn grpc.GrpcConnectionInterface
47+
commandService commandService
48+
subscribeChannel chan *mpi.ManagementPlaneRequest
49+
commandServerType model.ServerType
50+
subscribeMutex sync.Mutex
4851
}
4952
)
5053

51-
func NewCommandPlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface) *CommandPlugin {
54+
func NewCommandPlugin(agentConfig *config.Config, grpcConnection grpc.GrpcConnectionInterface,
55+
commandServerType model.ServerType,
56+
) *CommandPlugin {
5257
return &CommandPlugin{
53-
config: agentConfig,
54-
conn: grpcConnection,
55-
subscribeChannel: make(chan *mpi.ManagementPlaneRequest),
58+
config: agentConfig,
59+
conn: grpcConnection,
60+
subscribeChannel: make(chan *mpi.ManagementPlaneRequest),
61+
commandServerType: commandServerType,
5662
}
5763
}
5864

5965
func (cp *CommandPlugin) Init(ctx context.Context, messagePipe bus.MessagePipeInterface) error {
60-
slog.DebugContext(ctx, "Starting command plugin")
66+
newCtx := context.WithValue(
67+
ctx,
68+
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType.String()),
69+
)
70+
slog.DebugContext(newCtx, "Starting command plugin")
6171

6272
cp.messagePipe = messagePipe
6373
cp.commandService = NewCommandService(cp.conn.CommandServiceClient(), cp.config, cp.subscribeChannel)
6474

65-
go cp.monitorSubscribeChannel(ctx)
75+
go cp.monitorSubscribeChannel(newCtx)
6676

6777
return nil
6878
}
6979

7080
func (cp *CommandPlugin) Close(ctx context.Context) error {
71-
slog.InfoContext(ctx, "Closing command plugin")
81+
newCtx := context.WithValue(
82+
ctx,
83+
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType.String()),
84+
)
85+
slog.InfoContext(newCtx, "Closing command plugin")
7286

7387
cp.subscribeMutex.Lock()
7488
if cp.subscribeCancel != nil {
7589
cp.subscribeCancel()
7690
}
7791
cp.subscribeMutex.Unlock()
7892

79-
return cp.conn.Close(ctx)
93+
return cp.conn.Close(newCtx)
8094
}
8195

8296
func (cp *CommandPlugin) Info() *bus.Info {
8397
return &bus.Info{
84-
Name: "command",
98+
Name: cp.commandServerType.String(),
8599
}
86100
}
87101

88102
func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
89-
switch msg.Topic {
90-
case bus.ConnectionResetTopic:
91-
cp.processConnectionReset(ctx, msg)
92-
case bus.ResourceUpdateTopic:
93-
cp.processResourceUpdate(ctx, msg)
94-
case bus.InstanceHealthTopic:
95-
cp.processInstanceHealth(ctx, msg)
96-
case bus.DataPlaneHealthResponseTopic:
97-
cp.processDataPlaneHealth(ctx, msg)
98-
case bus.DataPlaneResponseTopic:
99-
cp.processDataPlaneResponse(ctx, msg)
100-
default:
101-
slog.DebugContext(ctx, "Command plugin received unknown topic", "topic", msg.Topic)
103+
slog.DebugContext(ctx, "Processing command")
104+
105+
if logger.ServerType(ctx) == "" {
106+
ctx = context.WithValue(
107+
ctx,
108+
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType.String()),
109+
)
110+
}
111+
112+
if logger.ServerType(ctx) == cp.commandServerType.String() {
113+
switch msg.Topic {
114+
case bus.ConnectionResetTopic:
115+
cp.processConnectionReset(ctx, msg)
116+
case bus.ResourceUpdateTopic:
117+
cp.processResourceUpdate(ctx, msg)
118+
case bus.InstanceHealthTopic:
119+
cp.processInstanceHealth(ctx, msg)
120+
case bus.DataPlaneHealthResponseTopic:
121+
cp.processDataPlaneHealth(ctx, msg)
122+
case bus.DataPlaneResponseTopic:
123+
cp.processDataPlaneResponse(ctx, msg)
124+
default:
125+
slog.DebugContext(ctx, "Command plugin received unknown topic", "topic", msg.Topic)
126+
}
102127
}
103128
}
104129

105130
func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Message) {
106131
slog.DebugContext(ctx, "Command plugin received resource update message")
107132
if resource, ok := msg.Data.(*mpi.Resource); ok {
108133
if !cp.commandService.IsConnected() {
109-
cp.createConnection(ctx, resource)
134+
newCtx := context.WithValue(
135+
ctx,
136+
logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, cp.commandServerType.String()),
137+
)
138+
cp.createConnection(newCtx, resource)
110139
} else {
111140
statusErr := cp.commandService.UpdateDataPlaneStatus(ctx, resource)
112141
if statusErr != nil {
@@ -145,13 +174,14 @@ func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Me
145174
correlationID := logger.CorrelationID(ctx)
146175
if err != nil {
147176
slog.ErrorContext(ctx, "Unable to update data plane health", "error", err)
148-
cp.messagePipe.Process(ctx, &bus.Message{
177+
178+
cp.processDataPlaneResponse(ctx, &bus.Message{
149179
Topic: bus.DataPlaneResponseTopic,
150180
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
151181
"Failed to send the health status update", err.Error()),
152182
})
153183
}
154-
cp.messagePipe.Process(ctx, &bus.Message{
184+
cp.processDataPlaneResponse(ctx, &bus.Message{
155185
Topic: bus.DataPlaneResponseTopic,
156186
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
157187
"Successfully sent health status update", ""),
@@ -208,6 +238,7 @@ func (cp *CommandPlugin) Subscriptions() []string {
208238
}
209239
}
210240

241+
// nolint: revive, cyclop
211242
func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
212243
for {
213244
select {
@@ -226,12 +257,28 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
226257
slog.InfoContext(ctx, "Received management plane config upload request")
227258
cp.handleConfigUploadRequest(newCtx, message)
228259
case *mpi.ManagementPlaneRequest_ConfigApplyRequest:
260+
if cp.commandServerType != model.Command {
261+
slog.WarnContext(newCtx, "Auxiliary command server can not perform config apply",
262+
"command_server_type", cp.commandServerType.String())
263+
cp.handleInvalidRequest(newCtx, message, "Config apply failed",
264+
message.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId())
265+
266+
return
267+
}
229268
slog.InfoContext(ctx, "Received management plane config apply request")
230269
cp.handleConfigApplyRequest(newCtx, message)
231270
case *mpi.ManagementPlaneRequest_HealthRequest:
232271
slog.InfoContext(ctx, "Received management plane health request")
233272
cp.handleHealthRequest(newCtx)
234273
case *mpi.ManagementPlaneRequest_ActionRequest:
274+
if cp.commandServerType != model.Command {
275+
slog.WarnContext(newCtx, "Auxiliary command server can not perform api action",
276+
"command_server_type", cp.commandServerType.String())
277+
cp.handleInvalidRequest(newCtx, message, "API action failed",
278+
message.GetActionRequest().GetInstanceId())
279+
280+
return
281+
}
235282
slog.InfoContext(ctx, "Received management plane action request")
236283
cp.handleAPIActionRequest(newCtx, message)
237284
default:
@@ -320,6 +367,23 @@ func (cp *CommandPlugin) handleHealthRequest(newCtx context.Context) {
320367
cp.messagePipe.Process(newCtx, &bus.Message{Topic: bus.DataPlaneHealthRequestTopic})
321368
}
322369

370+
func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context,
371+
request *mpi.ManagementPlaneRequest, message, instanceID string,
372+
) {
373+
err := cp.commandService.SendDataPlaneResponse(ctx, &mpi.DataPlaneResponse{
374+
MessageMeta: request.GetMessageMeta(),
375+
CommandResponse: &mpi.CommandResponse{
376+
Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
377+
Message: message,
378+
Error: "Can not perform write action as auxiliary command server",
379+
},
380+
InstanceId: instanceID,
381+
})
382+
if err != nil {
383+
slog.ErrorContext(ctx, "Unable to send data plane response", "error", err)
384+
}
385+
}
386+
323387
func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
324388
message, err string,
325389
) *mpi.DataPlaneResponse {

internal/command/command_plugin_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"testing"
1212
"time"
1313

14+
"github.com/nginx/agent/v3/internal/model"
15+
1416
pkg "github.com/nginx/agent/v3/pkg/config"
1517
"github.com/nginx/agent/v3/pkg/id"
1618

@@ -32,14 +34,14 @@ import (
3234
)
3335

3436
func TestCommandPlugin_Info(t *testing.T) {
35-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
37+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
3638
info := commandPlugin.Info()
3739

3840
assert.Equal(t, "command", info.Name)
3941
}
4042

4143
func TestCommandPlugin_Subscriptions(t *testing.T) {
42-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
44+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
4345
subscriptions := commandPlugin.Subscriptions()
4446

4547
assert.Equal(
@@ -60,7 +62,7 @@ func TestCommandPlugin_Init(t *testing.T) {
6062
messagePipe := busfakes.NewFakeMessagePipe()
6163
fakeCommandService := &commandfakes.FakeCommandService{}
6264

63-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
65+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
6466
err := commandPlugin.Init(ctx, messagePipe)
6567
require.NoError(t, err)
6668

@@ -79,7 +81,7 @@ func TestCommandPlugin_createConnection(t *testing.T) {
7981
commandService.CreateConnectionReturns(&mpi.CreateConnectionResponse{}, nil)
8082
messagePipe := busfakes.NewFakeMessagePipe()
8183

82-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
84+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
8385
err := commandPlugin.Init(ctx, messagePipe)
8486
commandPlugin.commandService = commandService
8587
require.NoError(t, err)
@@ -111,7 +113,7 @@ func TestCommandPlugin_Process(t *testing.T) {
111113
messagePipe := busfakes.NewFakeMessagePipe()
112114
fakeCommandService := &commandfakes.FakeCommandService{}
113115

114-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
116+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
115117
err := commandPlugin.Init(ctx, messagePipe)
116118
require.NoError(t, err)
117119
defer commandPlugin.Close(ctx)
@@ -219,7 +221,7 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {
219221

220222
agentConfig := types.AgentConfig()
221223
agentConfig.Features = test.configFeatures
222-
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{})
224+
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
223225
err := commandPlugin.Init(ctx, messagePipe)
224226
require.NoError(tt, err)
225227
defer commandPlugin.Close(ctx)
@@ -319,7 +321,7 @@ func TestCommandPlugin_FeatureDisabled(t *testing.T) {
319321

320322
agentConfig.Features = test.configFeatures
321323

322-
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{})
324+
commandPlugin := NewCommandPlugin(agentConfig, &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
323325
err := commandPlugin.Init(ctx, messagePipe)
324326
commandPlugin.commandService = fakeCommandService
325327
require.NoError(tt, err)
@@ -344,7 +346,7 @@ func TestMonitorSubscribeChannel(t *testing.T) {
344346
logBuf := &bytes.Buffer{}
345347
stub.StubLoggerWith(logBuf)
346348

347-
cp := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
349+
cp := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
348350
cp.subscribeCancel = cncl
349351

350352
message := protos.CreateManagementPlaneRequest()
@@ -383,7 +385,7 @@ func Test_createDataPlaneResponse(t *testing.T) {
383385
Error: "",
384386
},
385387
}
386-
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{})
388+
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
387389
result := commandPlugin.createDataPlaneResponse(expected.GetMessageMeta().GetCorrelationId(),
388390
expected.GetCommandResponse().GetStatus(),
389391
expected.GetCommandResponse().GetMessage(), expected.GetCommandResponse().GetError())

internal/command/command_service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ func (cs *CommandService) UpdateClient(ctx context.Context, client mpi.CommandSe
261261
cs.subscribeClientMutex.Unlock()
262262

263263
cs.isConnected.Store(false)
264+
264265
resp, err := cs.CreateConnection(ctx, cs.resource)
265266
if err != nil {
266267
return err

0 commit comments

Comments
 (0)