Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions llo/plugin_outcome.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"sort"

"github.com/goccy/go-json"
"github.com/smartcontractkit/libocr/offchainreporting2/types"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

Expand Down Expand Up @@ -400,10 +401,8 @@ func (out *Outcome) GenRetirementReport(protocolVersion uint32) RetirementReport
// ObservationTimestampNanoseconds > ValidAfterNanoseconds(previous observation timestamp)+MinReportInterval

// Indicates whether a report can be generated for the given channel.
// Returns nil if channel is reportable
// NOTE: A channel is still reportable even if missing some or all stream
// values. The report codec is expected to handle nils and act accordingly
// (e.g. some values may be optional).
// Checks if channel is retired, tombstoned, has missing stream values, and if ValidAfterNanoseconds is set.
// Returns nil if channel is reportable.
func (out *Outcome) IsReportable(channelID llotypes.ChannelID, protocolVersion uint32, minReportInterval uint64) *UnreportableChannelError {
if out.LifeCycleStage == LifeCycleStageRetired {
return &UnreportableChannelError{nil, "IsReportable=false; retired channel", channelID}
Expand All @@ -419,6 +418,14 @@ func (out *Outcome) IsReportable(channelID llotypes.ChannelID, protocolVersion u
return &UnreportableChannelError{nil, "IsReportable=false; tombstone channel", channelID}
}

if !nilStreamValuesEnabled(cd.Opts) {
for _, strm := range cd.Streams {
if out.StreamAggregates[strm.StreamID][strm.Aggregator] == nil {
return &UnreportableChannelError{nil, fmt.Sprintf("IsReportable=false; missing stream value for streamID=%d aggregator=%q", strm.StreamID, strm.Aggregator), channelID}
}
}
}

validAfterNanos, ok := out.ValidAfterNanoseconds[channelID]
if !ok {
// No ValidAfterNanoseconds entry yet, this must be a new channel.
Expand Down Expand Up @@ -456,6 +463,26 @@ func (out *Outcome) IsReportable(channelID llotypes.ChannelID, protocolVersion u
return nil
}

// nilStreamValuesEnabled returns true (the default) when nil stream values are
// allowed for the channel, meaning channels will remain reportable even if
// some stream aggregate values are missing. Returns false only when the opts
// JSON explicitly sets "enableNilStreamValues" to false, which causes channels
// with missing stream values to be treated as unreportable.
func nilStreamValuesEnabled(opts []byte) bool {
if len(opts) == 0 {
return true
}

// loose JSON unmarshal of just enableNilStreamValues field — no dependency on the codec package
var v struct {
EnableNilStreamValues *bool `json:"enableNilStreamValues"`
}
if err := json.Unmarshal(opts, &v); err != nil {
return true
}
return v.EnableNilStreamValues == nil || *v.EnableNilStreamValues
}

func IsSecondsResolution(reportFormat llotypes.ReportFormat) bool {
switch reportFormat {
// TODO: Might be cleaner to expose a TimeResolution() uint64 field on the
Expand Down
230 changes: 213 additions & 17 deletions llo/plugin_outcome_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ func Test_Outcome_GoldenFiles(t *testing.T) {
DefaultMinReportIntervalNanoseconds: 1,
}.GetOutcomeCodec()
p := &Plugin{
Config: Config{true},
OutcomeCodec: codec,
Logger: logger.Test(t),
ObservationCodec: obsCodec,
DonID: 10000043,
ConfigDigest: types.ConfigDigest{1, 2, 3, 4},
ProtocolVersion: 1,
Config: Config{true},
OutcomeCodec: codec,
Logger: logger.Test(t),
ObservationCodec: obsCodec,
DonID: 10000043,
ConfigDigest: types.ConfigDigest{1, 2, 3, 4},
ProtocolVersion: 1,
DefaultMinReportIntervalNanoseconds: 1,
}
// Minimal observations (timestamp only) so the plugin advances from previous outcome without new channel defs or stream values.
Expand Down Expand Up @@ -91,13 +91,13 @@ func Test_Outcome_EncodedMatchesGolden(t *testing.T) {
DefaultMinReportIntervalNanoseconds: 1,
}.GetOutcomeCodec()
p := &Plugin{
Config: Config{true},
OutcomeCodec: codec,
Logger: logger.Test(t),
ObservationCodec: obsCodec,
DonID: 10000043,
ConfigDigest: types.ConfigDigest{1, 2, 3, 4},
ProtocolVersion: 1,
Config: Config{true},
OutcomeCodec: codec,
Logger: logger.Test(t),
ObservationCodec: obsCodec,
DonID: 10000043,
ConfigDigest: types.ConfigDigest{1, 2, 3, 4},
ProtocolVersion: 1,
DefaultMinReportIntervalNanoseconds: 1,
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func Test_Outcome_EncodedMatchesGolden(t *testing.T) {
}
outcome, err = p.Outcome(ctx, ocr3types.OutcomeContext{
PreviousOutcome: fullGolden,
SeqNr: 2,
SeqNr: 2,
}, types.Query{}, aos)
require.NoError(t, err)
default:
Expand All @@ -165,14 +165,14 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) {
ObservationCodec: obsCodec,
DonID: 10000043,
ConfigDigest: types.ConfigDigest{1, 2, 3, 4},
F: 1,
}
testStartTS := time.Now()
testStartNanos := uint64(testStartTS.UnixNano()) //nolint:gosec // safe cast in tests

t.Run("if number of observers < 2f+1, errors", func(t *testing.T) {
_, err := p.Outcome(ctx, ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{})
require.EqualError(t, err, "invariant violation: expected at least 2f+1 attributed observations, got 0 (f: 0)")
p.F = 1
require.EqualError(t, err, "invariant violation: expected at least 2f+1 attributed observations, got 0 (f: 1)")
_, err = p.Outcome(ctx, ocr3types.OutcomeContext{SeqNr: 1}, types.Query{}, []types.AttributedObservation{{}, {}})
require.EqualError(t, err, "invariant violation: expected at least 2f+1 attributed observations, got 2 (f: 1)")
})
Expand Down Expand Up @@ -298,6 +298,10 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) {
ValidAfterNanoseconds: map[llotypes.ChannelID]uint64{
channelID: testStartNanos, // Channel is reportable
},
StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{
1: {llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(100))},
2: {llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(200))},
},
}

// Verify channel is reportable before tombstoning
Expand Down Expand Up @@ -675,6 +679,139 @@ func testOutcome(t *testing.T, outcomeCodec OutcomeCodec) {
llotypes.AggregatorQuote: &Quote{Bid: decimal.NewFromInt(320), Benchmark: decimal.NewFromInt(330), Ask: decimal.NewFromInt(340)},
}, decoded.StreamAggregates[3])
})
t.Run("(required for safe rollout) when enableNilStreamValues is undefined, ValidAfterNanoseconds still updates to previous ObservationTimestamp if previous outcome has missing stream values", func(t *testing.T) {
channelDef := map[llotypes.ChannelID]llotypes.ChannelDefinition{
1: { // requires streams 1 and 2
ReportFormat: llotypes.ReportFormatJSON,
Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}},
},
2: { // requires streams 2 and 3; nil stream values disabled so missing stream 3 makes it unreportable
ReportFormat: llotypes.ReportFormatEVMPremiumLegacy,
Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorQuote}},
},
}
// previous outcome - partially succesful report generation
// channel 1 would have been able to generate a report
// channel 2 would not have generated a report because of the missing stream value 3
previousOutcome := Outcome{
LifeCycleStage: llotypes.LifeCycleStage("test"),
ObservationTimestampNanoseconds: uint64(101 * time.Second),
ChannelDefinitions: channelDef,
ValidAfterNanoseconds: map[llotypes.ChannelID]uint64{
1: uint64(100 * time.Second),
2: uint64(100 * time.Second),
},
StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{
1: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(120)),
},
2: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(220)),
},
// missing stream value 3
},
}
encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome)
require.NoError(t, err)
outctx := ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome}

