Skip to content

Commit ae97812

Browse files
authored
fix: set the otlpbridge port based on config (#247)
1 parent 8c9cdb7 commit ae97812

File tree

5 files changed

+222
-5
lines changed

5 files changed

+222
-5
lines changed

agent/agent.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,12 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err
198198
}
199199

200200
if a.config.OrbAgent.ConfigManager.Active == "fleet" {
201-
const otlpBridgeEndpoint = "grpc://localhost:4317"
201+
// Get gRPC port from config, defaulting to 4318 if not specified
202+
grpcPort := 4318
203+
if a.config.OrbAgent.ConfigManager.Sources.Fleet.OTLPBridgeGRPCPort != nil {
204+
grpcPort = *a.config.OrbAgent.ConfigManager.Sources.Fleet.OTLPBridgeGRPCPort
205+
}
206+
otlpBridgeEndpoint := fmt.Sprintf("grpc://localhost:%d", grpcPort)
202207
if commonBackend, exists := a.config.OrbAgent.Backends["common"]; exists {
203208
if commonMap, ok := commonBackend.(map[string]any); ok {
204209
if otlpSection, ok := commonMap["otlp"].(map[string]any); ok {

agent/agent_test.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ func TestStart_FleetConfig_OverridesExistingOTLPGrpcURL(t *testing.T) {
126126

127127
// Verify the config was modified by checking backendsCommon which is set in startBackends
128128
// The OTLP configuration happens before startBackends, so backendsCommon should have the updated value
129-
assert.Equal(t, "grpc://localhost:4317", orbAgent.backendsCommon.Otlp.Grpc)
129+
// Default port is 4318
130+
assert.Equal(t, "grpc://localhost:4318", orbAgent.backendsCommon.Otlp.Grpc)
130131
}
131132

132133
func TestStart_FleetConfig_CreatesOTLPSectionWhenMissing(t *testing.T) {
@@ -165,7 +166,8 @@ func TestStart_FleetConfig_CreatesOTLPSectionWhenMissing(t *testing.T) {
165166

166167
// Verify the config was modified by checking backendsCommon which is set in startBackends
167168
// The OTLP configuration happens before startBackends, so backendsCommon should have the updated value
168-
assert.Equal(t, "grpc://localhost:4317", orbAgent.backendsCommon.Otlp.Grpc)
169+
// Default port is 4318
170+
assert.Equal(t, "grpc://localhost:4318", orbAgent.backendsCommon.Otlp.Grpc)
169171
}
170172

171173
func TestStart_FleetConfig_CreatesCommonBackendWhenMissing(t *testing.T) {
@@ -200,7 +202,8 @@ func TestStart_FleetConfig_CreatesCommonBackendWhenMissing(t *testing.T) {
200202

201203
// Verify the config was modified by checking backendsCommon which is set in startBackends
202204
// The OTLP configuration happens before startBackends, so backendsCommon should have the updated value
203-
assert.Equal(t, "grpc://localhost:4317", orbAgent.backendsCommon.Otlp.Grpc)
205+
// Default port is 4318
206+
assert.Equal(t, "grpc://localhost:4318", orbAgent.backendsCommon.Otlp.Grpc)
204207
}
205208

206209
func TestStart_NonFleetConfig_DoesNotModifyConfig(t *testing.T) {
@@ -244,3 +247,44 @@ func TestStart_NonFleetConfig_DoesNotModifyConfig(t *testing.T) {
244247
// For non-fleet config, the original value should remain
245248
assert.Equal(t, originalGrpcURL, orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should remain unchanged for non-fleet config")
246249
}
250+
251+
func TestStart_FleetConfig_UsesConfiguredGRPCPort(t *testing.T) {
252+
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
253+
repo, err := policies.NewMemRepo()
254+
require.NoError(t, err)
255+
256+
customPort := 9999
257+
cfg := config.Config{
258+
OrbAgent: config.OrbAgent{
259+
Backends: map[string]any{},
260+
ConfigManager: config.ManagerConfig{
261+
Active: "fleet",
262+
Sources: config.Sources{
263+
Fleet: config.FleetManager{
264+
OTLPBridgeGRPCPort: &customPort,
265+
},
266+
},
267+
},
268+
SecretsManager: config.ManagerSecrets{
269+
Active: "",
270+
},
271+
},
272+
}
273+
274+
agent, err := New(logger, cfg)
275+
require.NoError(t, err)
276+
277+
orbAgent := agent.(*orbAgent)
278+
orbAgent.secretsManager = &mockSecretsManager{}
279+
orbAgent.policyManager = &mockPolicyManager{repo: repo}
280+
281+
ctx, cancel := context.WithCancel(context.Background())
282+
defer cancel()
283+
284+
err = orbAgent.Start(ctx, cancel)
285+
require.Error(t, err) // Expected to fail when starting backends
286+
287+
// Verify the config was modified with the custom port
288+
expectedURL := "grpc://localhost:9999"
289+
assert.Equal(t, expectedURL, orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should use configured port")
290+
}

agent/config/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type FleetManager struct {
4343
ClientSecret string `yaml:"client_secret"`
4444
TokenExpiryCheckInterval *int `yaml:"token_expiry_check_interval,omitempty"` // Check interval in seconds (default: 30)
4545
TokenReconnectBuffer *int `yaml:"token_reconnect_buffer,omitempty"` // Reconnect buffer in seconds before expiry (default: 120)
46+
OTLPBridgeGRPCPort *int `yaml:"otlp_bridge_grpc_port,omitempty"` // GRPC port for the OTLP bridge (default: 4318)
4647
}
4748

4849
// Sources represents the configuration for manager sources, including cloud, local and git.

agent/configmgr/fleet.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,13 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
171171
fleetManager.connection.AddOnReadyHook(func(cm *autopaho.ConnectionManager, topics fleet.TokenResponseTopics) {
172172
if fleetManager.otlpBridge == nil {
173173
fleetManager.logger.Info("MQTT connection ready, initializing OTLP bridge")
174+
// Get gRPC port from config, defaulting to 4318 if not specified
175+
grpcPort := 4318
176+
if fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.OTLPBridgeGRPCPort != nil {
177+
grpcPort = *fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.OTLPBridgeGRPCPort
178+
}
174179
bridgeConfig := otlpbridge.BridgeConfig{
175-
ListenAddr: ":4317",
180+
ListenAddr: fmt.Sprintf(":%d", grpcPort),
176181
Encoding: "json",
177182
}
178183
var err error
@@ -185,6 +190,7 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
185190
fleetManager.logger.Error("failed to start OTLP bridge", slog.Any("error", err))
186191
return
187192
}
193+
fleetManager.logger.Info("OTLP bridge server started", slog.Int("grpc_port", grpcPort))
188194
} else {
189195
fleetManager.logger.Info("OTLP bridge already initialized, skipping initialization")
190196
}

agent/configmgr/fleet_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -849,3 +849,164 @@ func TestFleetConfigManager_OnReadyHook_SkipsInitializationOnReconnect(t *testin
849849
_ = fleetManager.otlpBridge.Stop(context.Background())
850850
}
851851
}
852+
853+
func TestFleetConfigManager_OnReadyHook_UsesConfiguredGRPCPort(t *testing.T) {
854+
// Test that OnReadyHook uses the configured gRPC port from config
855+
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
856+
mockPMgr := &mockPolicyManagerForFleet{}
857+
mockPMgr.On("GetRepo").Return(nil)
858+
fleetManager := newFleetConfigManager(logger, mockPMgr, &mockBackendState{})
859+
860+
// Create config with custom gRPC port
861+
customPort := 9999
862+
cfg := config.Config{
863+
OrbAgent: config.OrbAgent{
864+
ConfigManager: config.ManagerConfig{
865+
Sources: config.Sources{
866+
Fleet: config.FleetManager{
867+
OTLPBridgeGRPCPort: &customPort,
868+
},
869+
},
870+
},
871+
},
872+
}
873+
fleetManager.config = cfg
874+
875+
// Create the hook function that uses config (simulating what Start does)
876+
hookFunc := func(cm *autopaho.ConnectionManager, topics fleet.TokenResponseTopics) {
877+
if fleetManager.otlpBridge == nil {
878+
fleetManager.logger.Info("MQTT connection ready, initializing OTLP bridge")
879+
// Get gRPC port from config, defaulting to 4318 if not specified
880+
grpcPort := 4318
881+
if fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.OTLPBridgeGRPCPort != nil {
882+
grpcPort = *fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.OTLPBridgeGRPCPort
883+
}
884+
bridgeConfig := otlpbridge.BridgeConfig{
885+
ListenAddr: fmt.Sprintf(":%d", grpcPort),
886+
Encoding: "json",
887+
}
888+
var err error
889+
fleetManager.otlpBridge, err = otlpbridge.NewBridgeServer(bridgeConfig, fleetManager.policyManager.GetRepo(), fleetManager.logger)
890+
if err != nil {
891+
fleetManager.logger.Error("failed to create OTLP bridge", slog.Any("error", err))
892+
return
893+
}
894+
if err := fleetManager.otlpBridge.Start(context.Background()); err != nil {
895+
fleetManager.logger.Error("failed to start OTLP bridge", slog.Any("error", err))
896+
return
897+
}
898+
} else {
899+
fleetManager.logger.Info("OTLP bridge already initialized, skipping initialization")
900+
}
901+
902+
pub := otlpbridge.NewCMAdapterPublisher(cm)
903+
fleetManager.otlpBridge.SetPublisher(pub)
904+
fleetManager.otlpBridge.SetIngestTopic(topics.Ingest)
905+
fleetManager.otlpBridge.SetTelemetryTopic(topics.Telemetry)
906+
fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT",
907+
slog.String("ingest_topic", topics.Ingest),
908+
slog.String("telemetry_topic", topics.Telemetry))
909+
}
910+
911+
// Register the hook
912+
fleetManager.connection.AddOnReadyHook(hookFunc)
913+
914+
// Simulate connection ready event
915+
topics := fleet.TokenResponseTopics{
916+
Ingest: "test/otlp/topic",
917+
Telemetry: "test/telemetry/topic",
918+
}
919+
920+
// Call the hook manually
921+
hookFunc(nil, topics)
922+
923+
// Verify bridge was initialized
924+
require.NotNil(t, fleetManager.otlpBridge, "bridge should be initialized")
925+
// Verify the bridge is listening on the configured port
926+
// We can't directly check the port, but we can verify the bridge started successfully
927+
// The actual port verification would require inspecting the listener, which is not exposed
928+
// So we just verify the bridge exists and started without error
929+
930+
// Cleanup
931+
if fleetManager.otlpBridge != nil {
932+
_ = fleetManager.otlpBridge.Stop(context.Background())
933+
}
934+
}
935+
936+
func TestFleetConfigManager_OnReadyHook_UsesDefaultGRPCPort(t *testing.T) {
937+
// Test that OnReadyHook uses the default gRPC port (4318) when not configured
938+
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError}))
939+
mockPMgr := &mockPolicyManagerForFleet{}
940+
mockPMgr.On("GetRepo").Return(nil)
941+
fleetManager := newFleetConfigManager(logger, mockPMgr, &mockBackendState{})
942+
943+
// Create config without gRPC port configured (should use default)
944+
cfg := config.Config{
945+
OrbAgent: config.OrbAgent{
946+
ConfigManager: config.ManagerConfig{
947+
Sources: config.Sources{
948+
Fleet: config.FleetManager{
949+
// OTLPBridgeGRPCPort is nil, should use default 4318
950+
},
951+
},
952+
},
953+
},
954+
}
955+
fleetManager.config = cfg
956+
957+
// Create the hook function that uses config (simulating what Start does)
958+
hookFunc := func(cm *autopaho.ConnectionManager, topics fleet.TokenResponseTopics) {
959+
if fleetManager.otlpBridge == nil {
960+
fleetManager.logger.Info("MQTT connection ready, initializing OTLP bridge")
961+
// Get gRPC port from config, defaulting to 4318 if not specified
962+
grpcPort := 4318
963+
if fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.OTLPBridgeGRPCPort != nil {
964+
grpcPort = *fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.OTLPBridgeGRPCPort
965+
}
966+
bridgeConfig := otlpbridge.BridgeConfig{
967+
ListenAddr: fmt.Sprintf(":%d", grpcPort),
968+
Encoding: "json",
969+
}
970+
var err error
971+
fleetManager.otlpBridge, err = otlpbridge.NewBridgeServer(bridgeConfig, fleetManager.policyManager.GetRepo(), fleetManager.logger)
972+
if err != nil {
973+
fleetManager.logger.Error("failed to create OTLP bridge", slog.Any("error", err))
974+
return
975+
}
976+
if err := fleetManager.otlpBridge.Start(context.Background()); err != nil {
977+
fleetManager.logger.Error("failed to start OTLP bridge", slog.Any("error", err))
978+
return
979+
}
980+
} else {
981+
fleetManager.logger.Info("OTLP bridge already initialized, skipping initialization")
982+
}
983+
984+
pub := otlpbridge.NewCMAdapterPublisher(cm)
985+
fleetManager.otlpBridge.SetPublisher(pub)
986+
fleetManager.otlpBridge.SetIngestTopic(topics.Ingest)
987+
fleetManager.otlpBridge.SetTelemetryTopic(topics.Telemetry)
988+
fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT",
989+
slog.String("ingest_topic", topics.Ingest),
990+
slog.String("telemetry_topic", topics.Telemetry))
991+
}
992+
993+
// Register the hook
994+
fleetManager.connection.AddOnReadyHook(hookFunc)
995+
996+
// Simulate connection ready event
997+
topics := fleet.TokenResponseTopics{
998+
Ingest: "test/otlp/topic",
999+
Telemetry: "test/telemetry/topic",
1000+
}
1001+
1002+
// Call the hook manually
1003+
hookFunc(nil, topics)
1004+
1005+
// Verify bridge was initialized (should use default port 4318)
1006+
require.NotNil(t, fleetManager.otlpBridge, "bridge should be initialized with default port")
1007+
1008+
// Cleanup
1009+
if fleetManager.otlpBridge != nil {
1010+
_ = fleetManager.otlpBridge.Stop(context.Background())
1011+
}
1012+
}

0 commit comments

Comments
 (0)