Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions llo/stream_calculated.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,15 +563,15 @@ func (p *Plugin) ProcessCalculatedStreams(outcome *Outcome) {
StreamID: abi.ExpressionStreamID,
Aggregator: llotypes.AggregatorCalculated,
})

// update the outcome with the new stream aggregate
outcome.StreamAggregates[abi.ExpressionStreamID] = map[llotypes.Aggregator]StreamValue{
llotypes.AggregatorCalculated: nil,
}
}
outcome.ChannelDefinitions[cid] = cd
}

for _, abi := range abiEntries {
// update the outcome with the new stream aggregate
outcome.StreamAggregates[abi.ExpressionStreamID] = map[llotypes.Aggregator]StreamValue{}
}

for _, stream := range cd.Streams {
if stream.Aggregator == llotypes.AggregatorCalculated {
continue
Expand Down
31 changes: 17 additions & 14 deletions llo/stream_calculated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1976,15 +1976,6 @@ func TestProcessCalculatedStreams_AppendsExpressionStreamIDs(t *testing.T) {
expressionStreamID, llotypes.AggregatorCalculated, cd.Streams)
}

assertHasNilCalculatedAggregate := func(t *testing.T, outcome Outcome, expressionStreamID llotypes.StreamID) {
t.Helper()
require.Contains(t, outcome.StreamAggregates, expressionStreamID, "expected StreamAggregates to contain streamID=%d", expressionStreamID)
aggs := outcome.StreamAggregates[expressionStreamID]
v, ok := aggs[llotypes.AggregatorCalculated]
require.True(t, ok, "expected StreamAggregates[%d] to contain AggregatorCalculated", expressionStreamID)
assert.Nil(t, v, "expected nil calculated aggregate for streamID=%d", expressionStreamID)
}

t.Run("env population failure still appends expressionStreamID streams", func(t *testing.T) {
outcome := Outcome{
ObservationTimestampNanoseconds: 1750169759775700000,
Expand All @@ -2009,8 +2000,15 @@ func TestProcessCalculatedStreams_AppendsExpressionStreamIDs(t *testing.T) {
cd := outcome.ChannelDefinitions[1]
assertHasCalculatedStream(t, cd, 2)
assertHasCalculatedStream(t, cd, 3)
assertHasNilCalculatedAggregate(t, outcome, 2)
assertHasNilCalculatedAggregate(t, outcome, 3)

// The implementation pre-creates map entries, but since env population failed, the values should be nil.
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(2), "map entry should be pre-created")
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(3), "map entry should be pre-created")

got2 := outcome.StreamAggregates[2][llotypes.AggregatorCalculated]
got3 := outcome.StreamAggregates[3][llotypes.AggregatorCalculated]
assert.Nil(t, got2, "expected nil calculated aggregate for streamID=2 when env population fails")
assert.Nil(t, got3, "expected nil calculated aggregate for streamID=3 when env population fails")
})

t.Run("expression evaluation failure still appends expressionStreamID streams", func(t *testing.T) {
Expand Down Expand Up @@ -2038,9 +2036,14 @@ func TestProcessCalculatedStreams_AppendsExpressionStreamIDs(t *testing.T) {
assertHasCalculatedStream(t, cd, 2)
assertHasCalculatedStream(t, cd, 3)

// And since eval failed, we should not have calculated values set.
assertHasNilCalculatedAggregate(t, outcome, 2)
assertHasNilCalculatedAggregate(t, outcome, 3)
// The implementation pre-creates map entries, but since eval failed, the values should be nil.
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(2), "map entry should be pre-created")
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(3), "map entry should be pre-created")

got2 := outcome.StreamAggregates[2][llotypes.AggregatorCalculated]
got3 := outcome.StreamAggregates[3][llotypes.AggregatorCalculated]
assert.Nil(t, got2, "expected nil calculated aggregate for streamID=2 when expression evaluation fails")
assert.Nil(t, got3, "expected nil calculated aggregate for streamID=3 when expression evaluation fails")
})

t.Run("success appends expressionStreamID streams and sets calculated aggregates", func(t *testing.T) {
Expand Down
Loading