Skip to content

Commit 817658d

Browse files
committed
llo: only preallocate StreamAggregates, not empty values to avoid unused outcome space (#221)
calculated streams are always updated in channel definitions and always have stream aggregates (#219)
1 parent bcdf1d7 commit 817658d

File tree

4 files changed

+162
-24
lines changed

4 files changed

+162
-24
lines changed

llo/plugin_outcome_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,8 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) {
174174
}
175175

176176
// Verify channel is reportable before tombstoning
177-
require.Nil(t, previousOutcome.IsReportable(channelID, 1, uint64(100*time.Millisecond), p.ReportCodecs, p.ChannelDefinitionOptsCache))
178-
reportable, _ := previousOutcome.ReportableChannels(1, uint64(100*time.Millisecond), p.ReportCodecs, p.ChannelDefinitionOptsCache)
177+
require.Nil(t, previousOutcome.IsReportable(channelID, 1, uint64(100*time.Millisecond)))
178+
reportable, _ := previousOutcome.ReportableChannels(1, uint64(100*time.Millisecond))
179179
assert.Contains(t, reportable, channelID)
180180

181181
// Encode previous outcome
@@ -215,12 +215,12 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) {
215215
assert.Equal(t, tombstonedCd, decoded.ChannelDefinitions[channelID])
216216

217217
// Verify channel is no longer reportable
218-
err = decoded.IsReportable(channelID, 1, uint64(100*time.Millisecond), p.ReportCodecs, p.ChannelDefinitionOptsCache)
218+
err = decoded.IsReportable(channelID, 1, uint64(100*time.Millisecond))
219219
require.NotNil(t, err)
220220
assert.Contains(t, err.Error(), "tombstone channel")
221221

222222
// Verify ReportableChannels excludes the tombstoned channel
223-
reportable, unreportable := decoded.ReportableChannels(1, uint64(100*time.Millisecond), p.ReportCodecs, p.ChannelDefinitionOptsCache)
223+
reportable, unreportable := decoded.ReportableChannels(1, uint64(100*time.Millisecond))
224224
assert.NotContains(t, reportable, channelID, "Tombstoned channel should not be in reportable list")
225225
require.Len(t, unreportable, 1)
226226
assert.Equal(t, channelID, unreportable[0].ChannelID)

llo/plugin_reports_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,12 +199,12 @@ func testReports(t *testing.T, outcomeCodec OutcomeCodec) {
199199
}
200200

201201
// Verify tombstoned channel is not reportable
202-
unreportableErr := outcome.IsReportable(1, protocolVersion, uint64(minReportInterval), p.ReportCodecs, p.ChannelDefinitionOptsCache)
202+
unreportableErr := outcome.IsReportable(1, protocolVersion, uint64(minReportInterval))
203203
require.NotNil(t, unreportableErr)
204204
assert.Contains(t, unreportableErr.Error(), "tombstone channel")
205205

206206
// Verify non-tombstoned channel is reportable
207-
require.Nil(t, outcome.IsReportable(2, protocolVersion, uint64(minReportInterval), p.ReportCodecs, p.ChannelDefinitionOptsCache))
207+
require.Nil(t, outcome.IsReportable(2, protocolVersion, uint64(minReportInterval)))
208208

209209
encoded, err := p.OutcomeCodec.Encode(outcome)
210210
require.NoError(t, err)

llo/stream_calculated.go

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -549,23 +549,6 @@ func (p *Plugin) ProcessCalculatedStreams(outcome *Outcome) {
549549

550550
var err error
551551
env := NewEnv(outcome)
552-
for _, stream := range cd.Streams {
553-
if stream.Aggregator == llotypes.AggregatorCalculated {
554-
continue
555-
}
556-
557-
p.Logger.Debugw("setting stream value", "channelID", cid, "streamID", stream.StreamID, "aggregator", stream.Aggregator)
558-
559-
if err = env.SetStreamValue(stream.StreamID, outcome.StreamAggregates[stream.StreamID][stream.Aggregator]); err != nil {
560-
p.Logger.Errorw("failed to set stream value", "channelID", cid, "error", err, "streamID", stream.StreamID, "aggregator", stream.Aggregator)
561-
env.release()
562-
break
563-
}
564-
}
565-
566-
if err != nil {
567-
continue
568-
}
569552

570553
// TODO: we can potentially cache the opts for each channel definition
571554
// and avoid unmarshalling the options on outcome.
@@ -597,6 +580,29 @@ func (p *Plugin) ProcessCalculatedStreams(outcome *Outcome) {
597580
outcome.ChannelDefinitions[cid] = cd
598581
}
599582

583+
for _, abi := range copt.ABI {
584+
// update the outcome with the new stream aggregate
585+
outcome.StreamAggregates[abi.ExpressionStreamID] = map[llotypes.Aggregator]StreamValue{}
586+
}
587+
588+
for _, stream := range cd.Streams {
589+
if stream.Aggregator == llotypes.AggregatorCalculated {
590+
continue
591+
}
592+
593+
p.Logger.Debugw("setting stream value", "channelID", cid, "streamID", stream.StreamID, "aggregator", stream.Aggregator)
594+
595+
if err = env.SetStreamValue(stream.StreamID, outcome.StreamAggregates[stream.StreamID][stream.Aggregator]); err != nil {
596+
p.Logger.Errorw("failed to set stream value", "channelID", cid, "error", err, "streamID", stream.StreamID, "aggregator", stream.Aggregator)
597+
env.release()
598+
break
599+
}
600+
}
601+
602+
if err != nil {
603+
continue
604+
}
605+
600606
if err := p.evalExpression(&copt, cid, env, outcome); err != nil {
601607
p.Logger.Errorw("failed to process expression", "channelID", cid, "error", err)
602608
}
@@ -617,7 +623,12 @@ func (p *Plugin) evalExpression(o *opts, cid llotypes.ChannelID, env environment
617623
cid, abi.ExpressionStreamID)
618624
}
619625

620-
if len(outcome.StreamAggregates[abi.ExpressionStreamID]) > 0 {
626+
// Prevent overwriting an existing, already-computed calculated aggregate.
627+
// Note: `ProcessCalculatedStreams` intentionally may pre-create a placeholder like:
628+
// outcome.StreamAggregates[exprID][AggregatorCalculated] = nil
629+
// so that failures still leave a nil value behind. That placeholder should NOT
630+
// block evaluation.
631+
if value := outcome.StreamAggregates[abi.ExpressionStreamID][llotypes.AggregatorCalculated]; value != nil {
621632
return fmt.Errorf(
622633
"calculated stream aggregate ID already exists, channelID: %d, expressionStreamID: %d, expression: %s",
623634
cid, abi.ExpressionStreamID, abi.Expression)

llo/stream_calculated_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1680,6 +1680,7 @@ func TestProcessStreamCalculated(t *testing.T) {
16801680
Opts: []byte(`{"abi":[{"type":"int256","expression":"Sum(s1)","expressionStreamID":2}]}`),
16811681
},
16821682
},
1683+
StreamAggregates: StreamAggregates{},
16831684
},
16841685
},
16851686
{
@@ -1694,6 +1695,7 @@ func TestProcessStreamCalculated(t *testing.T) {
16941695
Opts: []byte(`{"abi":[{"type":"int256","expression":"","expressionStreamID":2}]}`),
16951696
},
16961697
},
1698+
StreamAggregates: StreamAggregates{},
16971699
},
16981700
},
16991701
{
@@ -1708,6 +1710,7 @@ func TestProcessStreamCalculated(t *testing.T) {
17081710
Opts: []byte(`{"abi":[{"type":"int256","expression":"s1","expressionStreamID":0}]}`),
17091711
},
17101712
},
1713+
StreamAggregates: StreamAggregates{},
17111714
},
17121715
},
17131716
{
@@ -1837,3 +1840,127 @@ func TestProcessCalculatedStreamsDryRun(t *testing.T) {
18371840
})
18381841
}
18391842
}
1843+
1844+
func TestProcessCalculatedStreams_AppendsExpressionStreamIDs(t *testing.T) {
1845+
lggr, err := logger.New()
1846+
require.NoError(t, err)
1847+
1848+
p := &Plugin{
1849+
Logger: lggr,
1850+
}
1851+
1852+
assertHasCalculatedStream := func(t *testing.T, cd llotypes.ChannelDefinition, expressionStreamID llotypes.StreamID) {
1853+
t.Helper()
1854+
for _, s := range cd.Streams {
1855+
if s.StreamID == expressionStreamID && s.Aggregator == llotypes.AggregatorCalculated {
1856+
return
1857+
}
1858+
}
1859+
t.Fatalf("expected channel definition streams to include calculated streamID=%d aggregator=%v; got streams=%v",
1860+
expressionStreamID, llotypes.AggregatorCalculated, cd.Streams)
1861+
}
1862+
1863+
t.Run("env population failure still appends expressionStreamID streams", func(t *testing.T) {
1864+
outcome := Outcome{
1865+
ObservationTimestampNanoseconds: 1750169759775700000,
1866+
ChannelDefinitions: llotypes.ChannelDefinitions{
1867+
1: {
1868+
ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpackedExpr,
1869+
Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}},
1870+
Opts: []byte(`{"abi":[` +
1871+
`{"type":"int256","expression":"Sum(s1, 1)","expressionStreamID":2},` +
1872+
`{"type":"int256","expression":"Sum(s1, 2)","expressionStreamID":3}` +
1873+
`]}`),
1874+
},
1875+
},
1876+
StreamAggregates: StreamAggregates{
1877+
// Intentionally omit stream 1 aggregate so env.SetStreamValue fails with "stream value is nil"
1878+
},
1879+
}
1880+
1881+
p.ProcessCalculatedStreams(&outcome)
1882+
1883+
cd := outcome.ChannelDefinitions[1]
1884+
assertHasCalculatedStream(t, cd, 2)
1885+
assertHasCalculatedStream(t, cd, 3)
1886+
1887+
// The implementation pre-creates map entries, but since env population failed, the values should be nil.
1888+
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(2), "map entry should be pre-created")
1889+
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(3), "map entry should be pre-created")
1890+
1891+
got2 := outcome.StreamAggregates[2][llotypes.AggregatorCalculated]
1892+
got3 := outcome.StreamAggregates[3][llotypes.AggregatorCalculated]
1893+
assert.Nil(t, got2, "expected nil calculated aggregate for streamID=2 when env population fails")
1894+
assert.Nil(t, got3, "expected nil calculated aggregate for streamID=3 when env population fails")
1895+
})
1896+
1897+
t.Run("expression evaluation failure still appends expressionStreamID streams", func(t *testing.T) {
1898+
outcome := Outcome{
1899+
ObservationTimestampNanoseconds: 1750169759775700000,
1900+
ChannelDefinitions: llotypes.ChannelDefinitions{
1901+
1: {
1902+
ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpackedExpr,
1903+
Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}},
1904+
Opts: []byte(`{"abi":[` +
1905+
`{"type":"int256","expression":"NonExistentFunc(s1)","expressionStreamID":2},` +
1906+
`{"type":"int256","expression":"NonExistentFunc(s1)","expressionStreamID":3}` +
1907+
`]}`),
1908+
},
1909+
},
1910+
StreamAggregates: StreamAggregates{
1911+
1: {llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(123))},
1912+
},
1913+
}
1914+
1915+
p.ProcessCalculatedStreams(&outcome)
1916+
1917+
cd := outcome.ChannelDefinitions[1]
1918+
assertHasCalculatedStream(t, cd, 2)
1919+
assertHasCalculatedStream(t, cd, 3)
1920+
1921+
// The implementation pre-creates map entries, but since eval failed, the values should be nil.
1922+
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(2), "map entry should be pre-created")
1923+
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(3), "map entry should be pre-created")
1924+
1925+
got2 := outcome.StreamAggregates[2][llotypes.AggregatorCalculated]
1926+
got3 := outcome.StreamAggregates[3][llotypes.AggregatorCalculated]
1927+
assert.Nil(t, got2, "expected nil calculated aggregate for streamID=2 when expression evaluation fails")
1928+
assert.Nil(t, got3, "expected nil calculated aggregate for streamID=3 when expression evaluation fails")
1929+
})
1930+
1931+
t.Run("success appends expressionStreamID streams and sets calculated aggregates", func(t *testing.T) {
1932+
outcome := Outcome{
1933+
ObservationTimestampNanoseconds: 1750169759775700000,
1934+
ChannelDefinitions: llotypes.ChannelDefinitions{
1935+
1: {
1936+
ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpackedExpr,
1937+
Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}},
1938+
Opts: []byte(`{"abi":[` +
1939+
`{"type":"int256","expression":"Sum(s1, 1)","expressionStreamID":2},` +
1940+
`{"type":"int256","expression":"Sum(s1, 2)","expressionStreamID":3}` +
1941+
`]}`),
1942+
},
1943+
},
1944+
StreamAggregates: StreamAggregates{
1945+
1: {llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(10))},
1946+
},
1947+
}
1948+
1949+
p.ProcessCalculatedStreams(&outcome)
1950+
1951+
cd := outcome.ChannelDefinitions[1]
1952+
assertHasCalculatedStream(t, cd, 2)
1953+
assertHasCalculatedStream(t, cd, 3)
1954+
1955+
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(2))
1956+
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(3))
1957+
1958+
got2 := outcome.StreamAggregates[2][llotypes.AggregatorCalculated]
1959+
got3 := outcome.StreamAggregates[3][llotypes.AggregatorCalculated]
1960+
require.NotNil(t, got2)
1961+
require.NotNil(t, got3)
1962+
1963+
assert.Equal(t, ToDecimal(decimal.NewFromInt(11)), got2)
1964+
assert.Equal(t, ToDecimal(decimal.NewFromInt(12)), got3)
1965+
})
1966+
}

0 commit comments

Comments
 (0)