Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions llo/plugin_outcome.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ func (p *Plugin) outcome(outctx ocr3types.OutcomeContext, query types.Query, aos
}
// previous outcome did not report; keep the same ValidAfterNanoseconds
outcome.ValidAfterNanoseconds[channelID] = previousValidAfterNanoseconds
} else if err4 := previousOutcome.IsEncodable(channelID, p.ReportCodecs); err4 != nil {
if p.Config.VerboseLogging {
p.Logger.Debugw("Channel is not encodable", "channelID", channelID, "err", err4, "stage", "Outcome", "seqNr", outctx.SeqNr)
}
// previous outcome would have failed to generate a report; keep the same ValidAfterNanoseconds
outcome.ValidAfterNanoseconds[channelID] = previousValidAfterNanoseconds
} else {
// previous outcome reported; update ValidAfterNanoseconds to the previousObservationTimestamp
outcome.ValidAfterNanoseconds[channelID] = previousOutcome.ObservationTimestampNanoseconds
Expand Down Expand Up @@ -456,6 +462,38 @@ func (out *Outcome) IsReportable(channelID llotypes.ChannelID, protocolVersion u
return nil
}

// IsEncodable checks if a channel can be encoded into a report.
// It simulates the report encoding step using the outcome stream values to check if the channel is encodable.
func (out *Outcome) IsEncodable(channelID llotypes.ChannelID, reportCodecs map[llotypes.ReportFormat]ReportCodec) error {
cd, exists := out.ChannelDefinitions[channelID]
if !exists {
return fmt.Errorf("no channel definition with this ID")
}
codec, exists := reportCodecs[cd.ReportFormat]
if !exists {
return fmt.Errorf("codec missing for ReportFormat=%q", cd.ReportFormat)
}

// build simulated report for channel using outcome stream values
values := make([]StreamValue, 0, len(cd.Streams))
for _, strm := range cd.Streams {
values = append(values, out.StreamAggregates[strm.StreamID][strm.Aggregator])
}
report := Report{
ChannelID: channelID,
ValidAfterNanoseconds: out.ValidAfterNanoseconds[channelID],
ObservationTimestampNanoseconds: out.ObservationTimestampNanoseconds,
Values: values,
Specimen: out.LifeCycleStage != LifeCycleStageProduction,
}

// check if report can be encoded
if _, err := codec.Encode(report, cd); err != nil {
return fmt.Errorf("report encoding simulation failed for channelID=%d, reportFormat=%q: %w", channelID, cd.ReportFormat, err)
}
return nil
}

func IsSecondsResolution(reportFormat llotypes.ReportFormat) bool {
switch reportFormat {
// TODO: Might be cleaner to expose a TimeResolution() uint64 field on the
Expand Down
183 changes: 164 additions & 19 deletions llo/plugin_outcome_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ func Test_Outcome_GoldenFiles(t *testing.T) {
DefaultMinReportIntervalNanoseconds: 1,
}.GetOutcomeCodec()
p := &Plugin{
Config: Config{true},
OutcomeCodec: codec,
Logger: logger.Test(t),
ObservationCodec: obsCodec,
DonID: 10000043,
ConfigDigest: types.ConfigDigest{1, 2, 3, 4},
ProtocolVersion: 1,
Config: Config{true},
OutcomeCodec: codec,
Logger: logger.Test(t),
ObservationCodec: obsCodec,
ReportCodecs: map[llotypes.ReportFormat]ReportCodec{
llotypes.ReportFormatJSON: JSONReportCodec{},
llotypes.ReportFormatEVMPremiumLegacy: JSONReportCodec{},
},
DonID: 10000043,
ConfigDigest: types.ConfigDigest{1, 2, 3, 4},
ProtocolVersion: 1,
DefaultMinReportIntervalNanoseconds: 1,
}
// Minimal observations (timestamp only) so the plugin advances from previous outcome without new channel defs or stream values.
Expand Down Expand Up @@ -91,13 +95,17 @@ func Test_Outcome_EncodedMatchesGolden(t *testing.T) {
DefaultMinReportIntervalNanoseconds: 1,
}.GetOutcomeCodec()
p := &Plugin{
Config: Config{true},
OutcomeCodec: codec,
Logger: logger.Test(t),
ObservationCodec: obsCodec,
DonID: 10000043,
ConfigDigest: types.ConfigDigest{1, 2, 3, 4},
ProtocolVersion: 1,
Config: Config{true},
OutcomeCodec: codec,
Logger: logger.Test(t),
ObservationCodec: obsCodec,
ReportCodecs: map[llotypes.ReportFormat]ReportCodec{
llotypes.ReportFormatJSON: JSONReportCodec{},
llotypes.ReportFormatEVMPremiumLegacy: JSONReportCodec{},
},
DonID: 10000043,
ConfigDigest: types.ConfigDigest{1, 2, 3, 4},
ProtocolVersion: 1,
DefaultMinReportIntervalNanoseconds: 1,
}

Expand Down Expand Up @@ -140,7 +148,7 @@ func Test_Outcome_EncodedMatchesGolden(t *testing.T) {
}
outcome, err = p.Outcome(ctx, ocr3types.OutcomeContext{
PreviousOutcome: fullGolden,
SeqNr: 2,
SeqNr: 2,
}, types.Query{}, aos)
require.NoError(t, err)
default:
Expand All @@ -163,16 +171,20 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) {
OutcomeCodec: outcomeCodec,
Logger: logger.Test(t),
ObservationCodec: obsCodec,
DonID: 10000043,
ConfigDigest: types.ConfigDigest{1, 2, 3, 4},
ReportCodecs: map[llotypes.ReportFormat]ReportCodec{
llotypes.ReportFormatJSON: JSONReportCodec{},
llotypes.ReportFormatEVMPremiumLegacy: JSONReportCodec{},
},
DonID: 10000043,
ConfigDigest: types.ConfigDigest{1, 2, 3, 4},
F: 1,
}
testStartTS := time.Now()
testStartNanos := uint64(testStartTS.UnixNano()) //nolint:gosec // safe cast in tests

t.Run("if number of observers < 2f+1, errors", func(t *testing.T) {
_, err := p.Outcome(ctx, ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{})
require.EqualError(t, err, "invariant violation: expected at least 2f+1 attributed observations, got 0 (f: 0)")
p.F = 1
require.EqualError(t, err, "invariant violation: expected at least 2f+1 attributed observations, got 0 (f: 1)")
_, err = p.Outcome(ctx, ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{{}, {}})
require.EqualError(t, err, "invariant violation: expected at least 2f+1 attributed observations, got 2 (f: 1)")
})
Expand Down Expand Up @@ -675,6 +687,71 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) {
llotypes.AggregatorQuote: &Quote{Bid: decimal.NewFromInt(320), Benchmark: decimal.NewFromInt(330), Ask: decimal.NewFromInt(340)},
}, decoded.StreamAggregates[3])
})
t.Run("ValidAfterNanoseconds should not update to previous ObservationTimestamp if previous outcome fails report encoding simulation", func(t *testing.T) {
channelDef := map[llotypes.ChannelID]llotypes.ChannelDefinition{
1: { // requires streams 1 and 2
ReportFormat: llotypes.ReportFormatJSON,
Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}},
},
2: { // requires streams 2 and 3
ReportFormat: llotypes.ReportFormatEVMPremiumLegacy,
Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorQuote}},
},
}
// previous outcome - partially succesful report generation
// channel 1 would have been able to generate a report
// channel 2 would not have generated a report because of the missing stream value 3
previousOutcome := Outcome{
LifeCycleStage: llotypes.LifeCycleStage("test"),
ObservationTimestampNanoseconds: uint64(101 * time.Second),
ChannelDefinitions: channelDef,
ValidAfterNanoseconds: map[llotypes.ChannelID]uint64{
1: uint64(100 * time.Second),
2: uint64(100 * time.Second),
},
StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{
1: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(120)),
},
2: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(220)),
},
// missing stream value 3
},
}
encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome)
require.NoError(t, err)
outctx := ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome}

