|
6 | 6 | "encoding/hex" |
7 | 7 | "encoding/json" |
8 | 8 | "fmt" |
9 | | - "strconv" |
10 | 9 | "strings" |
11 | 10 | "time" |
12 | 11 | "zenGate-Global/merkle-oracle-node/internal/cloud" |
@@ -292,7 +291,7 @@ type ChainEventProcessorActor struct { |
292 | 291 | lastSucessfullyIndexedSlot uint64 |
293 | 292 | lastSucessfullyIndexedBlockHash string |
294 | 293 |
|
295 | | - processedBlockMap map[string]bool |
| 294 | + processedBlockMap map[BlockKey]bool |
296 | 295 |
|
297 | 296 | blockTransactionCountMap map[uint64]uint64 |
298 | 297 | } |
@@ -322,19 +321,61 @@ func NewChainEventProcessorStrategy( |
322 | 321 | trieRootMismatchCount: 0, |
323 | 322 | lastSucessfullyIndexedSlot: cfg.StartBlockSlot, |
324 | 323 | lastSucessfullyIndexedBlockHash: cfg.StartBlockHash, |
325 | | - processedBlockMap: make(map[string]bool), |
| 324 | + processedBlockMap: make(map[BlockKey]bool), |
326 | 325 | blockTransactionCountMap: make(map[uint64]uint64), |
327 | 326 | } |
328 | 327 | } |
329 | 328 | } |
330 | 329 |
|
331 | | -// createBlockKey creates a composite key from block number and block hash |
| 330 | +// BlockKey represents a composite key for identifying blocks uniquely |
| 331 | +// This ensures we can distinguish between different blocks with the same number due to rollbacks |
| 332 | +// and enables garbage collection based on slot numbers |
| 333 | +type BlockKey struct { |
| 334 | + BlockNumber uint64 `json:"blockNumber"` |
| 335 | + BlockHash string `json:"blockHash"` |
| 336 | + SlotNumber uint64 `json:"slotNumber"` |
| 337 | +} |
| 338 | + |
| 339 | +// String returns a string representation of the BlockKey for logging purposes |
| 340 | +func (bk BlockKey) String() string { |
| 341 | + return fmt.Sprintf("%d:%s:%d", bk.BlockNumber, bk.BlockHash, bk.SlotNumber) |
| 342 | +} |
| 343 | + |
| 344 | +// createBlockKey creates a composite key from block number, block hash, and slot number |
332 | 345 | // This ensures we can distinguish between different blocks with the same number due to rollbacks |
333 | 346 | func (s *ChainEventProcessorActor) createBlockKey( |
334 | 347 | blockNumber uint64, |
335 | 348 | blockHash string, |
336 | | -) string { |
337 | | - return fmt.Sprintf("%d:%s", blockNumber, blockHash) |
| 349 | + slotNumber uint64, |
| 350 | +) BlockKey { |
| 351 | + return BlockKey{ |
| 352 | + BlockNumber: blockNumber, |
| 353 | + BlockHash: blockHash, |
| 354 | + SlotNumber: slotNumber, |
| 355 | + } |
| 356 | +} |
| 357 | + |
| 358 | +// garbageCollectOldBlocks removes processed blocks with slot numbers less than N |
| 359 | +func (s *ChainEventProcessorActor) garbageCollectOldBlocks(cutoffSlot uint64) { |
| 360 | + var removedBlocks []BlockKey |
| 361 | + for blockKey := range s.processedBlockMap { |
| 362 | + if blockKey.SlotNumber < cutoffSlot { |
| 363 | + delete(s.processedBlockMap, blockKey) |
| 364 | + removedBlocks = append(removedBlocks, blockKey) |
| 365 | + } |
| 366 | + } |
| 367 | + |
| 368 | + if len(removedBlocks) > 0 { |
| 369 | + s.logger.Debugf( |
| 370 | + "Garbage collected %d old processed blocks (cutoff slot: %d)", |
| 371 | + len(removedBlocks), |
| 372 | + cutoffSlot, |
| 373 | + ) |
| 374 | + s.logger.Debugf( |
| 375 | + "Remaining processed blocks count: %d", |
| 376 | + len(s.processedBlockMap), |
| 377 | + ) |
| 378 | + } |
338 | 379 | } |
339 | 380 |
|
340 | 381 | func (s *ChainEventProcessorActor) Receive(c *actor.Context) { |
@@ -511,13 +552,13 @@ func (s *ChainEventProcessorActor) processBlockEvent( |
511 | 552 | ) error { |
512 | 553 | blockNumber := event.EventContext.BlockNumber |
513 | 554 | blockHash := event.EventTransaction.BlockHash |
514 | | - blockKey := s.createBlockKey(blockNumber, blockHash) |
| 555 | + slotNumber := event.EventContext.SlotNumber |
| 556 | + blockKey := s.createBlockKey(blockNumber, blockHash, slotNumber) |
515 | 557 |
|
516 | 558 | if _, exists := s.processedBlockMap[blockKey]; exists { |
517 | 559 | s.logger.Infof( |
518 | | - "Block %d:%s already processed, skipping block", |
519 | | - blockNumber, |
520 | | - blockHash, |
| 560 | + "Block %s already processed, skipping block", |
| 561 | + blockKey.String(), |
521 | 562 | ) |
522 | 563 | return nil |
523 | 564 | } |
@@ -574,6 +615,9 @@ func (s *ChainEventProcessorActor) processBlockTipReached( |
574 | 615 | s.logger.Debugw("Current pending transactions", "count", c) |
575 | 616 | } |
576 | 617 |
|
| 618 | + currentSlot := uint64(blockEvent.EventContext.SlotNumber) |
| 619 | + s.garbageCollectOldBlocks(currentSlot - uint64(config.FinalitySlotsNeeded)) |
| 620 | + |
577 | 621 | memTrie, err := s.db.GetInMemoryTrie() |
578 | 622 | if err != nil { |
579 | 623 | return fmt.Errorf("get in-memory trie: %w", err) |
@@ -740,7 +784,6 @@ func (s *ChainEventProcessorActor) processBlockTipReached( |
740 | 784 | uploadPayload.TrieData.Deletions = dels |
741 | 785 |
|
742 | 786 | // apply ops to local trie |
743 | | - currentSlot := uint64(blockEvent.EventContext.SlotNumber) |
744 | 787 | if err := s.applyTrieOperations(memTrie, ins, ups, dels, currentSlot); err != nil { |
745 | 788 | return err |
746 | 789 | } |
@@ -1248,24 +1291,24 @@ func (s *ChainEventProcessorActor) processRollbackEvent( |
1248 | 1291 | s.logger.Debugf("Trie state unchanged after rollback (root: %x)", originalRoot) |
1249 | 1292 | } |
1250 | 1293 |
|
1251 | | - var deletedBlocks []string |
| 1294 | + var deletedBlocks []BlockKey |
1252 | 1295 | for blockKey := range s.processedBlockMap { |
1253 | | - // extract block number from composite key (format: "blockNumber:blockHash") |
1254 | | - if colonIndex := strings.Index(blockKey, ":"); colonIndex != -1 { |
1255 | | - if blockNumberStr := blockKey[:colonIndex]; blockNumberStr != "" { |
1256 | | - if blockNumber, err := strconv.ParseUint(blockNumberStr, 10, 64); err == nil { |
1257 | | - if blockNumber > rollbackEvent.SlotNumber { |
1258 | | - delete(s.processedBlockMap, blockKey) |
1259 | | - deletedBlocks = append(deletedBlocks, blockKey) |
1260 | | - } |
1261 | | - } |
1262 | | - } |
| 1296 | + // Check if block slot number is greater than rollback slot number |
| 1297 | + if blockKey.SlotNumber > rollbackEvent.SlotNumber { |
| 1298 | + delete(s.processedBlockMap, blockKey) |
| 1299 | + deletedBlocks = append(deletedBlocks, blockKey) |
1263 | 1300 | } |
1264 | 1301 | } |
1265 | 1302 |
|
1266 | 1303 | if len(deletedBlocks) > 0 { |
1267 | | - s.logger.Debugf("Removed %d processed block entries for blocks > %d", |
1268 | | - len(deletedBlocks), rollbackEvent.SlotNumber) |
| 1304 | + s.logger.Debugf( |
| 1305 | + "Removed %d processed block entries for blocks with slot > %d", |
| 1306 | + len(deletedBlocks), |
| 1307 | + rollbackEvent.SlotNumber, |
| 1308 | + ) |
| 1309 | + for _, deletedBlock := range deletedBlocks { |
| 1310 | + s.logger.Debugf("Removed block: %s", deletedBlock.String()) |
| 1311 | + } |
1269 | 1312 | } |
1270 | 1313 |
|
1271 | 1314 | return nil |
|
0 commit comments