Skip to content

Commit 117b91d

Browse files
authored
Merge pull request #8201 from onflow/peter/refactor-pipeline
[DataAvailability] Refactor optimistic sync pipeline
2 parents 956162d + 4c42dec commit 117b91d

File tree

8 files changed

+912
-896
lines changed

8 files changed

+912
-896
lines changed

module/executiondatasync/optimistic_sync/core.go

Lines changed: 2 additions & 320 deletions
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,12 @@ package optimistic_sync
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
7-
"sync"
8-
"time"
9-
10-
"github.com/rs/zerolog"
11-
"golang.org/x/sync/errgroup"
12-
13-
"github.com/onflow/flow-go/engine/access/ingestion/tx_error_messages"
14-
"github.com/onflow/flow-go/model/flow"
15-
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
16-
"github.com/onflow/flow-go/module/executiondatasync/optimistic_sync/persisters"
17-
"github.com/onflow/flow-go/module/executiondatasync/optimistic_sync/persisters/stores"
18-
"github.com/onflow/flow-go/module/state_synchronization/indexer"
19-
"github.com/onflow/flow-go/module/state_synchronization/requester"
20-
"github.com/onflow/flow-go/storage"
21-
"github.com/onflow/flow-go/storage/inmemory"
226
)
237

24-
// DefaultTxResultErrMsgsRequestTimeout is the default timeout for requesting transaction result error messages.
25-
const DefaultTxResultErrMsgsRequestTimeout = 5 * time.Second
26-
27-
// errResultAbandoned is returned when calling one of the methods after the result has been abandoned.
8+
// ErrResultAbandoned is returned when calling one of the methods after the result has been abandoned.
289
// Not exported because this is not an expected error condition.
29-
var errResultAbandoned = fmt.Errorf("result abandoned")
10+
var ErrResultAbandoned = fmt.Errorf("result abandoned")
3011