// generate attributed observations for next round
aos := []types.AttributedObservation{}
for i := 0; i < 4; i++ {
obs := Observation{
UnixTimestampNanoseconds: uint64(102 * time.Second),
StreamValues: map[llotypes.StreamID]StreamValue{
1: ToDecimal(decimal.NewFromInt(122)),
2: ToDecimal(decimal.NewFromInt(222)),
3: &Quote{Bid: decimal.NewFromInt(int64(322)), Benchmark: decimal.NewFromInt(int64(332)), Ask: decimal.NewFromInt(int64(342))},
}}
encoded, err2 := p.ObservationCodec.Encode(obs)
require.NoError(t, err2)
aos = append(aos,
types.AttributedObservation{
Observation: encoded,
Observer: commontypes.OracleID(i), //nolint:gosec // will never be > 4
})
}
outcome1, err := p.Outcome(ctx, outctx, types.Query{}, aos)
require.NoError(t, err)

decoded, err := p.OutcomeCodec.Decode(outcome1)
require.NoError(t, err)

// validAfterNanoseconds for channel `1` should update to previous ObservationTimestampNanoseconds
assert.Equal(t, uint64(101*time.Second), decoded.ValidAfterNanoseconds[1])
// validAfterNanoseconds for channel `2` should **not** update to previous ObservationTimestampNanoseconds
assert.Equal(t, uint64(100*time.Second), decoded.ValidAfterNanoseconds[2])
})
t.Run("sends outcome telemetry if channel is specified", func(t *testing.T) {
ch := make(chan *LLOOutcomeTelemetry, 10000)
p.OutcomeTelemetryCh = ch
Expand Down Expand Up @@ -1050,4 +1127,72 @@ func Test_Outcome_Methods(t *testing.T) {
assert.Equal(t, "ChannelID: 2; Reason: IsReportable=false; no ValidAfterNanoseconds entry yet, this must be a new channel", unreportable[0].Error())
})
})