// generate attributed observations for next round
aos := []types.AttributedObservation{}
for i := 0; i < 4; i++ {
obs := Observation{
UnixTimestampNanoseconds: uint64(102 * time.Second),
StreamValues: map[llotypes.StreamID]StreamValue{
1: ToDecimal(decimal.NewFromInt(122)),
2: ToDecimal(decimal.NewFromInt(222)),
3: &Quote{Bid: decimal.NewFromInt(int64(322)), Benchmark: decimal.NewFromInt(int64(332)), Ask: decimal.NewFromInt(int64(342))},
}}
encoded, err2 := p.ObservationCodec.Encode(obs)
require.NoError(t, err2)
aos = append(aos,
types.AttributedObservation{
Observation: encoded,
Observer: commontypes.OracleID(i), //nolint:gosec // will never be > 4
})
}
outcome1, err := p.Outcome(ctx, outctx, types.Query{}, aos)
require.NoError(t, err)

decoded, err := p.OutcomeCodec.Decode(outcome1)
require.NoError(t, err)

// validAfterNanoseconds for channel `1` should update to previous ObservationTimestampNanoseconds
assert.Equal(t, uint64(101*time.Second), decoded.ValidAfterNanoseconds[1])
// validAfterNanoseconds for channel `2` should still update to previous ObservationTimestampNanoseconds
// note: this is technically a bug, but we're keeping the same behaviour for backwards compatibility during rollout
// to ensure we don't break the outcome generation.
assert.Equal(t, uint64(101*time.Second), decoded.ValidAfterNanoseconds[2])
})
t.Run("when enableNilStreamValues is false, ValidAfterNanoseconds should not update to previous ObservationTimestamp if previous outcome has missing stream values", func(t *testing.T) {
channelDef := map[llotypes.ChannelID]llotypes.ChannelDefinition{
1: { // requires streams 1 and 2
ReportFormat: llotypes.ReportFormatJSON,
Streams: []llotypes.Stream{{StreamID: 1, Aggregator: llotypes.AggregatorMedian}, {StreamID: 2, Aggregator: llotypes.AggregatorMedian}},
},
2: { // requires streams 2 and 3; nil stream values disabled so missing stream 3 makes it unreportable
ReportFormat: llotypes.ReportFormatEVMPremiumLegacy,
Streams: []llotypes.Stream{{StreamID: 2, Aggregator: llotypes.AggregatorMedian}, {StreamID: 3, Aggregator: llotypes.AggregatorQuote}},
Opts: []byte(`{"enableNilStreamValues":false}`),
},
}
// previous outcome - partially succesful report generation
// channel 1 would have been able to generate a report
// channel 2 would not have generated a report because of the missing stream value 3
previousOutcome := Outcome{
LifeCycleStage: llotypes.LifeCycleStage("test"),
ObservationTimestampNanoseconds: uint64(101 * time.Second),
ChannelDefinitions: channelDef,
ValidAfterNanoseconds: map[llotypes.ChannelID]uint64{
1: uint64(100 * time.Second),
2: uint64(100 * time.Second),
},
StreamAggregates: map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{
1: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(120)),
},
2: {
llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(220)),
},
// missing stream value 3
},
}
encodedPreviousOutcome, err := p.OutcomeCodec.Encode(previousOutcome)
require.NoError(t, err)
outctx := ocr3types.OutcomeContext{SeqNr: 2, PreviousOutcome: encodedPreviousOutcome}

