Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions api/grpc/mpi/v1/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package v1

import (
"log/slog"

"google.golang.org/protobuf/types/known/structpb"
)

Expand Down Expand Up @@ -45,3 +47,28 @@ func ConvertToStructs(input map[string]any) ([]*structpb.Struct, error) {

return structs, nil
}

func ConvertToMap(input []*structpb.Struct) map[string]any {
convertedMap := make(map[string]any)
for _, value := range input {
for key, field := range value.GetFields() {
kind := field.GetKind()
switch kind.(type) {
case *structpb.Value_StringValue:
convertedMap[key] = field.GetStringValue()
case *structpb.Value_NumberValue:
convertedMap[key] = int(field.GetNumberValue())
case *structpb.Value_StructValue:
convertedMap[key] = field.GetStructValue()
case *structpb.Value_ListValue:
convertedMap[key] = field.GetListValue()
case *structpb.Value_BoolValue:
convertedMap[key] = field.GetBoolValue()
default:
slog.Warn("unknown type for map conversion", "value", kind)
}
}
}

return convertedMap
}
42 changes: 42 additions & 0 deletions api/grpc/mpi/v1/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,45 @@ func TestConvertToStructs(t *testing.T) {
})
}
}

func TestConvertToMaps(t *testing.T) {
tests := []struct {
name string
expected map[string]any
input []*structpb.Struct
}{
{
name: "Test 1: Valid input with simple key-value pairs",
expected: map[string]any{
"key1": "value1",
"key2": 123,
"key3": true,
},
input: []*structpb.Struct{
{
Fields: map[string]*structpb.Value{
"key1": structpb.NewStringValue("value1"),
},
},
{
Fields: map[string]*structpb.Value{
"key2": structpb.NewNumberValue(123),
},
},
{
Fields: map[string]*structpb.Value{
"key3": structpb.NewBoolValue(true),
},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := ConvertToMap(tt.input)

assert.Equal(t, tt.expected, got)
})
}
}
2 changes: 1 addition & 1 deletion internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (a *App) Run(ctx context.Context) error {
slog.String("commit", a.commit),
)

messagePipe := bus.NewMessagePipe(defaultMessagePipeChannelSize)
messagePipe := bus.NewMessagePipe(defaultMessagePipeChannelSize, agentConfig)
err = messagePipe.Register(defaultQueueSize, plugin.LoadPlugins(ctx, agentConfig))
if err != nil {
slog.ErrorContext(ctx, "Failed to register plugins", "error", err)
Expand Down
150 changes: 148 additions & 2 deletions internal/bus/message_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@ package bus
import (
"context"
"log/slog"
"reflect"
"sync"

mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1"
"github.com/nginx/agent/v3/internal/config"
"github.com/nginx/agent/v3/internal/logger"
"github.com/nginx/agent/v3/pkg/id"
messagebus "github.com/vardius/message-bus"
"google.golang.org/protobuf/types/known/timestamppb"
)

type (
Expand Down Expand Up @@ -45,20 +51,24 @@ type (
Info() *Info
Process(ctx context.Context, msg *Message)
Subscriptions() []string
Reconfigure(ctx context.Context, agentConfig *config.Config) error
}

MessagePipe struct {
agentConfig *config.Config
bus messagebus.MessageBus
messageChannel chan *MessageWithContext
plugins []Plugin
pluginsMutex sync.Mutex
configMutex sync.Mutex
}
)

func NewMessagePipe(size int) *MessagePipe {
func NewMessagePipe(size int, agentConfig *config.Config) *MessagePipe {
return &MessagePipe{
messageChannel: make(chan *MessageWithContext, size),
pluginsMutex: sync.Mutex{},
agentConfig: agentConfig,
}
}

Expand Down Expand Up @@ -110,6 +120,7 @@ func (p *MessagePipe) Process(ctx context.Context, messages ...*Message) {
}
}

//nolint:contextcheck,revive // need to use context from the message for the correlationID not use the parent context
func (p *MessagePipe) Run(ctx context.Context) {
p.pluginsMutex.Lock()
p.initPlugins(ctx)
Expand All @@ -126,7 +137,17 @@ func (p *MessagePipe) Run(ctx context.Context) {

return
case m := <-p.messageChannel:
p.bus.Publish(m.message.Topic, m.ctx, m.message)
if m.message != nil {
switch m.message.Topic {
case AgentConfigUpdateTopic:
p.handleAgentConfigUpdateTopic(m.ctx, m.message)
case ConnectionAgentConfigUpdateTopic:
p.handleConnectionAgentConfigUpdateTopic(m.ctx, m.message)
default:
slog.InfoContext(ctx, "Publishing message", "topic", m.message.Topic, "message----", m.message)
p.bus.Publish(m.message.Topic, m.ctx, m.message)
}
}
}
}
}
Expand Down Expand Up @@ -157,6 +178,114 @@ func (p *MessagePipe) Index(pluginName string, plugins []Plugin) int {
return -1
}

func (p *MessagePipe) Reconfigure(ctx context.Context, agentConfig *mpi.AgentConfig, topic, correlationID string) {
var reconfigureError error
p.configMutex.Lock()
defer p.configMutex.Unlock()
currentConfig := p.agentConfig

// convert agent config from *mpi.AgentConfig to *config.Config
updateAgentConfig := config.FromAgentRemoteConfigProto(agentConfig)

// The check for updates to the config needs to be done here as the command plugin needs the latest agent config
// to be sent in response to create connection requests.
p.updateConfig(ctx, updateAgentConfig)

// Reconfigure each plugin with the new agent config
for _, plugin := range p.plugins {
slog.InfoContext(ctx, "Reconfigure plugin", "plugin", plugin.Info().Name)
reconfigureError = plugin.Reconfigure(ctx, p.agentConfig)
if reconfigureError != nil {
slog.ErrorContext(ctx, "Reconfigure plugin failed", "plugin", plugin.Info().Name)
break
}
}

if reconfigureError != nil {
slog.ErrorContext(ctx, "Error updating plugin with updated agent config, reverting",
"error", reconfigureError.Error())

// If the agent update was received from a create connection request no data plane response needs to be sent
if topic == AgentConfigUpdateTopic {
response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
"Failed to update agent config", reconfigureError.Error())
p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response})
}