3112
// <component_spec>
3213
// Core defines the interface for pipelined execution result processing. There are 3 main steps which
@@ -69,302 +50,3 @@ type Core interface {
6950
// the caller should cancel its context to ensure the operation completes in a timely manner.
7051
Abandon()
7152
}
72-
73-
// workingData encapsulates all components and temporary storage
74-
// involved in processing a single block's execution data. When processing
75-
// is complete or abandoned, the entire workingData can be discarded.
76-
type workingData struct {
77-
protocolDB storage.DB
78-
lockManager storage.LockManager
79-
80-
persistentRegisters storage.RegisterIndex
81-
persistentEvents storage.Events
82-
persistentCollections storage.Collections
83-
persistentResults storage.LightTransactionResults
84-
persistentTxResultErrMsgs storage.TransactionResultErrorMessages
85-
latestPersistedSealedResult storage.LatestPersistedSealedResult
86-
87-
inmemRegisters *inmemory.RegistersReader
88-
inmemEvents *inmemory.EventsReader
89-
inmemCollections *inmemory.CollectionsReader
90-
inmemTransactions *inmemory.TransactionsReader
91-
inmemResults *inmemory.LightTransactionResultsReader
92-
inmemTxResultErrMsgs *inmemory.TransactionResultErrorMessagesReader
93-
94-
// Active processing components
95-
execDataRequester requester.ExecutionDataRequester
96-
txResultErrMsgsRequester tx_error_messages.Requester
97-
txResultErrMsgsRequestTimeout time.Duration
98-
99-
// Working data
100-
executionData *execution_data.BlockExecutionData
101-
txResultErrMsgsData []flow.TransactionResultErrorMessage
102-
indexerData *indexer.IndexerData
103-
persisted bool
104-
}
105-
106-
var _ Core = (*CoreImpl)(nil)
107-
108-
// CoreImpl implements the Core interface for processing execution data.
109-
// It coordinates the download, indexing, and persisting of execution data.
110-
//
111-
// Safe for concurrent use.
112-
type CoreImpl struct {
113-
log zerolog.Logger
114-
mu sync.Mutex
115-
116-
workingData *workingData
117-
118-
executionResult *flow.ExecutionResult
119-
block *flow.Block
120-
}
121-
122-
// NewCoreImpl creates a new CoreImpl with all necessary dependencies
123-
// Safe for concurrent use.
124-
//
125-
// No error returns are expected during normal operations
126-
func NewCoreImpl(
127-
logger zerolog.Logger,
128-
executionResult *flow.ExecutionResult,
129-
block *flow.Block,
130-
execDataRequester requester.ExecutionDataRequester,
131-
txResultErrMsgsRequester tx_error_messages.Requester,
132-
txResultErrMsgsRequestTimeout time.Duration,
133-
persistentRegisters storage.RegisterIndex,
134-
persistentEvents storage.Events,
135-
persistentCollections storage.Collections,
136-
persistentResults storage.LightTransactionResults,
137-
persistentTxResultErrMsg storage.TransactionResultErrorMessages,
138-
latestPersistedSealedResult storage.LatestPersistedSealedResult,
139-
protocolDB storage.DB,
140-
lockManager storage.LockManager,
141-
) (*CoreImpl, error) {
142-
if block.ID() != executionResult.BlockID {
143-
return nil, fmt.Errorf("header ID and execution result block ID must match")
144-
}
145-
146-
coreLogger := logger.With().
147-
Str("component", "execution_data_core").
148-
Str("execution_result_id", executionResult.ID().String()).
149-
Str("block_id", executionResult.BlockID.String()).
150-
Uint64("height", block.Height).
151-
Logger()
152-
153-
return &CoreImpl{
154-
log: coreLogger,
155-
block: block,
156-
executionResult: executionResult,
157-
workingData: &workingData{
158-
protocolDB: protocolDB,
159-
lockManager: lockManager,
160-
161-
execDataRequester: execDataRequester,
162-
txResultErrMsgsRequester: txResultErrMsgsRequester,
163-
txResultErrMsgsRequestTimeout: txResultErrMsgsRequestTimeout,
164-
165-
persistentRegisters: persistentRegisters,
166-
persistentEvents: persistentEvents,
167-
persistentCollections: persistentCollections,
168-
persistentResults: persistentResults,
169-
persistentTxResultErrMsgs: persistentTxResultErrMsg,
170-
latestPersistedSealedResult: latestPersistedSealedResult,
171-
},
172-
}, nil
173-
}
174-
175-
// Download retrieves all necessary data for processing from the network.
176-
// Download will block until the data is successfully downloaded, and has not internal timeout.
177-
// When Aboandon is called, the caller must cancel the context passed in to shutdown the operation
178-
// otherwise it may block indefinitely.
179-
//
180-
// The method may only be called once. Calling it multiple times will return an error.
181-
// Calling Download after Abandon is called will return an error.
182-
//
183-
// Expected error returns during normal operation:
184-
// - [context.Canceled]: if the provided context was canceled before completion
185-
func (c *CoreImpl) Download(ctx context.Context) error {
186-
c.mu.Lock()
187-
defer c.mu.Unlock()
188-
if c.workingData == nil {
189-
return errResultAbandoned
190-
}
191-
if c.workingData.executionData != nil {
192-
return fmt.Errorf("already downloaded")
193-
}
194-
195-
c.log.Debug().Msg("downloading execution data")
196-
197-
g, gCtx := errgroup.WithContext(ctx)
198-
199-
var executionData *execution_data.BlockExecutionData
200-
g.Go(func() error {
201-
var err error
202-
executionData, err = c.workingData.execDataRequester.RequestExecutionData(gCtx)
203-
if err != nil {
204-
return fmt.Errorf("failed to request execution data: %w", err)
205-
}
206-
207-
return nil
208-
})
209-
210-
var txResultErrMsgsData []flow.TransactionResultErrorMessage
211-
g.Go(func() error {
212-
timeoutCtx, cancel := context.WithTimeout(gCtx, c.workingData.txResultErrMsgsRequestTimeout)
213-
defer cancel()
214-
215-
var err error
216-
txResultErrMsgsData, err = c.workingData.txResultErrMsgsRequester.Request(timeoutCtx)
217-
if err != nil {
218-
// transaction error messages are downloaded from execution nodes over grpc and have no
219-
// protocol guarantees for delivery or correctness. Therefore, we attempt to download them
220-
// on a best-effort basis, and give up after a reasonable timeout to avoid blocking the
221-
// main indexing process. Missing error messages are handled gracefully by the rest of
222-
// the system, and can be retried or backfilled as needed later.
223-
if errors.Is(err, context.DeadlineExceeded) {
224-
c.log.Debug().
225-
Dur("timeout", c.workingData.txResultErrMsgsRequestTimeout).
226-
Msg("transaction result error messages request timed out")
227-
return nil
228-
}
229-
230-
return fmt.Errorf("failed to request transaction result error messages data: %w", err)
231-
}
232-
return nil
233-
})
234-
235-
if err := g.Wait(); err != nil {
236-
return err
237-
}
238-
239-
c.workingData.executionData = executionData
240-
c.workingData.txResultErrMsgsData = txResultErrMsgsData
241-
242-
c.log.Debug().Msg("successfully downloaded execution data")
243-
244-
return nil
245-
}
246-
247-
// Index processes the downloaded data and stores it into in-memory indexes.
248-
// Must be called after Download.
249-
//
250-
// The method may only be called once. Calling it multiple times will return an error.
251-
// Calling Index after Abandon is called will return an error.
252-
//
253-
// No error returns are expected during normal operations
254-
func (c *CoreImpl) Index() error {
255-
c.mu.Lock()
256-
defer c.mu.Unlock()
257-
if c.workingData == nil {
258-
return errResultAbandoned
259-
}
260-
if c.workingData.executionData == nil {
261-
return fmt.Errorf("downloading is not complete")
262-
}
263-
if c.workingData.indexerData != nil {
264-
return fmt.Errorf("already indexed")
265-
}
266-
267-
c.log.Debug().Msg("indexing execution data")
268-
269-
indexerComponent, err := indexer.NewInMemoryIndexer(c.log, c.block, c.executionResult)
270-
if err != nil {
271-
return fmt.Errorf("failed to create indexer: %w", err)
272-
}
273-
274-
indexerData, err := indexerComponent.IndexBlockData(c.workingData.executionData)
275-
if err != nil {
276-
return fmt.Errorf("failed to index execution data: %w", err)
277-
}
278-
279-
if c.workingData.txResultErrMsgsData != nil {
280-
err = indexer.ValidateTxErrors(indexerData.Results, c.workingData.txResultErrMsgsData)
281-
if err != nil {
282-
return fmt.Errorf("failed to validate transaction result error messages: %w", err)
283-
}
284-
}
285-
286-
blockID := c.executionResult.BlockID
287-
288-
c.workingData.indexerData = indexerData
289-
c.workingData.inmemCollections = inmemory.NewCollections(indexerData.Collections)
290-
c.workingData.inmemTransactions = inmemory.NewTransactions(indexerData.Transactions)
291-
c.workingData.inmemTxResultErrMsgs = inmemory.NewTransactionResultErrorMessages(blockID, c.workingData.txResultErrMsgsData)
292-
c.workingData.inmemEvents = inmemory.NewEvents(blockID, indexerData.Events)
293-
c.workingData.inmemResults = inmemory.NewLightTransactionResults(blockID, indexerData.Results)
294-
c.workingData.inmemRegisters = inmemory.NewRegisters(c.block.Height, indexerData.Registers)
295-
296-
c.log.Debug().Msg("successfully indexed execution data")
297-
298-
return nil
299-
}
300-
301-
// Persist stores the indexed data in permanent storage.
302-
// Must be called after Index.
303-
//
304-
// The method may only be called once. Calling it multiple times will return an error.
305-
// Calling Persist after Abandon is called will return an error.
306-
//
307-
// No error returns are expected during normal operations
308-
func (c *CoreImpl) Persist() error {
309-
c.mu.Lock()
310-
defer c.mu.Unlock()
311-
if c.workingData == nil {
312-
return errResultAbandoned
313-
}
314-
if c.workingData.persisted {
315-
return fmt.Errorf("already persisted")
316-
}
317-
if c.workingData.indexerData == nil {
318-
return fmt.Errorf("indexing is not complete")
319-
}
320-
321-
c.log.Debug().Msg("persisting execution data")
322-
323-
indexerData := c.workingData.indexerData
324-
325-
// the BlockPersister updates the latest persisted sealed result within the batch operation, so
326-
// all other updates must be done before the batch is committed to ensure the state remains
327-
// consistent. The registers db allows repeated indexing of the most recent block's registers,
328-
// so it is safe to persist them before the block persister.
329-
registerPersister := persisters.NewRegistersPersister(indexerData.Registers, c.workingData.persistentRegisters, c.block.Height)
330-
if err := registerPersister.Persist(); err != nil {
331-
return fmt.Errorf("failed to persist registers: %w", err)
332-
}
333-
334-
persisterStores := []stores.PersisterStore{
335-
stores.NewEventsStore(indexerData.Events, c.workingData.persistentEvents, c.executionResult.BlockID),
336-
stores.NewResultsStore(indexerData.Results, c.workingData.persistentResults, c.executionResult.BlockID),
337-
stores.NewCollectionsStore(indexerData.Collections, c.workingData.persistentCollections),
338-
stores.NewTxResultErrMsgStore(c.workingData.txResultErrMsgsData, c.workingData.persistentTxResultErrMsgs, c.executionResult.BlockID, c.workingData.lockManager),
339-
stores.NewLatestSealedResultStore(c.workingData.latestPersistedSealedResult, c.executionResult.ID(), c.block.Height),
340-
}
341-
blockPersister := persisters.NewBlockPersister(
342-
c.log,
343-
c.workingData.protocolDB,
344-
c.workingData.lockManager,
345-
c.executionResult,
346-
persisterStores,
347-
)
348-
if err := blockPersister.Persist(); err != nil {
349-
return fmt.Errorf("failed to persist block data: %w", err)
350-
}
351-
352-
// reset the indexer data to prevent multiple calls to Persist
353-
c.workingData.indexerData = nil
354-
c.workingData.persisted = true
355-
356-
return nil
357-
}
358-
359-
// Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted
360-
// and any data dropped.
361-
// This method will block until other in-progress operations are complete. If Download is in progress,
362-
// the caller should cancel its context to ensure the operation completes in a timely manner.
363-
//
364-
// The method is idempotent. Calling it multiple times has no effect.
365-
func (c *CoreImpl) Abandon() {
366-
c.mu.Lock()
367-
defer c.mu.Unlock()
368-
369-
c.workingData = nil
370-
}

0 commit comments

Comments
 (0)