Skip to content

Commit 57ffd1b

Browse files
authored
calculated streams are always updated in channel definitions and always have stream aggregates (#219)
1 parent b46caf8 commit 57ffd1b

File tree

2 files changed

+163
-18
lines changed

2 files changed

+163
-18
lines changed

llo/stream_calculated.go

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -547,23 +547,6 @@ func (p *Plugin) ProcessCalculatedStreams(outcome *Outcome) {
547547

548548
var err error
549549
env := NewEnv(outcome)
550-
for _, stream := range cd.Streams {
551-
if stream.Aggregator == llotypes.AggregatorCalculated {
552-
continue
553-
}
554-
555-
p.Logger.Debugw("setting stream value", "channelID", cid, "streamID", stream.StreamID, "aggregator", stream.Aggregator)
556-
557-
if err = env.SetStreamValue(stream.StreamID, outcome.StreamAggregates[stream.StreamID][stream.Aggregator]); err != nil {
558-
p.Logger.Errorw("failed to set stream value", "channelID", cid, "error", err, "streamID", stream.StreamID, "aggregator", stream.Aggregator)
559-
env.release()
560-
break
561-
}
562-
}
563-
564-
if err != nil {
565-
continue
566-
}
567550

568551
abiEntries, abiErr := p.getCalculatedStreamABI(cid, cd)
569552
if abiErr != nil {
@@ -580,10 +563,33 @@ func (p *Plugin) ProcessCalculatedStreams(outcome *Outcome) {
580563
StreamID: abi.ExpressionStreamID,
581564
Aggregator: llotypes.AggregatorCalculated,
582565
})
566+
567+
// update the outcome with the new stream aggregate
568+
outcome.StreamAggregates[abi.ExpressionStreamID] = map[llotypes.Aggregator]StreamValue{
569+
llotypes.AggregatorCalculated: nil,
570+
}
583571
}
584572
outcome.ChannelDefinitions[cid] = cd
585573
}
586574

575+
for _, stream := range cd.Streams {
576+
if stream.Aggregator == llotypes.AggregatorCalculated {
577+
continue
578+
}
579+
580+
p.Logger.Debugw("setting stream value", "channelID", cid, "streamID", stream.StreamID, "aggregator", stream.Aggregator)
581+
582+
if err = env.SetStreamValue(stream.StreamID, outcome.StreamAggregates[stream.StreamID][stream.Aggregator]); err != nil {
583+
p.Logger.Errorw("failed to set stream value", "channelID", cid, "error", err, "streamID", stream.StreamID, "aggregator", stream.Aggregator)
584+
env.release()
585+
break
586+
}
587+
}
588+
589+
if err != nil {
590+
continue
591+
}
592+
587593
if err := p.evalCalculatedExpression(abiEntries, cid, env, outcome); err != nil {
588594
p.Logger.Errorw("failed to process expression", "channelID", cid, "error", err)
589595
}
@@ -634,7 +640,12 @@ func (p *Plugin) evalCalculatedExpression(abiEntries []CalculatedStreamABI, cid
634640
cid, abi.ExpressionStreamID)
635641
}
636642

