Skip to content

Commit 4c5320e

Browse files
committed
fix: refine publisher pre-commit flag handling
1 parent 7bfa6de commit 4c5320e

File tree

10 files changed

+282
-15
lines changed

10 files changed

+282
-15
lines changed

cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ func init() {
135135
rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request")
136136
rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request")
137137
rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher")
138+
rootCmd.PersistentFlags().String("publisher-mode", "post-commit", "Publisher mode: pre-commit or post-commit")
138139
rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers")
139140
rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher")
140141
rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks")
@@ -250,6 +251,7 @@ func init() {
250251
viper.BindPFlag("api.contractApiRequest.disableCompression", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-disableCompression"))
251252
viper.BindPFlag("api.contractApiRequest.timeout", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-timeout"))
252253
viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled"))
254+
viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode"))
253255
viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers"))
254256
viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled"))
255257
viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName"))

configs/config.example.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,11 @@ api:
190190
publisher:
191191
# Whether the publisher is enabled
192192
enabled: true
193+
# Publisher mode: "pre-commit" publishes before writing to storage, "post-commit" publishes after commit
194+
mode: post-commit
193195
# Kafka broker addresses (comma-separated)
194196
brokers: localhost:9092
195-
197+
196198
# Block publishing configuration
197199
blocks:
198200
# Whether to publish block data

configs/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ type EventPublisherConfig struct {
172172

173173
type PublisherConfig struct {
174174
Enabled bool `mapstructure:"enabled"`
175+
Mode string `mapstructure:"mode"`
175176
Brokers string `mapstructure:"brokers"`
176177
Username string `mapstructure:"username"`
177178
Password string `mapstructure:"password"`

configs/test_config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ api:
6464

6565
publisher:
6666
enabled: false
67+
mode: post-commit
6768

6869
validation:
6970
mode: minimal

internal/orchestrator/committer.go

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,39 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
295295

296296
func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error {
297297
blockNumbers := make([]*big.Int, len(blockData))
298+
highestBlock := blockData[0].Block
298299
for i, block := range blockData {
299300
blockNumbers[i] = block.Block.Number
301+
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
302+
highestBlock = block.Block
303+
}
300304
}
301305
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
302306

307+
shouldPostCommitPublish := true
308+
309+
if config.Cfg.Publisher.Mode == "pre-commit" {
310+
chainID := c.rpc.GetChainID()
311+
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
312+
if err != nil {
313+
log.Error().Err(err).Msg("Failed to get last published block number, falling back to post-commit")
314+
} else if lastPublished == nil || lastPublished.Cmp(highestBlock.Number) < 0 {
315+
go func() {
316+
if err := c.publisher.PublishBlockData(blockData); err != nil {
317+
log.Error().Err(err).Msg("Failed to publish block data to kafka")
318+
return
319+
}
320+
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highestBlock.Number); err != nil {
321+
log.Error().Err(err).Msg("Failed to set last published block number")
322+
}
323+
}()
324+
shouldPostCommitPublish = false
325+
} else {
326+
log.Debug().Msgf("Skipping publish, latest published block %s >= current %s", lastPublished.String(), highestBlock.Number.String())
327+
shouldPostCommitPublish = false
328+
}
329+
}
330+
303331
mainStorageStart := time.Now()
304332
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
305333
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
@@ -308,11 +336,13 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
308336
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
309337
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())
310338

311-
go func() {
312-
if err := c.publisher.PublishBlockData(blockData); err != nil {
313-
log.Error().Err(err).Msg("Failed to publish block data to kafka")
314-
}
315-
}()
339+
if shouldPostCommitPublish {
340+
go func() {
341+
if err := c.publisher.PublishBlockData(blockData); err != nil {
342+
log.Error().Err(err).Msg("Failed to publish block data to kafka")
343+
}
344+
}()
345+
}
316346

317347
if c.workMode == WorkModeBackfill {
318348
go func() {
@@ -325,13 +355,6 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
325355
}()
326356
}
327357

328-
// Find highest block number from committed blocks
329-
highestBlock := blockData[0].Block
330-
for _, block := range blockData {
331-
if block.Block.Number.Cmp(highestBlock.Number) > 0 {
332-
highestBlock = block.Block
333-
}
334-
}
335358
c.lastCommittedBlock = new(big.Int).Set(highestBlock.Number)
336359

337360
// Update metrics for successful commits

internal/orchestrator/committer_test.go

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package orchestrator
22

33
import (
44
"context"
5+
"errors"
56
"math/big"
67
"testing"
78
"time"
@@ -324,9 +325,10 @@ func TestCommit(t *testing.T) {
324325
committer := NewCommitter(mockRPC, mockStorage)
325326
committer.workMode = WorkModeBackfill
326327

328+
chainID := big.NewInt(1)
327329
blockData := []common.BlockData{
328-
{Block: common.Block{Number: big.NewInt(101)}},
329-
{Block: common.Block{Number: big.NewInt(102)}},
330+
{Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}},
331+
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
330332
}
331333

332334
// Create a channel to signal when DeleteStagingData is called
@@ -350,6 +352,81 @@ func TestCommit(t *testing.T) {
350352
}
351353
}
352354

355+
func TestCommitPreCommitPublisherMode(t *testing.T) {
356+
defer func() { config.Cfg = config.Config{} }()
357+
config.Cfg.Publisher.Mode = "pre-commit"
358+
359+
mockRPC := mocks.NewMockIRPCClient(t)
360+
mockMainStorage := mocks.NewMockIMainStorage(t)
361+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
362+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
363+
mockStorage := storage.IStorage{
364+
MainStorage: mockMainStorage,
365+
StagingStorage: mockStagingStorage,
366+
OrchestratorStorage: mockOrchestratorStorage,
367+
}
368+
committer := NewCommitter(mockRPC, mockStorage)
369+
committer.workMode = WorkModeLive
370+
371+
chainID := big.NewInt(1)
372+
blockData := []common.BlockData{
373+
{Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}},
374+
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
375+
}
376+
377+
publishDone := make(chan struct{})
378+
379+
mockRPC.EXPECT().GetChainID().Return(chainID)
380+
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil)
381+
mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error {
382+
close(publishDone)
383+
return nil
384+
})
385+
mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil)
386+
387+
err := committer.commit(context.Background(), blockData)
388+
assert.NoError(t, err)
389+
390+
select {
391+
case <-publishDone:
392+
case <-time.After(2 * time.Second):
393+
t.Fatal("SetLastPublishedBlockNumber was not called")
394+
}
395+
}
396+
397+
func TestCommitPreCommitPublisherModeFallback(t *testing.T) {
398+
defer func() { config.Cfg = config.Config{} }()
399+
config.Cfg.Publisher.Mode = "pre-commit"
400+
401+
mockRPC := mocks.NewMockIRPCClient(t)
402+
mockMainStorage := mocks.NewMockIMainStorage(t)
403+
mockStagingStorage := mocks.NewMockIStagingStorage(t)
404+
mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t)
405+
mockStorage := storage.IStorage{
406+
MainStorage: mockMainStorage,
407+
StagingStorage: mockStagingStorage,
408+
OrchestratorStorage: mockOrchestratorStorage,
409+
}
410+
committer := NewCommitter(mockRPC, mockStorage)
411+
committer.workMode = WorkModeLive
412+
413+
chainID := big.NewInt(1)
414+
blockData := []common.BlockData{
415+
{Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}},
416+
{Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}},
417+
}
418+
419+
mockRPC.EXPECT().GetChainID().Return(chainID)
420+
mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(nil, errors.New("boom"))
421+
mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil)
422+
423+
err := committer.commit(context.Background(), blockData)
424+
assert.NoError(t, err)
425+
426+
time.Sleep(100 * time.Millisecond)
427+
mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything)
428+
}
429+
353430
func TestHandleGap(t *testing.T) {
354431
mockRPC := mocks.NewMockIRPCClient(t)
355432
mockMainStorage := mocks.NewMockIMainStorage(t)

internal/storage/clickhouse.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,6 +1075,31 @@ func (c *ClickHouseConnector) DeleteStagingData(data []common.BlockData) error {
10751075
return batch.Send()
10761076
}
10771077

1078+
func (c *ClickHouseConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) {
1079+
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'publish'", c.cfg.Database)
1080+
if chainId.Sign() > 0 {
1081+
query += fmt.Sprintf(" AND chain_id = %s", chainId.String())
1082+
}
1083+
var blockNumberString string
1084+
err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString)
1085+
if err != nil {
1086+
if err == sql.ErrNoRows {
1087+
return big.NewInt(0), nil
1088+
}
1089+
return nil, err
1090+
}
1091+
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
1092+
if !ok {
1093+
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
1094+
}
1095+
return blockNumber, nil
1096+
}
1097+
1098+
func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
1099+
query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String())
1100+
return c.conn.Exec(context.Background(), query)
1101+
}
1102+
10781103
func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) {
10791104
query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database)
10801105
if chainId.Sign() > 0 {

internal/storage/connector.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ type IStagingStorage interface {
8383
GetStagingData(qf QueryFilter) (data []common.BlockData, err error)
8484
DeleteStagingData(data []common.BlockData) error
8585
GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
86+
GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
87+
SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
8688
DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error
8789
}
8890

internal/storage/postgres.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,35 @@ func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error {
344344
return err
345345
}
346346

347+
func (p *PostgresConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) {
348+
query := `SELECT cursor_value FROM cursors WHERE cursor_type = 'publish' AND chain_id = $1`
349+
350+
var blockNumberString string
351+
err := p.db.QueryRow(query, chainId.String()).Scan(&blockNumberString)
352+
if err != nil {
353+
if err == sql.ErrNoRows {
354+
return big.NewInt(0), nil
355+
}
356+
return nil, err
357+
}
358+
359+
blockNumber, ok := new(big.Int).SetString(blockNumberString, 10)
360+
if !ok {
361+
return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString)
362+
}
363+
return blockNumber, nil
364+
}
365+
366+
func (p *PostgresConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error {
367+
query := `INSERT INTO cursors (chain_id, cursor_type, cursor_value)
368+
VALUES ($1, 'publish', $2)
369+
ON CONFLICT (chain_id, cursor_type)
370+
DO UPDATE SET cursor_value = EXCLUDED.cursor_value, updated_at = NOW()`
371+
372+
_, err := p.db.Exec(query, chainId.String(), blockNumber.String())
373+
return err
374+
}
375+
347376
func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) {
348377
query := `SELECT MAX(block_number) FROM block_data WHERE 1=1`
349378

0 commit comments

Comments
 (0)