Skip to content

Commit efbdc81

Browse files
authored
migration boundry fix (#288)
* validation migration boundry fix * s3 flush timeout 5mins
1 parent bd3eca1 commit efbdc81

File tree

3 files changed

+16
-5
lines changed

3 files changed

+16
-5
lines changed

cmd/migrate_valid.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,12 +348,19 @@ func (m *Migrator) DetermineMigrationBoundaries(targetStartBlock, targetEndBlock
348348
if err != nil {
349349
log.Fatal().Err(err).Msg("Failed to get latest block from main storage")
350350
}
351+
latestBlockRPC, err := m.rpcClient.GetLatestBlockNumber(context.Background())
352+
if err != nil {
353+
log.Fatal().Err(err).Msg("Failed to get latest block from RPC")
354+
}
351355
log.Info().Msgf("Latest block in main storage: %d", latestBlockStored)
352356

353357
endBlock := latestBlockStored
354-
if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockStored) < 0 {
358+
if targetEndBlock.Sign() > 0 && targetEndBlock.Cmp(latestBlockRPC) <= 0 {
355359
endBlock = targetEndBlock
356360
}
361+
if targetEndBlock.Uint64() == 0 {
362+
endBlock = latestBlockRPC
363+
}
357364

358365
startBlock := targetStartBlock
359366

configs/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ type S3StorageConfig struct {
9393
BufferSize int64 `mapstructure:"bufferSizeMB"` // Target buffer size in MB before flush
9494
BufferTimeout int `mapstructure:"bufferTimeoutSeconds"` // Max time in seconds before flush
9595
MaxBlocksPerFile int `mapstructure:"maxBlocksPerFile"` // Max blocks per parquet file (0 = no limit, only size/timeout triggers)
96+
FlushTimeout int `mapstructure:"flushTimeoutSeconds"` // Timeout in seconds for flush operations (default: 60)
9697
}
9798

9899
type ParquetConfig struct {

internal/storage/s3.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ func NewS3Connector(cfg *config.S3StorageConfig) (*S3Connector, error) {
103103
if cfg.BufferTimeout == 0 {
104104
cfg.BufferTimeout = 1 * 60 * 60 // 1 hour in seconds default
105105
}
106+
if cfg.FlushTimeout == 0 {
107+
cfg.FlushTimeout = 300 // 5 mins default
108+
}
106109

107110
// Create formatter based on format
108111
var formatter DataFormatter
@@ -309,17 +312,17 @@ func (s *S3Connector) Flush() error {
309312
select {
310313
case <-s.flushDoneCh:
311314
return nil
312-
case <-time.After(60 * time.Second):
313-
return fmt.Errorf("flush timeout after 60 seconds")
315+
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
316+
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
314317
}
315318
default:
316319
// Flush channel is full, likely a flush is already in progress
317320
// Wait for it to complete
318321
select {
319322
case <-s.flushDoneCh:
320323
return nil
321-
case <-time.After(60 * time.Second):
322-
return fmt.Errorf("flush timeout after 60 seconds")
324+
case <-time.After(time.Duration(s.config.FlushTimeout) * time.Second):
325+
return fmt.Errorf("flush timeout after %d seconds", s.config.FlushTimeout)
323326
}
324327
}
325328
}

0 commit comments

Comments
 (0)