Skip to content

Commit 486b248

Browse files
committed
moved dfprocessor to chainlink-evm
1 parent 2b6f782 commit 486b248

File tree

2 files changed

+59
-2
lines changed

2 files changed

+59
-2
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package processor
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"google.golang.org/protobuf/proto"
8+
9+
"github.com/smartcontractkit/chainlink-evm/pkg/report/monitor"
10+
"github.com/smartcontractkit/chainlink-evm/pkg/report/pb/data-feeds/on-chain/registry"
11+
wt "github.com/smartcontractkit/chainlink-evm/pkg/report/pb/platform"
12+
)
13+
14+
// EVM Data-Feeds specific processor decodes writes as 'data-feeds.registry.FeedUpdated' messages + metrics
15+
type dataFeedsProcessor struct {
16+
emitter monitor.ProtoEmitter
17+
metrics *registry.Metrics
18+
}
19+
20+
func NewDataFeedsProcessor(metrics *registry.Metrics) *dataFeedsProcessor {
21+
return &dataFeedsProcessor{
22+
metrics: metrics,
23+
}
24+
}
25+
26+
func (p *dataFeedsProcessor) Process(ctx context.Context, m proto.Message, attrKVs ...any) error {
27+
// Switch on the type of the proto.Message
28+
switch msg := m.(type) {
29+
case *wt.WriteConfirmed:
30+
// TODO: fallthrough if not a write containing a DF report
31+
// https://smartcontract-it.atlassian.net/browse/NONEVM-818
32+
// Notice: we assume all writes are Data-Feeds (static schema) writes for now
33+
34+
// Decode as an array of 'data-feeds.registry.FeedUpdated' messages
35+
updates, err := registry.DecodeAsFeedUpdated(msg)
36+
if err != nil {
37+
return fmt.Errorf("failed to decode as 'data-feeds.registry.FeedUpdated': %w", err)
38+
}
39+
// Emit the 'data-feeds.registry.FeedUpdated' messages
40+
for _, update := range updates {
41+
err = p.emitter.EmitWithLog(ctx, update, attrKVs...)
42+
if err != nil {
43+
return fmt.Errorf("failed to emit with log: %w", err)
44+
}
45+
// Process emit and derive metrics
46+
err = p.metrics.OnFeedUpdated(ctx, update, attrKVs...)
47+
if err != nil {
48+
return fmt.Errorf("failed to publish feed updated metrics: %w", err)
49+
}
50+
}
51+
return nil
52+
default:
53+
return nil // fallthrough
54+
}
55+
}
56+
57+
func (p *dataFeedsProcessor) SetEmitter(e monitor.ProtoEmitter) {
58+
p.emitter = e
59+
}

pkg/writetarget/write_target_monitor.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ const (
2222
schemaBasePath = repoCLLCommon + "/" + versionRefsDevelop + "/pkg/capabilities/writetarget/pb"
2323
)
2424

25-
// TODO: unsure if this is the right way to do this, but I believe all processors need to share the same emitter
26-
// so it may be correct.
2725
type emitterAwareProcessor interface {
2826
SetEmitter(e monitor.ProtoEmitter)
2927
}

0 commit comments

Comments
 (0)