@@ -47,6 +47,9 @@ const (
47
47
// This is temporary solution. It will be removed in future versions.
48
48
maxSubmitAttempts = 30
49
49
50
+ // Key for storing namespace migration state in the store
51
+ namespaceMigrationKey = "namespace_migration_completed"
52
+
50
53
// Applies to the headerInCh and dataInCh, 10000 is a large enough number for headers per DA block.
51
54
eventInChLength = 10000
52
55
)
@@ -168,6 +171,10 @@ type Manager struct {
168
171
// validatorHasherProvider is used to provide the validator hash for the header.
169
172
// It is used to set the validator hash in the header.
170
173
validatorHasherProvider types.ValidatorHasherProvider
174
+
175
+ // namespaceMigrationCompleted tracks whether we have completed the migration
176
+ // from legacy namespace to separate header/data namespaces
177
+ namespaceMigrationCompleted * atomic.Bool
171
178
}
172
179
173
180
// getInitialState tries to load lastState from Store, and if it's not available it reads genesis.
@@ -375,38 +382,44 @@ func NewManager(
375
382
headerBroadcaster : headerBroadcaster ,
376
383
dataBroadcaster : dataBroadcaster ,
377
384
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
378
- headerInCh : make (chan NewHeaderEvent , eventInChLength ),
379
- dataInCh : make (chan NewDataEvent , eventInChLength ),
380
- headerStoreCh : make (chan struct {}, 1 ),
381
- dataStoreCh : make (chan struct {}, 1 ),
382
- headerStore : headerStore ,
383
- dataStore : dataStore ,
384
- lastStateMtx : new (sync.RWMutex ),
385
- lastBatchData : lastBatchData ,
386
- headerCache : cache .NewCache [types.SignedHeader ](),
387
- dataCache : cache .NewCache [types.Data ](),
388
- retrieveCh : make (chan struct {}, 1 ),
389
- daIncluderCh : make (chan struct {}, 1 ),
390
- logger : logger ,
391
- txsAvailable : false ,
392
- pendingHeaders : pendingHeaders ,
393
- pendingData : pendingData ,
394
- metrics : seqMetrics ,
395
- sequencer : sequencer ,
396
- exec : exec ,
397
- da : da ,
398
- gasPrice : gasPrice ,
399
- gasMultiplier : gasMultiplier ,
400
- txNotifyCh : make (chan struct {}, 1 ), // Non-blocking channel
401
- signaturePayloadProvider : managerOpts .SignaturePayloadProvider ,
402
- validatorHasherProvider : managerOpts .ValidatorHasherProvider ,
385
+ headerInCh : make (chan NewHeaderEvent , eventInChLength ),
386
+ dataInCh : make (chan NewDataEvent , eventInChLength ),
387
+ headerStoreCh : make (chan struct {}, 1 ),
388
+ dataStoreCh : make (chan struct {}, 1 ),
389
+ headerStore : headerStore ,
390
+ dataStore : dataStore ,
391
+ lastStateMtx : new (sync.RWMutex ),
392
+ lastBatchData : lastBatchData ,
393
+ headerCache : cache .NewCache [types.SignedHeader ](),
394
+ dataCache : cache .NewCache [types.Data ](),
395
+ retrieveCh : make (chan struct {}, 1 ),
396
+ daIncluderCh : make (chan struct {}, 1 ),
397
+ logger : logger ,
398
+ txsAvailable : false ,
399
+ pendingHeaders : pendingHeaders ,
400
+ pendingData : pendingData ,
401
+ metrics : seqMetrics ,
402
+ sequencer : sequencer ,
403
+ exec : exec ,
404
+ da : da ,
405
+ gasPrice : gasPrice ,
406
+ gasMultiplier : gasMultiplier ,
407
+ txNotifyCh : make (chan struct {}, 1 ), // Non-blocking channel
408
+ signaturePayloadProvider : managerOpts .SignaturePayloadProvider ,
409
+ validatorHasherProvider : managerOpts .ValidatorHasherProvider ,
410
+ namespaceMigrationCompleted : & atomic.Bool {},
403
411
}
404
412
405
413
// initialize da included height
406
414
if height , err := m .store .GetMetadata (ctx , storepkg .DAIncludedHeightKey ); err == nil && len (height ) == 8 {
407
415
m .daIncludedHeight .Store (binary .LittleEndian .Uint64 (height ))
408
416
}
409
417
418
+ // initialize namespace migration state
419
+ if migrationData , err := m .store .GetMetadata (ctx , namespaceMigrationKey ); err == nil && len (migrationData ) > 0 {
420
+ m .namespaceMigrationCompleted .Store (migrationData [0 ] == 1 )
421
+ }
422
+
410
423
// Set the default publishBlock implementation
411
424
m .publishBlock = m .publishBlockInternal
412
425
@@ -418,6 +431,24 @@ func NewManager(
418
431
return m , nil
419
432
}
420
433
434
+ // setNamespaceMigrationCompleted marks the namespace migration as completed and persists it to disk
435
+ func (m * Manager ) setNamespaceMigrationCompleted (ctx context.Context ) error {
436
+ m .namespaceMigrationCompleted .Store (true )
437
+ return m .store .SetMetadata (ctx , namespaceMigrationKey , []byte {1 })
438
+ }
439
+
440
+ // loadNamespaceMigrationState loads the namespace migration state from persistent storage
441
+ func (m * Manager ) loadNamespaceMigrationState (ctx context.Context ) (bool , error ) {
442
+ migrationData , err := m .store .GetMetadata (ctx , namespaceMigrationKey )
443
+ if err != nil {
444
+ if errors .Is (err , ds .ErrNotFound ) {
445
+ return false , nil // Migration not completed
446
+ }
447
+ return false , fmt .Errorf ("failed to load migration state: %w" , err )
448
+ }
449
+ return len (migrationData ) > 0 && migrationData [0 ] == 1 , nil
450
+ }
451
+
421
452
// PendingHeaders returns the pending headers.
422
453
func (m * Manager ) PendingHeaders () * PendingHeaders {
423
454
return m .pendingHeaders
0 commit comments