feat(chainstate): Add json persister#2495
Conversation
Adds a json persister for the chainstate indexer which is intended to periodically persist state to disk in a json file. The amount of data we are storing with the indexer does not justify more complex storage (e.g ~1.8KB per operator). Storing in memory and persisting to disk for recovery is enough.
Codecov Report❌ Patch coverage is
❌ Your patch check has failed because the patch coverage (0.00%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## indexer-memstore #2495 +/- ##
====================================================
- Coverage 39.25% 39.19% -0.06%
====================================================
Files 555 556 +1
Lines 51359 51399 +40
====================================================
- Hits 20162 20147 -15
- Misses 28650 28701 +51
- Partials 2547 2551 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This pull request adds a JSON-based persistence mechanism for the chainstate indexer, which periodically saves the in-memory store state to disk. This is the fifth PR in a series building out the chainstate indexer functionality.
Changes:
- Introduces
JSONPersisterstruct to handle periodic state persistence to disk - Implements atomic file operations using temp file write + rename pattern
- Provides Load/Save operations and background periodic saving with graceful shutdown
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| tmpPath := p.path + ".tmp" | ||
| if err := os.WriteFile(tmpPath, data, 0644); err != nil { | ||
| return fmt.Errorf("failed to write temp file: %w", err) | ||
| } | ||
|
|
||
| if err := os.Rename(tmpPath, p.path); err != nil { | ||
| return fmt.Errorf("failed to rename temp file: %w", err) | ||
| } |
There was a problem hiding this comment.
The codebase has an established AtomicWrite utility function at litt/util/file_utils.go that provides better atomic file writing guarantees than the current implementation. It handles fsync of both the file and parent directory, and uses proper cleanup of temporary files. Consider using this utility instead of manually implementing atomic writes to maintain consistency across the codebase and improve durability guarantees.
| } | ||
|
|
||
| tmpPath := p.path + ".tmp" | ||
| if err := os.WriteFile(tmpPath, data, 0644); err != nil { |
There was a problem hiding this comment.
The file permissions 0644 allow any user on the system to read the chainstate data. If this data contains sensitive information about operators or the network state, consider using more restrictive permissions like 0600 (read/write only for the owner) to limit access to the file owner only.
| if err := os.WriteFile(tmpPath, data, 0644); err != nil { | |
| if err := os.WriteFile(tmpPath, data, 0600); err != nil { |
| return fmt.Errorf("failed to write temp file: %w", err) | ||
| } | ||
|
|
||
| if err := os.Rename(tmpPath, p.path); err != nil { |
There was a problem hiding this comment.
If the temporary file write succeeds but the rename operation fails, the temporary file will be left behind. Consider adding cleanup logic to remove the temporary file on error, or document that orphaned .tmp files may need to be cleaned up after failures.
| if err := os.Rename(tmpPath, p.path); err != nil { | |
| if err := os.Rename(tmpPath, p.path); err != nil { | |
| // Best-effort cleanup of the temporary file if rename fails. | |
| if removeErr := os.Remove(tmpPath); removeErr != nil && !os.IsNotExist(removeErr) { | |
| p.logger.Error("Failed to remove temp state file after rename error", "path", tmpPath, "error", removeErr) | |
| } |
| package store | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "os" | ||
| "time" | ||
|
|
||
| "github.com/Layr-Labs/eigensdk-go/logging" | ||
| ) | ||
|
|
||
| // JSONPersister handles periodic persistence of store state to a JSON file. | ||
| type JSONPersister struct { | ||
| store Store | ||
| path string | ||
| logger logging.Logger | ||
| } | ||
|
|
||
| // NewJSONPersister creates a new JSON persister for the given store. | ||
| func NewJSONPersister(store Store, path string, logger logging.Logger) *JSONPersister { | ||
| return &JSONPersister{ | ||
| store: store, | ||
| path: path, | ||
| logger: logger, | ||
| } | ||
| } | ||
|
|
||
| // Save persists the current store state to the configured JSON file. | ||
| // It uses atomic file operations (write to temp, then rename) to ensure consistency. | ||
| func (p *JSONPersister) Save(ctx context.Context) error { | ||
| data, err := p.store.Snapshot() | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create snapshot: %w", err) | ||
| } | ||
|
|
||
| tmpPath := p.path + ".tmp" | ||
| if err := os.WriteFile(tmpPath, data, 0644); err != nil { | ||
| return fmt.Errorf("failed to write temp file: %w", err) | ||
| } | ||
|
|
||
| if err := os.Rename(tmpPath, p.path); err != nil { | ||
| return fmt.Errorf("failed to rename temp file: %w", err) | ||
| } | ||
|
|
||
| p.logger.Info("State persisted", "path", p.path, "size_bytes", len(data)) | ||
| return nil | ||
| } | ||
|
|
||
| // Load restores the store state from the configured JSON file. | ||
| // If the file doesn't exist, it returns without error (fresh start). | ||
| func (p *JSONPersister) Load(ctx context.Context) error { | ||
| data, err := os.ReadFile(p.path) | ||
| if err != nil { | ||
| if os.IsNotExist(err) { | ||
| p.logger.Info("No existing state file, starting fresh", "path", p.path) | ||
| return nil | ||
| } | ||
| return fmt.Errorf("failed to read state file: %w", err) | ||
| } | ||
|
|
||
| if err := p.store.Restore(data); err != nil { | ||
| return fmt.Errorf("failed to restore state: %w", err) | ||
| } | ||
|
|
||
| p.logger.Info("State restored", "path", p.path, "size_bytes", len(data)) | ||
| return nil | ||
| } | ||
|
|
||
| // StartPeriodicSave starts a background goroutine that periodically saves the store state. | ||
| // It also performs a final save when the context is cancelled. | ||
| func (p *JSONPersister) StartPeriodicSave(ctx context.Context, interval time.Duration) { | ||
| ticker := time.NewTicker(interval) | ||
| defer ticker.Stop() | ||
|
|
||
| for { | ||
| select { | ||
| case <-ticker.C: | ||
| if err := p.Save(ctx); err != nil { | ||
| p.logger.Error("Failed to persist state", "error", err) | ||
| } | ||
| case <-ctx.Done(): | ||
| // Perform final save before shutdown | ||
| p.logger.Info("Context cancelled, performing final state save") | ||
| if err := p.Save(context.Background()); err != nil { | ||
| p.logger.Error("Failed final state save", "error", err) | ||
| } | ||
| return | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
There are no tests for the JSONPersister implementation. The repository has comprehensive test coverage across other modules. Consider adding tests to cover: successful Save/Load operations, handling of non-existent files in Load, atomic write behavior, StartPeriodicSave functionality with context cancellation, and error cases like write failures or corrupt JSON data during restoration.
| func (p *JSONPersister) Save(ctx context.Context) error { | ||
| data, err := p.store.Snapshot() | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create snapshot: %w", err) | ||
| } | ||
|
|
||
| tmpPath := p.path + ".tmp" | ||
| if err := os.WriteFile(tmpPath, data, 0644); err != nil { | ||
| return fmt.Errorf("failed to write temp file: %w", err) | ||
| } | ||
|
|
||
| if err := os.Rename(tmpPath, p.path); err != nil { | ||
| return fmt.Errorf("failed to rename temp file: %w", err) | ||
| } | ||
|
|
||
| p.logger.Info("State persisted", "path", p.path, "size_bytes", len(data)) | ||
| return nil |
There was a problem hiding this comment.
The context parameter passed to Save is not used. If the intention is to respect context cancellation during the save operation, consider checking ctx.Done() before performing the save, or remove the parameter if it's not needed. The final save in StartPeriodicSave uses context.Background(), which bypasses cancellation that might be in progress.
| func (p *JSONPersister) Load(ctx context.Context) error { | ||
| data, err := os.ReadFile(p.path) | ||
| if err != nil { | ||
| if os.IsNotExist(err) { | ||
| p.logger.Info("No existing state file, starting fresh", "path", p.path) | ||
| return nil | ||
| } | ||
| return fmt.Errorf("failed to read state file: %w", err) | ||
| } | ||
|
|
||
| if err := p.store.Restore(data); err != nil { | ||
| return fmt.Errorf("failed to restore state: %w", err) | ||
| } | ||
|
|
||
| p.logger.Info("State restored", "path", p.path, "size_bytes", len(data)) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
The context parameter passed to Load is not used within the function. Consider either using it to respect cancellation (e.g., checking ctx.Done() before the restore operation) or removing it if context support isn't needed for this operation.
| func (p *JSONPersister) StartPeriodicSave(ctx context.Context, interval time.Duration) { | ||
| ticker := time.NewTicker(interval) | ||
| defer ticker.Stop() | ||
|
|
||
| for { | ||
| select { | ||
| case <-ticker.C: | ||
| if err := p.Save(ctx); err != nil { | ||
| p.logger.Error("Failed to persist state", "error", err) | ||
| } | ||
| case <-ctx.Done(): | ||
| // Perform final save before shutdown | ||
| p.logger.Info("Context cancelled, performing final state save") | ||
| if err := p.Save(context.Background()); err != nil { | ||
| p.logger.Error("Failed final state save", "error", err) | ||
| } | ||
| return | ||
| } | ||
| } |
There was a problem hiding this comment.
StartPeriodicSave returns immediately without providing a way for the caller to know when the background goroutine has finished or to wait for the final save to complete. Consider returning a channel or using a wait group to allow the caller to synchronize with the completion of the final save operation before shutdown completes.
| func (p *JSONPersister) StartPeriodicSave(ctx context.Context, interval time.Duration) { | |
| ticker := time.NewTicker(interval) | |
| defer ticker.Stop() | |
| for { | |
| select { | |
| case <-ticker.C: | |
| if err := p.Save(ctx); err != nil { | |
| p.logger.Error("Failed to persist state", "error", err) | |
| } | |
| case <-ctx.Done(): | |
| // Perform final save before shutdown | |
| p.logger.Info("Context cancelled, performing final state save") | |
| if err := p.Save(context.Background()); err != nil { | |
| p.logger.Error("Failed final state save", "error", err) | |
| } | |
| return | |
| } | |
| } | |
| // The returned channel is closed when the background goroutine has finished, | |
| // allowing callers to wait for the final save to complete before shutdown. | |
| func (p *JSONPersister) StartPeriodicSave(ctx context.Context, interval time.Duration) <-chan struct{} { | |
| done := make(chan struct{}) | |
| go func() { | |
| defer close(done) | |
| ticker := time.NewTicker(interval) | |
| defer ticker.Stop() | |
| for { | |
| select { | |
| case <-ticker.C: | |
| if err := p.Save(ctx); err != nil { | |
| p.logger.Error("Failed to persist state", "error", err) | |
| } | |
| case <-ctx.Done(): | |
| // Perform final save before shutdown | |
| p.logger.Info("Context cancelled, performing final state save") | |
| if err := p.Save(context.Background()); err != nil { | |
| p.logger.Error("Failed final state save", "error", err) | |
| } | |
| return | |
| } | |
| } | |
| }() | |
| return done |
|
How would this work for deployment? Is it guaranteed that the existing volume gets reused in the new instance? |
| ) | ||
|
|
||
| // JSONPersister handles periodic persistence of store state to a JSON file. | ||
| type JSONPersister struct { |
There was a problem hiding this comment.
isn't a JSON persister just a snapshot persister?
We would need to setup persistent volume on the infra side but it should get reused by new instance. Also this indexer wouldn't run more than 1 replica as far as I understand? |
| return fmt.Errorf("failed to rename temp file: %w", err) | ||
| } | ||
|
|
||
| p.logger.Info("State persisted", "path", p.path, "size_bytes", len(data)) |
There was a problem hiding this comment.
nit: Can we make this debug log and emit an info log on final save?
| } | ||
|
|
||
| // NewJSONPersister creates a new JSON persister for the given store. | ||
| func NewJSONPersister(store Store, path string, logger logging.Logger) *JSONPersister { |
There was a problem hiding this comment.
Should we call os.MkdirAll on the path here? Otherwise I think the WriteFile command will fail if the parent directories don't already exist
There was a problem hiding this comment.
Alternatively this is a problem on the infra side to make sure directories are setup properly. I'm okay with either way.
Adds a json persister for the chainstate indexer which is intended to periodically persist state to disk in a json file.
The amount of data we are storing with the indexer does not justify more complex storage (e.g ~1.8KB per operator). Storing in memory and persisting to disk for recovery is enough.
Note: This is the fifth PR in a series of chainstate indexer PRs