p.agentConfig = currentConfig
for _, plugin := range p.plugins {
err := plugin.Reconfigure(ctx, currentConfig)
if err != nil {
slog.ErrorContext(ctx, "Error reverting agent config", "error", err.Error())
}
}
}

slog.InfoContext(ctx, "Finished reconfigure plugin", "plugins", p.plugins)
if topic == AgentConfigUpdateTopic {
response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
"Successfully updated agent config", "")
p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response})
}
}

func (p *MessagePipe) handleConnectionAgentConfigUpdateTopic(ctx context.Context, msg *Message) {
slog.DebugContext(ctx, "Handling connection agent config update topic", "topic", msg.Topic)
agentConfig, ok := msg.Data.(*mpi.AgentConfig)
if !ok {
slog.ErrorContext(ctx, "Failed to parse agent config update message")
return
}

p.Reconfigure(ctx, agentConfig, msg.Topic, "")
}

func (p *MessagePipe) handleAgentConfigUpdateTopic(ctx context.Context, msg *Message) {
slog.DebugContext(ctx, "Received agent config update topic", "topic", msg.Topic)
mpRequest, ok := msg.Data.(*mpi.ManagementPlaneRequest)
if !ok {
slog.ErrorContext(ctx, "Failed to parse agent config update message")
return
}

reconfigureRequest, ok := mpRequest.GetRequest().(*mpi.ManagementPlaneRequest_UpdateAgentConfigRequest)
if !ok {
slog.ErrorContext(ctx, "Failed to parse agent config update message")
return
}

correlationID := reconfigureRequest.UpdateAgentConfigRequest.GetMessageMeta().GetCorrelationId()
p.Reconfigure(ctx, reconfigureRequest.UpdateAgentConfigRequest.GetAgentConfig(), msg.Topic, correlationID)
}

