Skip to content

Commit 57eb3a8

Browse files
authored
Update credential watcher to allow second credential watcher (#1132)
1 parent 5c6f0c9 commit 57eb3a8

File tree

9 files changed

+165
-79
lines changed

9 files changed

+165
-79
lines changed

internal/command/command_plugin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context,
380380
CommandResponse: &mpi.CommandResponse{
381381
Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
382382
Message: message,
383-
Error: "Can not perform write action as auxiliary command server",
383+
Error: "Unable to process request. Management plane is configured as read only.",
384384
},
385385
InstanceId: instanceID,
386386
})

internal/config/mapper.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,40 @@ func ToCommandProto(cmd *Command) *mpi.CommandServer {
8585

8686
return protoConfig
8787
}
88+
89+
// ToAuxiliaryCommandServerProto maps the AgentConfig Command struct back to the AuxiliaryCommandServer proto message
90+
func ToAuxiliaryCommandServerProto(cmd *Command) *mpi.AuxiliaryCommandServer {
91+
protoConfig := &mpi.AuxiliaryCommandServer{}
92+
93+
// Map ServerConfig to the ServerSettings
94+
if cmd.Server != nil {
95+
protoServerType := mpi.ServerSettings_SERVER_SETTINGS_TYPE_UNDEFINED
96+
if cmd.Server.Type == Grpc {
97+
protoServerType = mpi.ServerSettings_SERVER_SETTINGS_TYPE_GRPC
98+
}
99+
100+
protoConfig.Server = &mpi.ServerSettings{
101+
Host: cmd.Server.Host,
102+
Port: int32(cmd.Server.Port),
103+
Type: protoServerType,
104+
}
105+
}
106+
107+
// Map AuthConfig to AuthSettings
108+
if cmd.Auth != nil {
109+
protoConfig.Auth = &mpi.AuthSettings{}
110+
}
111+
112+
// Map TLSConfig to TLSSettings
113+
if cmd.TLS != nil {
114+
protoConfig.Tls = &mpi.TLSSettings{
115+
Cert: cmd.TLS.Cert,
116+
Key: cmd.TLS.Key,
117+
Ca: cmd.TLS.Ca,
118+
ServerName: cmd.TLS.ServerName,
119+
SkipVerify: cmd.TLS.SkipVerify,
120+
}
121+
}
122+
123+
return protoConfig
124+
}

internal/file/file_plugin.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) {
8787
)
8888
}
8989

