Skip to content

Commit 3e3f37a

Browse files
authored
feat: send diode data over MQTT (#222)
1 parent 2418e40 commit 3e3f37a

File tree

11 files changed

+387
-42
lines changed

11 files changed

+387
-42
lines changed

agent/agent.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,37 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err
197197
return err
198198
}
199199

200+
if a.config.OrbAgent.ConfigManager.Active == "fleet" {
201+
const otlpBridgeEndpoint = "grpc://localhost:4317"
202+
if commonBackend, exists := a.config.OrbAgent.Backends["common"]; exists {
203+
if commonMap, ok := commonBackend.(map[string]any); ok {
204+
if otlpSection, ok := commonMap["otlp"].(map[string]any); ok {
205+
grpcURL, _ := otlpSection["grpc"].(string)
206+
if grpcURL != "" {
207+
a.logger.Warn("Overriding OTLP gRPC URL for fleet config manager", "url", grpcURL)
208+
}
209+
otlpSection["grpc"] = otlpBridgeEndpoint
210+
a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", otlpBridgeEndpoint)
211+
212+
} else {
213+
// otlp section doesn't exist, create it
214+
commonMap["otlp"] = map[string]any{
215+
"grpc": otlpBridgeEndpoint,
216+
}
217+
a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", otlpBridgeEndpoint)
218+
}
219+
}
220+
} else {
221+
// common backend doesn't exist, create it with otlp config
222+
a.config.OrbAgent.Backends["common"] = map[string]any{
223+
"otlp": map[string]any{
224+
"grpc": otlpBridgeEndpoint,
225+
},
226+
}
227+
a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", otlpBridgeEndpoint)
228+
}
229+
}
230+
200231
if err = a.startBackends(agentCtx, a.config.OrbAgent.Backends, a.config.OrbAgent.Labels); err != nil {
201232
return err
202233
}

agent/agent_test.go

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ import (
77
"testing"
88

99
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
1011

1112
"github.com/netboxlabs/orb-agent/agent/backend"
1213
"github.com/netboxlabs/orb-agent/agent/config"
1314
"github.com/netboxlabs/orb-agent/agent/configmgr"
15+
"github.com/netboxlabs/orb-agent/agent/policies"
1416
)
1517