func (p *MessagePipe) updateConfig(ctx context.Context, updateAgentConfig *config.Config) {
slog.InfoContext(ctx, "Updating agent config")
if updateAgentConfig.Log != nil && !reflect.DeepEqual(p.agentConfig.Log, updateAgentConfig.Log) {
slog.DebugContext(ctx, "Agent log level has been updated", "previous", p.agentConfig.Log,
"update", updateAgentConfig.Log)
p.agentConfig.Log = updateAgentConfig.Log

slogger := logger.New(
p.agentConfig.Log.Path,
p.agentConfig.Log.Level,
)
slog.SetDefault(slogger)
}

if updateAgentConfig.Labels != nil && !reflect.DeepEqual(p.agentConfig.Labels, updateAgentConfig.Labels) {
slog.DebugContext(ctx, "Agent labels have been updated", "previous", p.agentConfig.Labels,
"update", updateAgentConfig.Labels)
p.agentConfig.Labels = updateAgentConfig.Labels

// OTel Headers also need to be updated when labels have been updated
if p.agentConfig.Collector != nil {
slog.DebugContext(ctx, "Agent OTel headers have been updated")
config.AddLabelsAsOTelHeaders(p.agentConfig.Collector, updateAgentConfig.Labels)
}
}
slog.DebugContext(ctx, "Updated agent config")
}

func (p *MessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin Plugin) error {
if index != -1 {
p.plugins = append(p.plugins[:index], p.plugins[index+1:]...)
Expand Down Expand Up @@ -209,3 +338,20 @@ func (p *MessagePipe) initPlugins(ctx context.Context) {
}
}
}

func (p *MessagePipe) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
message, err string,
) *mpi.DataPlaneResponse {
return &mpi.DataPlaneResponse{
MessageMeta: &mpi.MessageMeta{
MessageId: id.GenerateMessageID(),
CorrelationId: correlationID,
Timestamp: timestamppb.Now(),
},
CommandResponse: &mpi.CommandResponse{
Status: status,
Message: message,
Error: err,
},
}
}
38 changes: 35 additions & 3 deletions internal/bus/message_pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ package bus

import (
"context"
"log/slog"
"testing"
"time"

"github.com/nginx/agent/v3/internal/config"
"github.com/nginx/agent/v3/test/types"
"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -41,6 +44,11 @@ func (*testPlugin) Subscriptions() []string {
return []string{"test.message"}
}

func (p *testPlugin) Reconfigure(ctx context.Context, agentConfig *config.Config) error {
p.Called()
return nil
}

func TestMessagePipe(t *testing.T) {
messages := []*Message{
{Topic: "test.message", Data: 1},
Expand All @@ -58,7 +66,7 @@ func TestMessagePipe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
pipelineDone := make(chan bool)

messagePipe := NewMessagePipe(100)
messagePipe := NewMessagePipe(100, types.AgentConfig())
err := messagePipe.Register(10, []Plugin{plugin})

require.NoError(t, err)
Expand All @@ -84,7 +92,7 @@ func TestMessagePipe_DeRegister(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

messagePipe := NewMessagePipe(100)
messagePipe := NewMessagePipe(100, types.AgentConfig())
err := messagePipe.Register(100, []Plugin{plugin})

require.NoError(t, err)
Expand All @@ -105,7 +113,7 @@ func TestMessagePipe_IsPluginRegistered(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
pipelineDone := make(chan bool)

messagePipe := NewMessagePipe(100)
messagePipe := NewMessagePipe(100, types.AgentConfig())
err := messagePipe.Register(10, []Plugin{plugin})

require.NoError(t, err)
Expand All @@ -121,3 +129,27 @@ func TestMessagePipe_IsPluginRegistered(t *testing.T) {
assert.True(t, messagePipe.IsPluginRegistered(plugin.Info().Name))
assert.False(t, messagePipe.IsPluginRegistered("metrics"))
}

func TestMessagePipe_updateConfig(t *testing.T) {
initialConfig := types.AgentConfig()
initialConfig.Log.Level = "INFO"
initialConfig.Log.Path = ""

messagePipe := NewMessagePipe(100, initialConfig)
originalLogger := slog.Default()

updatedConfig := &config.Config{
Log: &config.Log{
Path: "/etc/nginx-agent/",
Level: "DEBUG",
},
}

messagePipe.updateConfig(context.Background(), updatedConfig)

require.Equal(t, messagePipe.agentConfig.Log.Path, updatedConfig.Log.Path)
require.Equal(t, messagePipe.agentConfig.Log.Level, updatedConfig.Log.Level)

newLogger := slog.Default()
require.NotEqual(t, originalLogger, newLogger)
}
Loading
Loading