90-
if logger.ServerType(ctx) == fp.serverType.String() || logger.ServerType(ctx) == "" {
90+
if logger.ServerType(ctx) == fp.serverType.String() {
9191
switch msg.Topic {
9292
case bus.ConnectionResetTopic:
9393
fp.handleConnectionReset(ctx, msg)
@@ -358,7 +358,6 @@ func (fp *FilePlugin) handleNginxConfigUpdate(ctx context.Context, msg *bus.Mess
358358
fp.fileManagerService.ConfigUpdate(ctx, nginxConfigContext)
359359
}
360360

361-
// nolint: dupl
362361
func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Message) {
363362
slog.DebugContext(ctx, "File plugin received config upload request message")
364363
managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest)

internal/watcher/credentials/credential_watcher_service.go

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ import (
1313
"sync/atomic"
1414
"time"
1515

16+
"github.com/nginx/agent/v3/internal/model"
17+
18+
"github.com/nginx/agent/v3/internal/grpc"
19+
1620
"github.com/fsnotify/fsnotify"
1721
"github.com/nginx/agent/v3/internal/config"
1822
"github.com/nginx/agent/v3/internal/logger"
@@ -28,56 +32,75 @@ var emptyEvent = fsnotify.Event{
2832
}
2933

3034
type CredentialUpdateMessage struct {
31-
CorrelationID slog.Attr
35+
CorrelationID slog.Attr
36+
GrpcConnection *grpc.GrpcConnection
37+
ServerType model.ServerType
3238
}
3339

3440
type CredentialWatcherService struct {
3541
agentConfig *config.Config
3642
watcher *fsnotify.Watcher
3743
filesBeingWatched *sync.Map
3844
filesChanged *atomic.Bool
45+
serverType model.ServerType
46+
watcherMutex sync.Mutex
3947
}
4048

41-
func NewCredentialWatcherService(agentConfig *config.Config) *CredentialWatcherService {
49+
func NewCredentialWatcherService(agentConfig *config.Config, serverType model.ServerType) *CredentialWatcherService {
4250
filesChanged := &atomic.Bool{}
4351
filesChanged.Store(false)
4452

4553
return &CredentialWatcherService{
4654
agentConfig: agentConfig,
4755
filesBeingWatched: &sync.Map{},
4856
filesChanged: filesChanged,
57+
serverType: serverType,
58+
watcherMutex: sync.Mutex{},
4959
}
5060
}
5161

5262
func (cws *CredentialWatcherService) Watch(ctx context.Context, ch chan<- CredentialUpdateMessage) {
53-
slog.DebugContext(ctx, "Starting credential watcher monitoring")
63+
newCtx := context.WithValue(
64+
ctx,
65+
logger.ServerTypeContextKey,
66+
slog.Any(logger.ServerTypeKey, cws.serverType.String()),
67+
)
68+
slog.DebugContext(newCtx, "Starting credential watcher monitoring")
5469

5570
ticker := time.NewTicker(monitoringInterval)
5671
watcher, err := fsnotify.NewWatcher()
5772
if err != nil {
58-
slog.ErrorContext(ctx, "Failed to create credential watcher", "error", err)
73+
slog.ErrorContext(newCtx, "Failed to create credential watcher", "error", err)
5974
return
6075
}
6176

6277
cws.watcher = watcher
6378

64-
cws.watchFiles(ctx, credentialPaths(cws.agentConfig))
79+
cws.watcherMutex.Lock()
80+
commandServer := cws.agentConfig.Command
81+
82+
if cws.serverType == model.Auxiliary {
83+
commandServer = cws.agentConfig.AuxiliaryCommand
84+
}
85+
86+
cws.watchFiles(newCtx, credentialPaths(commandServer))
87+
cws.watcherMutex.Unlock()
6588

6689
for {
6790
select {
68-
case <-ctx.Done():
91+
case <-newCtx.Done():
6992
closeError := cws.watcher.Close()
7093
if closeError != nil {
71-
slog.ErrorContext(ctx, "Unable to close credential watcher", "error", closeError)
94+
slog.ErrorContext(newCtx, "Unable to close credential watcher", "error", closeError)
7295
}
7396

7497
return
7598
case event := <-cws.watcher.Events:
76-
cws.handleEvent(ctx, event)
99+
cws.handleEvent(newCtx, event)
77100
case <-ticker.C:
78-
cws.checkForUpdates(ctx, ch)
101+
cws.checkForUpdates(newCtx, ch)
79102
case watcherError := <-cws.watcher.Errors:
80-
slog.ErrorContext(ctx, "Unexpected error in credential watcher", "error", watcherError)
103+
slog.ErrorContext(newCtx, "Unexpected error in credential watcher", "error", watcherError)
81104
}
82105
}
83106
}
@@ -146,31 +169,50 @@ func (cws *CredentialWatcherService) checkForUpdates(ctx context.Context, ch cha
146169
slog.Any(logger.CorrelationIDKey, logger.GenerateCorrelationID()),
147170
)
148171

172+
cws.watcherMutex.Lock()
173+
defer cws.watcherMutex.Unlock()
174+
175+
commandServer := cws.agentConfig.Command
176+
if cws.serverType == model.Auxiliary {
177+
commandServer = cws.agentConfig.AuxiliaryCommand
178+
}
179+
180+
conn, err := grpc.NewGrpcConnection(newCtx, cws.agentConfig, commandServer)
181+
if err != nil {
182+
slog.ErrorContext(newCtx, "Unable to create new grpc connection", "error", err)
183+
cws.filesChanged.Store(false)
184+
185+
return
186+
}
149187
slog.DebugContext(ctx, "Credential watcher has detected changes")
150-
ch <- CredentialUpdateMessage{CorrelationID: logger.CorrelationIDAttr(newCtx)}
188+
ch <- CredentialUpdateMessage{
189+
CorrelationID: logger.CorrelationIDAttr(newCtx),
190+
ServerType: cws.serverType,
191+
GrpcConnection: conn,
192+
}
151193
cws.filesChanged.Store(false)
152194
}
153195
}
154196

155-
func credentialPaths(agentConfig *config.Config) []string {
197+
func credentialPaths(agentConfig *config.Command) []string {
156198
var paths []string
157199

158-
if agentConfig.Command.Auth != nil {
159-
if agentConfig.Command.Auth.TokenPath != "" {
160-
paths = append(paths, agentConfig.Command.Auth.TokenPath)
200+
if agentConfig.Auth != nil {
201+
if agentConfig.Auth.TokenPath != "" {
202+
paths = append(paths, agentConfig.Auth.TokenPath)
161203
}
162204
}
163205

164206
// agent's tls certs
165-
if agentConfig.Command.TLS != nil {
166-
if agentConfig.Command.TLS.Ca != "" {
167-
paths = append(paths, agentConfig.Command.TLS.Ca)
207+
if agentConfig.TLS != nil {
208+
if agentConfig.TLS.Ca != "" {
209+
paths = append(paths, agentConfig.TLS.Ca)
168210
}
169-
if agentConfig.Command.TLS.Cert != "" {
170-
paths = append(paths, agentConfig.Command.TLS.Cert)
211+
if agentConfig.TLS.Cert != "" {
212+
paths = append(paths, agentConfig.TLS.Cert)
171213
}
172-
if agentConfig.Command.TLS.Key != "" {
173-
paths = append(paths, agentConfig.Command.TLS.Key)
214+
if agentConfig.TLS.Key != "" {
215+
paths = append(paths, agentConfig.TLS.Key)
174216
}
175217
}
176218

internal/watcher/credentials/credential_watcher_service_test.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"testing"
1414
"time"
1515

16+
"github.com/nginx/agent/v3/internal/model"
17+
1618
"github.com/nginx/agent/v3/internal/config"
1719

1820
"github.com/fsnotify/fsnotify"
@@ -22,15 +24,15 @@ import (
2224
)
2325

2426
func TestCredentialWatcherService_TestNewCredentialWatcherService(t *testing.T) {
25-
credentialWatcherService := NewCredentialWatcherService(types.AgentConfig())
27+
credentialWatcherService := NewCredentialWatcherService(types.AgentConfig(), model.Command)
2628

2729
assert.Empty(t, credentialWatcherService.filesBeingWatched)
2830
assert.False(t, credentialWatcherService.filesChanged.Load())
2931
}
3032

3133
func TestCredentialWatcherService_Watch(t *testing.T) {
3234
ctx := context.Background()
33-
cws := NewCredentialWatcherService(types.AgentConfig())
35+
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
3436
watcher, err := fsnotify.NewWatcher()
3537
require.NoError(t, err)
3638
cws.watcher = watcher
@@ -61,7 +63,7 @@ func TestCredentialWatcherService_Watch(t *testing.T) {
6163
}
6264

6365
func TestCredentialWatcherService_isWatching(t *testing.T) {
64-
cws := NewCredentialWatcherService(types.AgentConfig())
66+
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
6567
assert.False(t, cws.isWatching("test-file"))
6668
cws.filesBeingWatched.Store("test-file", true)
6769
assert.True(t, cws.isWatching("test-file"))
@@ -80,7 +82,7 @@ func TestCredentialWatcherService_isEventSkippable(t *testing.T) {
8082

8183
func TestCredentialWatcherService_addWatcher(t *testing.T) {
8284
ctx := context.Background()
83-
cws := NewCredentialWatcherService(types.AgentConfig())
85+
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
8486
watcher, err := fsnotify.NewWatcher()
8587
require.NoError(t, err)
8688
cws.watcher = watcher
@@ -105,7 +107,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) {
105107
var files []string
106108

107109
ctx := context.Background()
108-
cws := NewCredentialWatcherService(types.AgentConfig())
110+
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
109111
watcher, err := fsnotify.NewWatcher()
110112
require.NoError(t, err)
111113
cws.watcher = watcher
@@ -137,7 +139,7 @@ func TestCredentialWatcherService_watchFiles(t *testing.T) {
137139

138140
func TestCredentialWatcherService_checkForUpdates(t *testing.T) {
139141
ctx := context.Background()
140-
cws := NewCredentialWatcherService(types.AgentConfig())
142+
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
141143
watcher, err := fsnotify.NewWatcher()
142144
require.NoError(t, err)
143145
cws.watcher = watcher
@@ -164,7 +166,7 @@ func TestCredentialWatcherService_checkForUpdates(t *testing.T) {
164166

165167
func TestCredentialWatcherService_handleEvent(t *testing.T) {
166168
ctx := context.Background()
167-
cws := NewCredentialWatcherService(types.AgentConfig())
169+
cws := NewCredentialWatcherService(types.AgentConfig(), model.Command)
168170
watcher, err := fsnotify.NewWatcher()
169171
require.NoError(t, err)
170172
cws.watcher = watcher
@@ -232,7 +234,7 @@ func Test_credentialPaths(t *testing.T) {
232234
}
233235
for _, tt := range tests {
234236
t.Run(tt.name, func(t *testing.T) {
235-
assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig), "credentialPaths(%v)", tt.agentConfig)
237+
assert.Equalf(t, tt.want, credentialPaths(tt.agentConfig.Command), "credentialPaths(%v)", tt.agentConfig)
236238
})
237239
}
238240
}

internal/watcher/instance/instance_watcher_service.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan
308308
slog.WarnContext(ctx, "Unable to convert config to labels structure", "error", convertErr)
309309
}
310310

311-
return &mpi.Instance{
311+
instance := &mpi.Instance{
312312
InstanceMeta: &mpi.InstanceMeta{
313313
InstanceId: iw.agentConfig.UUID,
314314
InstanceType: mpi.InstanceMeta_INSTANCE_TYPE_AGENT,
@@ -334,6 +334,13 @@ func (iw *InstanceWatcherService) agentInstance(ctx context.Context) *mpi.Instan
334334
Details: nil,
335335
},
336336
}
337+
338+
if iw.agentConfig.IsAuxiliaryCommandGrpcClientConfigured() {
339+
instance.GetInstanceConfig().GetAgentConfig().AuxiliaryCommand = config.
340+
ToAuxiliaryCommandServerProto(iw.agentConfig.AuxiliaryCommand)
341+
}
342+
343+
return instance
337344
}
338345

339346
func compareInstances(oldInstancesMap, instancesMap map[string]*mpi.Instance) (

0 commit comments

Comments
 (0)