1618
// mockConfigManager implements configmgr.Manager for testing Stop delegation
@@ -40,3 +42,205 @@ func TestAgentStop_DelegatesToConfigManagerStop(t *testing.T) {
4042

4143
assert.True(t, mockMgr.stopCalled, "expected configManager.Stop to be called")
4244
}
45+
46+
// mockPolicyManager implements policymgr.PolicyManager for testing
47+
type mockPolicyManager struct {
48+
repo policies.PolicyRepo
49+
}
50+
51+
func (m *mockPolicyManager) ManagePolicy(_ config.PolicyPayload) {}
52+
func (m *mockPolicyManager) RemovePolicyDataset(_ string, _ string, _ backend.Backend) {}
53+
func (m *mockPolicyManager) GetPolicyState() ([]policies.PolicyData, error) {
54+
return nil, nil
55+
}
56+
57+
func (m *mockPolicyManager) GetRepo() policies.PolicyRepo {
58+
return m.repo
59+
}
60+
61+
func (m *mockPolicyManager) ApplyBackendPolicies(_ backend.Backend) error {
62+
return nil
63+
}
64+
65+
func (m *mockPolicyManager) RemoveBackendPolicies(_ backend.Backend, _ bool) error {
66+
return nil
67+
}
68+
69+
func (m *mockPolicyManager) RemovePolicy(_ string, _ string, _ string) error {
70+
return nil
71+
}
72+
73+
// mockSecretsManager implements secretsmgr.Manager for testing
74+
type mockSecretsManager struct{}
75+
76+
func (m *mockSecretsManager) Start(_ context.Context) error {
77+
return nil
78+
}
79+
func (m *mockSecretsManager) RegisterUpdatePoliciesCallback(_ func(map[string]bool)) {}
80+
func (m *mockSecretsManager) SolvePolicySecrets(payload config.PolicyPayload) (config.PolicyPayload, error) {
81+
return payload, nil
82+
}
83+
84+
func (m *mockSecretsManager) SolveConfigSecrets(backends map[string]any, configManager config.ManagerConfig) (map[string]any, config.ManagerConfig, error) {
85+
return backends, configManager, nil
86+
}
87+
88+
func TestStart_FleetConfig_OverridesExistingOTLPGrpcURL(t *testing.T) {
89+
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
90+
repo, err := policies.NewMemRepo()
91+
require.NoError(t, err)
92+
93+
cfg := config.Config{
94+
OrbAgent: config.OrbAgent{
95+
Backends: map[string]any{
96+
"common": map[string]any{
97+
"otlp": map[string]any{
98+
"grpc": "original:4317",
99+
},
100+
},
101+
},
102+
ConfigManager: config.ManagerConfig{
103+
Active: "fleet",
104+
},
105+
SecretsManager: config.ManagerSecrets{
106+
Active: "",
107+
},
108+
},
109+
}
110+
111+
agent, err := New(logger, cfg)
112+
require.NoError(t, err)
113+
114+
orbAgent := agent.(*orbAgent)
115+
orbAgent.secretsManager = &mockSecretsManager{}
116+
orbAgent.policyManager = &mockPolicyManager{repo: repo}
117+
118+
ctx, cancel := context.WithCancel(context.Background())
119+
defer cancel()
120+
121+
// Start will fail when trying to start backends, but we can check the config before that
122+
err = orbAgent.Start(ctx, cancel)
123+
// We expect an error because there are no actual backends configured
124+
// But the important thing is that the config was modified
125+
require.Error(t, err)
126+
127+
// Verify the config was modified by checking backendsCommon which is set in startBackends
128+
// The OTLP configuration happens before startBackends, so backendsCommon should have the updated value
129+
assert.Equal(t, "grpc://localhost:4317", orbAgent.backendsCommon.Otlp.Grpc)
130+
}
131+
132+
func TestStart_FleetConfig_CreatesOTLPSectionWhenMissing(t *testing.T) {
133+
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
134+
repo, err := policies.NewMemRepo()
135+
require.NoError(t, err)
136+
137+
cfg := config.Config{
138+
OrbAgent: config.OrbAgent{
139+
Backends: map[string]any{
140+
"common": map[string]any{
141+
"other": "value",
142+
},
143+
},
144+
ConfigManager: config.ManagerConfig{
145+
Active: "fleet",
146+
},
147+
SecretsManager: config.ManagerSecrets{
148+
Active: "",
149+
},
150+
},
151+
}
152+
153+
agent, err := New(logger, cfg)
154+
require.NoError(t, err)
155+
156+
orbAgent := agent.(*orbAgent)
157+
orbAgent.secretsManager = &mockSecretsManager{}
158+
orbAgent.policyManager = &mockPolicyManager{repo: repo}
159+
160+
ctx, cancel := context.WithCancel(context.Background())
161+
defer cancel()
162+
163+
err = orbAgent.Start(ctx, cancel)
164+
require.Error(t, err) // Expected to fail when starting backends
165+
166+
// Verify the config was modified by checking backendsCommon which is set in startBackends
167+
// The OTLP configuration happens before startBackends, so backendsCommon should have the updated value
168+
assert.Equal(t, "grpc://localhost:4317", orbAgent.backendsCommon.Otlp.Grpc)
169+
}
170+
171+
func TestStart_FleetConfig_CreatesCommonBackendWhenMissing(t *testing.T) {
172+
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
173+
repo, err := policies.NewMemRepo()
174+
require.NoError(t, err)
175+
176+
cfg := config.Config{
177+
OrbAgent: config.OrbAgent{
178+
Backends: map[string]any{},
179+
ConfigManager: config.ManagerConfig{
180+
Active: "fleet",
181+
},
182+
SecretsManager: config.ManagerSecrets{
183+
Active: "",
184+
},
185+
},
186+
}
187+
188+
agent, err := New(logger, cfg)
189+
require.NoError(t, err)
190+
191+
orbAgent := agent.(*orbAgent)
192+
orbAgent.secretsManager = &mockSecretsManager{}
193+
orbAgent.policyManager = &mockPolicyManager{repo: repo}
194+
195+
ctx, cancel := context.WithCancel(context.Background())
196+
defer cancel()
197+
198+
err = orbAgent.Start(ctx, cancel)
199+
require.Error(t, err) // Expected to fail when starting backends
200+
201+
// Verify the config was modified by checking backendsCommon which is set in startBackends
202+
// The OTLP configuration happens before startBackends, so backendsCommon should have the updated value
203+
assert.Equal(t, "grpc://localhost:4317", orbAgent.backendsCommon.Otlp.Grpc)
204+
}
205+
206+
func TestStart_NonFleetConfig_DoesNotModifyConfig(t *testing.T) {
207+
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
208+
repo, err := policies.NewMemRepo()
209+
require.NoError(t, err)
210+
211+
originalGrpcURL := "original:4317"
212+
cfg := config.Config{
213+
OrbAgent: config.OrbAgent{
214+
Backends: map[string]any{
215+
"common": map[string]any{
216+
"otlp": map[string]any{
217+
"grpc": originalGrpcURL,
218+
},
219+
},
220+
},
221+
ConfigManager: config.ManagerConfig{
222+
Active: "local", // Not fleet
223+
},
224+
SecretsManager: config.ManagerSecrets{
225+
Active: "",
226+
},
227+
},
228+
}
229+
230+
agent, err := New(logger, cfg)
231+
require.NoError(t, err)
232+
233+
orbAgent := agent.(*orbAgent)
234+
orbAgent.secretsManager = &mockSecretsManager{}
235+
orbAgent.policyManager = &mockPolicyManager{repo: repo}
236+
237+
ctx, cancel := context.WithCancel(context.Background())
238+
defer cancel()
239+
240+
err = orbAgent.Start(ctx, cancel)
241+
require.Error(t, err) // Expected to fail when starting backends
242+
243+
// Verify the config was NOT modified by checking backendsCommon which is set in startBackends
244+
// For non-fleet config, the original value should remain
245+
assert.Equal(t, originalGrpcURL, orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should remain unchanged for non-fleet config")
246+
}

agent/configmgr/fleet.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
117117
"capabilities_topic", topics.Capabilities,
118118
"inbox_topic", topics.Inbox,
119119
"outbox_topic", topics.Outbox,
120-
"otlp_topic", topics.Ingest)
120+
"otlp_topic", topics.Ingest,
121+
"telemetry_topic", topics.Telemetry)
121122