// generate attributed observations for next round
aos := []types.AttributedObservation{}
for i := 0; i < 4; i++ {
obs := Observation{
UnixTimestampNanoseconds: uint64(102 * time.Second),
StreamValues: map[llotypes.StreamID]StreamValue{
1: ToDecimal(decimal.NewFromInt(122)),
2: ToDecimal(decimal.NewFromInt(222)),
3: &Quote{Bid: decimal.NewFromInt(int64(322)), Benchmark: decimal.NewFromInt(int64(332)), Ask: decimal.NewFromInt(int64(342))},
}}
encoded, err2 := p.ObservationCodec.Encode(obs)
require.NoError(t, err2)
aos = append(aos,
types.AttributedObservation{
Observation: encoded,
Observer: commontypes.OracleID(i), //nolint:gosec // will never be > 4
})
}
outcome1, err := p.Outcome(ctx, outctx, types.Query{}, aos)
require.NoError(t, err)

decoded, err := p.OutcomeCodec.Decode(outcome1)
require.NoError(t, err)

// validAfterNanoseconds for channel `1` should update to previous ObservationTimestampNanoseconds
assert.Equal(t, uint64(101*time.Second), decoded.ValidAfterNanoseconds[1])
// validAfterNanoseconds for channel `2` should **not** update to previous ObservationTimestampNanoseconds
assert.Equal(t, uint64(100*time.Second), decoded.ValidAfterNanoseconds[2])
})
t.Run("sends outcome telemetry if channel is specified", func(t *testing.T) {
ch := make(chan *LLOOutcomeTelemetry, 10000)
p.OutcomeTelemetryCh = ch
Expand Down Expand Up @@ -974,6 +1111,44 @@ func Test_Outcome_Methods(t *testing.T) {
outcome.ChannelDefinitions = map[llotypes.ChannelID]llotypes.ChannelDefinition{}
require.EqualError(t, outcome.IsReportable(cid, 1, defaultMinReportInterval), "ChannelID: 1; Reason: IsReportable=false; no channel definition with this ID")

// Missing stream aggregate value; IsReportable=false only when enableNilStreamValues=false
outcome.ChannelDefinitions[cid] = llotypes.ChannelDefinition{
Streams: []llotypes.Stream{
{StreamID: 1, Aggregator: llotypes.AggregatorMedian},
{StreamID: 2, Aggregator: llotypes.AggregatorQuote},
},
Opts: []byte(`{"enableNilStreamValues":false}`),
}
outcome.StreamAggregates = map[llotypes.StreamID]map[llotypes.Aggregator]StreamValue{
1: {llotypes.AggregatorMedian: ToDecimal(decimal.NewFromInt(100))},
// stream 2 quote is missing
}
require.EqualError(t, outcome.IsReportable(cid, 1, defaultMinReportInterval), `ChannelID: 1; Reason: IsReportable=false; missing stream value for streamID=2 aggregator="quote"`)

// Missing stream aggregate value; IsReportable=true when enableNilStreamValues is absent (default=true)
outcome.ChannelDefinitions[cid] = llotypes.ChannelDefinition{
Streams: []llotypes.Stream{
{StreamID: 1, Aggregator: llotypes.AggregatorMedian},
{StreamID: 2, Aggregator: llotypes.AggregatorQuote},
},
// no Opts: missing stream values are allowed by default
}
outcome.ValidAfterNanoseconds = map[llotypes.ChannelID]uint64{cid: obsTSNanos - uint64(100*time.Millisecond)}
require.Nil(t, outcome.IsReportable(cid, 1, defaultMinReportInterval))

// Missing stream aggregate value; IsReportable=true when enableNilStreamValues=true
outcome.ChannelDefinitions[cid] = llotypes.ChannelDefinition{
Streams: []llotypes.Stream{
{StreamID: 1, Aggregator: llotypes.AggregatorMedian},
{StreamID: 2, Aggregator: llotypes.AggregatorQuote},
},
Opts: []byte(`{"enableNilStreamValues":true}`),
}
require.Nil(t, outcome.IsReportable(cid, 1, defaultMinReportInterval))

// Reset for remaining tests
outcome.ValidAfterNanoseconds = nil

// No ValidAfterNanoseconds yet
outcome.ChannelDefinitions[cid] = llotypes.ChannelDefinition{}
require.EqualError(t, outcome.IsReportable(cid, 1, defaultMinReportInterval), "ChannelID: 1; Reason: IsReportable=false; no ValidAfterNanoseconds entry yet, this must be a new channel")
Expand Down Expand Up @@ -1051,3 +1226,24 @@ func Test_Outcome_Methods(t *testing.T) {
})
})
}

func Test_nilStreamValuesEnabled(t *testing.T) {
t.Run("nil opts returns true (default)", func(t *testing.T) {
assert.True(t, nilStreamValuesEnabled(nil))
})
t.Run("empty opts returns true (default)", func(t *testing.T) {
assert.True(t, nilStreamValuesEnabled([]byte{}))
})
t.Run("opts without the field returns true (default)", func(t *testing.T) {
assert.True(t, nilStreamValuesEnabled([]byte(`{"someOtherField":true}`)))
})
t.Run("invalid JSON returns true (default)", func(t *testing.T) {
assert.True(t, nilStreamValuesEnabled([]byte(`not json`)))
})
t.Run("enableNilStreamValues=true returns true", func(t *testing.T) {
assert.True(t, nilStreamValuesEnabled([]byte(`{"enableNilStreamValues":true}`)))
})
t.Run("enableNilStreamValues=false returns false", func(t *testing.T) {
assert.False(t, nilStreamValuesEnabled([]byte(`{"enableNilStreamValues":false}`)))
})
}
Loading
Loading