Skip to content

Commit 0b2194d

Browse files
Fix non-otel component status being incorrectly deleted (#8435) (#8582)
* Fix non-otel component status being incorrectly deleted Otel component status processing needs to emit fake STOPPED events for components which don't exist anymore. This was incorrectly taking place for components running as beats processes, resulting in them incorrectly reporting a STOPPED state. * Fix incorrect timeout for coordinator test * Add a more detailed comment about the test logic (cherry picked from commit 70f2450) Co-authored-by: Mikołaj Świątek <[email protected]>
1 parent 5875336 commit 0b2194d

File tree

4 files changed

+152
-67
lines changed

4 files changed

+152
-67
lines changed

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -654,49 +654,47 @@ func (c *Coordinator) SetLogLevel(ctx context.Context, lvl *logp.Level) error {
654654
// watchRuntimeComponents listens for state updates from the runtime
655655
// manager, logs them, and forwards them to CoordinatorState.
656656
// Runs in its own goroutine created in Coordinator.Run.
657-
func (c *Coordinator) watchRuntimeComponents(ctx context.Context) {
658-
state := make(map[string]runtime.ComponentState)
657+
func (c *Coordinator) watchRuntimeComponents(
658+
ctx context.Context,
659+
runtimeComponentStates <-chan runtime.ComponentComponentState,
660+
otelStatuses <-chan *status.AggregateStatus,
661+
) {
662+
// We need to track otel component state separately because otel components may not always get a STOPPED status
663+
// If we receive an otel status without the state of a component we're tracking, we need to emit a fake STOPPED
664+
// status for it. Process component states should not be affected by this logic.
665+
processState := make(map[string]runtime.ComponentState)
666+
otelState := make(map[string]runtime.ComponentState)
659667

660-
var subChan <-chan runtime.ComponentComponentState
661-
var otelChan <-chan *status.AggregateStatus
662-
// A real Coordinator will always have a runtime manager, but unit tests
663-
// may not initialize all managers -- in that case we leave subChan nil,
664-
// and just idle until Coordinator shuts down.
665-
if c.runtimeMgr != nil {
666-
subChan = c.runtimeMgr.SubscribeAll(ctx).Ch()
667-
}
668-
if c.otelMgr != nil {
669-
otelChan = c.otelMgr.Watch()
670-
}
671668
for {
672669
select {
673670
case <-ctx.Done():
674671
return
675-
case componentState := <-subChan:
676-
logComponentStateChange(c.logger, state, &componentState)
672+
case componentState := <-runtimeComponentStates:
673+
logComponentStateChange(c.logger, processState, &componentState)
677674
// Forward the final changes back to Coordinator, unless our context
678675
// has ended.
679676
select {
680677
case c.managerChans.runtimeManagerUpdate <- componentState:
681678
case <-ctx.Done():
682679
return
683680
}
684-
case otelStatus := <-otelChan:
681+
case otelStatus := <-otelStatuses:
685682
// We don't break on errors here, because we want to forward the status
686683
// even if there was an error, and the rest of the code gracefully handles componentStates being nil
687684
componentStates, err := translate.GetAllComponentStates(otelStatus, c.componentModel)
688685
if err != nil {
689686
c.setOTelError(err)
690687
}
691-
err = translate.DropComponentStateFromOtelStatus(otelStatus)
688+
finalOtelStatus, err := translate.DropComponentStateFromOtelStatus(otelStatus)
692689
if err != nil {
693690
c.setOTelError(err)
691+
finalOtelStatus = otelStatus
694692
}
695693

696694
// forward the remaining otel status
697695
// TODO: Implement subscriptions for otel manager status to avoid the need for this
698696
select {
699-
case c.managerChans.otelManagerUpdate <- otelStatus:
697+
case c.managerChans.otelManagerUpdate <- finalOtelStatus:
700698
case <-ctx.Done():
701699
return
702700
}
@@ -707,7 +705,7 @@ func (c *Coordinator) watchRuntimeComponents(ctx context.Context) {
707705
for _, componentState := range componentStates {
708706
componentIds[componentState.Component.ID] = true
709707
}
710-
for id := range state {
708+
for id := range otelState {
711709
if _, ok := componentIds[id]; !ok {
712710
// this component is not in the configuration anymore, emit a fake STOPPED state
713711
componentStates = append(componentStates, runtime.ComponentComponentState{
@@ -722,7 +720,7 @@ func (c *Coordinator) watchRuntimeComponents(ctx context.Context) {
722720
}
723721
// now handle the component states
724722
for _, componentState := range componentStates {
725-
logComponentStateChange(c.logger, state, &componentState)
723+
logComponentStateChange(c.logger, otelState, &componentState)
726724
// Forward the final changes back to Coordinator, unless our context
727725
// has ended.
728726
select {
@@ -813,7 +811,19 @@ func (c *Coordinator) Run(ctx context.Context) error {
813811
// log all changes in the state of the runtime and update the coordinator state
814812
watchCtx, watchCanceller := context.WithCancel(ctx)
815813
defer watchCanceller()
816-
go c.watchRuntimeComponents(watchCtx)
814+
815+
var subChan <-chan runtime.ComponentComponentState
816+
var otelChan <-chan *status.AggregateStatus
817+
// A real Coordinator will always have a runtime manager, but unit tests
818+
// may not initialize all managers -- in that case we leave subChan nil,
819+
// and just idle until Coordinator shuts down.
820+
if c.runtimeMgr != nil {
821+
subChan = c.runtimeMgr.SubscribeAll(ctx).Ch()
822+
}
823+
if c.otelMgr != nil {
824+
otelChan = c.otelMgr.Watch()
825+
}
826+
go c.watchRuntimeComponents(watchCtx, subChan, otelChan)
817827

818828
// Close the state broadcaster on finish, but leave it running in the
819829
// background until all subscribers have read the final values or their

internal/pkg/agent/application/coordinator/coordinator_unit_test.go

Lines changed: 84 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,42 +1341,43 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) {
13411341
logger := logp.NewLogger("testing")
13421342

13431343
statusChan := make(chan *status.AggregateStatus)
1344-
otelManager := &fakeOTelManager{
1345-
statusChan: statusChan,
1346-
}
1347-
1348-
componentModel := []component.Component{
1349-
{
1350-
ID: "filestream-default",
1351-
InputType: "filestream",
1352-
OutputType: "elasticsearch",
1353-
RuntimeManager: component.OtelRuntimeManager,
1354-
InputSpec: &component.InputRuntimeSpec{
1355-
BinaryName: "agentbeat",
1356-
Spec: component.InputSpec{
1357-
Command: &component.CommandSpec{
1358-
Args: []string{"filebeat"},
1359-
},
1344+
1345+
runtimeStateChan := make(chan runtime.ComponentComponentState)
1346+
1347+
otelComponent := component.Component{
1348+
ID: "filestream-default",
1349+
InputType: "filestream",
1350+
OutputType: "elasticsearch",
1351+
RuntimeManager: component.OtelRuntimeManager,
1352+
InputSpec: &component.InputRuntimeSpec{
1353+
BinaryName: "agentbeat",
1354+
Spec: component.InputSpec{
1355+
Command: &component.CommandSpec{
1356+
Args: []string{"filebeat"},
13601357
},
13611358
},
1362-
Units: []component.Unit{
1363-
{
1364-
ID: "filestream-unit",
1365-
Type: client.UnitTypeInput,
1366-
Config: &proto.UnitExpectedConfig{
1367-
Streams: []*proto.Stream{
1368-
{Id: "test-1"},
1369-
{Id: "test-2"},
1370-
},
1359+
},
1360+
Units: []component.Unit{
1361+
{
1362+
ID: "filestream-unit",
1363+
Type: client.UnitTypeInput,
1364+
Config: &proto.UnitExpectedConfig{
1365+
Streams: []*proto.Stream{
1366+
{Id: "test-1"},
1367+
{Id: "test-2"},
13711368
},
13721369
},
1373-
{
1374-
ID: "filestream-default",
1375-
Type: client.UnitTypeOutput,
1376-
},
1370+
},
1371+
{
1372+
ID: "filestream-default",
1373+
Type: client.UnitTypeOutput,
13771374
},
13781375
},
13791376
}
1377+
processComponent := otelComponent
1378+
processComponent.RuntimeManager = component.ProcessRuntimeManager
1379+
processComponent.ID = "filestream-process"
1380+
13801381
otelStatus := &status.AggregateStatus{
13811382
Event: componentstatus.NewEvent(componentstatus.StatusOK),
13821383
ComponentStatusMap: map[string]*status.AggregateStatus{
@@ -1411,17 +1412,17 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) {
14111412
otelManagerUpdate: make(chan *status.AggregateStatus),
14121413
runtimeManagerUpdate: make(chan runtime.ComponentComponentState),
14131414
},
1414-
otelMgr: otelManager,
1415-
state: State{},
1416-
componentModel: componentModel,
1415+
state: State{},
14171416
}
14181417

14191418
// start runtime status watching
1420-
go coord.watchRuntimeComponents(ctx)
1419+
go coord.watchRuntimeComponents(ctx, runtimeStateChan, statusChan)
14211420

14221421
// no component status
14231422
assert.Empty(t, coord.state.Components)
14241423

1424+
coord.componentModel = []component.Component{otelComponent}
1425+
14251426
// push the status into the coordinator
14261427
select {
14271428
case statusChan <- otelStatus:
@@ -1446,8 +1447,57 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) {
14461447

14471448
assert.Len(t, coord.state.Components, 1)
14481449

1450+
// Add both a process component and an otel component, in that order. Both should appear in the state.
1451+
coord.componentModel = []component.Component{otelComponent, processComponent}
1452+
1453+
// push the process component state into the coordinator
1454+
select {
1455+
case runtimeStateChan <- runtime.ComponentComponentState{
1456+
Component: processComponent,
1457+
State: runtime.ComponentState{
1458+
State: client.UnitStateHealthy,
1459+
},
1460+
}:
1461+
case <-ctx.Done():
1462+
t.Fatal("timeout waiting for coordinator to receive status")
1463+
}
1464+
1465+
select {
1466+
case componentState := <-coord.managerChans.runtimeManagerUpdate:
1467+
coord.applyComponentState(componentState)
1468+
case <-ctx.Done():
1469+
t.Fatal("timeout waiting for coordinator to receive status")
1470+
}
1471+
1472+
// push the otel status into the coordinator
1473+
select {
1474+
case statusChan <- otelStatus:
1475+
case <-ctx.Done():
1476+
t.Fatal("timeout waiting for coordinator to receive status")
1477+
}
1478+
1479+
select {
1480+
case finalOtelStatus := <-coord.managerChans.otelManagerUpdate:
1481+
// we shouldn't have any status remaining for the otel collector, as the status we've pushed earlier only
1482+
// contains beats receiver status for the "filestream-default" component
1483+
// this status is removed from the otel collector status, because it's reported as component state instead
1484+
assert.Empty(t, finalOtelStatus.ComponentStatusMap)
1485+
case <-ctx.Done():
1486+
t.Fatal("timeout waiting for coordinator to receive status")
1487+
}
1488+
1489+
select {
1490+
case componentState := <-coord.managerChans.runtimeManagerUpdate:
1491+
coord.applyComponentState(componentState)
1492+
case <-ctx.Done():
1493+
t.Fatal("timeout waiting for coordinator to receive status")
1494+
}
1495+
1496+
assert.Len(t, coord.state.Components, 2)
1497+
14491498
// Now, we remove the component and resend the same status. The component state should be deleted.
14501499
coord.componentModel = []component.Component{}
1500+
coord.state = State{}
14511501
select {
14521502
case statusChan <- otelStatus:
14531503
case <-ctx.Done():
@@ -1471,7 +1521,7 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) {
14711521

14721522
assert.Empty(t, coord.state.Components)
14731523

1474-
// Push an invalid status, there should be no component state, but there should be an otel status
1524+
// Push an invalid status, there should be no otel component state, but there should be an otel status
14751525
select {
14761526
case statusChan <- invalidOtelStatus:
14771527
case <-ctx.Done():

internal/pkg/otel/translate/status.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,26 +56,28 @@ func GetAllComponentStates(otelStatus *status.AggregateStatus, components []comp
5656

5757
// DropComponentStateFromOtelStatus removes the statuses of otel pipelines representing runtime components from the
5858
// given status.
59-
func DropComponentStateFromOtelStatus(otelStatus *status.AggregateStatus) error {
59+
func DropComponentStateFromOtelStatus(otelStatus *status.AggregateStatus) (*status.AggregateStatus, error) {
6060
if otelStatus == nil {
61-
return nil
61+
return nil, nil
6262
}
63-
for pipelineStatusId := range otelStatus.ComponentStatusMap {
63+
64+
newStatus := deepCopyStatus(otelStatus)
65+
for pipelineStatusId := range newStatus.ComponentStatusMap {
6466
pipelineId := &pipeline.ID{}
6567
componentKind, pipelineIdStr := parseEntityStatusId(pipelineStatusId)
6668
if componentKind != "pipeline" {
67-
return fmt.Errorf("pipeline status id %s is not a pipeline", pipelineStatusId)
69+
return nil, fmt.Errorf("pipeline status id %s is not a pipeline", pipelineStatusId)
6870
}
6971
err := pipelineId.UnmarshalText([]byte(pipelineIdStr)) // there's no ergonomic way to do this conversion
7072
if err != nil {
71-
return err
73+
return nil, err
7274
}
7375
if strings.HasPrefix(pipelineId.Name(), OtelNamePrefix) {
74-
delete(otelStatus.ComponentStatusMap, pipelineStatusId)
76+
delete(newStatus.ComponentStatusMap, pipelineStatusId)
7577
}
7678
}
7779

78-
return nil
80+
return newStatus, nil
7981
}
8082

8183
// getOtelRuntimePipelineStatuses finds otel pipeline statuses belonging to runtime components and returns them as a map
@@ -273,3 +275,24 @@ func parseEntityStatusId(id string) (kind string, entityId string) {
273275
}
274276
return parts[0], parts[1]
275277
}
278+
279+
// deepCopyStatus makes a deep copy of the status.
280+
func deepCopyStatus(otelStatus *status.AggregateStatus) *status.AggregateStatus {
281+
if otelStatus == nil {
282+
return nil
283+
}
284+
285+
newStatus := &status.AggregateStatus{
286+
Event: otelStatus.Event,
287+
}
288+
if otelStatus.ComponentStatusMap == nil {
289+
return newStatus
290+
}
291+
292+
newStatus.ComponentStatusMap = make(map[string]*status.AggregateStatus, len(otelStatus.ComponentStatusMap))
293+
for k, v := range otelStatus.ComponentStatusMap {
294+
newStatus.ComponentStatusMap[k] = deepCopyStatus(v)
295+
}
296+
297+
return newStatus
298+
}

internal/pkg/otel/translate/status_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,9 @@ func TestGetAllComponentState(t *testing.T) {
180180

181181
func TestDropComponentStateFromOtelStatus(t *testing.T) {
182182
t.Run("empty", func(t *testing.T) {
183-
err := DropComponentStateFromOtelStatus(nil)
183+
s, err := DropComponentStateFromOtelStatus(nil)
184184
require.NoError(t, err)
185+
require.Nil(t, s)
185186
})
186187

187188
t.Run("drop non otel", func(t *testing.T) {
@@ -195,10 +196,10 @@ func TestDropComponentStateFromOtelStatus(t *testing.T) {
195196
},
196197
},
197198
}
198-
err := DropComponentStateFromOtelStatus(otelStatus)
199+
s, err := DropComponentStateFromOtelStatus(otelStatus)
199200
require.NoError(t, err)
200-
assert.Len(t, otelStatus.ComponentStatusMap, 1)
201-
assert.Contains(t, otelStatus.ComponentStatusMap, "pipeline:logs")
201+
assert.Len(t, s.ComponentStatusMap, 1)
202+
assert.Contains(t, s.ComponentStatusMap, "pipeline:logs")
202203
})
203204

204205
t.Run("invalid status", func(t *testing.T) {
@@ -209,8 +210,9 @@ func TestDropComponentStateFromOtelStatus(t *testing.T) {
209210
},
210211
},
211212
}
212-
err := DropComponentStateFromOtelStatus(otelStatus)
213+
s, err := DropComponentStateFromOtelStatus(otelStatus)
213214
require.Error(t, err)
215+
require.Nil(t, s)
214216
assert.Equal(t, "pipeline status id logs is not a pipeline", err.Error())
215217
})
216218
}

0 commit comments

Comments
 (0)