Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
732 changes: 499 additions & 233 deletions api/grpc/mpi/v1/command.pb.go

Large diffs are not rendered by default.

469 changes: 469 additions & 0 deletions api/grpc/mpi/v1/command.pb.validate.go

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions api/grpc/mpi/v1/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ message UpdateDataPlaneStatusRequest {
// Respond to a UpdateDataPlaneStatusRequest - intentionally empty
message UpdateDataPlaneStatusResponse {}

message UpdateNginxAgentConfigurationRequest {
// Meta-information associated with a message
mpi.v1.MessageMeta message_meta = 1;
// the NGINX Agent configuration to update
AgentConfig agent_config = 2;
}

message UpdateNginxAgentConfigurationResponse {
// The success or failure of the UpdateNginxAgentConfigurationRequest
mpi.v1.CommandResponse response = 1;
}

message InstanceHealth {
// Health status enum
enum InstanceHealthStatus {
Expand Down Expand Up @@ -171,6 +183,8 @@ message ManagementPlaneRequest {
APIActionRequest action_request = 7;
// triggers a DataPlaneResponse with a command_response for a particular correlation_id
CommandStatusRequest command_status_request = 8;
// triggers an UpdateNginxAgentConfiguration rpc, returning an UpdateNginxAgentConfigurationResponse
UpdateNginxAgentConfigurationRequest update_nginx_agent_configuration_request = 9;
}
}

Expand Down Expand Up @@ -387,6 +401,26 @@ message AgentConfig {
string message_buffer_size = 6;
// Auxiliary Command server settings
AuxiliaryCommandServer auxiliary_command = 7;
// Log settings
Log log = 8;
}

// The log settings associated with NGINX Agent
message Log {
LogLevel log_level = 1;
enum LogLevel {
// Unspecified log level
LOG_LEVEL_UNSPECIFIED = 0;
// Error log level
LOG_LEVEL_ERROR = 1;
// Warning log level
LOG_LEVEL_WARN = 2;
// Info log level
LOG_LEVEL_INFO = 3;
// Debug log level
LOG_LEVEL_DEBUG = 4;
}
string log_path = 2;
}

// The command server settings, associated with messaging from an external source
Expand Down
68 changes: 68 additions & 0 deletions docs/proto/protos.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
- [InstanceHealth](#mpi-v1-InstanceHealth)
- [InstanceMeta](#mpi-v1-InstanceMeta)
- [InstanceRuntime](#mpi-v1-InstanceRuntime)
- [Log](#mpi-v1-Log)
- [ManagementPlaneRequest](#mpi-v1-ManagementPlaneRequest)
- [MetricsServer](#mpi-v1-MetricsServer)
- [NGINXAppProtectRuntimeInfo](#mpi-v1-NGINXAppProtectRuntimeInfo)
Expand All @@ -81,10 +82,13 @@
- [UpdateDataPlaneStatusRequest](#mpi-v1-UpdateDataPlaneStatusRequest)
- [UpdateDataPlaneStatusResponse](#mpi-v1-UpdateDataPlaneStatusResponse)
- [UpdateHTTPUpstreamServers](#mpi-v1-UpdateHTTPUpstreamServers)
- [UpdateNginxAgentConfigurationRequest](#mpi-v1-UpdateNginxAgentConfigurationRequest)
- [UpdateNginxAgentConfigurationResponse](#mpi-v1-UpdateNginxAgentConfigurationResponse)
- [UpdateStreamServers](#mpi-v1-UpdateStreamServers)

- [InstanceHealth.InstanceHealthStatus](#mpi-v1-InstanceHealth-InstanceHealthStatus)
- [InstanceMeta.InstanceType](#mpi-v1-InstanceMeta-InstanceType)
- [Log.LogLevel](#mpi-v1-Log-LogLevel)

- [CommandService](#mpi-v1-CommandService)

Expand Down Expand Up @@ -718,6 +722,7 @@ This contains a series of NGINX Agent configurations
| features | [string](#string) | repeated | A list of features that the NGINX Agent has |
| message_buffer_size | [string](#string) | | Message buffer size, maximum not acknowledged messages from the subscribe perspective |
| auxiliary_command | [AuxiliaryCommandServer](#mpi-v1-AuxiliaryCommandServer) | | Auxiliary Command server settings |
| log | [Log](#mpi-v1-Log) | | Log settings |



Expand Down Expand Up @@ -1049,6 +1054,22 @@ Meta-information relating to the reported instance



<a name="mpi-v1-Log"></a>

### Log
The log settings associated with NGINX Agent


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| log_level | [Log.LogLevel](#mpi-v1-Log-LogLevel) | | |
| log_path | [string](#string) | | |






<a name="mpi-v1-ManagementPlaneRequest"></a>

### ManagementPlaneRequest
Expand All @@ -1064,6 +1085,7 @@ A Management Plane request for information, triggers an associated rpc on the Da
| config_upload_request | [ConfigUploadRequest](#mpi-v1-ConfigUploadRequest) | | triggers a series of rpc UpdateFile(File) for that instances |
| action_request | [APIActionRequest](#mpi-v1-APIActionRequest) | | triggers a DataPlaneResponse with a command_response for a particular action |
| command_status_request | [CommandStatusRequest](#mpi-v1-CommandStatusRequest) | | triggers a DataPlaneResponse with a command_response for a particular correlation_id |
| update_nginx_agent_configuration_request | [UpdateNginxAgentConfigurationRequest](#mpi-v1-UpdateNginxAgentConfigurationRequest) | | triggers an UpdateNginxAgentConfiguration rpc, returning an UpdateNginxAgentConfigurationResponse |



Expand Down Expand Up @@ -1271,6 +1293,37 @@ Update HTTP Upstream Servers for an instance



<a name="mpi-v1-UpdateNginxAgentConfigurationRequest"></a>

### UpdateNginxAgentConfigurationRequest



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| message_meta | [MessageMeta](#mpi-v1-MessageMeta) | | Meta-information associated with a message |
| agent_config | [AgentConfig](#mpi-v1-AgentConfig) | | the NGINX Agent configuration to update |






<a name="mpi-v1-UpdateNginxAgentConfigurationResponse"></a>

### UpdateNginxAgentConfigurationResponse



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| response | [CommandResponse](#mpi-v1-CommandResponse) | | The success or failure of the UpdateNginxAgentConfigurationRequest |






<a name="mpi-v1-UpdateStreamServers"></a>

### UpdateStreamServers
Expand Down Expand Up @@ -1318,6 +1371,21 @@ the types of instances possible
| INSTANCE_TYPE_NGINX_APP_PROTECT | 5 | NGINX App Protect |



<a name="mpi-v1-Log-LogLevel"></a>

### Log.LogLevel


| Name | Number | Description |
| ---- | ------ | ----------- |
| LOG_LEVEL_UNSPECIFIED | 0 | Unspecified log level |
| LOG_LEVEL_ERROR | 1 | Error log level |
| LOG_LEVEL_WARN | 2 | Warning log level |
| LOG_LEVEL_INFO | 3 | Info log level |
| LOG_LEVEL_DEBUG | 4 | Debug log level |





Expand Down
1 change: 1 addition & 0 deletions internal/bus/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ const (
DataPlaneHealthRequestTopic = "data-plane-health-request"
DataPlaneHealthResponseTopic = "data-plane-health-response"
APIActionRequestTopic = "api-action-request"
AgentConfigUpdateTopic = "agent-config-update"
)
14 changes: 14 additions & 0 deletions internal/collector/otel_collector_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ func (oc *Collector) Process(ctx context.Context, msg *bus.Message) {
oc.handleNginxConfigUpdate(ctx, msg)
case bus.ResourceUpdateTopic:
oc.handleResourceUpdate(ctx, msg)
case bus.AgentConfigUpdateTopic:
oc.handleAgentConfigUpdate(ctx, msg)
default:
slog.DebugContext(ctx, "OTel collector plugin unknown topic", "topic", msg.Topic)
}
Expand All @@ -202,6 +204,7 @@ func (oc *Collector) Subscriptions() []string {
return []string{
bus.ResourceUpdateTopic,
bus.NginxConfigUpdateTopic,
bus.AgentConfigUpdateTopic,
}
}

Expand Down Expand Up @@ -329,6 +332,17 @@ func (oc *Collector) handleResourceUpdate(ctx context.Context, msg *bus.Message)
}
}

func (oc *Collector) handleAgentConfigUpdate(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "OTel collector plugin received agent config update message")
agentConfig, ok := msg.Data.(*config.Config)
if !ok {
slog.ErrorContext(ctx, "Unable to cast message payload to *config.Config", "payload", msg.Data)
return
}

oc.config = agentConfig
}

func (oc *Collector) updateResourceProcessor(resourceUpdateContext *v1.Resource) bool {
resourceProcessorUpdated := false

Expand Down
66 changes: 66 additions & 0 deletions internal/command/command_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type (
Subscribe(ctx context.Context)
IsConnected() bool
CreateConnection(ctx context.Context, resource *mpi.Resource) (*mpi.CreateConnectionResponse, error)
UpdateAgentConfig(ctx context.Context, request *mpi.AgentConfig) (*config.Config, error)
}

CommandPlugin struct {
Expand Down Expand Up @@ -180,6 +181,24 @@ func (cp *CommandPlugin) createConnection(ctx context.Context, resource *mpi.Res
Topic: bus.ConnectionCreatedTopic,
Data: createConnectionResponse,
})

if createConnectionResponse.GetAgentConfig() != nil {
newAgentConfig, updateConfigError := cp.commandService.UpdateAgentConfig(
ctx,
createConnectionResponse.GetAgentConfig(),
)
if updateConfigError != nil {
slog.ErrorContext(ctx, "Unable to update agent configuration", "error", updateConfigError)
} else {
slog.DebugContext(
ctx, "Notifying other plugins of agent configuration update from create connection response",
)
cp.messagePipe.Process(ctx, &bus.Message{
Topic: bus.AgentConfigUpdateTopic,
Data: newAgentConfig,
})
}
}
}
}

Expand Down Expand Up @@ -305,6 +324,9 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
}
slog.InfoContext(ctx, "Received management plane action request")
cp.handleAPIActionRequest(newCtx, message)
case *mpi.ManagementPlaneRequest_UpdateNginxAgentConfigurationRequest:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Auxiliary vs Command sever matter for the agent config ?

slog.InfoContext(ctx, "Received management plane request - update agent configuration")
cp.handleAgentConfigUpdateRequest(newCtx, message)
default:
slog.DebugContext(newCtx, "Management plane request not implemented yet")
}
Expand Down Expand Up @@ -408,6 +430,50 @@ func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context,
}
}

func (cp *CommandPlugin) handleAgentConfigUpdateRequest(ctx context.Context, request *mpi.ManagementPlaneRequest) {
newAgentConfig, err := cp.commandService.UpdateAgentConfig(
ctx,
request.GetUpdateNginxAgentConfigurationRequest().GetAgentConfig(),
)
if err != nil {
slog.ErrorContext(ctx, "Command service was unable to update agent configuration", "error", err)

responseError := cp.commandService.SendDataPlaneResponse(ctx, &mpi.DataPlaneResponse{
MessageMeta: &mpi.MessageMeta{
MessageId: id.GenerateMessageID(),
CorrelationId: request.GetMessageMeta().GetCorrelationId(),
Timestamp: timestamppb.Now(),
},
CommandResponse: &mpi.CommandResponse{
Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
Message: "Failed to update agent configuration",
},
})

if responseError != nil {
slog.ErrorContext(ctx, "Unable to send data plane response", "error", responseError)
}
} else {
cp.Process(ctx, &bus.Message{Topic: bus.AgentConfigUpdateTopic, Data: newAgentConfig})

responseError := cp.commandService.SendDataPlaneResponse(ctx, &mpi.DataPlaneResponse{
MessageMeta: &mpi.MessageMeta{
MessageId: id.GenerateMessageID(),
CorrelationId: request.GetMessageMeta().GetCorrelationId(),
Timestamp: timestamppb.Now(),
},
CommandResponse: &mpi.CommandResponse{
Status: mpi.CommandResponse_COMMAND_STATUS_OK,
Message: "Successfully updated agent configuration",
},
})

if responseError != nil {
slog.ErrorContext(ctx, "Unable to send data plane response", "error", responseError)
}
}
}

func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
message, err string,
) *mpi.DataPlaneResponse {
Expand Down
19 changes: 16 additions & 3 deletions internal/command/command_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,20 @@ func TestCommandPlugin_Init(t *testing.T) {

func TestCommandPlugin_createConnection(t *testing.T) {
ctx := context.Background()
response := &mpi.CreateConnectionResponse{
Response: &mpi.CommandResponse{
Status: mpi.CommandResponse_COMMAND_STATUS_OK,
Message: "Connection created successfully",
},
AgentConfig: &mpi.AgentConfig{
Log: &mpi.Log{
LogLevel: mpi.Log_LOG_LEVEL_DEBUG,
},
},
}

commandService := &commandfakes.FakeCommandService{}
commandService.CreateConnectionReturns(&mpi.CreateConnectionResponse{}, nil)
commandService.CreateConnectionReturns(response, nil)
messagePipe := busfakes.NewFakeMessagePipe()

commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
Expand All @@ -98,14 +110,15 @@ func TestCommandPlugin_createConnection(t *testing.T) {

assert.Eventually(
t,
func() bool { return len(messagePipe.Messages()) == 1 },
func() bool { return len(messagePipe.Messages()) == 2 },
2*time.Second,
10*time.Millisecond,
)

messages := messagePipe.Messages()
assert.Len(t, messages, 1)
assert.Len(t, messages, 2)
assert.Equal(t, bus.ConnectionCreatedTopic, messages[0].Topic)
assert.Equal(t, bus.AgentConfigUpdateTopic, messages[1].Topic)
}

func TestCommandPlugin_Process(t *testing.T) {
Expand Down
27 changes: 27 additions & 0 deletions internal/command/command_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"log/slog"
"reflect"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -182,6 +183,32 @@ func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *m
)
}

func (cs *CommandService) UpdateAgentConfig(
ctx context.Context,
mpiConfig *mpi.AgentConfig,
) (*config.Config, error) {
if !cs.isConnected.Load() {
return nil, errors.New("command service client not connected yet")
}
slog.InfoContext(ctx, "Updating agent configuration", "config", mpiConfig)

updatedLog := config.FromAgentConfigLogProto(mpiConfig.GetLog())

if mpiConfig.GetLog() != nil && !reflect.DeepEqual(cs.agentConfig.Log, updatedLog) {
slog.InfoContext(ctx, "Updating log configuration", "log", updatedLog)
cs.agentConfig.Log = updatedLog

slogger := logger.New(
cs.agentConfig.Log.Path,
cs.agentConfig.Log.Level,
)
slog.SetDefault(slogger)
}

return cs.agentConfig, nil
}

// Subscribe to the Management Plane for incoming commands.
func (cs *CommandService) Subscribe(ctx context.Context) {
commonSettings := &config.BackOff{
InitialInterval: cs.agentConfig.Client.Backoff.InitialInterval,
Expand Down
Loading
Loading