Skip to content

Commit 96a0627

Browse files
authored
Merge pull request #8010 from onflow/peter/refactor-opsync-indexing
[DataAvailability] Refactor pipeline core logic storage
2 parents c6967df + 4d81c35 commit 96a0627

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2343
-2470
lines changed

module/executiondatasync/optimistic_sync/core.go

Lines changed: 186 additions & 132 deletions
Large diffs are not rendered by default.

module/executiondatasync/optimistic_sync/core_impl_test.go

Lines changed: 359 additions & 176 deletions
Large diffs are not rendered by default.

module/executiondatasync/optimistic_sync/mock/core.go

Lines changed: 2 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

module/executiondatasync/optimistic_sync/persisters/block.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,17 @@ import (
1313
"github.com/onflow/flow-go/utils/logging"
1414
)
1515

16-
// BlockPersister handles persisting of execution data for all PersisterStore-s to database.
17-
// Each BlockPersister instance is created for ONE specific block
16+
// BlockPersister stores execution data for a single execution result into the database.
17+
// It uses the set of [stores.PersisterStore] to persist the data with a single atomic batch operation.
18+
// To ensure the database only contains data certified by the protocol, the block persister must
19+
// only be called for sealed execution results.
1820
type BlockPersister struct {
1921
log zerolog.Logger
2022

2123
persisterStores []stores.PersisterStore
2224
protocolDB storage.DB
2325
lockManager lockctx.Manager
2426
executionResult *flow.ExecutionResult
25-
header *flow.Header
2627
}
2728

2829
// NewBlockPersister creates a new block persister.
@@ -31,33 +32,30 @@ func NewBlockPersister(
3132
protocolDB storage.DB,
3233
lockManager lockctx.Manager,
3334
executionResult *flow.ExecutionResult,
34-
header *flow.Header,
3535
persisterStores []stores.PersisterStore,
3636
) *BlockPersister {
3737
log = log.With().
3838
Str("component", "block_persister").
39+
Hex("execution_result_id", logging.ID(executionResult.ID())).
3940
Hex("block_id", logging.ID(executionResult.BlockID)).
40-
Uint64("height", header.Height).
4141
Logger()
4242

43-
persister := &BlockPersister{
43+
log.Info().
44+
Int("batch_persisters_count", len(persisterStores)).
45+
Msg("block persisters initialized")
46+
47+
return &BlockPersister{
4448
log: log,
4549
persisterStores: persisterStores,
4650
protocolDB: protocolDB,
4751
executionResult: executionResult,
48-
header: header,
4952
lockManager: lockManager,
5053
}
51-
52-
persister.log.Info().
53-
Int("batch_persisters_count", len(persisterStores)).
54-
Msg("block persisters initialized")
55-
56-
return persister
5754
}
5855

59-
// Persist save data in provided persisted stores and commit updates to the database.
60-
// No errors are expected during normal operations
56+
// Persist atomically stores all data into the database using the configured persister stores.
57+
//
58+
// No error returns are expected during normal operations
6159
func (p *BlockPersister) Persist() error {
6260
p.log.Debug().Msg("started to persist execution data")
6361
start := time.Now()

0 commit comments

Comments
 (0)