122123
connectionDetails := fleet.ConnectionDetails{
123124
MQTTURL: jwtClaims.MqttURL,
@@ -172,7 +173,7 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
172173
fleetManager.logger.Info("MQTT connection ready, initializing OTLP bridge")
173174
bridgeConfig := otlpbridge.BridgeConfig{
174175
ListenAddr: ":4317",
175-
Encoding: "protobuf",
176+
Encoding: "json",
176177
}
177178
var err error
178179
fleetManager.otlpBridge, err = otlpbridge.NewBridgeServer(bridgeConfig, fleetManager.policyManager.GetRepo(), fleetManager.logger)
@@ -192,7 +193,10 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
192193
pub := otlpbridge.NewCMAdapterPublisher(cm)
193194
fleetManager.otlpBridge.SetPublisher(pub)
194195
fleetManager.otlpBridge.SetIngestTopic(topics.Ingest)
195-
fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest))
196+
fleetManager.otlpBridge.SetTelemetryTopic(topics.Telemetry)
197+
fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT",
198+
slog.String("ingest_topic", topics.Ingest),
199+
slog.String("telemetry_topic", topics.Telemetry))
196200
})
197201

198202
// Start goroutine to handle reconnect requests (JWT refresh)

agent/configmgr/fleet/connection_hooks_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestConnect_StoresTopicsBeforeConnecting(t *testing.T) {
5656
MQTTURL: "mqtt://localhost:1883",
5757
Token: "",
5858
AgentID: "agent-1",
59-
Topics: TokenResponseTopics{Inbox: "inbox/x", Heartbeat: "hb/x", Capabilities: "cap/x", Outbox: "out/x", Ingest: "otlp/x"},
59+
Topics: TokenResponseTopics{Inbox: "inbox/x", Heartbeat: "hb/x", Capabilities: "cap/x", Outbox: "out/x", Ingest: "otlp/x", Telemetry: "telemetry/x"},
6060
ClientID: "client-1",
6161
Zone: "zone-a",
6262
}

