Skip to content

Commit f24e297

Browse files
committed
extracted write target monitor from aptos
1 parent beacbab commit f24e297

File tree

3 files changed

+264
-1
lines changed

3 files changed

+264
-1
lines changed

capabilities/go.mod

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,20 @@ require (
77
github.com/stretchr/testify v1.10.0
88
go.opentelemetry.io/otel v1.30.0
99
go.opentelemetry.io/otel/trace v1.30.0
10+
google.golang.org/protobuf v1.36.6
1011
)
1112

1213
require (
1314
github.com/bahlo/generic-list-go v0.2.0 // indirect
1415
github.com/beorn7/perks v1.0.1 // indirect
16+
github.com/btcsuite/btcd v0.22.0-beta // indirect
17+
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
1518
github.com/buger/jsonparser v1.1.1 // indirect
1619
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
1720
github.com/cespare/xxhash/v2 v2.3.0 // indirect
1821
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
22+
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
23+
github.com/ethereum/go-ethereum v1.14.11 // indirect
1924
github.com/go-logr/logr v1.4.2 // indirect
2025
github.com/go-logr/stdr v1.2.2 // indirect
2126
github.com/go-playground/locales v0.13.0 // indirect
@@ -26,6 +31,7 @@ require (
2631
github.com/google/go-cmp v0.6.0 // indirect
2732
github.com/google/uuid v1.6.0 // indirect
2833
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
34+
github.com/holiman/uint256 v1.3.1 // indirect
2935
github.com/invopop/jsonschema v0.12.0 // indirect
3036
github.com/jpillora/backoff v1.0.0 // indirect
3137
github.com/leodido/go-urn v1.2.0 // indirect
@@ -69,6 +75,5 @@ require (
6975
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect
7076
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
7177
google.golang.org/grpc v1.67.1 // indirect
72-
google.golang.org/protobuf v1.36.6 // indirect
7378
gopkg.in/yaml.v3 v3.0.1 // indirect
7479
)

capabilities/go.sum

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,42 @@
1+
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
12
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
23
github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
34
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
45
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
6+
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
57
github.com/btcsuite/btcd v0.22.0-beta h1:LTDpDKUM5EeOFBPM8IXpinEcmZ6FWfNZbE3lfrfdnWo=
8+
github.com/btcsuite/btcd v0.22.0-beta/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN0VB8L8svzOA=
69
github.com/btcsuite/btcd/btcec/v2 v2.3.4 h1:3EJjcN70HCu/mwqlUsGK8GcNVyLVxFDlWurTXGPFfiQ=
710
github.com/btcsuite/btcd/btcec/v2 v2.3.4/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
11+
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
12+
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
13+
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce/go.mod h1:0DVlHczLPewLcPGEIeUEzfOJhqGPQ0mJJRDBtD307+o=
14+
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
15+
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
16+
github.com/btcsuite/goleveldb v1.0.0/go.mod h1:QiK9vBlgftBg6rWQIj6wFzbPfRjiykIEhBH4obrXJ/I=
17+
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
18+
github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
19+
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
20+
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
821
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
922
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
1023
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
1124
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
1225
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
1326
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
27+
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1428
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1529
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1630
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
1731
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
32+
github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y=
33+
github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo=
1834
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnNEcHYvcCuK6dPZSg=
1935
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
36+
github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218=
2037
github.com/ethereum/go-ethereum v1.14.11 h1:8nFDCUUE67rPc6AKxFj7JKaOa2W/W1Rse3oS6LvvxEY=
2138
github.com/ethereum/go-ethereum v1.14.11/go.mod h1:+l/fr42Mma+xBnhefL/+z11/hcmJ2egl+ScIVPjhc7E=
39+
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
2240
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
2341
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
2442
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
@@ -39,17 +57,24 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek
3957
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
4058
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
4159
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
60+
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
61+
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
4262
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
4363
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
4464
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys=
4565
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I=
4666
github.com/holiman/uint256 v1.3.1 h1:JfTzmih28bittyHM8z360dCjIA9dbPIBlcTI6lmctQs=
4767
github.com/holiman/uint256 v1.3.1/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
68+
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
4869
github.com/invopop/jsonschema v0.12.0 h1:6ovsNSuvn9wEQVOyc72aycBMVQFKz7cPdMJn10CvzRI=
4970
github.com/invopop/jsonschema v0.12.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0=
71+
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
72+
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
5073
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
5174
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
5275
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
76+
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
77+
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
5378
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
5479
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
5580
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -62,6 +87,10 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zk
6287
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
6388
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
6489
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
90+
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
91+
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
92+
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
93+
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
6594
github.com/pelletier/go-toml/v2 v2.2.0 h1:QLgLl2yMN7N+ruc31VynXs1vhMZa7CeHHejIeBAsoHo=
6695
github.com/pelletier/go-toml/v2 v2.2.0/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
6796
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -143,16 +172,23 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
143172
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
144173
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
145174
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
175+
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
146176
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
177+
golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
178+
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
147179
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
148180
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
149181
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
150182
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
151183
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
184+
golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
185+
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
152186
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
153187
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
154188
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
189+
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
155190
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
191+
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
156192
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
157193
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
158194
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
@@ -173,6 +209,9 @@ google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/
173209
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
174210
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
175211
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
212+
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
213+
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
214+
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
176215
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
177216
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
178217
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
package writetarget
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"google.golang.org/protobuf/proto"
8+
9+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
10+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
11+
12+
"github.com/smartcontractkit/chainlink-common/pkg/beholder/monitor"
13+
14+
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/write_target/pb/data-feeds/on-chain/registry"
15+
wt "github.com/smartcontractkit/chainlink-common/pkg/capabilities/write_target/pb/platform"
16+
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/write_target/pb/platform/on-chain/forwarder"
17+
)
18+
19+
const (
20+
repoCLLCommon = "https://raw.githubusercontent.com/smartcontractkit/chainlink-common"
21+
// TODO: replace with main when merged
22+
versionRefsDevelop = "refs/heads/generalized-monitoring-extraction"
23+
schemaBasePath = repoCLLCommon + "/" + versionRefsDevelop + "/pkg/capabilities/write_target/pb"
24+
)
25+
26+
// NewWriteTargetMonitor initializes a Beholder client for the Write Target
27+
//
28+
// The client is initialized as a BeholderClient extension with a custom ProtoEmitter.
29+
// The ProtoEmitter is proxied with additional processing for emitted messages. This processing
30+
// includes decoding messages as specific types and deriving metrics based on the decoded messages.
31+
func NewWriteTargetMonitor(ctx context.Context, lggr logger.Logger) (*monitor.BeholderClient, error) {
32+
// Initialize the Beholder client with a local logger a custom Emitter
33+
client := beholder.GetClient().ForPackage("write_target")
34+
35+
registryMetrics, err := registry.NewMetrics()
36+
if err != nil {
37+
return nil, fmt.Errorf("failed to create new registry metrics: %w", err)
38+
}
39+
40+
forwarderMetrics, err := forwarder.NewMetrics()
41+
if err != nil {
42+
return nil, fmt.Errorf("failed to create new forwarder metrics: %w", err)
43+
}
44+
45+
wtMetrics, err := wt.NewMetrics()
46+
if err != nil {
47+
return nil, fmt.Errorf("failed to create new write target metrics: %w", err)
48+
}
49+
50+
// Underlying ProtoEmitter
51+
emitter := monitor.NewProtoEmitter(lggr, &client, schemaBasePath)
52+
53+
// Proxy ProtoEmitter with additional processing
54+
protoEmitterProxy := protoEmitter{
55+
lggr: lggr,
56+
emitter: emitter,
57+
processors: []monitor.ProtoProcessor{
58+
&wtProcessor{wtMetrics},
59+
&keystoneProcessor{emitter, forwarderMetrics},
60+
&dataFeedsProcessor{emitter, registryMetrics},
61+
},
62+
}
63+
return &monitor.BeholderClient{Client: &client, ProtoEmitter: &protoEmitterProxy}, nil
64+
}
65+
66+
// ProtoEmitter proxy specific to the WT
67+
type protoEmitter struct {
68+
lggr logger.Logger
69+
emitter monitor.ProtoEmitter
70+
processors []monitor.ProtoProcessor
71+
}
72+
73+
// Emit emits a proto.Message and runs additional processing
74+
func (e *protoEmitter) Emit(ctx context.Context, m proto.Message, attrKVs ...any) error {
75+
err := e.emitter.Emit(ctx, m, attrKVs...)
76+
if err != nil {
77+
return fmt.Errorf("failed to emit: %w", err)
78+
}
79+
80+
// Notice: we skip processing errors (and continue) so this will never error
81+
return e.Process(ctx, m, attrKVs...)
82+
}
83+
84+
// EmitWithLog emits a proto.Message and runs additional processing
85+
func (e *protoEmitter) EmitWithLog(ctx context.Context, m proto.Message, attrKVs ...any) error {
86+
err := e.emitter.EmitWithLog(ctx, m, attrKVs...)
87+
if err != nil {
88+
return fmt.Errorf("failed to emit with log: %w", err)
89+
}
90+
91+
// Notice: we skip processing errors (and continue) so this will never error
92+
return e.Process(ctx, m, attrKVs...)
93+
}
94+
95+
// Process aggregates further processing for emitted messages
96+
func (e *protoEmitter) Process(ctx context.Context, m proto.Message, attrKVs ...any) error {
97+
// Further processing for emitted messages
98+
for _, p := range e.processors {
99+
err := p.Process(ctx, m, attrKVs...)
100+
if err != nil {
101+
// Notice: we swallow and log processing errors
102+
// These should be investigated and fixed, but are not critical to product runtime,
103+
// and shouldn't block further processing of the emitted message.
104+
e.lggr.Errorw("failed to process emitted message", "err", err)
105+
return nil
106+
}
107+
}
108+
return nil
109+
}
110+
111+
// Write-Target specific processor decodes write messages to derive metrics
112+
type wtProcessor struct {
113+
metrics *wt.Metrics
114+
}
115+
116+
func (p *wtProcessor) Process(ctx context.Context, m proto.Message, attrKVs ...any) error {
117+
// Switch on the type of the proto.Message
118+
switch msg := m.(type) {
119+
case *wt.WriteInitiated:
120+
err := p.metrics.OnWriteInitiated(ctx, msg, attrKVs...)
121+
if err != nil {
122+
return fmt.Errorf("failed to publish write initiated metrics: %w", err)
123+
}
124+
return nil
125+
case *wt.WriteError:
126+
err := p.metrics.OnWriteError(ctx, msg, attrKVs...)
127+
if err != nil {
128+
return fmt.Errorf("failed to publish write error metrics: %w", err)
129+
}
130+
return nil
131+
case *wt.WriteSent:
132+
err := p.metrics.OnWriteSent(ctx, msg, attrKVs...)
133+
if err != nil {
134+
return fmt.Errorf("failed to publish write sent metrics: %w", err)
135+
}
136+
return nil
137+
case *wt.WriteConfirmed:
138+
err := p.metrics.OnWriteConfirmed(ctx, msg, attrKVs...)
139+
if err != nil {
140+
return fmt.Errorf("failed to publish write confirmed metrics: %w", err)
141+
}
142+
return nil
143+
default:
144+
return nil // fallthrough
145+
}
146+
}
147+
148+
// Keystone specific processor decodes writes as 'platform.forwarder.ReportProcessed' messages + metrics
149+
type keystoneProcessor struct {
150+
emitter monitor.ProtoEmitter
151+
metrics *forwarder.Metrics
152+
}
153+
154+
func (p *keystoneProcessor) Process(ctx context.Context, m proto.Message, attrKVs ...any) error {
155+
// Switch on the type of the proto.Message
156+
switch msg := m.(type) {
157+
case *wt.WriteConfirmed:
158+
// TODO: detect the type of write payload (support more than one type of write, first multiple Keystone report versions)
159+
// https://smartcontract-it.atlassian.net/browse/NONEVM-817
160+
// Q: Will this msg ever contain different (non-Keystone) types of writes? Hmm.
161+
// Notice: we assume all writes are Keystone (v1) writes for now
162+
163+
// Decode as a 'platform.forwarder.ReportProcessed' message
164+
reportProcessed, err := forwarder.DecodeAsReportProcessed(msg)
165+
if err != nil {
166+
return fmt.Errorf("failed to decode as 'platform.forwarder.ReportProcessed': %w", err)
167+
}
168+
// Emit the 'platform.forwarder.ReportProcessed' message
169+
err = p.emitter.EmitWithLog(ctx, reportProcessed, attrKVs...)
170+
if err != nil {
171+
return fmt.Errorf("failed to emit with log: %w", err)
172+
}
173+
// Process emit and derive metrics
174+
err = p.metrics.OnReportProcessed(ctx, reportProcessed, attrKVs...)
175+
if err != nil {
176+
return fmt.Errorf("failed to publish report processed metrics: %w", err)
177+
}
178+
return nil
179+
default:
180+
return nil // fallthrough
181+
}
182+
}
183+
184+
// Data-Feeds specific processor decodes writes as 'data-feeds.registry.FeedUpdated' messages + metrics
185+
type dataFeedsProcessor struct {
186+
emitter monitor.ProtoEmitter
187+
metrics *registry.Metrics
188+
}
189+
190+
func (p *dataFeedsProcessor) Process(ctx context.Context, m proto.Message, attrKVs ...any) error {
191+
// Switch on the type of the proto.Message
192+
switch msg := m.(type) {
193+
case *wt.WriteConfirmed:
194+
// TODO: fallthrough if not a write containing a DF report
195+
// https://smartcontract-it.atlassian.net/browse/NONEVM-818
196+
// Notice: we assume all writes are Data-Feeds (static schema) writes for now
197+
198+
// Decode as an array of 'data-feeds.registry.FeedUpdated' messages
199+
updates, err := registry.DecodeAsFeedUpdated(msg)
200+
if err != nil {
201+
return fmt.Errorf("failed to decode as 'data-feeds.registry.FeedUpdated': %w", err)
202+
}
203+
// Emit the 'data-feeds.registry.FeedUpdated' messages
204+
for _, update := range updates {
205+
err = p.emitter.EmitWithLog(ctx, update, attrKVs...)
206+
if err != nil {
207+
return fmt.Errorf("failed to emit with log: %w", err)
208+
}
209+
// Process emit and derive metrics
210+
err = p.metrics.OnFeedUpdated(ctx, update, attrKVs...)
211+
if err != nil {
212+
return fmt.Errorf("failed to publish feed updated metrics: %w", err)
213+
}
214+
}
215+
return nil
216+
default:
217+
return nil // fallthrough
218+
}
219+
}

0 commit comments

Comments
 (0)