t.Run("IsEncodable", func(t *testing.T) {
cid := llotypes.ChannelID(1)
baseOutcome := Outcome{
LifeCycleStage: LifeCycleStageProduction,
ObservationTimestampNanoseconds: uint64(1726670490 * time.Second),
ChannelDefinitions: map[llotypes.ChannelID]llotypes.ChannelDefinition{
cid: {
ReportFormat: llotypes.ReportFormatJSON,
Streams: []llotypes.Stream{
{StreamID: 1, Aggregator: llotypes.AggregatorMedian},
{StreamID: 2, Aggregator: llotypes.AggregatorQuote},
},
},
},
ValidAfterNanoseconds: map[llotypes.ChannelID]uint64{
cid: uint64(1726670480 * time.Second),
},
StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{
1: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(123)),
},
2: {
llotypes.AggregatorQuote: &Quote{
Bid: decimal.NewFromInt(100),
Benchmark: decimal.NewFromInt(101),
Ask: decimal.NewFromInt(102),
},
},
},
}

t.Run("returns nil if report simulation encodes successfully", func(t *testing.T) {
err := baseOutcome.IsEncodable(cid, map[llotypes.ReportFormat]ReportCodec{
llotypes.ReportFormatJSON: JSONReportCodec{},
})
require.NoError(t, err)
})

t.Run("returns error if channel definition is missing", func(t *testing.T) {
err := baseOutcome.IsEncodable(999, map[llotypes.ReportFormat]ReportCodec{
llotypes.ReportFormatJSON: JSONReportCodec{},
})
require.EqualError(t, err, "no channel definition with this ID")
})

t.Run("returns error if codec for report format is missing", func(t *testing.T) {
err := baseOutcome.IsEncodable(cid, map[llotypes.ReportFormat]ReportCodec{})
require.EqualError(t, err, "codec missing for ReportFormat=\"json\"")
})

t.Run("returns wrapped encoding error if stream aggregate is missing", func(t *testing.T) {
outcomeWithMissingAggregate := baseOutcome
outcomeWithMissingAggregate.StreamAggregates = map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{
1: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(123)),
},
// stream 2 quote is missing
}

err := outcomeWithMissingAggregate.IsEncodable(cid, map[llotypes.ReportFormat]ReportCodec{
llotypes.ReportFormatJSON: JSONReportCodec{},
})
require.Error(t, err)
assert.Contains(t, err.Error(), "report encoding simulation failed for channelID=1")
assert.Contains(t, err.Error(), "nil stream value")
})
})
}
100 changes: 100 additions & 0 deletions llo/plugin_reports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,106 @@ func testReports(t *testing.T, outcomeCodec OutcomeCodec) {
require.Empty(t, rwis)
})