agent/configmgr/fleet/jwt_claims_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ func GestGenerateTopicsFromTemplate(t *testing.T) {
173173
Capabilities: "orgs/test-org/agents/test-client-123/capabilities",
174174
Inbox: "orgs/test-org/agents/test-client-123/inbox",
175175
Outbox: "orgs/test-org/agents/test-client-123/outbox",
176+
Ingest: "orgs/test-org/agents/test-client-123/ingest",
177+
Telemetry: "orgs/test-org/agents/test-client-123/telemetry",
176178
},
177179
},
178180
{
@@ -184,6 +186,8 @@ func GestGenerateTopicsFromTemplate(t *testing.T) {
184186
Capabilities: "orgs/prod-company/agents/test-agent-123/capabilities",
185187
Inbox: "orgs/prod-company/agents/test-agent-123/inbox",
186188
Outbox: "orgs/prod-company/agents/test-agent-123/outbox",
189+
Ingest: "orgs/prod-company/agents/test-agent-123/ingest",
190+
Telemetry: "orgs/prod-company/agents/test-agent-123/telemetry",
187191
},
188192
},
189193
}

agent/configmgr/fleet/topics.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const (
1818
inboxTemplate = "orgs/{org_id}/agents/{agent_id}/inbox"
1919
outboxTemplate = "orgs/{org_id}/agents/{agent_id}/outbox"
2020
ingestTemplate = "orgs/{org_id}/agents/{agent_id}/ingest"
21+
telemetryTemplate = "orgs/{org_id}/agents/{agent_id}/telemetry"
2122

2223
groupsTemplate = "orgs/{org_id}/groups/{group_id}"
2324
)
@@ -29,6 +30,7 @@ type TokenResponseTopics struct {
2930
Inbox string `json:"inbox"`
3031
Outbox string `json:"outbox"`
3132
Ingest string `json:"ingest"`
33+
Telemetry string `json:"telemetry"`
3234
}
3335

3436
// GenerateTopicsFromTemplate creates actual topic names from templates using JWT claims and config agent_id
@@ -39,6 +41,7 @@ func GenerateTopicsFromTemplate(jwtClaims *JWTClaims) (*TokenResponseTopics, err
3941
Inbox: fillTopicTemplate(inboxTemplate, jwtClaims),
4042
Outbox: fillTopicTemplate(outboxTemplate, jwtClaims),
4143
Ingest: fillTopicTemplate(ingestTemplate, jwtClaims),
44+
Telemetry: fillTopicTemplate(telemetryTemplate, jwtClaims),
4245
}, nil
4346
}
4447

agent/configmgr/fleet/topics_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,15 @@ func TestGenerateTopicsFromTemplate_IncludesIngest(t *testing.T) {
1313
t.Fatalf("expected ingest topic %q, got %q", expected, topics.Ingest)
1414
}
1515
}
16+
17+
func TestGenerateTopicsFromTemplate_IncludesTelemetry(t *testing.T) {
18+
claims := &JWTClaims{OrgID: "org-123", AgentID: "agent-abc"}
19+
topics, err := GenerateTopicsFromTemplate(claims)
20+
if err != nil {
21+
t.Fatalf("unexpected error: %v", err)
22+
}
23+
expected := "orgs/org-123/agents/agent-abc/telemetry"
24+
if topics.Telemetry != expected {
25+
t.Fatalf("expected telemetry topic %q, got %q", expected, topics.Telemetry)
26+
}
27+
}

0 commit comments

Comments
 (0)