Skip to content

Commit fe27dc1

Browse files
committed
enable committer message compression per record
1 parent 714b211 commit fe27dc1

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

internal/storage/kafka_publisher.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sync"
1111
"time"
1212

13+
"github.com/DataDog/zstd"
1314
"github.com/rs/zerolog/log"
1415
config "github.com/thirdweb-dev/indexer/configs"
1516
"github.com/thirdweb-dev/indexer/internal/common"
@@ -306,19 +307,27 @@ func (p *KafkaPublisher) createBlockRevertMessage(chainId uint64, blockNumber ui
306307
}
307308

308309
func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, blockNumber uint64, timestamp time.Time, msgJson []byte) (*kgo.Record, error) {
309-
// Create headers with metadata
310+
// Compress the JSON data using zstd
311+
compressedData, err := zstd.Compress(nil, msgJson)
312+
if err != nil {
313+
return nil, fmt.Errorf("failed to compress message data: %v", err)
314+
}
315+
316+
// Create headers with metadata including compression info
310317
headers := []kgo.RecordHeader{
311318
{Key: "chain_id", Value: []byte(fmt.Sprintf("%d", chainId))},
312319
{Key: "block_number", Value: []byte(fmt.Sprintf("%d", blockNumber))},
313320
{Key: "type", Value: []byte(fmt.Sprintf("%s", msgType))},
314321
{Key: "timestamp", Value: []byte(timestamp.Format(time.RFC3339Nano))},
315322
{Key: "schema_version", Value: []byte("1")},
323+
{Key: "original_size", Value: []byte(fmt.Sprintf("%d", len(msgJson)))},
324+
{Key: "content-encoding", Value: []byte("zstd")},
316325
}
317326

318327
return &kgo.Record{
319328
Topic: fmt.Sprintf("insight.commit.blocks.%d", chainId),
320329
Key: []byte(fmt.Sprintf("%d:%s:%d", chainId, msgType, blockNumber)),
321-
Value: msgJson,
330+
Value: compressedData,
322331
Headers: headers,
323332
Partition: 0,
324333
}, nil

0 commit comments

Comments
 (0)