t.Run("(bug demo) results in a report gap due to validAfter being updated even if previous report was dropped", func(t *testing.T) {
// Round 1: all streams present → reports generated covering [100s, 200s].
outcome1 := Outcome{
ObservationTimestampNanoseconds: uint64(200 * time.Second),
ValidAfterNanoseconds: map[llotypes.ChannelID]uint64{
1: uint64(100 * time.Second),
2: uint64(100 * time.Second),
},
ChannelDefinitions: smallDefinitions,
StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{
1: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(1.1)),
},
2: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(2.2)),
},
3: {
llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(3.3), Benchmark: decimal.NewFromFloat(4.4), Bid: decimal.NewFromFloat(5.5)},
},
4: {
llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(6.6), Benchmark: decimal.NewFromFloat(7.7), Bid: decimal.NewFromFloat(8.8)},
},
},
}
encoded1, err := p.OutcomeCodec.Encode(outcome1)
require.NoError(t, err)
rwis1, err := p.Reports(ctx, 2, encoded1)
require.NoError(t, err)
require.Len(t, rwis1, 2)
assert.Contains(t, string(rwis1[0].ReportWithInfo.Report), `"ChannelID":1,"ValidAfterNanoseconds":100000000000,"ObservationTimestampNanoseconds":200000000000`)
assert.Contains(t, string(rwis1[1].ReportWithInfo.Report), `"ChannelID":2,"ValidAfterNanoseconds":100000000000,"ObservationTimestampNanoseconds":200000000000`)

// Round 2: stream 4 is missing from aggregates.
// Report generated for channel 1 covering [200s, 300s].
// Report for channel 2 is dropped because of the missing stream value.
outcome2 := Outcome{
ObservationTimestampNanoseconds: uint64(300 * time.Second),
ValidAfterNanoseconds: map[llotypes.ChannelID]uint64{
1: uint64(200 * time.Second),
2: uint64(200 * time.Second),
},
ChannelDefinitions: smallDefinitions,
StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{
1: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(1.1)),
},
2: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(2.2)),
},
3: {
llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(3.3), Benchmark: decimal.NewFromFloat(4.4), Bid: decimal.NewFromFloat(5.5)},
},
},
}
encoded2, err := p.OutcomeCodec.Encode(outcome2)
require.NoError(t, err)
rwis2, err := p.Reports(ctx, 2, encoded2)
require.NoError(t, err)
require.Len(t, rwis2, 1)
assert.Contains(t, string(rwis2[0].ReportWithInfo.Report), `"ChannelID":1,"ValidAfterNanoseconds":200000000000,"ObservationTimestampNanoseconds":300000000000`)

// Round 3: all streams present again.
// Report generated for channel 1 covering [300s, 400s].
// Report generated for channel 2 covering [300s, 400s].
outcome3 := Outcome{
ObservationTimestampNanoseconds: uint64(400 * time.Second),
ValidAfterNanoseconds: map[llotypes.ChannelID]uint64{
1: uint64(300 * time.Second),
2: uint64(300 * time.Second), // <-- this is incorrectly updated to previous ObservationTimestampNanoseconds, should be 200s
},
ChannelDefinitions: smallDefinitions,
StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{
1: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(1.1)),
},
2: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromFloat(2.2)),
},
3: {
llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(3.3), Benchmark: decimal.NewFromFloat(4.4), Bid: decimal.NewFromFloat(5.5)},
},
4: {
llotypes.AggregatorQuote: &Quote{Ask: decimal.NewFromFloat(6.6), Benchmark: decimal.NewFromFloat(7.7), Bid: decimal.NewFromFloat(8.8)},
},
},
}
encoded3, err := p.OutcomeCodec.Encode(outcome3)
require.NoError(t, err)
rwis3, err := p.Reports(ctx, 2, encoded3)
require.NoError(t, err)
require.Len(t, rwis3, 2)
assert.Contains(t, string(rwis3[0].ReportWithInfo.Report), `"ChannelID":1,"ValidAfterNanoseconds":300000000000,"ObservationTimestampNanoseconds":400000000000`)
assert.Contains(t, string(rwis3[1].ReportWithInfo.Report), `"ChannelID":2,"ValidAfterNanoseconds":300000000000,"ObservationTimestampNanoseconds":400000000000`)

// validAfterNanoseconds has advanced to 300s (outcome2's obsTs) because outcome2 passed the IsReportable
// timing check despite its report being dropped. The new report covers [300s, 400s] — leaving [200s, 300s]
// unaccounted for = report gap. The expected behaviour is that validAfterNanoseconds should not update to
// previous ObservationTimestampNanoseconds and we produce reports for ranges [100s, 200s] and [200s, 400s].
})

t.Run("does not generate reports for tombstoned channels", func(t *testing.T) {
tombstonedDefinitions := map[llotypes.ChannelID]llotypes.ChannelDefinition{
1: {
Expand Down
Loading