Skip to content

Commit 290d64e

Browse files
committed
publishing blockchain data
1 parent 1f71d1d commit 290d64e

File tree

8 files changed

+507
-19
lines changed

8 files changed

+507
-19
lines changed

cmd/root.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,20 @@ func init() {
101101
rootCmd.PersistentFlags().String("api-basicAuth-username", "", "API basic auth username")
102102
rootCmd.PersistentFlags().String("api-basicAuth-password", "", "API basic auth password")
103103
rootCmd.PersistentFlags().String("api-thirdweb-clientId", "", "Thirdweb client id")
104+
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
105+
rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers")
106+
rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher")
107+
rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks")
108+
rootCmd.PersistentFlags().Bool("publisher-transactions-enabled", false, "Toggle transaction publisher")
109+
rootCmd.PersistentFlags().String("publisher-transactions-topicName", "", "Kafka topic name for transactions")
110+
rootCmd.PersistentFlags().String("publisher-transactions-toFilter", "", "Filter transactions by to address")
111+
rootCmd.PersistentFlags().String("publisher-transactions-fromFilter", "", "Filter transactions by from address")
112+
rootCmd.PersistentFlags().Bool("publisher-traces-enabled", false, "Toggle trace publisher")
113+
rootCmd.PersistentFlags().String("publisher-traces-topicName", "", "Kafka topic name for traces")
114+
rootCmd.PersistentFlags().Bool("publisher-events-enabled", false, "Toggle event publisher")
115+
rootCmd.PersistentFlags().String("publisher-events-topicName", "", "Kafka topic name for events")
116+
rootCmd.PersistentFlags().String("publisher-events-addressFilter", "", "Filter events by address")
117+
rootCmd.PersistentFlags().String("publisher-events-topic0Filter", "", "Filter events by topic0")
104118
viper.BindPFlag("rpc.url", rootCmd.PersistentFlags().Lookup("rpc-url"))
105119
viper.BindPFlag("rpc.blocks.blocksPerRequest", rootCmd.PersistentFlags().Lookup("rpc-blocks-blocksPerRequest"))
106120
viper.BindPFlag("rpc.blocks.batchDelay", rootCmd.PersistentFlags().Lookup("rpc-blocks-batchDelay"))
@@ -166,6 +180,20 @@ func init() {
166180
viper.BindPFlag("api.basicAuth.username", rootCmd.PersistentFlags().Lookup("api-basicAuth-username"))
167181
viper.BindPFlag("api.basicAuth.password", rootCmd.PersistentFlags().Lookup("api-basicAuth-password"))
168182
viper.BindPFlag("api.thirdweb.clientId", rootCmd.PersistentFlags().Lookup("api-thirdweb-clientId"))
183+
viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled"))
184+
viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers"))
185+
viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled"))
186+
viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName"))
187+
viper.BindPFlag("publisher.transactions.enabled", rootCmd.PersistentFlags().Lookup("publisher-transactions-enabled"))
188+
viper.BindPFlag("publisher.transactions.topicName", rootCmd.PersistentFlags().Lookup("publisher-transactions-topicName"))
189+
viper.BindPFlag("publisher.transactions.toFilter", rootCmd.PersistentFlags().Lookup("publisher-transactions-toFilter"))
190+
viper.BindPFlag("publisher.transactions.fromFilter", rootCmd.PersistentFlags().Lookup("publisher-transactions-fromFilter"))
191+
viper.BindPFlag("publisher.traces.enabled", rootCmd.PersistentFlags().Lookup("publisher-traces-enabled"))
192+
viper.BindPFlag("publisher.traces.topicName", rootCmd.PersistentFlags().Lookup("publisher-traces-topicName"))
193+
viper.BindPFlag("publisher.events.enabled", rootCmd.PersistentFlags().Lookup("publisher-events-enabled"))
194+
viper.BindPFlag("publisher.events.topicName", rootCmd.PersistentFlags().Lookup("publisher-events-topicName"))
195+
viper.BindPFlag("publisher.events.addressFilter", rootCmd.PersistentFlags().Lookup("publisher-events-addressFilter"))
196+
viper.BindPFlag("publisher.events.topic0Filter", rootCmd.PersistentFlags().Lookup("publisher-events-topic0Filter"))
169197
rootCmd.AddCommand(orchestratorCmd)
170198
rootCmd.AddCommand(apiCmd)
171199
}

configs/config.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,39 @@ type APIConfig struct {
130130
Thirdweb ThirdwebConfig `mapstructure:"thirdweb"`
131131
}
132132

133+
type BlockPublisherConfig struct {
134+
Enabled bool `mapstructure:"enabled"`
135+
TopicName string `mapstructure:"topicName"`
136+
}
137+
138+
type TransactionPublisherConfig struct {
139+
Enabled bool `mapstructure:"enabled"`
140+
TopicName string `mapstructure:"topicName"`
141+
ToFilter []string `mapstructure:"toFilter"`
142+
FromFilter []string `mapstructure:"fromFilter"`
143+
}
144+
145+
type TracePublisherConfig struct {
146+
Enabled bool `mapstructure:"enabled"`
147+
TopicName string `mapstructure:"topicName"`
148+
}
149+
150+
type EventPublisherConfig struct {
151+
Enabled bool `mapstructure:"enabled"`
152+
TopicName string `mapstructure:"topicName"`
153+
AddressFilter []string `mapstructure:"addressFilter"`
154+
Topic0Filter []string `mapstructure:"topic0Filter"`
155+
}
156+
157+
type PublisherConfig struct {
158+
Enabled bool `mapstructure:"enabled"`
159+
Brokers string `mapstructure:"brokers"`
160+
Blocks BlockPublisherConfig `mapstructure:"blocks"`
161+
Transactions TransactionPublisherConfig `mapstructure:"transactions"`
162+
Traces TracePublisherConfig `mapstructure:"traces"`
163+
Events EventPublisherConfig `mapstructure:"events"`
164+
}
165+
133166
type Config struct {
134167
RPC RPCConfig `mapstructure:"rpc"`
135168
Log LogConfig `mapstructure:"log"`
@@ -139,6 +172,7 @@ type Config struct {
139172
ReorgHandler ReorgHandlerConfig `mapstructure:"reorgHandler"`
140173
Storage StorageConfig `mapstructure:"storage"`
141174
API APIConfig `mapstructure:"api"`
175+
Publisher PublisherConfig `mapstructure:"publisher"`
142176
}
143177

144178
var Cfg Config

go.mod

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ require (
1717
github.com/swaggo/files v1.0.1
1818
github.com/swaggo/gin-swagger v1.6.0
1919
github.com/swaggo/swag v1.16.3
20+
github.com/twmb/franz-go v1.18.1
2021
)
2122

2223
require (
@@ -77,7 +78,7 @@ require (
7778
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
7879
github.com/paulmach/orb v0.11.1 // indirect
7980
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
80-
github.com/pierrec/lz4/v4 v4.1.21 // indirect
81+
github.com/pierrec/lz4/v4 v4.1.22 // indirect
8182
github.com/pkg/errors v0.9.1 // indirect
8283
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
8384
github.com/prometheus/client_model v0.6.1 // indirect
@@ -98,19 +99,20 @@ require (
9899
github.com/tklauser/go-sysconf v0.3.12 // indirect
99100
github.com/tklauser/numcpus v0.6.1 // indirect
100101
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
102+
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
101103
github.com/ugorji/go/codec v1.2.12 // indirect
102104
github.com/urfave/cli/v2 v2.27.4 // indirect
103105
github.com/yusufpapurcu/wmi v1.2.3 // indirect
104106
go.opentelemetry.io/otel v1.26.0 // indirect
105107
go.opentelemetry.io/otel/trace v1.26.0 // indirect
106108
go.uber.org/multierr v1.11.0 // indirect
107109
golang.org/x/arch v0.12.0 // indirect
108-
golang.org/x/crypto v0.32.0 // indirect
110+
golang.org/x/crypto v0.33.0 // indirect
109111
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
110-
golang.org/x/net v0.34.0 // indirect
111-
golang.org/x/sync v0.10.0 // indirect
112-
golang.org/x/sys v0.29.0 // indirect
113-
golang.org/x/text v0.21.0 // indirect
112+
golang.org/x/net v0.35.0 // indirect
113+
golang.org/x/sync v0.11.0 // indirect
114+
golang.org/x/sys v0.30.0 // indirect
115+
golang.org/x/text v0.22.0 // indirect
114116
golang.org/x/tools v0.25.0 // indirect
115117
google.golang.org/protobuf v1.36.1 // indirect
116118
gopkg.in/ini.v1 v1.67.0 // indirect

go.sum

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,8 @@ github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/En
225225
github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY=
226226
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
227227
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
228-
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
229-
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
228+
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
229+
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
230230
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
231231
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
232232
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@@ -307,6 +307,10 @@ github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+F
307307
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
308308
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
309309
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
310+
github.com/twmb/franz-go v1.18.1 h1:D75xxCDyvTqBSiImFx2lkPduE39jz1vaD7+FNc+vMkc=
311+
github.com/twmb/franz-go v1.18.1/go.mod h1:Uzo77TarcLTUZeLuGq+9lNpSkfZI+JErv7YJhlDjs9M=
312+
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
313+
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
310314
github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8=
311315
github.com/tyler-smith/go-bip39 v1.1.0/go.mod h1:gUYDtqQw1JS3ZJ8UWVcGTGqqr6YIN3CWg+kkNaLt55U=
312316
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
@@ -340,8 +344,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
340344
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
341345
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
342346
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
343-
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
344-
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
347+
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
348+
golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M=
345349
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw=
346350
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ=
347351
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@@ -357,15 +361,15 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
357361
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
358362
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
359363
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
360-
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
361-
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
364+
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
365+
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
362366
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
363367
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
364368
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
365369
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
366370
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
367-
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
368-
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
371+
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
372+
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
369373
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
370374
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
371375
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -382,8 +386,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
382386
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
383387
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
384388
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
385-
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
386-
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
389+
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
390+
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
387391
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
388392
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
389393
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
@@ -392,8 +396,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
392396
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
393397
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
394398
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
395-
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
396-
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
399+
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
400+
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
397401
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
398402
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
399403
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

internal/common/trace.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,52 @@ type Trace struct {
3232
}
3333

3434
type RawTraces = []map[string]interface{}
35+
36+
type TraceModel struct {
37+
ChainId string `json:"chain_id"`
38+
BlockNumber uint64 `json:"block_number"`
39+
BlockHash string `json:"block_hash"`
40+
BlockTimestamp uint64 `json:"block_timestamp"`
41+
TransactionHash string `json:"transaction_hash"`
42+
TransactionIndex uint64 `json:"transaction_index"`
43+
Subtraces int64 `json:"subtraces"`
44+
TraceAddress []uint64 `json:"trace_address"`
45+
TraceType string `json:"trace_type"`
46+
CallType string `json:"call_type"`
47+
Error string `json:"error"`
48+
FromAddress string `json:"from_address"`
49+
ToAddress string `json:"to_address"`
50+
Gas uint64 `json:"gas"`
51+
GasUsed uint64 `json:"gas_used"`
52+
Input string `json:"input"`
53+
Output string `json:"output"`
54+
Value uint64 `json:"value"`
55+
Author string `json:"author"`
56+
RewardType string `json:"reward_type"`
57+
RefundAddress string `json:"refund_address"`
58+
}
59+
60+
func (t *Trace) Serialize() TraceModel {
61+
return TraceModel{
62+
ChainId: t.ChainID.String(),
63+
BlockNumber: t.BlockNumber.Uint64(),
64+
BlockHash: t.BlockHash,
65+
TransactionHash: t.TransactionHash,
66+
TransactionIndex: t.TransactionIndex,
67+
Subtraces: t.Subtraces,
68+
TraceAddress: t.TraceAddress,
69+
TraceType: t.TraceType,
70+
CallType: t.CallType,
71+
Error: t.Error,
72+
FromAddress: t.FromAddress,
73+
ToAddress: t.ToAddress,
74+
Gas: t.Gas.Uint64(),
75+
GasUsed: t.GasUsed.Uint64(),
76+
Input: t.Input,
77+
Output: t.Output,
78+
Value: t.Value.Uint64(),
79+
Author: t.Author,
80+
RewardType: t.RewardType,
81+
RefundAddress: t.RefundAddress,
82+
}
83+
}

internal/orchestrator/committer.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
config "github.com/thirdweb-dev/indexer/configs"
1212
"github.com/thirdweb-dev/indexer/internal/common"
1313
"github.com/thirdweb-dev/indexer/internal/metrics"
14+
"github.com/thirdweb-dev/indexer/internal/publisher"
1415
"github.com/thirdweb-dev/indexer/internal/rpc"
1516
"github.com/thirdweb-dev/indexer/internal/storage"
1617
)
@@ -25,6 +26,7 @@ type Committer struct {
2526
commitFromBlock *big.Int
2627
rpc rpc.IRPCClient
2728
lastCommittedBlock *big.Int
29+
publisher *publisher.Publisher
2830
}
2931

3032
func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
@@ -45,6 +47,7 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage) *Committer {
4547
commitFromBlock: commitFromBlock,
4648
rpc: rpc,
4749
lastCommittedBlock: commitFromBlock,
50+
publisher: publisher.GetInstance(),
4851
}
4952
}
5053

@@ -56,6 +59,7 @@ func (c *Committer) Start(ctx context.Context) {
5659
select {
5760
case <-ctx.Done():
5861
log.Info().Msg("Committer shutting down")
62+
c.publisher.Close()
5963
return
6064
default:
6165
time.Sleep(interval)
@@ -164,6 +168,12 @@ func (c *Committer) commit(blockData []common.BlockData) error {
164168
}
165169
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
166170

171+
go func() {
172+
if err := c.publisher.PublishBlockData(blockData); err != nil {
173+
log.Error().Err(err).Msg("Failed to publish block data to kafka")
174+
}
175+
}()
176+
167177
// TODO if next parts (saving or deleting) fail, we'll have to do a rollback
168178
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
169179
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)

internal/orchestrator/reorg_handler.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
config "github.com/thirdweb-dev/indexer/configs"
1313
"github.com/thirdweb-dev/indexer/internal/common"
1414
"github.com/thirdweb-dev/indexer/internal/metrics"
15+
"github.com/thirdweb-dev/indexer/internal/publisher"
1516
"github.com/thirdweb-dev/indexer/internal/rpc"
1617
"github.com/thirdweb-dev/indexer/internal/storage"
1718
"github.com/thirdweb-dev/indexer/internal/worker"
@@ -24,6 +25,7 @@ type ReorgHandler struct {
2425
blocksPerScan int
2526
lastCheckedBlock *big.Int
2627
worker *worker.Worker
28+
publisher *publisher.Publisher
2729
}
2830

2931
const DEFAULT_REORG_HANDLER_INTERVAL = 1000
@@ -45,6 +47,7 @@ func NewReorgHandler(rpc rpc.IRPCClient, storage storage.IStorage) *ReorgHandler
4547
triggerInterval: triggerInterval,
4648
blocksPerScan: blocksPerScan,
4749
lastCheckedBlock: getInitialCheckedBlockNumber(storage, rpc.GetChainID()),
50+
publisher: publisher.GetInstance(),
4851
}
4952
}
5053

@@ -77,6 +80,7 @@ func (rh *ReorgHandler) Start(ctx context.Context) {
7780
select {
7881
case <-ctx.Done():
7982
log.Info().Msg("Reorg handler shutting down")
83+
rh.publisher.Close()
8084
return
8185
case <-ticker.C:
8286
mostRecentBlockChecked, err := rh.RunFromBlock(rh.lastCheckedBlock)
@@ -278,11 +282,21 @@ func (rh *ReorgHandler) handleReorg(reorgedBlockNumbers []*big.Int) error {
278282
blocksToDelete = append(blocksToDelete, result.BlockNumber)
279283
}
280284
// TODO make delete and insert atomic
281-
if _, err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blocksToDelete); err != nil {
285+
deletedBlockData, err := rh.storage.MainStorage.DeleteBlockData(rh.rpc.GetChainID(), blocksToDelete)
286+
if err != nil {
282287
return fmt.Errorf("error deleting data for blocks %v: %w", blocksToDelete, err)
283288
}
284289
if err := rh.storage.MainStorage.InsertBlockData(data); err != nil {
285290
return fmt.Errorf("error saving data to main storage: %w", err)
286291
}
292+
if rh.publisher != nil {
293+
// Publish block data asynchronously
294+
go func() {
295+
// TODO: get actual deleted data here
296+
if err := rh.publisher.PublishReorg(deletedBlockData, data); err != nil {
297+
log.Error().Err(err).Msg("Failed to publish reorg data to kafka")
298+
}
299+
}()
300+
}
287301
return nil
288302
}

0 commit comments

Comments
 (0)