55 "crypto/tls"
66 "encoding/json"
77 "fmt"
8+ "hash/fnv"
9+ "math"
810 "net"
911 "strings"
1012 "sync"
@@ -18,28 +20,37 @@ import (
1820)
1921
2022type KafkaPublisher struct {
21- client * kgo.Client
22- mu sync.RWMutex
23+ client * kgo.Client
24+ mu sync.RWMutex
25+ chainID string
2326}
2427
25- type PublishableMessage [T common.BlockData ] struct {
26- Data T `json:"data"`
27- Status string `json:"status"`
28+ type PublishableBlockMessage struct {
29+ common.BlockData
30+ Sign int8 `json:"sign"`
31+ InsertTimestamp time.Time `json:"insert_timestamp"`
2832}
2933
3034// NewKafkaPublisher method for storage connector (public)
3135func NewKafkaPublisher (cfg * config.KafkaConfig ) (* KafkaPublisher , error ) {
3236 brokers := strings .Split (cfg .Brokers , "," )
37+ chainID := config .Cfg .RPC .ChainID
38+
3339 opts := []kgo.Opt {
3440 kgo .SeedBrokers (brokers ... ),
3541 kgo .AllowAutoTopicCreation (),
36- kgo .ProducerBatchCompression (kgo .SnappyCompression ()),
37- kgo .ClientID (fmt .Sprintf ("insight-indexer-kafka-storage-%s" , config .Cfg .RPC .ChainID )),
42+ kgo .ProducerBatchCompression (kgo .ZstdCompression ()),
43+ kgo .ClientID (fmt .Sprintf ("insight-indexer-kafka-storage-%s" , chainID )),
44+ kgo .TransactionalID (fmt .Sprintf ("insight-producer-%s" , chainID )),
45+ kgo .MaxBufferedBytes (2 * 1024 * 1024 * 1024 ), // 2GB
3846 kgo .MaxBufferedRecords (1_000_000 ),
3947 kgo .ProducerBatchMaxBytes (16_000_000 ),
40- kgo .RecordPartitioner (kgo .UniformBytesPartitioner (1_000_000 , false , false , nil )),
48+ kgo .RecordPartitioner (kgo .ManualPartitioner ()),
49+ kgo .ProduceRequestTimeout (30 * time .Second ),
4150 kgo .MetadataMaxAge (60 * time .Second ),
4251 kgo .DialTimeout (10 * time .Second ),
52+ kgo .RequiredAcks (kgo .AllISRAcks ()),
53+ kgo .RequestRetries (5 ),
4354 }
4455
4556 if cfg .Username != "" && cfg .Password != "" {
@@ -68,8 +79,10 @@ func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) {
6879 }
6980
7081 publisher := & KafkaPublisher {
71- client : client ,
82+ client : client ,
83+ chainID : chainID ,
7284 }
85+
7386 return publisher , nil
7487}
7588
@@ -78,7 +91,6 @@ func (p *KafkaPublisher) PublishBlockData(blockData []common.BlockData) error {
7891}
7992
8093func (p * KafkaPublisher ) PublishReorg (oldData []common.BlockData , newData []common.BlockData ) error {
81- // TODO: need to revisit how reorg blocks get published to downstream
8294 if err := p .publishBlockData (oldData , true ); err != nil {
8395 return fmt .Errorf ("failed to publish old block data: %v" , err )
8496 }
@@ -105,30 +117,39 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re
105117 return nil
106118 }
107119
108- p .mu .RLock ()
109- defer p .mu .RUnlock ()
120+ // Lock for the entire transaction lifecycle to ensure thread safety
121+ p .mu .Lock ()
122+ defer p .mu .Unlock ()
110123
111124 if p .client == nil {
112- return nil // Skip if no client configured
125+ return fmt .Errorf ("no kafka client configured" )
126+ }
127+
128+ // Start a new transaction
129+ if err := p .client .BeginTransaction (); err != nil {
130+ return fmt .Errorf ("failed to begin transaction: %v" , err )
113131 }
114132
115- var wg sync.WaitGroup
116- wg .Add (len (messages ))
117- // Publish to all configured producers
133+ // Produce all messages in the transaction
118134 for _ , msg := range messages {
119- p .client .Produce (ctx , msg , func (_ * kgo.Record , err error ) {
120- defer wg .Done ()
121- if err != nil {
122- log .Error ().Err (err ).Msg ("Failed to publish message to Kafka" )
123- }
124- })
135+ p .client .Produce (ctx , msg , nil )
136+ }
137+
138+ // Flush all messages
139+ if err := p .client .Flush (ctx ); err != nil {
140+ p .client .EndTransaction (ctx , kgo .TryAbort )
141+ return fmt .Errorf ("failed to flush messages: %v" , err )
142+ }
143+
144+ // Commit the transaction
145+ if err := p .client .EndTransaction (ctx , kgo .TryCommit ); err != nil {
146+ return fmt .Errorf ("failed to commit transaction: %v" , err )
125147 }
126- wg .Wait ()
127148
128149 return nil
129150}
130151
131- func (p * KafkaPublisher ) publishBlockData (blockData []common.BlockData , isReorg bool ) error {
152+ func (p * KafkaPublisher ) publishBlockData (blockData []common.BlockData , isDeleted bool ) error {
132153 if len (blockData ) == 0 {
133154 return nil
134155 }
@@ -138,15 +159,9 @@ func (p *KafkaPublisher) publishBlockData(blockData []common.BlockData, isReorg
138159 // Prepare messages for blocks, events, transactions and traces
139160 blockMessages := make ([]* kgo.Record , len (blockData ))
140161
141- // TODO: handle reorg
142- status := "new"
143- if isReorg {
144- status = "reverted"
145- }
146-
147162 for i , data := range blockData {
148163 // Block message
149- if blockMsg , err := p .createBlockDataMessage (data , status ); err == nil {
164+ if blockMsg , err := p .createBlockDataMessage (data , isDeleted ); err == nil {
150165 blockMessages [i ] = blockMsg
151166 } else {
152167 return fmt .Errorf ("failed to create block message: %v" , err )
@@ -161,27 +176,47 @@ func (p *KafkaPublisher) publishBlockData(blockData []common.BlockData, isReorg
161176 return nil
162177}
163178
164- func (p * KafkaPublisher ) createBlockDataMessage (data common.BlockData , status string ) (* kgo.Record , error ) {
165- msg := PublishableMessage [common.BlockData ]{
166- Data : data ,
167- Status : status ,
179+ func (p * KafkaPublisher ) createBlockDataMessage (data common.BlockData , isDeleted bool ) (* kgo.Record , error ) {
180+ insertTimestamp := time .Now ()
181+ msg := PublishableBlockMessage {
182+ BlockData : data .Serialize (),
183+ Sign : 1 ,
184+ InsertTimestamp : insertTimestamp ,
185+ }
186+ if isDeleted {
187+ msg .Sign = - 1 // Indicate deletion with a negative sign
168188 }
169189 msgJson , err := json .Marshal (msg )
170190 if err != nil {
171191 return nil , fmt .Errorf ("failed to marshal block data: %v" , err )
172192 }
173- return & kgo.Record {
174- Topic : p .getTopicName ("commit" , data .ChainId ),
175- Key : []byte (fmt .Sprintf ("block-%s-%d-%s" , status , data .ChainId , data .Block .Hash )),
176- Value : msgJson ,
177- }, nil
178- }
179193
180- func (p * KafkaPublisher ) getTopicName (entity string , chainId uint64 ) string {
181- switch entity {
182- case "commit" :
183- return fmt .Sprintf ("insight.commit.blocks.%d" , chainId )
184- default :
185- panic (fmt .Errorf ("unknown topic entity: %s" , entity ))
194+ // Determine partition based on chainID
195+ var partition int32
196+ if data .ChainId <= math .MaxInt32 {
197+ // Direct assignment for chain IDs that fit in int32
198+ partition = int32 (data .ChainId )
199+ } else {
200+ // Hash for larger chain IDs to avoid overflow
201+ h := fnv .New32a ()
202+ fmt .Fprintf (h , "%d" , data .ChainId )
203+ partition = int32 (h .Sum32 () & 0x7FFFFFFF ) // Ensure positive
186204 }
205+
206+ // Create headers with metadata
207+ headers := []kgo.RecordHeader {
208+ {Key : "chain_id" , Value : []byte (fmt .Sprintf ("%d" , data .ChainId ))},
209+ {Key : "block_number" , Value : []byte (fmt .Sprintf ("%d" , data .Block .Number ))},
210+ {Key : "sign" , Value : []byte (fmt .Sprintf ("%d" , msg .Sign ))},
211+ {Key : "insert_timestamp" , Value : []byte (insertTimestamp .Format (time .RFC3339Nano ))},
212+ {Key : "schema_version" , Value : []byte ("1" )},
213+ }
214+
215+ return & kgo.Record {
216+ Topic : "insight.commit.blocks" ,
217+ Key : []byte (fmt .Sprintf ("blockdata-%d-%d-%s-%d" , data .ChainId , data .Block .Number , data .Block .Hash , msg .Sign )),
218+ Value : msgJson ,
219+ Headers : headers ,
220+ Partition : partition ,
221+ }, nil
187222}
0 commit comments