Skip to content

Commit 6f2b72d

Browse files
committed
Merge branch 'main' into jl/commit-kafka
2 parents 0bf3097 + 3a03423 commit 6f2b72d

File tree

4 files changed

+82
-15
lines changed

4 files changed

+82
-15
lines changed

cmd/root.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ func init() {
8484
rootCmd.PersistentFlags().Bool("storage-orchestrator-clickhouse-disableTLS", false, "Clickhouse disableTLS for orchestrator storage")
8585
rootCmd.PersistentFlags().Bool("storage-orchestrator-clickhouse-enableParallelViewProcessing", false, "Clickhouse enableParallelViewProcessing for orchestrator storage")
8686
rootCmd.PersistentFlags().Int("storage-orchestrator-clickhouse-maxQueryTime", 60, "Clickhouse max query time for orchestrator storage")
87+
rootCmd.PersistentFlags().Int("storage-orchestrator-clickhouse-maxMemoryUsage", 1000000000, "Clickhouse max memory usage in bytes for orchestrator storage")
8788
rootCmd.PersistentFlags().String("storage-orchestrator-postgres-host", "", "PostgreSQL host for orchestrator storage")
8889
rootCmd.PersistentFlags().Int("storage-orchestrator-postgres-port", 5432, "PostgreSQL port for orchestrator storage")
8990
rootCmd.PersistentFlags().String("storage-orchestrator-postgres-username", "", "PostgreSQL username for orchestrator storage")
@@ -105,6 +106,7 @@ func init() {
105106
rootCmd.PersistentFlags().Bool("storage-main-clickhouse-disableTLS", false, "Clickhouse disableTLS for main storage")
106107
rootCmd.PersistentFlags().Bool("storage-main-clickhouse-enableParallelViewProcessing", false, "Clickhouse enableParallelViewProcessing for main storage")
107108
rootCmd.PersistentFlags().Int("storage-main-clickhouse-maxQueryTime", 60, "Clickhouse max query time for main storage")
109+
rootCmd.PersistentFlags().Int("storage-main-clickhouse-maxMemoryUsage", 1000000000, "Clickhouse max memory usage in bytes for main storage")
108110
rootCmd.PersistentFlags().String("storage-staging-clickhouse-username", "", "Clickhouse username for staging storage")
109111
rootCmd.PersistentFlags().String("storage-staging-clickhouse-password", "", "Clickhouse password for staging storage")
110112
rootCmd.PersistentFlags().Bool("storage-staging-clickhouse-asyncInsert", false, "Clickhouse async insert for staging storage")
@@ -114,6 +116,7 @@ func init() {
114116
rootCmd.PersistentFlags().Bool("storage-staging-clickhouse-disableTLS", false, "Clickhouse disableTLS for staging storage")
115117
rootCmd.PersistentFlags().Bool("storage-staging-clickhouse-enableParallelViewProcessing", false, "Clickhouse enableParallelViewProcessing for staging storage")
116118
rootCmd.PersistentFlags().Int("storage-staging-clickhouse-maxQueryTime", 60, "Clickhouse max query time for staging storage")
119+
rootCmd.PersistentFlags().Int("storage-staging-clickhouse-maxMemoryUsage", 1000000000, "Clickhouse max memory usage in bytes for staging storage")
117120
rootCmd.PersistentFlags().String("storage-staging-postgres-host", "", "PostgreSQL host for staging storage")
118121
rootCmd.PersistentFlags().Int("storage-staging-postgres-port", 5432, "PostgreSQL port for staging storage")
119122
rootCmd.PersistentFlags().String("storage-staging-postgres-username", "", "PostgreSQL username for staging storage")
@@ -211,6 +214,7 @@ func init() {
211214
viper.BindPFlag("storage.staging.clickhouse.disableTLS", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-disableTLS"))
212215
viper.BindPFlag("storage.staging.clickhouse.enableParallelViewProcessing", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-enableParallelViewProcessing"))
213216
viper.BindPFlag("storage.staging.clickhouse.maxQueryTime", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-maxQueryTime"))
217+
viper.BindPFlag("storage.staging.clickhouse.maxMemoryUsage", rootCmd.PersistentFlags().Lookup("storage-staging-clickhouse-maxMemoryUsage"))
214218
viper.BindPFlag("storage.main.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-database"))
215219
viper.BindPFlag("storage.main.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-host"))
216220
viper.BindPFlag("storage.main.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-port"))
@@ -223,6 +227,7 @@ func init() {
223227
viper.BindPFlag("storage.main.clickhouse.disableTLS", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-disableTLS"))
224228
viper.BindPFlag("storage.main.clickhouse.enableParallelViewProcessing", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-enableParallelViewProcessing"))
225229
viper.BindPFlag("storage.main.clickhouse.maxQueryTime", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-maxQueryTime"))
230+
viper.BindPFlag("storage.main.clickhouse.maxMemoryUsage", rootCmd.PersistentFlags().Lookup("storage-main-clickhouse-maxMemoryUsage"))
226231
viper.BindPFlag("storage.orchestrator.clickhouse.database", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-database"))
227232
viper.BindPFlag("storage.orchestrator.clickhouse.host", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-host"))
228233
viper.BindPFlag("storage.orchestrator.clickhouse.port", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-port"))
@@ -235,6 +240,7 @@ func init() {
235240
viper.BindPFlag("storage.orchestrator.clickhouse.disableTLS", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-disableTLS"))
236241
viper.BindPFlag("storage.orchestrator.clickhouse.enableParallelViewProcessing", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-enableParallelViewProcessing"))
237242
viper.BindPFlag("storage.orchestrator.clickhouse.maxQueryTime", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-maxQueryTime"))
243+
viper.BindPFlag("storage.orchestrator.clickhouse.maxMemoryUsage", rootCmd.PersistentFlags().Lookup("storage-orchestrator-clickhouse-maxMemoryUsage"))
238244
viper.BindPFlag("storage.orchestrator.postgres.host", rootCmd.PersistentFlags().Lookup("storage-orchestrator-postgres-host"))
239245
viper.BindPFlag("storage.orchestrator.postgres.port", rootCmd.PersistentFlags().Lookup("storage-orchestrator-postgres-port"))
240246
viper.BindPFlag("storage.orchestrator.postgres.username", rootCmd.PersistentFlags().Lookup("storage-orchestrator-postgres-username"))

configs/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ type ClickhouseConfig struct {
8686
ChainBasedConfig map[string]TableOverrideConfig `mapstructure:"chainBasedConfig"`
8787
EnableParallelViewProcessing bool `mapstructure:"enableParallelViewProcessing"`
8888
MaxQueryTime int `mapstructure:"maxQueryTime"`
89+
MaxMemoryUsage int `mapstructure:"maxMemoryUsage"`
8990
}
9091

9192
type PostgresConfig struct {

internal/orchestrator/committer.go

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func (c *Committer) Start(ctx context.Context) {
9696
c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64())
9797
}
9898

99+
// Initialize publisher position - always use max(lastPublished, lastCommitted) to prevent double publishing
99100
lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID)
100101
if err != nil {
101102
// It's okay to fail silently here; it's only used for staging cleanup and will be
@@ -128,6 +129,54 @@ func (c *Committer) Start(ctx context.Context) {
128129
c.lastPublishedBlock.Store(c.lastCommittedBlock.Load())
129130
}
130131

132+
// Determine the correct publish position - always take the maximum to avoid going backwards
133+
var targetPublishBlock *big.Int
134+
135+
if lastPublished == nil || lastPublished.Sign() == 0 {
136+
// No previous publish position
137+
if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 {
138+
// Start from committed position
139+
targetPublishBlock = latestCommittedBlockNumber
140+
} else if c.commitFromBlock.Sign() > 0 {
141+
// Start from configured position minus 1 (since we publish from next block)
142+
targetPublishBlock = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
143+
} else {
144+
// Start from 0
145+
targetPublishBlock = big.NewInt(0)
146+
}
147+
148+
log.Info().
149+
Str("target_publish_block", targetPublishBlock.String()).
150+
Msg("No previous publish position, initializing publisher cursor")
151+
} else {
152+
// We have a previous position
153+
targetPublishBlock = lastPublished
154+
}
155+
156+
// Only update storage if we're changing the position
157+
if lastPublished == nil || targetPublishBlock.Cmp(lastPublished) != 0 {
158+
if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, targetPublishBlock); err != nil {
159+
log.Error().Err(err).Msg("Failed to update published block number in storage")
160+
// If we can't update storage, use what was there originally to avoid issues
161+
if lastPublished != nil {
162+
targetPublishBlock = lastPublished
163+
}
164+
}
165+
}
166+
167+
// Store in memory for quick acess
168+
c.lastPublishedBlock.Store(targetPublishBlock.Uint64())
169+
170+
log.Info().
171+
Str("publish_from", targetPublishBlock.String()).
172+
Str("committed_at", func() string {
173+
if latestCommittedBlockNumber != nil {
174+
return latestCommittedBlockNumber.String()
175+
}
176+
return "0"
177+
}()).
178+
Msg("Publisher initialized")
179+
131180
c.cleanupProcessedStagingBlocks()
132181

133182
if config.Cfg.Publisher.Mode == "parallel" {
@@ -290,25 +339,27 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er
290339
}
291340

292341
func (c *Committer) getBlockNumbersToPublish(ctx context.Context) ([]*big.Int, error) {
293-
lastestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID())
294-
log.Debug().Msgf("Committer found this last published block number in staging storage: %s", lastestPublishedBlockNumber.String())
342+
// Get the last published block from storage (which was already corrected in Start)
343+
latestPublishedBlockNumber, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(c.rpc.GetChainID())
295344
if err != nil {
296-
return nil, err
345+
return nil, fmt.Errorf("failed to get last published block number: %v", err)
297346
}
298347

299-
if lastestPublishedBlockNumber.Sign() == 0 {
300-
// If no blocks have been committed yet, start from the fromBlock specified in the config
301-
lastestPublishedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1))
302-
} else {
303-
lastPublished := new(big.Int).SetUint64(c.lastPublishedBlock.Load())
304-
if lastestPublishedBlockNumber.Cmp(lastPublished) < 0 {
305-
log.Warn().Msgf("Max block in storage (%s) is less than last published block in memory (%s).", lastestPublishedBlockNumber.String(), lastPublished.String())
306-
return []*big.Int{}, nil
307-
}
348+
// This should never happen after Start() has run, but handle it defensively
349+
if latestPublishedBlockNumber == nil || latestPublishedBlockNumber.Sign() == 0 {
350+
// Fall back to in-memory value which was set during Start
351+
latestPublishedBlockNumber = new(big.Int).SetUint64(c.lastPublishedBlock.Load())
352+
log.Warn().
353+
Str("fallback_value", latestPublishedBlockNumber.String()).
354+
Msg("Storage returned nil/0 for last published block, using in-memory value")
308355
}
309356

310-
startBlock := new(big.Int).Add(lastestPublishedBlockNumber, big.NewInt(1))
311-
endBlock, err := c.getBlockToCommitUntil(ctx, lastestPublishedBlockNumber)
357+
log.Debug().
358+
Str("last_published", latestPublishedBlockNumber.String()).
359+
Msg("Determining blocks to publish")
360+
361+
startBlock := new(big.Int).Add(latestPublishedBlockNumber, big.NewInt(1))
362+
endBlock, err := c.getBlockToCommitUntil(ctx, latestPublishedBlockNumber)
312363
if err != nil {
313364
return nil, fmt.Errorf("error getting block to commit until: %v", err)
314365
}

internal/storage/clickhouse.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -670,8 +670,17 @@ func (c *ClickHouseConnector) addPostQueryClauses(query string, qf QueryFilter)
670670
}
671671

672672
// Add settings at the very end
673+
// Build settings string for ClickHouse query optimization
674+
var settings []string
673675
if c.cfg.MaxQueryTime > 0 {
674-
query += fmt.Sprintf(" SETTINGS max_execution_time = %d", c.cfg.MaxQueryTime)
676+
settings = append(settings, fmt.Sprintf("max_execution_time = %d", c.cfg.MaxQueryTime))
677+
}
678+
if c.cfg.MaxMemoryUsage > 0 {
679+
settings = append(settings, fmt.Sprintf("max_memory_usage = %d", c.cfg.MaxMemoryUsage))
680+
}
681+
682+
if len(settings) > 0 {
683+
query += " SETTINGS " + strings.Join(settings, ", ")
675684
}
676685

677686
return query

0 commit comments

Comments
 (0)