-
Notifications
You must be signed in to change notification settings - Fork 207
Expand file tree
/
Copy pathengine.go
More file actions
410 lines (357 loc) · 14.5 KB
/
engine.go
File metadata and controls
410 lines (357 loc) · 14.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
package ingestion
import (
"context"
"errors"
"fmt"
"github.com/jordanschalm/lockctx"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/ingestion/collections"
"github.com/onflow/flow-go/engine/access/ingestion/tx_error_messages"
"github.com/onflow/flow-go/engine/common/fifoqueue"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/jobqueue"
"github.com/onflow/flow-go/module/util"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)
const (
// default queue capacity
defaultQueueCapacity = 10_000
// processFinalizedBlocksWorkersCount defines the number of workers that
// concurrently process finalized blocks in the job queue.
processFinalizedBlocksWorkersCount = 1
// ensure blocks are processed sequentially by jobqueue
searchAhead = 1
)
// Engine represents the ingestion engine, used to funnel data from other nodes
// to a centralized location that can be queried by a user
//
// No errors are expected during normal operation.
type Engine struct {
*component.ComponentManager
messageHandler *engine.MessageHandler
executionReceiptsNotifier engine.Notifier
executionReceiptsQueue engine.MessageStore
// Job queue
finalizedBlockConsumer *jobqueue.ComponentConsumer
// Notifier for queue consumer
finalizedBlockNotifier engine.Notifier
// txResultErrorMessagesChan is used to fetch and store transaction result error messages for blocks
txResultErrorMessagesChan chan flow.Identifier
log zerolog.Logger // used to log relevant actions with context
state protocol.State // used to access the protocol state
me module.Local // used to access local node information
// storage
// FIX: remove direct DB access by substituting indexer module
db storage.DB
lockManager storage.LockManager
blocks storage.Blocks
executionReceipts storage.ExecutionReceipts
maxReceiptHeight uint64
executionResults storage.ExecutionResults
collectionSyncer *collections.Syncer
collectionIndexer *collections.Indexer
// TODO: There's still a need for this metric to be in the ingestion engine rather than collection syncer.
// Maybe it is a good idea to split it up?
collectionExecutedMetric module.CollectionExecutedMetric
accessMetrics module.AccessMetrics
txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore
}
var _ network.MessageProcessor = (*Engine)(nil)
// New creates a new access ingestion engine
//
// No errors are expected during normal operation.
func New(
log zerolog.Logger,
net network.EngineRegistry,
state protocol.State,
me module.Local,
lockManager storage.LockManager,
db storage.DB,
blocks storage.Blocks,
executionResults storage.ExecutionResults,
executionReceipts storage.ExecutionReceipts,
finalizedProcessedHeight storage.ConsumerProgress,
collectionSyncer *collections.Syncer,
collectionIndexer *collections.Indexer,
collectionExecutedMetric module.CollectionExecutedMetric,
accessMetrics module.AccessMetrics,
txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore,
registrar hotstuff.FinalizationRegistrar,
) (*Engine, error) {
executionReceiptsRawQueue, err := fifoqueue.NewFifoQueue(defaultQueueCapacity)
if err != nil {
return nil, fmt.Errorf("could not create execution receipts queue: %w", err)
}
executionReceiptsQueue := &engine.FifoMessageStore{FifoQueue: executionReceiptsRawQueue}
messageHandler := engine.NewMessageHandler(
log,
engine.NewNotifier(),
engine.Pattern{
Match: func(msg *engine.Message) bool {
_, ok := msg.Payload.(*flow.ExecutionReceipt)
return ok
},
Store: executionReceiptsQueue,
},
)
// initialize the propagation engine with its dependencies
e := &Engine{
log: log.With().Str("engine", "ingestion").Logger(),
state: state,
me: me,
lockManager: lockManager,
db: db,
blocks: blocks,
executionResults: executionResults,
executionReceipts: executionReceipts,
maxReceiptHeight: 0,
collectionExecutedMetric: collectionExecutedMetric,
accessMetrics: accessMetrics,
finalizedBlockNotifier: engine.NewNotifier(),
// queue / notifier for execution receipts
executionReceiptsNotifier: engine.NewNotifier(),
txResultErrorMessagesChan: make(chan flow.Identifier, 1),
executionReceiptsQueue: executionReceiptsQueue,
messageHandler: messageHandler,
txErrorMessagesCore: txErrorMessagesCore,
collectionSyncer: collectionSyncer,
collectionIndexer: collectionIndexer,
}
// jobqueue Jobs object that tracks finalized blocks by height. This is used by the finalizedBlockConsumer
// to get a sequential list of finalized blocks.
finalizedBlockReader := jobqueue.NewFinalizedBlockReader(state, blocks)
// create a jobqueue that will process new available finalized block. The `finalizedBlockNotifier` is used to
// signal new work, which is being triggered on the `processFinalizedBlockJob` handler.
e.finalizedBlockConsumer, err = jobqueue.NewComponentConsumer(
e.log.With().Str("module", "ingestion_block_consumer").Logger(),
e.finalizedBlockNotifier.Channel(),
finalizedProcessedHeight,
finalizedBlockReader,
e.processFinalizedBlockJob,
processFinalizedBlocksWorkersCount,
searchAhead,
)
if err != nil {
return nil, fmt.Errorf("error creating finalizedBlock jobqueue: %w", err)
}
// Add workers
builder := component.NewComponentManagerBuilder().
AddWorker(e.collectionSyncer.WorkerLoop).
AddWorker(e.collectionIndexer.WorkerLoop).
AddWorker(e.processExecutionReceipts).
AddWorker(e.runFinalizedBlockConsumer)
// If txErrorMessagesCore is provided, add a worker responsible for processing
// transaction result error messages by receipts. This worker listens for blocks
// containing execution receipts and processes any associated transaction result
// error messages. The worker is added only when error message processing is enabled.
if txErrorMessagesCore != nil {
builder.AddWorker(e.processTransactionResultErrorMessagesByReceipts)
}
e.ComponentManager = builder.Build()
// register engine with the execution receipt provider
_, err = net.Register(channels.ReceiveReceipts, e)
if err != nil {
return nil, fmt.Errorf("could not register for results: %w", err)
}
registrar.AddOnBlockFinalizedConsumer(e.onFinalizedBlock)
return e, nil
}
// runFinalizedBlockConsumer runs the finalizedBlockConsumer component
func (e *Engine) runFinalizedBlockConsumer(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
e.finalizedBlockConsumer.Start(ctx)
err := util.WaitClosed(ctx, e.finalizedBlockConsumer.Ready())
if err == nil {
ready()
}
<-e.finalizedBlockConsumer.Done()
}
// processFinalizedBlockJob is a handler function for processing finalized block jobs.
// It converts the job to a block, processes the block, and logs any errors encountered during processing.
func (e *Engine) processFinalizedBlockJob(ctx irrecoverable.SignalerContext, job module.Job, done func()) {
block, err := jobqueue.JobToBlock(job)
if err != nil {
ctx.Throw(fmt.Errorf("failed to convert job to block: %w", err))
}
err = e.processFinalizedBlock(block)
if err != nil {
ctx.Throw(
fmt.Errorf(
"fatal error when ingestion building col->block index for finalized block (job: %s, height: %v): %w",
job.ID(), block.Height, err))
return
}
done()
}
// processExecutionReceipts is responsible for processing the execution receipts.
// It listens for incoming execution receipts and processes them asynchronously.
func (e *Engine) processExecutionReceipts(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
notifier := e.executionReceiptsNotifier.Channel()
for {
select {
case <-ctx.Done():
return
case <-notifier:
err := e.processAvailableExecutionReceipts(ctx)
if err != nil {
// if an error reaches this point, it is unexpected
ctx.Throw(err)
return
}
}
}
}
// processAvailableExecutionReceipts processes available execution receipts in the queue and handles it.
// It continues processing until the context is canceled.
//
// No errors are expected during normal operation.
func (e *Engine) processAvailableExecutionReceipts(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
default:
}
msg, ok := e.executionReceiptsQueue.Get()
if !ok {
return nil
}
receipt := msg.Payload.(*flow.ExecutionReceipt)
if err := e.handleExecutionReceipt(msg.OriginID, receipt); err != nil {
return err
}
// Notify to fetch and store transaction result error messages for the block.
// If txErrorMessagesCore is enabled, the receipt's BlockID is sent to trigger
// transaction error message processing. This step is skipped if error message
// storage is not enabled.
if e.txErrorMessagesCore != nil {
e.txResultErrorMessagesChan <- receipt.BlockID
}
}
}
// processTransactionResultErrorMessagesByReceipts handles error messages related to transaction
// results by reading from the error messages channel and processing them accordingly.
//
// This function listens for messages on the txResultErrorMessagesChan channel and
// processes each transaction result error message as it arrives.
//
// No errors are expected during normal operation.
func (e *Engine) processTransactionResultErrorMessagesByReceipts(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
for {
select {
case <-ctx.Done():
return
case blockID := <-e.txResultErrorMessagesChan:
err := e.txErrorMessagesCore.FetchErrorMessages(ctx, blockID)
if err != nil {
// TODO: we should revisit error handling here.
// Errors that come from querying the EN and possibly ExecutionNodesForBlockID should be logged and
// retried later, while others should cause an exception.
e.log.Error().
Err(err).
Msg("error encountered while processing transaction result error messages by receipts")
}
}
}
}
// process processes the given ingestion engine event. Events that are given
// to this function originate within the expulsion engine on the node with the
// given origin ID.
func (e *Engine) process(originID flow.Identifier, event interface{}) error {
select {
case <-e.ComponentManager.ShutdownSignal():
return component.ErrComponentShutdown
default:
}
switch event.(type) {
case *flow.ExecutionReceipt:
err := e.messageHandler.Process(originID, event)
e.executionReceiptsNotifier.Notify()
return err
default:
return fmt.Errorf("invalid event type (%T)", event)
}
}
// Process processes the given event from the node with the given origin ID in
// a blocking manner. It returns the potential processing error when done.
func (e *Engine) Process(_ channels.Channel, originID flow.Identifier, event interface{}) error {
return e.process(originID, event)
}
// onFinalizedBlock is called by the follower engine after a block has been finalized and the state has been updated.
// Receives block finalized events from the finalization registrar and forwards them to the finalizedBlockConsumer.
func (e *Engine) onFinalizedBlock(*model.Block) {
e.finalizedBlockNotifier.Notify()
}
// processFinalizedBlock handles an incoming finalized block.
// It processes the block, indexes it for further processing, and requests missing collections if necessary.
// If the block is already indexed (storage.ErrAlreadyExists), it logs a warning and continues processing.
//
// Expected errors during normal operation:
// - storage.ErrNotFound - if last full block height does not exist in the database.
// - generic error in case of unexpected failure from the database layer, or failure
// to decode an existing database value.
func (e *Engine) processFinalizedBlock(block *flow.Block) error {
// FIX: we can't index guarantees here, as we might have more than one block
// with the same collection as long as it is not finalized
// TODO: substitute an indexer module as layer between engine and storage
// index the block storage with each of the collection guarantee
err := storage.WithLocks(e.lockManager, storage.LockGroupAccessFinalizingBlock, func(lctx lockctx.Context) error {
return e.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
// requires [storage.LockIndexBlockByPayloadGuarantees] lock
err := e.blocks.BatchIndexBlockContainingCollectionGuarantees(lctx, rw, block.ID(), flow.GetIDs(block.Payload.Guarantees))
if err != nil {
return fmt.Errorf("could not index block for collections: %w", err)
}
// loop through seals and index ID -> result ID
for _, seal := range block.Payload.Seals {
// requires [storage.LockIndexExecutionResult] lock
err := e.executionResults.BatchIndex(lctx, rw, seal.BlockID, seal.ResultID)
if err != nil {
return fmt.Errorf("could not index block for execution result: %w", err)
}
}
return nil
})
})
if err != nil {
if !errors.Is(err, storage.ErrAlreadyExists) {
return fmt.Errorf("could not index block for collections: %w", err)
}
// the job queue processed index is updated in a separate db update, so it's possible that the above index
// has been built, but the jobqueue index has not been updated yet. In this case, we can safely skip processing.
e.log.Warn().
Uint64("height", block.Height).
Str("block_id", block.ID().String()).
Msg("block already indexed, skipping indexing")
}
err = e.collectionSyncer.RequestCollectionsForBlock(block.Height, block.Payload.Guarantees)
if err != nil {
return fmt.Errorf("could not request collections for block: %w", err)
}
e.collectionExecutedMetric.BlockFinalized(block)
e.accessMetrics.UpdateIngestionFinalizedBlockHeight(block.Height)
return nil
}
// handleExecutionReceipt persists the execution receipt locally.
// Storing the execution receipt and updates the collection executed metric.
//
// No errors are expected during normal operation.
func (e *Engine) handleExecutionReceipt(_ flow.Identifier, r *flow.ExecutionReceipt) error {
// persist the execution receipt locally, storing will also index the receipt
err := e.executionReceipts.Store(r)
if err != nil {
return fmt.Errorf("failed to store execution receipt: %w", err)
}
e.collectionExecutedMetric.ExecutionReceiptReceived(r)
return nil
}