Skip to content

Commit 4f554aa

Browse files
committed
llo/calculated: ensure stream dependencies and aggregates are present
1 parent c69ea27 commit 4f554aa

File tree

4 files changed

+21
-3
lines changed

4 files changed

+21
-3
lines changed

llo/plugin_reports.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,11 @@ func (p *Plugin) reports(ctx context.Context, seqNr uint64, rawOutcome ocr3types
5555
cd := outcome.ChannelDefinitions[cid]
5656
values := make([]StreamValue, 0, len(cd.Streams))
5757
for _, strm := range cd.Streams {
58-
values = append(values, outcome.StreamAggregates[strm.StreamID][strm.Aggregator])
58+
if aggregates, ok := outcome.StreamAggregates[strm.StreamID]; !ok {
59+
values = append(values, nil)
60+
} else {
61+
values = append(values, aggregates[strm.Aggregator])
62+
}
5963
}
6064

6165
report := Report{

llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_expr.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,16 @@ func (r ReportCodecEVMABIEncodeUnpackedExpr) Encode(report llo.Report, cd llotyp
4646
// as Opts is small.
4747
opts := ReportFormatEVMABIEncodeOpts{}
4848
if err = (&opts).Decode(cd.Opts); err != nil {
49-
return nil, fmt.Errorf("failed to decode opts; got: '%s'; %w", cd.Opts, err)
49+
return nil, fmt.Errorf("ReportCodecEVMABIEncodeUnpackedExpr failed to decode opts; got: '%s'; %w", cd.Opts, err)
50+
}
51+
52+
if len(opts.ABI) < 1 {
53+
return nil, fmt.Errorf("ReportCodecEVMABIEncodeUnpackedExpr no expressions found in channel definition")
54+
}
55+
56+
// not enough streams for calculated feed
57+
if cd.Streams[len(cd.Streams)-1].StreamID != opts.ABI[len(opts.ABI)-1].encoders[0].ExpressionStreamID {
58+
return nil, fmt.Errorf("ReportCodecEVMABIEncodeUnpackedExpr not enough streams for calculated streams; expected: %d, got: %d", opts.ABI[len(opts.ABI)-1].encoders[0].ExpressionStreamID, len(cd.Streams))
5059
}
5160

5261
validAfter := ConvertTimestamp(report.ValidAfterNanoseconds, opts.TimestampPrecision)

llo/reportcodecs/evm/report_codec_evm_abi_encode_unpacked_expr_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func TestReportCodecEVMABIEncodeUnpackedExpr_Encode(t *testing.T) {
7474
codec := ReportCodecEVMABIEncodeUnpackedExpr{}
7575
_, err = codec.Encode(report, cd)
7676
require.Error(t, err)
77-
assert.Contains(t, err.Error(), "ABI and values length mismatch")
77+
assert.Contains(t, err.Error(), "ReportCodecEVMABIEncodeUnpackedExpr no expressions found in channel definition")
7878
})
7979

8080
t.Run("DEX-based asset schema example", func(t *testing.T) {

llo/stream_calculated.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,11 @@ func (p *Plugin) ProcessCalculatedStreams(outcome *Outcome) {
585585

586586
}
587587

588+
if len(cd.Streams) < 1 {
589+
p.Logger.Errorw("no streams or expressions found in channel definition", "channelID", cid)
590+
continue
591+
}
592+
588593
// channel definitions are inherited from the previous outcome,
589594
// so we only update the channel definition streams if we haven't done it before
590595
if cd.Streams[len(cd.Streams)-1].StreamID != copt.ABI[len(copt.ABI)-1].ExpressionStreamID {

0 commit comments

Comments
 (0)