637-
if len(outcome.StreamAggregates[abi.ExpressionStreamID]) > 0 {
643+
// Prevent overwriting an existing, already-computed calculated aggregate.
644+
// Note: `ProcessCalculatedStreams` intentionally may pre-create a placeholder like:
645+
// outcome.StreamAggregates[exprID][AggregatorCalculated] = nil
646+
// so that failures still leave a nil value behind. That placeholder should NOT
647+
// block evaluation.
648+
if value := outcome.StreamAggregates[abi.ExpressionStreamID][llotypes.AggregatorCalculated]; value != nil {
638649
return fmt.Errorf(
639650
"calculated stream aggregate ID already exists, channelID: %d, expressionStreamID: %d, expression: %s",
640651
cid, abi.ExpressionStreamID, abi.Expression)

llo/stream_calculated_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,6 +1711,7 @@ func TestProcessStreamCalculated(t *testing.T) {
17111711
Opts: []byte(`{"abi":[{"type":"int256","expression":"Sum(s1)","expressionStreamID":2}]}`),
17121712
},
17131713
},
1714+
StreamAggregates: StreamAggregates{},
17141715
},
17151716
},
17161717
{
@@ -1725,6 +1726,7 @@ func TestProcessStreamCalculated(t *testing.T) {
17251726
Opts: []byte(`{"abi":[{"type":"int256","expression":"","expressionStreamID":2}]}`),
17261727
},
17271728
},
1729+
StreamAggregates: StreamAggregates{},
17281730
},
17291731
},
17301732
{
@@ -1739,6 +1741,7 @@ func TestProcessStreamCalculated(t *testing.T) {
17391741
Opts: []byte(`{"abi":[{"type":"int256","expression":"s1","expressionStreamID":0}]}`),
17401742
},
17411743
},
1744+
StreamAggregates: StreamAggregates{},
17421745
},
17431746
},
17441747
}
@@ -1946,3 +1949,134 @@ func TestProcessCalculatedStreamsDryRun(t *testing.T) {
19461949
})
19471950
}
19481951
}
1952+
1953+
func TestProcessCalculatedStreams_AppendsExpressionStreamIDs(t *testing.T) {
1954+
lggr, err := logger.New()
1955+
require.NoError(t, err)
1956+
1957+
codec := mockCalculatedStreamCodec{}
1958+
cache := NewChannelDefinitionOptsCache()
1959+
1960+
p := &Plugin{
1961+
Logger: lggr,
1962+
ReportCodecs: map[llotypes.ReportFormat]ReportCodec{
1963+
llotypes.ReportFormatEVMABIEncodeUnpackedExpr: codec,
1964+
},
1965+
ChannelDefinitionOptsCache: cache,
1966+
}
1967+
1968+
assertHasCalculatedStream := func(t *testing.T, cd llotypes.ChannelDefinition, expressionStreamID llotypes.StreamID) {
1969+
t.Helper()
1970+
for _, s := range cd.Streams {
1971+
if s.StreamID == expressionStreamID && s.Aggregator == llotypes.AggregatorCalculated {
1972+
return
1973+
}
1974+
}
1975+
t.Fatalf("expected channel definition streams to include calculated streamID=%d aggregator=%v; got streams=%v",
1976+
expressionStreamID, llotypes.AggregatorCalculated, cd.Streams)
1977+
}
1978+
1979+
assertHasNilCalculatedAggregate := func(t *testing.T, outcome Outcome, expressionStreamID llotypes.StreamID) {
1980+
t.Helper()
1981+
require.Contains(t, outcome.StreamAggregates, expressionStreamID, "expected StreamAggregates to contain streamID=%d", expressionStreamID)
1982+
aggs := outcome.StreamAggregates[expressionStreamID]
1983+
v, ok := aggs[llotypes.AggregatorCalculated]
1984+
require.True(t, ok, "expected StreamAggregates[%d] to contain AggregatorCalculated", expressionStreamID)
1985+
assert.Nil(t, v, "expected nil calculated aggregate for streamID=%d", expressionStreamID)
1986+
}
1987+
1988+
t.Run("env population failure still appends expressionStreamID streams", func(t *testing.T) {
1989+
outcome := Outcome{
1990+
ObservationTimestampNanoseconds: 1750169759775700000,
1991+
ChannelDefinitions: llotypes.ChannelDefinitions{
1992+
1: {
1993+
ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpackedExpr,
1994+
Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}},
1995+
Opts: []byte(`{"abi":[` +
1996+
`{"type":"int256","expression":"Sum(s1, 1)","expressionStreamID":2},` +
1997+
`{"type":"int256","expression":"Sum(s1, 2)","expressionStreamID":3}` +
1998+
`]}`),
1999+
},
2000+
},
2001+
StreamAggregates: StreamAggregates{
2002+
// Intentionally omit stream 1 aggregate so env.SetStreamValue fails with "stream value is nil"
2003+
},
2004+
}
2005+
2006+
populateCache(t, cache, 1, outcome.ChannelDefinitions[1], p.ReportCodecs)
2007+
p.ProcessCalculatedStreams(&outcome)
2008+
2009+
cd := outcome.ChannelDefinitions[1]
2010+
assertHasCalculatedStream(t, cd, 2)
2011+
assertHasCalculatedStream(t, cd, 3)
2012+
assertHasNilCalculatedAggregate(t, outcome, 2)
2013+
assertHasNilCalculatedAggregate(t, outcome, 3)
2014+
})
2015+
2016+
t.Run("expression evaluation failure still appends expressionStreamID streams", func(t *testing.T) {
2017+
outcome := Outcome{
2018+
ObservationTimestampNanoseconds: 1750169759775700000,
2019+
ChannelDefinitions: llotypes.ChannelDefinitions{
2020+
1: {
2021+
ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpackedExpr,
2022+
Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}},
2023+
Opts: []byte(`{"abi":[` +
2024+
`{"type":"int256","expression":"NonExistentFunc(s1)","expressionStreamID":2},` +
2025+
`{"type":"int256","expression":"NonExistentFunc(s1)","expressionStreamID":3}` +
2026+
`]}`),
2027+
},
2028+
},
2029+
StreamAggregates: StreamAggregates{
2030+
1: {llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(123))},
2031+
},
2032+
}
2033+
2034+
populateCache(t, cache, 1, outcome.ChannelDefinitions[1], p.ReportCodecs)
2035+
p.ProcessCalculatedStreams(&outcome)
2036+
2037+
cd := outcome.ChannelDefinitions[1]
2038+
assertHasCalculatedStream(t, cd, 2)
2039+
assertHasCalculatedStream(t, cd, 3)
2040+
2041+
// And since eval failed, we should not have calculated values set.
2042+
assertHasNilCalculatedAggregate(t, outcome, 2)
2043+
assertHasNilCalculatedAggregate(t, outcome, 3)
2044+
})
2045+
2046+
t.Run("success appends expressionStreamID streams and sets calculated aggregates", func(t *testing.T) {
2047+
outcome := Outcome{
2048+
ObservationTimestampNanoseconds: 1750169759775700000,
2049+
ChannelDefinitions: llotypes.ChannelDefinitions{
2050+
1: {
2051+
ReportFormat: llotypes.ReportFormatEVMABIEncodeUnpackedExpr,
2052+
Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}},
2053+
Opts: []byte(`{"abi":[` +
2054+
`{"type":"int256","expression":"Sum(s1, 1)","expressionStreamID":2},` +
2055+
`{"type":"int256","expression":"Sum(s1, 2)","expressionStreamID":3}` +
2056+
`]}`),
2057+
},
2058+
},
2059+
StreamAggregates: StreamAggregates{
2060+
1: {llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(10))},
2061+
},
2062+
}
2063+
2064+
populateCache(t, cache, 1, outcome.ChannelDefinitions[1], p.ReportCodecs)
2065+
p.ProcessCalculatedStreams(&outcome)
2066+
2067+
cd := outcome.ChannelDefinitions[1]
2068+
assertHasCalculatedStream(t, cd, 2)
2069+
assertHasCalculatedStream(t, cd, 3)
2070+
2071+
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(2))
2072+
require.Contains(t, outcome.StreamAggregates, llotypes.StreamID(3))
2073+
2074+
got2 := outcome.StreamAggregates[2][llotypes.AggregatorCalculated]
2075+
got3 := outcome.StreamAggregates[3][llotypes.AggregatorCalculated]
2076+
require.NotNil(t, got2)
2077+
require.NotNil(t, got3)
2078+
2079+
assert.Equal(t, ToDecimal(decimal.NewFromInt(11)), got2)
2080+
assert.Equal(t, ToDecimal(decimal.NewFromInt(12)), got3)
2081+
})
2082+
}

0 commit comments

Comments
 (0)