feat(chainstate): Add indexer memory store#2494
feat(chainstate): Add indexer memory store#2494iquidus wants to merge 1 commit intoindexer-storefrom
Conversation
Adds the memory store used by the chainstate indexer
Codecov Report❌ Patch coverage is
❌ Your patch check has failed because the patch coverage (0.00%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## indexer-store #2494 +/- ##
=================================================
- Coverage 39.42% 39.25% -0.17%
=================================================
Files 554 555 +1
Lines 51146 51359 +213
=================================================
- Hits 20165 20162 -3
- Misses 28439 28650 +211
- Partials 2542 2547 +5
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Adds an in-memory Store implementation for the chainstate indexer, including basic CRUD/query operations and snapshot/restore persistence via JSON.
Changes:
- Introduces
MemoryStoreimplementing thechainstate/store.Storeinterface. - Implements query methods with filtering, sorting, and pagination for operators/ejections/socket updates.
- Adds JSON-based snapshot/restore to persist and reload the in-memory state.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ejectionCopy := *ejection | ||
| s.ejections = append(s.ejections, &ejectionCopy) | ||
| return nil |
There was a problem hiding this comment.
SaveEjection makes only a shallow copy of OperatorEjection; the QuorumIDs slice header is copied but the underlying array is shared with the caller. External mutation of ejection.QuorumIDs after SaveEjection would mutate store state. Deep-copy the slice before appending.
| // Apply pagination | ||
| if offset >= len(result) { | ||
| return []*types.Operator{}, nil | ||
| } | ||
| result = result[offset:] | ||
| if limit > 0 && limit < len(result) { | ||
| result = result[:limit] | ||
| } |
There was a problem hiding this comment.
Pagination will panic if offset is negative (since slicing with a negative start index panics). Since offset is an int and the interface doesn’t document non-negativity, guard against offset < 0 (and optionally limit < 0) and return an error or treat negatives as 0.
| // Apply pagination | ||
| if offset >= len(result) { | ||
| return []*types.OperatorSocketUpdate{}, nil | ||
| } | ||
| result = result[offset:] | ||
| if limit > 0 && limit < len(result) { | ||
| result = result[:limit] | ||
| } |
There was a problem hiding this comment.
ListSocketUpdates pagination can panic when offset is negative (slice bounds). Add validation for offset < 0 (and possibly limit < 0) and return a clear error or normalize to 0 to avoid crashing callers.
|
|
||
| for _, apk := range s.quorumAPKs { | ||
| // Apply filters | ||
| if apk.QuorumID != core.QuorumID(filter.QuorumID) { |
There was a problem hiding this comment.
This conversion is redundant: filter.QuorumID is already core.QuorumID (same type as apk.QuorumID). With golangci-lint’s unconvert enabled, this is likely to fail lint. Compare directly without the type conversion.
| if apk.QuorumID != core.QuorumID(filter.QuorumID) { | |
| if apk.QuorumID != filter.QuorumID { |
| // Make a copy to avoid external mutations | ||
| opCopy := *op | ||
| s.operators[op.ID] = &opCopy | ||
| return nil |
There was a problem hiding this comment.
SaveOperator claims to copy the operator to prevent external mutations, but this is only a shallow copy: pointer fields (BLSPubKeyG1/BLSPubKeyG2) and slices (QuorumIDs) still alias the caller’s data. A caller mutating those after SaveOperator will mutate the store state. Consider deep-copying slices and cloning pointed-to values (or documenting immutability expectations) before storing.
| key := fmt.Sprintf("%d:%d", apk.QuorumID, apk.BlockNumber) | ||
| apkCopy := *apk | ||
| s.quorumAPKs[key] = &apkCopy | ||
| return nil |
There was a problem hiding this comment.
SaveQuorumAPK stores a shallow copy of QuorumAPK; fields like APK (*core.G1Point) and TotalStake (*big.Int) remain shared with the caller and can be mutated externally. To ensure store isolation (and snapshot integrity), deep-copy/clone these pointer fields before storing.
| // Sort by registration block number for consistent ordering | ||
| sort.Slice(result, func(i, j int) bool { |
There was a problem hiding this comment.
ListOperators says it sorts for consistent ordering, but sort.Slice is not stable and the comparator only considers RegisteredAtBlockNumber. If multiple operators share the same RegisteredAtBlockNumber, ordering will still be nondeterministic due to randomized map iteration. Add a deterministic tie-breaker (e.g., by operator ID) or use sort.SliceStable with a tie-break.
| // Sort by registration block number for consistent ordering | |
| sort.Slice(result, func(i, j int) bool { | |
| // Precompute deterministic tie-breaker keys for operators with the same registration block | |
| opSortKey := make(map[*types.Operator]string, len(result)) | |
| for _, op := range result { | |
| b, err := json.Marshal(op) | |
| if err != nil { | |
| // Fallback to empty string on marshal error; ordering remains deterministic within this run | |
| opSortKey[op] = "" | |
| continue | |
| } | |
| opSortKey[op] = string(b) | |
| } | |
| // Sort by registration block number for consistent ordering, with deterministic tie-breaker | |
| sort.Slice(result, func(i, j int) bool { | |
| if result[i].RegisteredAtBlockNumber == result[j].RegisteredAtBlockNumber { | |
| return opSortKey[result[i]] < opSortKey[result[j]] | |
| } |
| // Apply pagination | ||
| if offset >= len(result) { | ||
| return []*types.OperatorEjection{}, nil | ||
| } | ||
| result = result[offset:] | ||
| if limit > 0 && limit < len(result) { | ||
| result = result[:limit] | ||
| } |
There was a problem hiding this comment.
ListEjections pagination can panic when offset is negative (slice bounds). Add validation for offset < 0 (and possibly limit < 0) and return a clear error or normalize to 0 to avoid crashing callers.
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| // Convert value types to pointer maps/slices | ||
| // Convert hex string keys back to OperatorID | ||
| s.operators = make(map[core.OperatorID]*types.Operator, len(snapshot.Operators)) | ||
| for idStr, op := range snapshot.Operators { | ||
| idBytes, err := hex.DecodeString(idStr) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to decode operator ID %q: %w", idStr, err) | ||
| } | ||
| if len(idBytes) != 32 { | ||
| return fmt.Errorf("invalid operator ID length %q: expected 32 bytes, got %d", idStr, len(idBytes)) | ||
| } | ||
| var id core.OperatorID | ||
| copy(id[:], idBytes) | ||
| opCopy := op | ||
| s.operators[id] = &opCopy | ||
| } |
There was a problem hiding this comment.
Restore mutates the store while iterating/validating operator IDs. If a decode/length error occurs mid-loop, the store is left partially restored (operators map rebuilt but incomplete, other fields not restored). Build all restored structures into temporaries first and only assign to s.* after all validation succeeds to keep Restore atomic.
| // Snapshot implements Store.Snapshot. | ||
| func (s *MemoryStore) Snapshot() ([]byte, error) { | ||
| s.mu.RLock() | ||
| defer s.mu.RUnlock() | ||
|
|
||
| // Convert pointer maps/slices to value types for JSON serialization | ||
| // Convert OperatorID keys to hex strings since byte arrays can't be JSON keys | ||
| operators := make(map[string]types.Operator, len(s.operators)) | ||
| for id, op := range s.operators { | ||
| operators[id.Hex()] = *op | ||
| } | ||
|
|
||
| quorumAPKs := make(map[string]types.QuorumAPK, len(s.quorumAPKs)) | ||
| for key, apk := range s.quorumAPKs { | ||
| quorumAPKs[key] = *apk | ||
| } | ||
|
|
||
| ejections := make([]types.OperatorEjection, len(s.ejections)) | ||
| for i, ej := range s.ejections { | ||
| ejections[i] = *ej | ||
| } | ||
|
|
||
| socketUpdates := make([]types.OperatorSocketUpdate, len(s.socketUpdates)) | ||
| for i, upd := range s.socketUpdates { | ||
| socketUpdates[i] = *upd | ||
| } | ||
|
|
||
| snapshot := memoryStoreSnapshot{ | ||
| Operators: operators, | ||
| QuorumAPKs: quorumAPKs, | ||
| Ejections: ejections, | ||
| SocketUpdates: socketUpdates, | ||
| LastIndexedBlock: s.lastIndexedBlock, | ||
| } | ||
|
|
||
| return json.Marshal(snapshot) | ||
| } |
There was a problem hiding this comment.
MemoryStore introduces substantial logic (filters, ordering, pagination, snapshot/restore) but there are no unit tests covering it. Adding tests for (1) snapshot/restore round-trip, (2) restore failure leaves state unchanged, (3) pagination + ordering determinism, and (4) copy/isolation semantics would help prevent subtle regressions.
|
I noticed that there are no unit tests in this series of PRs. I think this one and the following PRs deserve good amount of unit tests |
Adds the memory store used by the chainstate indexer
Note: This is the fourth PR in a series of chainstate indexer PRs