From 2eb0944f412df6545727e75db9a5fb42216f958d Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 20 Nov 2025 15:02:16 +0000 Subject: [PATCH 1/2] refactor remote config add labels --- api/grpc/mpi/v1/helpers.go | 27 +++ api/grpc/mpi/v1/helpers_test.go | 42 +++++ internal/app.go | 2 +- internal/bus/message_pipe.go | 150 ++++++++++++++++- internal/bus/message_pipe_test.go | 38 ++++- internal/bus/topics.go | 45 ++--- internal/collector/otel_collector_plugin.go | 39 +++-- .../collector/otel_collector_plugin_test.go | 8 +- internal/command/command_plugin.go | 79 +++------ internal/command/command_plugin_test.go | 35 ++-- internal/command/command_service.go | 26 +-- internal/command/command_service_test.go | 35 ---- .../commandfakes/fake_command_service.go | 157 +++++++++--------- internal/config/config.go | 4 +- internal/config/mapper.go | 18 +- internal/file/file_plugin.go | 29 ++-- internal/file/file_plugin_test.go | 1 - internal/resource/resource_plugin.go | 31 ++-- internal/resource/resource_plugin_test.go | 1 - internal/watcher/watcher_plugin.go | 29 ++-- internal/watcher/watcher_plugin_test.go | 1 - 21 files changed, 467 insertions(+), 330 deletions(-) diff --git a/api/grpc/mpi/v1/helpers.go b/api/grpc/mpi/v1/helpers.go index 02a5adb83..a4ce745f8 100644 --- a/api/grpc/mpi/v1/helpers.go +++ b/api/grpc/mpi/v1/helpers.go @@ -6,6 +6,8 @@ package v1 import ( + "log/slog" + "google.golang.org/protobuf/types/known/structpb" ) @@ -45,3 +47,28 @@ func ConvertToStructs(input map[string]any) ([]*structpb.Struct, error) { return structs, nil } + +func ConvertToMap(input []*structpb.Struct) map[string]any { + convertedMap := make(map[string]any) + for _, value := range input { + for key, field := range value.GetFields() { + kind := field.GetKind() + switch kind.(type) { + case *structpb.Value_StringValue: + convertedMap[key] = field.GetStringValue() + case *structpb.Value_NumberValue: + convertedMap[key] = int(field.GetNumberValue()) + case *structpb.Value_StructValue: + convertedMap[key] = field.GetStructValue() + case *structpb.Value_ListValue: + convertedMap[key] = field.GetListValue() + case *structpb.Value_BoolValue: + convertedMap[key] = field.GetBoolValue() + default: + slog.Warn("unknown type for map conversion", "value", kind) + } + } + } + + return convertedMap +} diff --git a/api/grpc/mpi/v1/helpers_test.go b/api/grpc/mpi/v1/helpers_test.go index e23aa8040..c5a225850 100644 --- a/api/grpc/mpi/v1/helpers_test.go +++ b/api/grpc/mpi/v1/helpers_test.go @@ -70,3 +70,45 @@ func TestConvertToStructs(t *testing.T) { }) } } + +func TestConvertToMaps(t *testing.T) { + tests := []struct { + name string + expected map[string]any + input []*structpb.Struct + }{ + { + name: "Test 1: Valid input with simple key-value pairs", + expected: map[string]any{ + "key1": "value1", + "key2": 123, + "key3": true, + }, + input: []*structpb.Struct{ + { + Fields: map[string]*structpb.Value{ + "key1": structpb.NewStringValue("value1"), + }, + }, + { + Fields: map[string]*structpb.Value{ + "key2": structpb.NewNumberValue(123), + }, + }, + { + Fields: map[string]*structpb.Value{ + "key3": structpb.NewBoolValue(true), + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := ConvertToMap(tt.input) + + assert.Equal(t, tt.expected, got) + }) + } +} diff --git a/internal/app.go b/internal/app.go index 6f10aa1bb..b0d442057 100644 --- a/internal/app.go +++ b/internal/app.go @@ -50,7 +50,7 @@ func (a *App) Run(ctx context.Context) error { slog.String("commit", a.commit), ) - messagePipe := bus.NewMessagePipe(defaultMessagePipeChannelSize) + messagePipe := bus.NewMessagePipe(defaultMessagePipeChannelSize, agentConfig) err = messagePipe.Register(defaultQueueSize, plugin.LoadPlugins(ctx, agentConfig)) if err != nil { slog.ErrorContext(ctx, "Failed to register plugins", "error", err) diff --git a/internal/bus/message_pipe.go b/internal/bus/message_pipe.go index aabba3dfb..12419c35b 100644 --- a/internal/bus/message_pipe.go +++ b/internal/bus/message_pipe.go @@ -8,9 +8,15 @@ package bus import ( "context" "log/slog" + "reflect" "sync" + mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" + "github.com/nginx/agent/v3/internal/config" + "github.com/nginx/agent/v3/internal/logger" + "github.com/nginx/agent/v3/pkg/id" messagebus "github.com/vardius/message-bus" + "google.golang.org/protobuf/types/known/timestamppb" ) type ( @@ -45,20 +51,24 @@ type ( Info() *Info Process(ctx context.Context, msg *Message) Subscriptions() []string + Reconfigure(ctx context.Context, agentConfig *config.Config) error } MessagePipe struct { + agentConfig *config.Config bus messagebus.MessageBus messageChannel chan *MessageWithContext plugins []Plugin pluginsMutex sync.Mutex + configMutex sync.Mutex } ) -func NewMessagePipe(size int) *MessagePipe { +func NewMessagePipe(size int, agentConfig *config.Config) *MessagePipe { return &MessagePipe{ messageChannel: make(chan *MessageWithContext, size), pluginsMutex: sync.Mutex{}, + agentConfig: agentConfig, } } @@ -110,6 +120,7 @@ func (p *MessagePipe) Process(ctx context.Context, messages ...*Message) { } } +//nolint:contextcheck,revive // need to use context from the message for the correlationID not use the parent context func (p *MessagePipe) Run(ctx context.Context) { p.pluginsMutex.Lock() p.initPlugins(ctx) @@ -126,7 +137,17 @@ func (p *MessagePipe) Run(ctx context.Context) { return case m := <-p.messageChannel: - p.bus.Publish(m.message.Topic, m.ctx, m.message) + if m.message != nil { + switch m.message.Topic { + case AgentConfigUpdateTopic: + p.handleAgentConfigUpdateTopic(m.ctx, m.message) + case ConnectionAgentConfigUpdateTopic: + p.handleConnectionAgentConfigUpdateTopic(m.ctx, m.message) + default: + slog.InfoContext(ctx, "Publishing message", "topic", m.message.Topic, "message----", m.message) + p.bus.Publish(m.message.Topic, m.ctx, m.message) + } + } } } } @@ -157,6 +178,114 @@ func (p *MessagePipe) Index(pluginName string, plugins []Plugin) int { return -1 } +func (p *MessagePipe) Reconfigure(ctx context.Context, agentConfig *mpi.AgentConfig, topic, correlationID string) { + var reconfigureError error + p.configMutex.Lock() + defer p.configMutex.Unlock() + currentConfig := p.agentConfig + + // convert agent config from *mpi.AgentConfig to *config.Config + updateAgentConfig := config.FromAgentRemoteConfigProto(agentConfig) + + // The check for updates to the config needs to be done here as the command plugin needs the latest agent config + // to be sent in response to create connection requests. + p.updateConfig(ctx, updateAgentConfig) + + // Reconfigure each plugin with the new agent config + for _, plugin := range p.plugins { + slog.InfoContext(ctx, "Reconfigure plugin", "plugin", plugin.Info().Name) + reconfigureError = plugin.Reconfigure(ctx, p.agentConfig) + if reconfigureError != nil { + slog.ErrorContext(ctx, "Reconfigure plugin failed", "plugin", plugin.Info().Name) + break + } + } + + if reconfigureError != nil { + slog.ErrorContext(ctx, "Error updating plugin with updated agent config, reverting", + "error", reconfigureError.Error()) + + // If the agent update was received from a create connection request no data plane response needs to be sent + if topic == AgentConfigUpdateTopic { + response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, + "Failed to update agent config", reconfigureError.Error()) + p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response}) + } + + p.agentConfig = currentConfig + for _, plugin := range p.plugins { + err := plugin.Reconfigure(ctx, currentConfig) + if err != nil { + slog.ErrorContext(ctx, "Error reverting agent config", "error", err.Error()) + } + } + } + + slog.InfoContext(ctx, "Finished reconfigure plugin", "plugins", p.plugins) + if topic == AgentConfigUpdateTopic { + response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, + "Successfully updated agent config", "") + p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response}) + } +} + +func (p *MessagePipe) handleConnectionAgentConfigUpdateTopic(ctx context.Context, msg *Message) { + slog.DebugContext(ctx, "Handling connection agent config update topic", "topic", msg.Topic) + agentConfig, ok := msg.Data.(*mpi.AgentConfig) + if !ok { + slog.ErrorContext(ctx, "Failed to parse agent config update message") + return + } + + p.Reconfigure(ctx, agentConfig, msg.Topic, "") +} + +func (p *MessagePipe) handleAgentConfigUpdateTopic(ctx context.Context, msg *Message) { + slog.DebugContext(ctx, "Received agent config update topic", "topic", msg.Topic) + mpRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) + if !ok { + slog.ErrorContext(ctx, "Failed to parse agent config update message") + return + } + + reconfigureRequest, ok := mpRequest.GetRequest().(*mpi.ManagementPlaneRequest_UpdateAgentConfigRequest) + if !ok { + slog.ErrorContext(ctx, "Failed to parse agent config update message") + return + } + + correlationID := reconfigureRequest.UpdateAgentConfigRequest.GetMessageMeta().GetCorrelationId() + p.Reconfigure(ctx, reconfigureRequest.UpdateAgentConfigRequest.GetAgentConfig(), msg.Topic, correlationID) +} + +func (p *MessagePipe) updateConfig(ctx context.Context, updateAgentConfig *config.Config) { + slog.InfoContext(ctx, "Updating agent config") + if updateAgentConfig.Log != nil && !reflect.DeepEqual(p.agentConfig.Log, updateAgentConfig.Log) { + slog.DebugContext(ctx, "Agent log level has been updated", "previous", p.agentConfig.Log, + "update", updateAgentConfig.Log) + p.agentConfig.Log = updateAgentConfig.Log + + slogger := logger.New( + p.agentConfig.Log.Path, + p.agentConfig.Log.Level, + ) + slog.SetDefault(slogger) + } + + if updateAgentConfig.Labels != nil && !reflect.DeepEqual(p.agentConfig.Labels, updateAgentConfig.Labels) { + slog.DebugContext(ctx, "Agent labels have been updated", "previous", p.agentConfig.Labels, + "update", updateAgentConfig.Labels) + p.agentConfig.Labels = updateAgentConfig.Labels + + // OTel Headers also need to be updated when labels have been updated + if p.agentConfig.Collector != nil { + slog.DebugContext(ctx, "Agent OTel headers have been updated") + config.AddLabelsAsOTelHeaders(p.agentConfig.Collector, updateAgentConfig.Labels) + } + } + slog.DebugContext(ctx, "Updated agent config") +} + func (p *MessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin Plugin) error { if index != -1 { p.plugins = append(p.plugins[:index], p.plugins[index+1:]...) @@ -209,3 +338,20 @@ func (p *MessagePipe) initPlugins(ctx context.Context) { } } } + +func (p *MessagePipe) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus, + message, err string, +) *mpi.DataPlaneResponse { + return &mpi.DataPlaneResponse{ + MessageMeta: &mpi.MessageMeta{ + MessageId: id.GenerateMessageID(), + CorrelationId: correlationID, + Timestamp: timestamppb.Now(), + }, + CommandResponse: &mpi.CommandResponse{ + Status: status, + Message: message, + Error: err, + }, + } +} diff --git a/internal/bus/message_pipe_test.go b/internal/bus/message_pipe_test.go index d45ef6c9a..8618f219d 100644 --- a/internal/bus/message_pipe_test.go +++ b/internal/bus/message_pipe_test.go @@ -6,9 +6,12 @@ package bus import ( "context" + "log/slog" "testing" "time" + "github.com/nginx/agent/v3/internal/config" + "github.com/nginx/agent/v3/test/types" "github.com/stretchr/testify/require" "github.com/stretchr/testify/assert" @@ -41,6 +44,11 @@ func (*testPlugin) Subscriptions() []string { return []string{"test.message"} } +func (p *testPlugin) Reconfigure(ctx context.Context, agentConfig *config.Config) error { + p.Called() + return nil +} + func TestMessagePipe(t *testing.T) { messages := []*Message{ {Topic: "test.message", Data: 1}, @@ -58,7 +66,7 @@ func TestMessagePipe(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) pipelineDone := make(chan bool) - messagePipe := NewMessagePipe(100) + messagePipe := NewMessagePipe(100, types.AgentConfig()) err := messagePipe.Register(10, []Plugin{plugin}) require.NoError(t, err) @@ -84,7 +92,7 @@ func TestMessagePipe_DeRegister(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - messagePipe := NewMessagePipe(100) + messagePipe := NewMessagePipe(100, types.AgentConfig()) err := messagePipe.Register(100, []Plugin{plugin}) require.NoError(t, err) @@ -105,7 +113,7 @@ func TestMessagePipe_IsPluginRegistered(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) pipelineDone := make(chan bool) - messagePipe := NewMessagePipe(100) + messagePipe := NewMessagePipe(100, types.AgentConfig()) err := messagePipe.Register(10, []Plugin{plugin}) require.NoError(t, err) @@ -121,3 +129,27 @@ func TestMessagePipe_IsPluginRegistered(t *testing.T) { assert.True(t, messagePipe.IsPluginRegistered(plugin.Info().Name)) assert.False(t, messagePipe.IsPluginRegistered("metrics")) } + +func TestMessagePipe_updateConfig(t *testing.T) { + initialConfig := types.AgentConfig() + initialConfig.Log.Level = "INFO" + initialConfig.Log.Path = "" + + messagePipe := NewMessagePipe(100, initialConfig) + originalLogger := slog.Default() + + updatedConfig := &config.Config{ + Log: &config.Log{ + Path: "/etc/nginx-agent/", + Level: "DEBUG", + }, + } + + messagePipe.updateConfig(context.Background(), updatedConfig) + + require.Equal(t, messagePipe.agentConfig.Log.Path, updatedConfig.Log.Path) + require.Equal(t, messagePipe.agentConfig.Log.Level, updatedConfig.Log.Level) + + newLogger := slog.Default() + require.NotEqual(t, originalLogger, newLogger) +} diff --git a/internal/bus/topics.go b/internal/bus/topics.go index a4b0e341c..5ad699527 100644 --- a/internal/bus/topics.go +++ b/internal/bus/topics.go @@ -6,26 +6,27 @@ package bus const ( - AddInstancesTopic = "add-instances" - UpdatedInstancesTopic = "updated-instances" - DeletedInstancesTopic = "deleted-instances" - ResourceUpdateTopic = "resource-update" - NginxConfigUpdateTopic = "nginx-config-update" - InstanceHealthTopic = "instance-health" - ConfigUploadRequestTopic = "config-upload-request" - DataPlaneResponseTopic = "data-plane-response" - ConnectionCreatedTopic = "connection-created" - CredentialUpdatedTopic = "credential-updated" - ConnectionResetTopic = "connection-reset" - ConfigApplyRequestTopic = "config-apply-request" - WriteConfigSuccessfulTopic = "write-config-successful" - ReloadSuccessfulTopic = "reload-successful" - EnableWatchersTopic = "enable-watchers" - ConfigApplyFailedTopic = "config-apply-failed" - ConfigApplyCompleteTopic = "config-apply-complete" - RollbackWriteTopic = "rollback-write" - DataPlaneHealthRequestTopic = "data-plane-health-request" - DataPlaneHealthResponseTopic = "data-plane-health-response" - APIActionRequestTopic = "api-action-request" - AgentConfigUpdateTopic = "agent-config-update" + AddInstancesTopic = "add-instances" + UpdatedInstancesTopic = "updated-instances" + DeletedInstancesTopic = "deleted-instances" + ResourceUpdateTopic = "resource-update" + NginxConfigUpdateTopic = "nginx-config-update" + InstanceHealthTopic = "instance-health" + ConfigUploadRequestTopic = "config-upload-request" + DataPlaneResponseTopic = "data-plane-response" + ConnectionCreatedTopic = "connection-created" + CredentialUpdatedTopic = "credential-updated" + ConnectionResetTopic = "connection-reset" + ConfigApplyRequestTopic = "config-apply-request" + WriteConfigSuccessfulTopic = "write-config-successful" + ReloadSuccessfulTopic = "reload-successful" + EnableWatchersTopic = "enable-watchers" + ConfigApplyFailedTopic = "config-apply-failed" + ConfigApplyCompleteTopic = "config-apply-complete" + RollbackWriteTopic = "rollback-write" + DataPlaneHealthRequestTopic = "data-plane-health-request" + DataPlaneHealthResponseTopic = "data-plane-health-response" + APIActionRequestTopic = "api-action-request" + AgentConfigUpdateTopic = "agent-config-update" + ConnectionAgentConfigUpdateTopic = "connection-agent-config-update" ) diff --git a/internal/collector/otel_collector_plugin.go b/internal/collector/otel_collector_plugin.go index 07a05f4db..0a82556bb 100644 --- a/internal/collector/otel_collector_plugin.go +++ b/internal/collector/otel_collector_plugin.go @@ -13,6 +13,7 @@ import ( "net" "net/url" "os" + "reflect" "strings" "sync" "time" @@ -55,11 +56,11 @@ type ( service types.CollectorInterface config *config.Config mu *sync.Mutex - agentConfigMutex *sync.Mutex cancel context.CancelFunc previousNAPSysLogServer string debugOTelConfigPath string stopped bool + agentConfigMutex sync.Mutex } ) @@ -101,7 +102,7 @@ func NewCollector(conf *config.Config) (*Collector, error) { service: oTelCollector, stopped: true, mu: &sync.Mutex{}, - agentConfigMutex: &sync.Mutex{}, + agentConfigMutex: sync.Mutex{}, previousNAPSysLogServer: "", debugOTelConfigPath: debugOTelConfigPath, }, nil @@ -194,8 +195,6 @@ func (oc *Collector) Process(ctx context.Context, msg *bus.Message) { oc.handleNginxConfigUpdate(ctx, msg) case bus.ResourceUpdateTopic: oc.handleResourceUpdate(ctx, msg) - case bus.AgentConfigUpdateTopic: - oc.handleAgentConfigUpdate(ctx, msg) default: slog.DebugContext(ctx, "OTel collector plugin unknown topic", "topic", msg.Topic) } @@ -206,10 +205,25 @@ func (oc *Collector) Subscriptions() []string { return []string{ bus.ResourceUpdateTopic, bus.NginxConfigUpdateTopic, - bus.AgentConfigUpdateTopic, } } +func (oc *Collector) Reconfigure(ctx context.Context, agentConfig *config.Config) error { + slog.DebugContext(ctx, "OTel collector plugin received agent config update message") + + oc.agentConfigMutex.Lock() + defer oc.agentConfigMutex.Unlock() + + if !reflect.DeepEqual(oc.config.Collector.Extensions.HeadersSetter.Headers, + agentConfig.Collector.Extensions.HeadersSetter.Headers) { + slog.DebugContext(ctx, "OTel collector headers have changed, restarting collector") + oc.config = agentConfig + oc.restartCollector(ctx) + } + + return nil +} + // Process receivers and log warning for sub-optimal configurations func (oc *Collector) processReceivers(ctx context.Context, receivers map[string]*config.OtlpReceiver) { for _, receiver := range receivers { @@ -334,21 +348,6 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message) } } -func (oc *Collector) handleAgentConfigUpdate(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "OTel collector plugin received agent config update message") - - oc.agentConfigMutex.Lock() - defer oc.agentConfigMutex.Unlock() - - agentConfig, ok := msg.Data.(*config.Config) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *config.Config", "payload", msg.Data) - return - } - - oc.config = agentConfig -} - func (oc *Collector) updateResourceProcessor(resourceUpdateContext *v1.Resource) bool { resourceProcessorUpdated := false diff --git a/internal/collector/otel_collector_plugin_test.go b/internal/collector/otel_collector_plugin_test.go index db21215c0..7e5dc503e 100644 --- a/internal/collector/otel_collector_plugin_test.go +++ b/internal/collector/otel_collector_plugin_test.go @@ -147,7 +147,7 @@ func TestCollector_InitAndClose(t *testing.T) { require.NoError(t, err, "NewCollector should not return an error with valid config") ctx := context.Background() - messagePipe := bus.NewMessagePipe(10) + messagePipe := bus.NewMessagePipe(10, types.AgentConfig()) err = messagePipe.Register(10, []bus.Plugin{collector}) require.NoError(t, err) @@ -315,7 +315,7 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) { collector.service = createFakeCollector() ctx := context.Background() - messagePipe := bus.NewMessagePipe(10) + messagePipe := bus.NewMessagePipe(10, types.AgentConfig()) err = messagePipe.Register(10, []bus.Plugin{collector}) require.NoError(tt, err) @@ -400,7 +400,7 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) { collector.service = createFakeCollector() ctx := context.Background() - messagePipe := bus.NewMessagePipe(10) + messagePipe := bus.NewMessagePipe(10, types.AgentConfig()) err = messagePipe.Register(10, []bus.Plugin{collector}) require.NoError(tt, err) @@ -462,7 +462,7 @@ func TestCollector_ProcessResourceUpdateTopicFails(t *testing.T) { collector.service = createFakeCollector() ctx := context.Background() - messagePipe := bus.NewMessagePipe(10) + messagePipe := bus.NewMessagePipe(10, types.AgentConfig()) err = messagePipe.Register(10, []bus.Plugin{collector}) require.NoError(tt, err) diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go index 15537b0e5..2ce0c6842 100644 --- a/internal/command/command_plugin.go +++ b/internal/command/command_plugin.go @@ -37,7 +37,7 @@ type ( Subscribe(ctx context.Context) IsConnected() bool CreateConnection(ctx context.Context, resource *mpi.Resource) (*mpi.CreateConnectionResponse, error) - UpdateAgentConfig(ctx context.Context, request *mpi.AgentConfig) (*config.Config, error) + Reconfigure(ctx context.Context, request *config.Config) error } CommandPlugin struct { @@ -49,6 +49,7 @@ type ( subscribeChannel chan *mpi.ManagementPlaneRequest commandServerType model.ServerType subscribeMutex sync.Mutex + agentConfigMutex sync.Mutex } ) @@ -144,6 +145,16 @@ func (cp *CommandPlugin) Subscriptions() []string { } } +func (cp *CommandPlugin) Reconfigure(ctx context.Context, agentConfig *config.Config) error { + cp.agentConfigMutex.Lock() + defer cp.agentConfigMutex.Unlock() + + cp.config = agentConfig + err := cp.commandService.Reconfigure(ctx, agentConfig) + + return err +} + func (cp *CommandPlugin) processResourceUpdate(ctx context.Context, msg *bus.Message) { slog.DebugContext(ctx, "Command plugin received resource update message") if resource, ok := msg.Data.(*mpi.Resource); ok { @@ -183,21 +194,13 @@ func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Res }) if createConnectionResponse.GetAgentConfig() != nil { - newAgentConfig, updateConfigError := cp.commandService.UpdateAgentConfig( - ctx, - createConnectionResponse.GetAgentConfig(), + slog.DebugContext( + ctx, "Notifying other plugins of agent configuration update from create connection response", ) - if updateConfigError != nil { - slog.ErrorContext(ctx, "Unable to update agent configuration", "error", updateConfigError) - } else { - slog.DebugContext( - ctx, "Notifying other plugins of agent configuration update from create connection response", - ) - cp.messagePipe.Process(ctx, &bus.Message{ - Topic: bus.AgentConfigUpdateTopic, - Data: newAgentConfig, - }) - } + cp.messagePipe.Process(ctx, &bus.Message{ + Topic: bus.ConnectionAgentConfigUpdateTopic, + Data: createConnectionResponse.GetAgentConfig(), + }) } } } @@ -334,7 +337,7 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) { return } - cp.handleAgentConfigUpdateRequest(newCtx, message) + cp.messagePipe.Process(ctx, &bus.Message{Topic: bus.AgentConfigUpdateTopic, Data: message}) default: slog.DebugContext(newCtx, "Management plane request not implemented yet") } @@ -438,50 +441,6 @@ func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context, } } -func (cp *CommandPlugin) handleAgentConfigUpdateRequest(ctx context.Context, request *mpi.ManagementPlaneRequest) { - newAgentConfig, err := cp.commandService.UpdateAgentConfig( - ctx, - request.GetUpdateAgentConfigRequest().GetAgentConfig(), - ) - if err != nil { - slog.ErrorContext(ctx, "Unable to update agent configuration", "error", err) - - responseError := cp.commandService.SendDataPlaneResponse(ctx, &mpi.DataPlaneResponse{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: request.GetMessageMeta().GetCorrelationId(), - Timestamp: timestamppb.Now(), - }, - CommandResponse: &mpi.CommandResponse{ - Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, - Message: "Failed to update agent configuration", - }, - }) - - if responseError != nil { - slog.ErrorContext(ctx, "Unable to send data plane response", "error", responseError) - } - } else { - cp.messagePipe.Process(ctx, &bus.Message{Topic: bus.AgentConfigUpdateTopic, Data: newAgentConfig}) - - responseError := cp.commandService.SendDataPlaneResponse(ctx, &mpi.DataPlaneResponse{ - MessageMeta: &mpi.MessageMeta{ - MessageId: id.GenerateMessageID(), - CorrelationId: request.GetMessageMeta().GetCorrelationId(), - Timestamp: timestamppb.Now(), - }, - CommandResponse: &mpi.CommandResponse{ - Status: mpi.CommandResponse_COMMAND_STATUS_OK, - Message: "Successfully updated agent configuration", - }, - }) - - if responseError != nil { - slog.ErrorContext(ctx, "Unable to send data plane response", "error", responseError) - } - } -} - func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus, message, err string, ) *mpi.DataPlaneResponse { diff --git a/internal/command/command_plugin_test.go b/internal/command/command_plugin_test.go index 885b4e979..87af33a9a 100644 --- a/internal/command/command_plugin_test.go +++ b/internal/command/command_plugin_test.go @@ -119,7 +119,7 @@ func TestCommandPlugin_createConnection(t *testing.T) { messages := messagePipe.Messages() assert.Len(t, messages, 2) assert.Equal(t, bus.ConnectionCreatedTopic, messages[0].Topic) - assert.Equal(t, bus.AgentConfigUpdateTopic, messages[1].Topic) + assert.Equal(t, bus.ConnectionAgentConfigUpdateTopic, messages[1].Topic) } func TestCommandPlugin_Process(t *testing.T) { @@ -244,7 +244,7 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) { }, }, expectedTopic: &bus.Message{Topic: bus.AgentConfigUpdateTopic}, - request: "UpdateAgentConfigRequest", + request: "AgentConfigUpdateTopic", configFeatures: config.DefaultFeatures(), }, } @@ -277,24 +277,21 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) { assert.Len(tt, messages, 1) assert.Equal(tt, test.expectedTopic.Topic, messages[0].Topic) - if test.request == "UpdateAgentConfigRequest" { - data, ok := messages[0].Data.(*config.Config) + mp, ok := messages[0].Data.(*mpi.ManagementPlaneRequest) + + switch test.request { + case "UploadRequest": + assert.True(tt, ok) + require.NotNil(tt, mp.GetConfigUploadRequest()) + case "ApplyRequest": + assert.True(tt, ok) + require.NotNil(tt, mp.GetConfigApplyRequest()) + case "APIActionRequest": + assert.True(tt, ok) + require.NotNil(tt, mp.GetActionRequest()) + case "AgentConfigUpdateTopic": assert.True(tt, ok) - require.NotNil(tt, data) - } else { - mp, ok := messages[0].Data.(*mpi.ManagementPlaneRequest) - - switch test.request { - case "UploadRequest": - assert.True(tt, ok) - require.NotNil(tt, mp.GetConfigUploadRequest()) - case "ApplyRequest": - assert.True(tt, ok) - require.NotNil(tt, mp.GetConfigApplyRequest()) - case "APIActionRequest": - assert.True(tt, ok) - require.NotNil(tt, mp.GetActionRequest()) - } + require.NotNil(tt, mp.GetUpdateAgentConfigRequest()) } }) } diff --git a/internal/command/command_service.go b/internal/command/command_service.go index a2e61a435..3c101bf80 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -10,7 +10,6 @@ import ( "errors" "fmt" "log/slog" - "reflect" "sync" "sync/atomic" @@ -48,6 +47,7 @@ type ( subscribeClientMutex sync.Mutex configApplyRequestQueueMutex sync.Mutex resourceMutex sync.Mutex + agentConfigMutex sync.Mutex } ) @@ -183,26 +183,14 @@ func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *m ) } -func (cs *CommandService) UpdateAgentConfig( - ctx context.Context, - mpiConfig *mpi.AgentConfig, -) (*config.Config, error) { - slog.InfoContext(ctx, "Updating agent configuration", "config", mpiConfig) - - updatedLog := config.FromAgentConfigLogProto(mpiConfig.GetLog()) - - if mpiConfig.GetLog() != nil && !reflect.DeepEqual(cs.agentConfig.Log, updatedLog) { - slog.InfoContext(ctx, "Updating log configuration", "log", updatedLog) - cs.agentConfig.Log = updatedLog +func (cs *CommandService) Reconfigure(ctx context.Context, agentConfig *config.Config) error { + cs.agentConfigMutex.Lock() + defer cs.agentConfigMutex.Unlock() - slogger := logger.New( - cs.agentConfig.Log.Path, - cs.agentConfig.Log.Level, - ) - slog.SetDefault(slogger) - } + slog.DebugContext(ctx, "Command plugin is reconfiguring to update agent configuration", "config", agentConfig) + cs.agentConfig = agentConfig - return cs.agentConfig, nil + return nil } // Subscribe to the Management Plane for incoming commands. diff --git a/internal/command/command_service_test.go b/internal/command/command_service_test.go index a554ac428..d91e9fe0f 100644 --- a/internal/command/command_service_test.go +++ b/internal/command/command_service_test.go @@ -377,41 +377,6 @@ func TestCommandService_SendDataPlaneResponse_configApplyRequest(t *testing.T) { wg.Wait() } -func TestCommandService_UpdateAgentConfiguration(t *testing.T) { - ctx := context.Background() - commandServiceClient := &v1fakes.FakeCommandServiceClient{} - - initialConfig := types.AgentConfig() - initialConfig.Log.Level = "INFO" - initialConfig.Log.Path = "" - - commandService := NewCommandService( - commandServiceClient, - initialConfig, - make(chan *mpi.ManagementPlaneRequest), - ) - commandService.isConnected.Store(true) - - originalLogger := slog.Default() - - updatedConfig := &mpi.AgentConfig{ - Log: &mpi.Log{ - LogLevel: mpi.Log_LOG_LEVEL_DEBUG, - LogPath: "/etc/nginx-agent", - }, - } - - newAgentConfig, err := commandService.UpdateAgentConfig(ctx, updatedConfig) - require.NoError(t, err) - require.Equal(t, "DEBUG", commandService.agentConfig.Log.Level) - require.Equal(t, "/etc/nginx-agent", commandService.agentConfig.Log.Path) - require.Equal(t, "DEBUG", newAgentConfig.Log.Level) - require.Equal(t, "/etc/nginx-agent", newAgentConfig.Log.Path) - - updatedLogger := slog.Default() - require.NotEqual(t, originalLogger, updatedLogger) -} - func TestCommandService_isValidRequest(t *testing.T) { ctx := context.Background() commandServiceClient := &v1fakes.FakeCommandServiceClient{} diff --git a/internal/command/commandfakes/fake_command_service.go b/internal/command/commandfakes/fake_command_service.go index 21c52824a..9d5087028 100644 --- a/internal/command/commandfakes/fake_command_service.go +++ b/internal/command/commandfakes/fake_command_service.go @@ -34,6 +34,18 @@ type FakeCommandService struct { isConnectedReturnsOnCall map[int]struct { result1 bool } + ReconfigureStub func(context.Context, *config.Config) error + reconfigureMutex sync.RWMutex + reconfigureArgsForCall []struct { + arg1 context.Context + arg2 *config.Config + } + reconfigureReturns struct { + result1 error + } + reconfigureReturnsOnCall map[int]struct { + result1 error + } SendDataPlaneResponseStub func(context.Context, *v1.DataPlaneResponse) error sendDataPlaneResponseMutex sync.RWMutex sendDataPlaneResponseArgsForCall []struct { @@ -51,20 +63,6 @@ type FakeCommandService struct { subscribeArgsForCall []struct { arg1 context.Context } - UpdateAgentConfigStub func(context.Context, *v1.AgentConfig) (*config.Config, error) - updateAgentConfigMutex sync.RWMutex - updateAgentConfigArgsForCall []struct { - arg1 context.Context - arg2 *v1.AgentConfig - } - updateAgentConfigReturns struct { - result1 *config.Config - result2 error - } - updateAgentConfigReturnsOnCall map[int]struct { - result1 *config.Config - result2 error - } UpdateClientStub func(context.Context, v1.CommandServiceClient) error updateClientMutex sync.RWMutex updateClientArgsForCall []struct { @@ -223,6 +221,68 @@ func (fake *FakeCommandService) IsConnectedReturnsOnCall(i int, result1 bool) { }{result1} } +func (fake *FakeCommandService) Reconfigure(arg1 context.Context, arg2 *config.Config) error { + fake.reconfigureMutex.Lock() + ret, specificReturn := fake.reconfigureReturnsOnCall[len(fake.reconfigureArgsForCall)] + fake.reconfigureArgsForCall = append(fake.reconfigureArgsForCall, struct { + arg1 context.Context + arg2 *config.Config + }{arg1, arg2}) + stub := fake.ReconfigureStub + fakeReturns := fake.reconfigureReturns + fake.recordInvocation("Reconfigure", []interface{}{arg1, arg2}) + fake.reconfigureMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeCommandService) ReconfigureCallCount() int { + fake.reconfigureMutex.RLock() + defer fake.reconfigureMutex.RUnlock() + return len(fake.reconfigureArgsForCall) +} + +func (fake *FakeCommandService) ReconfigureCalls(stub func(context.Context, *config.Config) error) { + fake.reconfigureMutex.Lock() + defer fake.reconfigureMutex.Unlock() + fake.ReconfigureStub = stub +} + +func (fake *FakeCommandService) ReconfigureArgsForCall(i int) (context.Context, *config.Config) { + fake.reconfigureMutex.RLock() + defer fake.reconfigureMutex.RUnlock() + argsForCall := fake.reconfigureArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeCommandService) ReconfigureReturns(result1 error) { + fake.reconfigureMutex.Lock() + defer fake.reconfigureMutex.Unlock() + fake.ReconfigureStub = nil + fake.reconfigureReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeCommandService) ReconfigureReturnsOnCall(i int, result1 error) { + fake.reconfigureMutex.Lock() + defer fake.reconfigureMutex.Unlock() + fake.ReconfigureStub = nil + if fake.reconfigureReturnsOnCall == nil { + fake.reconfigureReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.reconfigureReturnsOnCall[i] = struct { + result1 error + }{result1} +} + func (fake *FakeCommandService) SendDataPlaneResponse(arg1 context.Context, arg2 *v1.DataPlaneResponse) error { fake.sendDataPlaneResponseMutex.Lock() ret, specificReturn := fake.sendDataPlaneResponseReturnsOnCall[len(fake.sendDataPlaneResponseArgsForCall)] @@ -317,71 +377,6 @@ func (fake *FakeCommandService) SubscribeArgsForCall(i int) context.Context { return argsForCall.arg1 } -func (fake *FakeCommandService) UpdateAgentConfig(arg1 context.Context, arg2 *v1.AgentConfig) (*config.Config, error) { - fake.updateAgentConfigMutex.Lock() - ret, specificReturn := fake.updateAgentConfigReturnsOnCall[len(fake.updateAgentConfigArgsForCall)] - fake.updateAgentConfigArgsForCall = append(fake.updateAgentConfigArgsForCall, struct { - arg1 context.Context - arg2 *v1.AgentConfig - }{arg1, arg2}) - stub := fake.UpdateAgentConfigStub - fakeReturns := fake.updateAgentConfigReturns - fake.recordInvocation("UpdateAgentConfig", []interface{}{arg1, arg2}) - fake.updateAgentConfigMutex.Unlock() - if stub != nil { - return stub(arg1, arg2) - } - if specificReturn { - return ret.result1, ret.result2 - } - return fakeReturns.result1, fakeReturns.result2 -} - -func (fake *FakeCommandService) UpdateAgentConfigCallCount() int { - fake.updateAgentConfigMutex.RLock() - defer fake.updateAgentConfigMutex.RUnlock() - return len(fake.updateAgentConfigArgsForCall) -} - -func (fake *FakeCommandService) UpdateAgentConfigCalls(stub func(context.Context, *v1.AgentConfig) (*config.Config, error)) { - fake.updateAgentConfigMutex.Lock() - defer fake.updateAgentConfigMutex.Unlock() - fake.UpdateAgentConfigStub = stub -} - -func (fake *FakeCommandService) UpdateAgentConfigArgsForCall(i int) (context.Context, *v1.AgentConfig) { - fake.updateAgentConfigMutex.RLock() - defer fake.updateAgentConfigMutex.RUnlock() - argsForCall := fake.updateAgentConfigArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 -} - -func (fake *FakeCommandService) UpdateAgentConfigReturns(result1 *config.Config, result2 error) { - fake.updateAgentConfigMutex.Lock() - defer fake.updateAgentConfigMutex.Unlock() - fake.UpdateAgentConfigStub = nil - fake.updateAgentConfigReturns = struct { - result1 *config.Config - result2 error - }{result1, result2} -} - -func (fake *FakeCommandService) UpdateAgentConfigReturnsOnCall(i int, result1 *config.Config, result2 error) { - fake.updateAgentConfigMutex.Lock() - defer fake.updateAgentConfigMutex.Unlock() - fake.UpdateAgentConfigStub = nil - if fake.updateAgentConfigReturnsOnCall == nil { - fake.updateAgentConfigReturnsOnCall = make(map[int]struct { - result1 *config.Config - result2 error - }) - } - fake.updateAgentConfigReturnsOnCall[i] = struct { - result1 *config.Config - result2 error - }{result1, result2} -} - func (fake *FakeCommandService) UpdateClient(arg1 context.Context, arg2 v1.CommandServiceClient) error { fake.updateClientMutex.Lock() ret, specificReturn := fake.updateClientReturnsOnCall[len(fake.updateClientArgsForCall)] @@ -580,12 +575,12 @@ func (fake *FakeCommandService) Invocations() map[string][][]interface{} { defer fake.createConnectionMutex.RUnlock() fake.isConnectedMutex.RLock() defer fake.isConnectedMutex.RUnlock() + fake.reconfigureMutex.RLock() + defer fake.reconfigureMutex.RUnlock() fake.sendDataPlaneResponseMutex.RLock() defer fake.sendDataPlaneResponseMutex.RUnlock() fake.subscribeMutex.RLock() defer fake.subscribeMutex.RUnlock() - fake.updateAgentConfigMutex.RLock() - defer fake.updateAgentConfigMutex.RUnlock() fake.updateClientMutex.RLock() defer fake.updateClientMutex.RUnlock() fake.updateDataPlaneHealthMutex.RLock() diff --git a/internal/config/config.go b/internal/config/config.go index 5d70d0277..330afc842 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -125,7 +125,7 @@ func ResolveConfig() (*Config, error) { } defaultCollector(collector, config) - addLabelsAsOTelHeaders(collector, config.Labels) + AddLabelsAsOTelHeaders(collector, config.Labels) slog.Debug("Agent config", "config", config) slog.Info("Excluded files from being watched for file changes", "exclude_files", @@ -345,7 +345,7 @@ func addDefaultVMHostMetricsReceiver(collector *Collector) { } } -func addLabelsAsOTelHeaders(collector *Collector, labels map[string]any) { +func AddLabelsAsOTelHeaders(collector *Collector, labels map[string]any) { slog.Debug("Adding labels as headers to collector", "labels", labels) if collector.Extensions.HeadersSetter != nil { for key, value := range labels { diff --git a/internal/config/mapper.go b/internal/config/mapper.go index 641744be1..b418f0d90 100644 --- a/internal/config/mapper.go +++ b/internal/config/mapper.go @@ -124,11 +124,21 @@ func ToAuxiliaryCommandServerProto(cmd *Command) *mpi.AuxiliaryCommandServer { return protoConfig } -func FromAgentConfigLogProto(log *mpi.Log) *Log { - return &Log{ - Level: MapConfigLogLevelToSlogLevel(log.GetLogLevel()), - Path: log.GetLogPath(), +func FromAgentRemoteConfigProto(config *mpi.AgentConfig) *Config { + conf := &Config{} + + if config.GetLabels() != nil { + conf.Labels = mpi.ConvertToMap(config.GetLabels()) } + + if config.GetLog() != nil { + conf.Log = &Log{ + Level: MapConfigLogLevelToSlogLevel(config.GetLog().GetLogLevel()), + Path: config.GetLog().GetLogPath(), + } + } + + return conf } func ToAgentConfigLogProto(log *Log) *mpi.Log { diff --git a/internal/file/file_plugin.go b/internal/file/file_plugin.go index 7a2fa208d..8cbc402e4 100644 --- a/internal/file/file_plugin.go +++ b/internal/file/file_plugin.go @@ -113,8 +113,6 @@ func (fp *FilePlugin) Process(ctx context.Context, msg *bus.Message) { fp.handleReloadSuccess(ctxWithMetadata, msg) case bus.ConfigApplyFailedTopic: fp.handleConfigApplyFailedRequest(ctxWithMetadata, msg) - case bus.AgentConfigUpdateTopic: - fp.handleAgentConfigUpdate(ctxWithMetadata, msg) default: slog.DebugContext(ctxWithMetadata, "File plugin received unknown topic", "topic", msg.Topic) } @@ -140,10 +138,20 @@ func (fp *FilePlugin) Subscriptions() []string { bus.ConfigApplyFailedTopic, bus.ReloadSuccessfulTopic, bus.ConfigApplyCompleteTopic, - bus.AgentConfigUpdateTopic, } } +func (fp *FilePlugin) Reconfigure(ctx context.Context, agentConfig *config.Config) error { + slog.DebugContext(ctx, "File plugin is reconfiguring to update agent configuration") + + fp.agentConfigMutex.Lock() + defer fp.agentConfigMutex.Unlock() + + fp.config = agentConfig + + return nil +} + func (fp *FilePlugin) enableWatchers(ctx context.Context, configContext *model.NginxConfigContext, instanceID string, @@ -416,21 +424,6 @@ func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Me fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: response}) } -func (fp *FilePlugin) handleAgentConfigUpdate(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "File plugin received agent config update message") - - fp.agentConfigMutex.Lock() - defer fp.agentConfigMutex.Unlock() - - agentConfig, ok := msg.Data.(*config.Config) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *config.Config", "payload", msg.Data) - return - } - - fp.config = agentConfig -} - func (fp *FilePlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus, message, instanceID, err string, ) *mpi.DataPlaneResponse { diff --git a/internal/file/file_plugin_test.go b/internal/file/file_plugin_test.go index 74aa6024a..2955dacf3 100644 --- a/internal/file/file_plugin_test.go +++ b/internal/file/file_plugin_test.go @@ -61,7 +61,6 @@ func TestFilePlugin_Subscriptions(t *testing.T) { bus.ConfigApplyFailedTopic, bus.ReloadSuccessfulTopic, bus.ConfigApplyCompleteTopic, - bus.AgentConfigUpdateTopic, }, filePlugin.Subscriptions(), ) diff --git a/internal/resource/resource_plugin.go b/internal/resource/resource_plugin.go index c76138333..30b718e7d 100644 --- a/internal/resource/resource_plugin.go +++ b/internal/resource/resource_plugin.go @@ -29,7 +29,7 @@ type Resource struct { messagePipe bus.MessagePipeInterface resourceService resourceServiceInterface agentConfig *config.Config - agentConfigMutex *sync.Mutex + agentConfigMutex sync.Mutex } type errResponse struct { @@ -123,8 +123,6 @@ func (r *Resource) Process(ctx context.Context, msg *bus.Message) { r.handleRollbackWrite(ctx, msg) case bus.APIActionRequestTopic: r.handleAPIActionRequest(ctx, msg) - case bus.AgentConfigUpdateTopic: - r.handleAgentConfigUpdate(ctx, msg) default: slog.DebugContext(ctx, "Unknown topic", "topic", msg.Topic) } @@ -138,10 +136,20 @@ func (*Resource) Subscriptions() []string { bus.WriteConfigSuccessfulTopic, bus.RollbackWriteTopic, bus.APIActionRequestTopic, - bus.AgentConfigUpdateTopic, } } +func (r *Resource) Reconfigure(ctx context.Context, agentConfig *config.Config) error { + slog.DebugContext(ctx, "Resource plugin is reconfiguring to update agent configuration") + + r.agentConfigMutex.Lock() + defer r.agentConfigMutex.Unlock() + + r.agentConfig = agentConfig + + return nil +} + func (r *Resource) handleAPIActionRequest(ctx context.Context, msg *bus.Message) { slog.DebugContext(ctx, "Resource plugin received api action request message") managementPlaneRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest) @@ -286,18 +294,3 @@ func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) { r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse}) } - -func (r *Resource) handleAgentConfigUpdate(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "Resource plugin received agent config update message") - - r.agentConfigMutex.Lock() - defer r.agentConfigMutex.Unlock() - - agentConfig, ok := msg.Data.(*config.Config) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *config.Config", "payload", msg.Data) - return - } - - r.agentConfig = agentConfig -} diff --git a/internal/resource/resource_plugin_test.go b/internal/resource/resource_plugin_test.go index df45879f4..0a7165b17 100644 --- a/internal/resource/resource_plugin_test.go +++ b/internal/resource/resource_plugin_test.go @@ -845,7 +845,6 @@ func TestResource_Subscriptions(t *testing.T) { bus.WriteConfigSuccessfulTopic, bus.RollbackWriteTopic, bus.APIActionRequestTopic, - bus.AgentConfigUpdateTopic, }, resourcePlugin.Subscriptions()) } diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index c6af0dc00..99504be2c 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -146,8 +146,6 @@ func (w *Watcher) Process(ctx context.Context, msg *bus.Message) { w.handleHealthRequest(ctx) case bus.EnableWatchersTopic: w.handleEnableWatchers(ctx, msg) - case bus.AgentConfigUpdateTopic: - w.handleAgentConfigUpdate(ctx, msg) default: slog.DebugContext(ctx, "Watcher plugin unknown topic", "topic", msg.Topic) } @@ -158,10 +156,20 @@ func (*Watcher) Subscriptions() []string { bus.ConfigApplyRequestTopic, bus.DataPlaneHealthRequestTopic, bus.EnableWatchersTopic, - bus.AgentConfigUpdateTopic, } } +func (w *Watcher) Reconfigure(ctx context.Context, agentConfig *config.Config) error { + slog.DebugContext(ctx, "Watcher plugin is reconfiguring to update agent configuration") + + w.agentConfigMutex.Lock() + defer w.agentConfigMutex.Unlock() + + w.agentConfig = agentConfig + + return nil +} + func (w *Watcher) handleEnableWatchers(ctx context.Context, msg *bus.Message) { slog.DebugContext(ctx, "Watcher plugin received enable watchers message") enableWatchersMessage, ok := msg.Data.(*model.EnableWatchers) @@ -319,18 +327,3 @@ func (w *Watcher) handleInstanceUpdates(newCtx context.Context, message instance ) } } - -func (w *Watcher) handleAgentConfigUpdate(ctx context.Context, msg *bus.Message) { - slog.DebugContext(ctx, "Watcher plugin received agent config update message") - - w.agentConfigMutex.Lock() - defer w.agentConfigMutex.Unlock() - - agentConfig, ok := msg.Data.(*config.Config) - if !ok { - slog.ErrorContext(ctx, "Unable to cast message payload to *config.Config", "payload", msg.Data) - return - } - - w.agentConfig = agentConfig -} diff --git a/internal/watcher/watcher_plugin_test.go b/internal/watcher/watcher_plugin_test.go index 424009c60..9331d320c 100644 --- a/internal/watcher/watcher_plugin_test.go +++ b/internal/watcher/watcher_plugin_test.go @@ -248,7 +248,6 @@ func TestWatcher_Subscriptions(t *testing.T) { bus.ConfigApplyRequestTopic, bus.DataPlaneHealthRequestTopic, bus.EnableWatchersTopic, - bus.AgentConfigUpdateTopic, }, watcherPlugin.Subscriptions(), ) From f52b49577652c440507ab6da9c8e640d264794f0 Mon Sep 17 00:00:00 2001 From: Aphral Griffin Date: Thu, 20 Nov 2025 15:20:44 +0000 Subject: [PATCH 2/2] refactor remote config add labels --- internal/resource/resource_plugin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/resource/resource_plugin.go b/internal/resource/resource_plugin.go index 30b718e7d..87c3313bb 100644 --- a/internal/resource/resource_plugin.go +++ b/internal/resource/resource_plugin.go @@ -29,7 +29,7 @@ type Resource struct { messagePipe bus.MessagePipeInterface resourceService resourceServiceInterface agentConfig *config.Config - agentConfigMutex sync.Mutex + agentConfigMutex *sync.Mutex } type errResponse struct {