Skip to content

Commit 0bf3097

Browse files
committed
Update schema payload
1 parent 20dc471 commit 0bf3097

File tree

2 files changed

+117
-56
lines changed

2 files changed

+117
-56
lines changed

internal/storage/kafka_publisher.go

Lines changed: 102 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"crypto/tls"
66
"encoding/json"
77
"fmt"
8-
"hash/fnv"
9-
"math"
108
"net"
119
"strings"
1210
"sync"
@@ -25,12 +23,39 @@ type KafkaPublisher struct {
2523
chainID string
2624
}
2725

28-
type PublishableBlockMessage struct {
26+
type MessageType string
27+
28+
type PublishableData interface {
29+
GetType() MessageType
30+
}
31+
32+
type PublishableMessagePayload struct {
33+
Data PublishableData `json:"data"`
34+
Type MessageType `json:"type"`
35+
Timestamp time.Time `json:"timestamp"`
36+
}
37+
38+
type PublishableMessageBlockData struct {
2939
common.BlockData
3040
Sign int8 `json:"sign"`
3141
InsertTimestamp time.Time `json:"insert_timestamp"`
3242
}
3343

44+
type PublishableMessageRevert struct {
45+
ChainId uint64 `json:"chain_id"`
46+
BlockNumber uint64 `json:"block_number"`
47+
Sign int8 `json:"sign"`
48+
InsertTimestamp time.Time `json:"insert_timestamp"`
49+
}
50+
51+
func (b PublishableMessageBlockData) GetType() MessageType {
52+
return "block_data"
53+
}
54+
55+
func (b PublishableMessageRevert) GetType() MessageType {
56+
return "revert"
57+
}
58+
3459
// NewKafkaPublisher method for storage connector (public)
3560
func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) {
3661
brokers := strings.Split(cfg.Brokers, ",")
@@ -91,6 +116,12 @@ func (p *KafkaPublisher) PublishBlockData(blockData []common.BlockData) error {
91116
}
92117

93118
func (p *KafkaPublisher) PublishReorg(oldData []common.BlockData, newData []common.BlockData) error {
119+
newHead := uint64(newData[0].Block.Number.Uint64())
120+
// Publish revert the revert to the new head - 1, so that the new updated block data can be re-processed
121+
if err := p.publishBlockRevert(newData[0].ChainId, newHead-1); err != nil {
122+
return fmt.Errorf("failed to revert: %v", err)
123+
}
124+
94125
if err := p.publishBlockData(oldData, true); err != nil {
95126
return fmt.Errorf("failed to publish old block data: %v", err)
96127
}
@@ -149,6 +180,27 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re
149180
return nil
150181
}
151182

183+
func (p *KafkaPublisher) publishBlockRevert(chainId uint64, blockNumber uint64) error {
184+
publishStart := time.Now()
185+
186+
// Prepare messages for blocks, events, transactions and traces
187+
blockMessages := make([]*kgo.Record, 1)
188+
189+
// Block message
190+
if blockMsg, err := p.createBlockRevertMessage(chainId, blockNumber); err == nil {
191+
blockMessages[0] = blockMsg
192+
} else {
193+
return fmt.Errorf("failed to create block revert message: %v", err)
194+
}
195+
196+
if err := p.publishMessages(context.Background(), blockMessages); err != nil {
197+
return fmt.Errorf("failed to publish block revert messages: %v", err)
198+
}
199+
200+
log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds())
201+
return nil
202+
}
203+
152204
func (p *KafkaPublisher) publishBlockData(blockData []common.BlockData, isDeleted bool) error {
153205
if len(blockData) == 0 {
154206
return nil
@@ -176,47 +228,71 @@ func (p *KafkaPublisher) publishBlockData(blockData []common.BlockData, isDelete
176228
return nil
177229
}
178230

179-
func (p *KafkaPublisher) createBlockDataMessage(data common.BlockData, isDeleted bool) (*kgo.Record, error) {
180-
insertTimestamp := time.Now()
181-
msg := PublishableBlockMessage{
182-
BlockData: data.Serialize(),
231+
func (p *KafkaPublisher) createBlockDataMessage(block common.BlockData, isDeleted bool) (*kgo.Record, error) {
232+
timestamp := time.Now()
233+
234+
data := PublishableMessageBlockData{
235+
BlockData: block,
183236
Sign: 1,
184-
InsertTimestamp: insertTimestamp,
237+
InsertTimestamp: timestamp,
185238
}
186239
if isDeleted {
187-
msg.Sign = -1 // Indicate deletion with a negative sign
240+
data.Sign = -1
188241
}
242+
243+
msg := PublishableMessagePayload{
244+
Data: data,
245+
Type: data.GetType(),
246+
Timestamp: timestamp,
247+
}
248+
189249
msgJson, err := json.Marshal(msg)
190250
if err != nil {
191251
return nil, fmt.Errorf("failed to marshal block data: %v", err)
192252
}
193253

194-
// Determine partition based on chainID
195-
var partition int32
196-
if data.ChainId <= math.MaxInt32 {
197-
// Direct assignment for chain IDs that fit in int32
198-
partition = int32(data.ChainId)
199-
} else {
200-
// Hash for larger chain IDs to avoid overflow
201-
h := fnv.New32a()
202-
fmt.Fprintf(h, "%d", data.ChainId)
203-
partition = int32(h.Sum32() & 0x7FFFFFFF) // Ensure positive
254+
return p.createRecord(data.GetType(), block.ChainId, block.Block.Number.Uint64(), timestamp, msgJson)
255+
}
256+
257+
func (p *KafkaPublisher) createBlockRevertMessage(chainId uint64, blockNumber uint64) (*kgo.Record, error) {
258+
timestamp := time.Now()
259+
260+
data := PublishableMessageRevert{
261+
ChainId: chainId,
262+
BlockNumber: blockNumber,
263+
Sign: 1,
264+
InsertTimestamp: timestamp,
265+
}
266+
267+
msg := PublishableMessagePayload{
268+
Data: data,
269+
Type: data.GetType(),
270+
Timestamp: timestamp,
204271
}
205272

273+
msgJson, err := json.Marshal(msg)
274+
if err != nil {
275+
return nil, fmt.Errorf("failed to marshal block data: %v", err)
276+
}
277+
278+
return p.createRecord(data.GetType(), chainId, blockNumber, timestamp, msgJson)
279+
}
280+
281+
func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, blockNumber uint64, timestamp time.Time, msgJson []byte) (*kgo.Record, error) {
206282
// Create headers with metadata
207283
headers := []kgo.RecordHeader{
208-
{Key: "chain_id", Value: []byte(fmt.Sprintf("%d", data.ChainId))},
209-
{Key: "block_number", Value: []byte(fmt.Sprintf("%d", data.Block.Number))},
210-
{Key: "sign", Value: []byte(fmt.Sprintf("%d", msg.Sign))},
211-
{Key: "insert_timestamp", Value: []byte(insertTimestamp.Format(time.RFC3339Nano))},
284+
{Key: "chain_id", Value: []byte(fmt.Sprintf("%d", chainId))},
285+
{Key: "block_number", Value: []byte(fmt.Sprintf("%d", blockNumber))},
286+
{Key: "type", Value: []byte(fmt.Sprintf("%s", msgType))},
287+
{Key: "timestamp", Value: []byte(timestamp.Format(time.RFC3339Nano))},
212288
{Key: "schema_version", Value: []byte("1")},
213289
}
214290

215291
return &kgo.Record{
216-
Topic: "insight.commit.blocks",
217-
Key: []byte(fmt.Sprintf("blockdata-%d-%d-%s-%d", data.ChainId, data.Block.Number, data.Block.Hash, msg.Sign)),
292+
Topic: fmt.Sprintf("insight.commit.blocks.%d", chainId),
293+
Key: []byte(fmt.Sprintf("%d:%s:%d", chainId, msgType, blockNumber)),
218294
Value: msgJson,
219295
Headers: headers,
220-
Partition: partition,
296+
Partition: 0,
221297
}, nil
222298
}

internal/storage/kafka_redis.go

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -184,27 +184,12 @@ func (kr *KafkaRedisConnector) ReplaceBlockData(data []common.BlockData) ([]comm
184184

185185
oldBlocks := []common.BlockData{}
186186

187-
// Publish reorg event to Kafka
188-
// TODO: Publish new blocks (the reorg handler will mark old ones as reverted)
189-
if err := kr.kafkaPublisher.PublishBlockData(data); err != nil {
187+
// TODO: We need to fetch the old blocks from the primary data store
188+
if err := kr.kafkaPublisher.PublishReorg(data, data); err != nil {
190189
return nil, fmt.Errorf("failed to publish reorg blocks to kafka: %w", err)
191190
}
192191

193-
// Update cursor to track the highest block number
194-
if len(data) > 0 {
195-
var maxBlock *big.Int
196-
for _, blockData := range data {
197-
if maxBlock == nil || blockData.Block.Number.Cmp(maxBlock) > 0 {
198-
maxBlock = blockData.Block.Number
199-
}
200-
}
201-
if maxBlock != nil {
202-
if err := kr.SetLastPublishedBlockNumber(data[0].Block.ChainId, maxBlock); err != nil {
203-
return nil, fmt.Errorf("failed to update published block cursor: %w", err)
204-
}
205-
}
206-
}
207-
192+
// save cursor
208193
return oldBlocks, nil
209194
}
210195

@@ -250,48 +235,48 @@ func (kr *KafkaRedisConnector) GetMaxBlockNumberInRange(chainId *big.Int, startB
250235
}
251236

252237
func (kr *KafkaRedisConnector) GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) ([]common.BlockHeader, error) {
253-
return []common.BlockHeader{}, nil
238+
return nil, fmt.Errorf("query operations are not supported with KafkaRedis connector - this is a write-only connector for streaming")
254239
}
255240

256241
func (kr *KafkaRedisConnector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error) {
257-
return QueryResult[common.TokenBalance]{Data: []common.TokenBalance{}}, nil
242+
return QueryResult[common.TokenBalance]{}, fmt.Errorf("query operations are not supported with KafkaRedis connector - this is a write-only connector for streaming")
258243
}
259244

260245
func (kr *KafkaRedisConnector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error) {
261-
return QueryResult[common.TokenTransfer]{Data: []common.TokenTransfer{}}, nil
246+
return QueryResult[common.TokenTransfer]{}, fmt.Errorf("query operations are not supported with KafkaRedis connector - this is a write-only connector for streaming")
262247
}
263248

264249
func (kr *KafkaRedisConnector) GetValidationBlockData(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) ([]common.BlockData, error) {
265-
return []common.BlockData{}, nil
250+
return nil, fmt.Errorf("query operations are not supported with KafkaRedis connector - this is a write-only connector for streaming")
266251
}
267252

268253
func (kr *KafkaRedisConnector) FindMissingBlockNumbers(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) ([]*big.Int, error) {
269-
return []*big.Int{}, nil
254+
return nil, fmt.Errorf("query operations are not supported with KafkaRedis connector - this is a write-only connector for streaming")
270255
}
271256

272257
func (kr *KafkaRedisConnector) GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) ([]common.BlockData, error) {
273-
return []common.BlockData{}, nil
258+
return nil, fmt.Errorf("query operations are not supported with KafkaRedis connector - this is a write-only connector for streaming")
274259
}
275260

276-
// Query methods return empty results as this connector uses Kafka for data delivery
261+
// Query methods return errors as this is a write-only connector for streaming
277262
func (kr *KafkaRedisConnector) GetBlocks(qf QueryFilter, fields ...string) (QueryResult[common.Block], error) {
278-
return QueryResult[common.Block]{Data: []common.Block{}}, nil
263+
return QueryResult[common.Block]{}, fmt.Errorf("query operations are not supported with KafkaRedis connector - this is a write-only connector for streaming")
279264
}
280265

281266
func (kr *KafkaRedisConnector) GetTransactions(qf QueryFilter, fields ...string) (QueryResult[common.Transaction], error) {
282-
return QueryResult[common.Transaction]{Data: []common.Transaction{}}, nil
267+
return QueryResult[common.Transaction]{}, fmt.Errorf("query operations are not supported with KafkaRedis connector - this is a write-only connector for streaming")
283268
}
284269

285270
func (kr *KafkaRedisConnector) GetLogs(qf QueryFilter, fields ...string) (QueryResult[common.Log], error) {
286-
return QueryResult[common.Log]{Data: []common.Log{}}, nil
271+
return QueryResult[common.Log]{}, fmt.Errorf("query operations are not supported with KafkaRedis connector - this is a write-only connector for streaming")
287272
}
288273

289274
func (kr *KafkaRedisConnector) GetTraces(qf QueryFilter, fields ...string) (QueryResult[common.Trace], error) {
290-
return QueryResult[common.Trace]{Data: []common.Trace{}}, nil
275+
return QueryResult[common.Trace]{}, fmt.Errorf("query operations are not supported with KafkaRedis connector - this is a write-only connector for streaming")
291276
}
292277

293278
func (kr *KafkaRedisConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error) {
294-
return QueryResult[interface{}]{Aggregates: []map[string]interface{}{}}, nil
279+
return QueryResult[interface{}]{}, fmt.Errorf("query operations are not supported with KafkaRedis connector - this is a write-only connector for streaming")
295280
}
296281

297282
// Close closes the Redis connection

0 commit comments

Comments
 (0)