Skip to content

Commit 8cfb974

Browse files
authored
llo: only preallocate StreamAggregates, not empty values to avoid unused outcome space (#221)
1 parent 57ffd1b commit 8cfb974

File tree

2 files changed

+22
-19
lines changed

2 files changed

+22
-19
lines changed

llo/stream_calculated.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -563,15 +563,15 @@ func (p *Plugin) ProcessCalculatedStreams(outcome *Outcome) {
563563
StreamID: abi.ExpressionStreamID,
564564
Aggregator: llotypes.AggregatorCalculated,
565565
})
566-
567-
// update the outcome with the new stream aggregate
568-
outcome.StreamAggregates[abi.ExpressionStreamID] = map[llotypes.Aggregator]StreamValue{
569-
llotypes.AggregatorCalculated: nil,
570-
}
571566
}
572567
outcome.ChannelDefinitions[cid] = cd
573568
}
574569

570+
for _, abi := range abiEntries {
571+
// update the outcome with the new stream aggregate
572+
outcome.StreamAggregates[abi.ExpressionStreamID] = map[llotypes.Aggregator]StreamValue{}
573+
}
574+
575575
for _, stream := range cd.Streams {
576576
if stream.Aggregator == llotypes.AggregatorCalculated {
577577
continue

llo/stream_calculated_test.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1976,15 +1976,6 @@ func TestProcessCalculatedStreams_AppendsExpressionStreamIDs(t *testing.T) {
19761976
expressionStreamID, llotypes.AggregatorCalculated, cd.Streams)
19771977
}
19781978

1979-
assertHasNilCalculatedAggregate := func(t *testing.T, outcome Outcome, expressionStreamID llotypes.StreamID) {
1980-
t.Helper()
1981-
require.Contains(t, outcome.StreamAggregates, expressionStreamID, "expected StreamAggregates to contain streamID=%d", expressionStreamID)
1982-
aggs := outcome.StreamAggregates[expressionStreamID]
1983-
v, ok := aggs[llotypes.AggregatorCalculated]
1984-
require.True(t, ok, "expected StreamAggregates[%d] to contain AggregatorCalculated", expressionStreamID)
1985-
assert.Nil(t, v, "expected nil calculated aggregate for streamID=%d", expressionStreamID)
1986-
}
1987-
19881979
t.Run("env population failure still appends expressionStreamID streams", func(t *testing.T) {
19891980
outcome := Outcome{
19901981
ObservationTimestampNanoseconds: 1750169759775700000,
@@ -2009,8 +2000,15 @@ func TestProcessCalculatedStreams_AppendsExpressionStreamIDs(t *testing.T) {
20092000
cd := outcome.ChannelDefinitions[1]
20102001
assertHasCalculatedStream(t, cd, 2)
20112002
assertHasCalculatedStream(t, cd, 3)
2012-
assertHasNilCalculatedAggregate(t, outcome, 2)
2013-
assertHasNilCalculatedAggregate(t, outcome, 3)
2003+
2004+
// The implementation pre-creates map entries, but since env population failed, the values should be nil.
2005+
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(2), "map entry should be pre-created")
2006+
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(3), "map entry should be pre-created")
2007+
2008+
got2 := outcome.StreamAggregates[2][llotypes.AggregatorCalculated]
2009+
got3 := outcome.StreamAggregates[3][llotypes.AggregatorCalculated]
2010+
assert.Nil(t, got2, "expected nil calculated aggregate for streamID=2 when env population fails")
2011+
assert.Nil(t, got3, "expected nil calculated aggregate for streamID=3 when env population fails")
20142012
})
20152013

20162014
t.Run("expression evaluation failure still appends expressionStreamID streams", func(t *testing.T) {
@@ -2038,9 +2036,14 @@ func TestProcessCalculatedStreams_AppendsExpressionStreamIDs(t *testing.T) {
20382036
assertHasCalculatedStream(t, cd, 2)
20392037
assertHasCalculatedStream(t, cd, 3)
20402038

2041-
// And since eval failed, we should not have calculated values set.
2042-
assertHasNilCalculatedAggregate(t, outcome, 2)
2043-
assertHasNilCalculatedAggregate(t, outcome, 3)
2039+
// The implementation pre-creates map entries, but since eval failed, the values should be nil.
2040+
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(2), "map entry should be pre-created")
2041+
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(3), "map entry should be pre-created")
2042+
2043+
got2 := outcome.StreamAggregates[2][llotypes.AggregatorCalculated]
2044+
got3 := outcome.StreamAggregates[3][llotypes.AggregatorCalculated]
2045+
assert.Nil(t, got2, "expected nil calculated aggregate for streamID=2 when expression evaluation fails")
2046+
assert.Nil(t, got3, "expected nil calculated aggregate for streamID=3 when expression evaluation fails")
20442047
})
20452048

20462049
t.Run("success appends expressionStreamID streams and sets calculated aggregates", func(t *testing.T) {

0 commit comments

Comments
 (0)