@@ -14,12 +14,14 @@ import (
1414
1515 "github.com/ethereum/go-ethereum/common"
1616 "github.com/pkg/errors"
17+ "google.golang.org/protobuf/proto"
1718
1819 chainselectors "github.com/smartcontractkit/chain-selectors"
1920
2021 dftypes "github.com/smartcontractkit/chainlink-evm/pkg/report/datafeeds"
21- dfevm "github.com/smartcontractkit/chainlink-evm/pkg/report/datafeeds/evm"
2222 "github.com/smartcontractkit/chainlink-evm/pkg/report/monitor"
23+ "github.com/smartcontractkit/chainlink-evm/pkg/report/pb/data-feeds/on-chain/registry"
24+ wt "github.com/smartcontractkit/chainlink-evm/pkg/report/pb/platform"
2325 "github.com/smartcontractkit/chainlink-evm/pkg/writetarget"
2426
2527 "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
@@ -32,6 +34,47 @@ import (
3234 relayevmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
3335)
3436
37+ // EVM Data-Feeds specific processor decodes writes as 'data-feeds.registry.FeedUpdated' messages + metrics
38+ type dataFeedsProcessor struct {
39+ emitter monitor.ProtoEmitter
40+ metrics * registry.Metrics
41+ }
42+
43+ func (p * dataFeedsProcessor ) Process (ctx context.Context , m proto.Message , attrKVs ... any ) error {
44+ // Switch on the type of the proto.Message
45+ switch msg := m .(type ) {
46+ case * wt.WriteConfirmed :
47+ // TODO: fallthrough if not a write containing a DF report
48+ // https://smartcontract-it.atlassian.net/browse/NONEVM-818
49+ // Notice: we assume all writes are Data-Feeds (static schema) writes for now
50+
51+ // Decode as an array of 'data-feeds.registry.FeedUpdated' messages
52+ updates , err := registry .DecodeAsFeedUpdated (msg )
53+ if err != nil {
54+ return fmt .Errorf ("failed to decode as 'data-feeds.registry.FeedUpdated': %w" , err )
55+ }
56+ // Emit the 'data-feeds.registry.FeedUpdated' messages
57+ for _ , update := range updates {
58+ err = p .emitter .EmitWithLog (ctx , update , attrKVs ... )
59+ if err != nil {
60+ return fmt .Errorf ("failed to emit with log: %w" , err )
61+ }
62+ // Process emit and derive metrics
63+ err = p .metrics .OnFeedUpdated (ctx , update , attrKVs ... )
64+ if err != nil {
65+ return fmt .Errorf ("failed to publish feed updated metrics: %w" , err )
66+ }
67+ }
68+ return nil
69+ default :
70+ return nil // fallthrough
71+ }
72+ }
73+
74+ func (c * dataFeedsProcessor ) SetEmitter (e monitor.ProtoEmitter ) {
75+ c .emitter = e
76+ }
77+
3578func NewWriteTarget (ctx context.Context , relayer * Relayer , chain legacyevm.Chain , gasLimitDefault uint64 , lggr logger.Logger ) (capabilities.TargetCapability , error ) {
3679 // generate ID based on chain selector
3780 id := GenerateWriteTargetName (chain .ID ().Uint64 ())
@@ -102,7 +145,16 @@ func NewWriteTarget(ctx context.Context, relayer *Relayer, chain legacyevm.Chain
102145 return nil , fmt .Errorf ("failed to get chain info: %w" , err )
103146 }
104147
105- beholder , err := writetarget .NewMonitor (ctx , lggr , dfevm .DecodeAsFeedUpdated )
148+ registryMetrics , err := registry .NewMetrics ()
149+ if err != nil {
150+ return nil , fmt .Errorf ("failed to create new registry metrics: %w" , err )
151+ }
152+
153+ dfProcessor := & dataFeedsProcessor {
154+ metrics : registryMetrics ,
155+ }
156+
157+ beholder , err := writetarget .NewMonitor (lggr , []monitor.ProtoProcessor {dfProcessor })
106158 if err != nil {
107159 return nil , fmt .Errorf ("failed to create Aptos WT monitor client: %+w" , err )
108160 }
0 commit comments