@@ -14,14 +14,14 @@ import (
1414
1515 "github.com/ethereum/go-ethereum/common"
1616 "github.com/pkg/errors"
17- "google.golang.org/protobuf/proto"
1817
1918 chainselectors "github.com/smartcontractkit/chain-selectors"
2019
2120 dftypes "github.com/smartcontractkit/chainlink-evm/pkg/report/datafeeds"
21+ processor "github.com/smartcontractkit/chainlink-evm/pkg/report/datafeeds/processor"
22+
2223 "github.com/smartcontractkit/chainlink-evm/pkg/report/monitor"
2324 "github.com/smartcontractkit/chainlink-evm/pkg/report/pb/data-feeds/on-chain/registry"
24- wt "github.com/smartcontractkit/chainlink-evm/pkg/report/pb/platform"
2525 "github.com/smartcontractkit/chainlink-evm/pkg/writetarget"
2626
2727 "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
@@ -34,47 +34,6 @@ import (
3434 relayevmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
3535)
3636
37- // EVM Data-Feeds specific processor decodes writes as 'data-feeds.registry.FeedUpdated' messages + metrics
38- type dataFeedsProcessor struct {
39- emitter monitor.ProtoEmitter
40- metrics * registry.Metrics
41- }
42-
43- func (p * dataFeedsProcessor ) Process (ctx context.Context , m proto.Message , attrKVs ... any ) error {
44- // Switch on the type of the proto.Message
45- switch msg := m .(type ) {
46- case * wt.WriteConfirmed :
47- // TODO: fallthrough if not a write containing a DF report
48- // https://smartcontract-it.atlassian.net/browse/NONEVM-818
49- // Notice: we assume all writes are Data-Feeds (static schema) writes for now
50-
51- // Decode as an array of 'data-feeds.registry.FeedUpdated' messages
52- updates , err := registry .DecodeAsFeedUpdated (msg )
53- if err != nil {
54- return fmt .Errorf ("failed to decode as 'data-feeds.registry.FeedUpdated': %w" , err )
55- }
56- // Emit the 'data-feeds.registry.FeedUpdated' messages
57- for _ , update := range updates {
58- err = p .emitter .EmitWithLog (ctx , update , attrKVs ... )
59- if err != nil {
60- return fmt .Errorf ("failed to emit with log: %w" , err )
61- }
62- // Process emit and derive metrics
63- err = p .metrics .OnFeedUpdated (ctx , update , attrKVs ... )
64- if err != nil {
65- return fmt .Errorf ("failed to publish feed updated metrics: %w" , err )
66- }
67- }
68- return nil
69- default :
70- return nil // fallthrough
71- }
72- }
73-
74- func (c * dataFeedsProcessor ) SetEmitter (e monitor.ProtoEmitter ) {
75- c .emitter = e
76- }
77-
7837func NewWriteTarget (ctx context.Context , relayer * Relayer , chain legacyevm.Chain , gasLimitDefault uint64 , lggr logger.Logger ) (capabilities.TargetCapability , error ) {
7938 // generate ID based on chain selector
8039 id := GenerateWriteTargetName (chain .ID ().Uint64 ())
@@ -150,9 +109,7 @@ func NewWriteTarget(ctx context.Context, relayer *Relayer, chain legacyevm.Chain
150109 return nil , fmt .Errorf ("failed to create new registry metrics: %w" , err )
151110 }
152111
153- dfProcessor := & dataFeedsProcessor {
154- metrics : registryMetrics ,
155- }
112+ dfProcessor := processor .NewDataFeedsProcessor (registryMetrics )
156113
157114 beholder , err := writetarget .NewMonitor (lggr , []monitor.ProtoProcessor {dfProcessor })
158115 if err != nil {
0 commit comments