diff --git a/state/process.go b/state/process.go index 90c799a..da8111c 100644 --- a/state/process.go +++ b/state/process.go @@ -41,6 +41,7 @@ func (i *Indexer) processBlock(rctx context.Context, spork *config.Spork, height slowPath := false unsealed := false var span trace.Span + defer func() { span.End() }() outer: for { if span != nil { @@ -127,19 +128,7 @@ outer: for _, col := range block.CollectionGuarantees { colData := &collectionData{} cols = append(cols, colData) - initialCol := true for { - select { - case <-ctx.Done(): - span.End() - return - default: - } - if initialCol { - initialCol = false - } else { - time.Sleep(time.Second) - } client := spork.AccessNodes.Client() info, err := client.CollectionByID(ctx, col.CollectionId) if err != nil { @@ -147,25 +136,17 @@ outer: "Failed to fetch collection %x in block %x at height %d: %s", col.CollectionId, hash, height, err, ) + if status.Code(err) == codes.Canceled { + return + } + time.Sleep(time.Second) continue } tctx := ctx for _, txnHash := range info.TransactionIds { ctx = tctx - initialTxn := true txnIndex++ for { - select { - case <-ctx.Done(): - span.End() - return - default: - } - if initialTxn { - initialTxn = false - } else { - time.Sleep(time.Second) - } client := spork.AccessNodes.Client() info, err := client.Transaction(ctx, txnHash) if err != nil { @@ -173,18 +154,29 @@ outer: "Failed to fetch transaction %x in block %x at height %d: %s", txnHash, hash, height, err, ) + if status.Code(err) == codes.Canceled { + return + } + time.Sleep(time.Second) continue } - client = spork.AccessNodes.Client() + colData.txns = append(colData.txns, info) + break + } + for { + client := spork.AccessNodes.Client() txnResult, err := client.TransactionResult(ctx, hash, uint32(txnIndex)) if err != nil { log.Errorf( "Failed to fetch transaction result for %x in block %x at height %d: %s", txnHash, hash, height, err, ) + if status.Code(err) == codes.Canceled { + return + } + time.Sleep(time.Second) continue } - colData.txns = append(colData.txns, info) colData.txnResults = append(colData.txnResults, txnResult) break } @@ -201,17 +193,9 @@ outer: // the self-sealed root block of a spork (where a ZeroID is used for // the event collection hash). if !(spork.Prev != nil && height == spork.RootBlock) { - // TODO(tav): We check for just one transaction in the system - // collection. If this changes in the future, we will need to - // update the logic here to speculatively fetch more transaction - // results. + // We always retrieve the first transaction of the system collection. txnIndex++ for { - select { - case <-ctx.Done(): - return - default: - } client := spork.AccessNodes.Client() txnResult, err := client.TransactionResult(ctx, hash, uint32(txnIndex)) if err != nil { @@ -219,12 +203,72 @@ outer: "Failed to fetch transaction result at index %d in block %x at height %d: %s", txnIndex, hash, height, err, ) + if status.Code(err) == codes.Canceled { + return + } time.Sleep(time.Second) continue } col.txnResults = append(col.txnResults, txnResult) break } + if spork.Version >= 8 { + // check if the first tx in the system collection contains events indicating scheduled transactions were run + // We are already on the slow path where GetTransactionsByBlockID/GetTransactionResultsByBlockID has failed + systemTxEvents := col.txnResults[0].Events + scheduledTxs := 0 + for _, event := range systemTxEvents { + if strings.HasSuffix(event.Type, ".FlowTransactionScheduler.PendingExecution") { + scheduledTxs++ + } + } + // Request scheduled transactions and the final system transaction + fetchSystemTxs: + for range scheduledTxs + 1 { + txnIndex++ + for { + client := spork.AccessNodes.Client() + txnResult, err := client.TransactionResult(ctx, hash, uint32(txnIndex)) + if err != nil { + log.Errorf( + "Failed to fetch transaction result at index %d in block %x at height %d: %s", + txnIndex, hash, height, err, + ) + if status.Code(err) == codes.Canceled { + return + } + if status.Code(err) == codes.NotFound { + // no more transactions available + break fetchSystemTxs + } + time.Sleep(time.Second) + continue + } + col.txnResults = append(col.txnResults, txnResult) + break + } + } + } + // we have the transaction results for the system collection; now fetch transaction infos + for _, result := range col.txnResults { + for { + client := spork.AccessNodes.Client() + info, err := client.Transaction(ctx, result.TransactionId) + if err != nil { + log.Errorf( + "Failed to fetch transaction %x in block %x at height %d: %s", + result.TransactionId, hash, height, err, + ) + if status.Code(err) == codes.Canceled { + return + } + time.Sleep(time.Second) + continue + } + col.txns = append(col.txns, info) + break + } + } } } else { col := &collectionData{} @@ -378,15 +422,9 @@ outer: newCounter := 0 transfers := 0 for _, col := range cols { - // TODO(tav): We may want to index events from the system collection - // at some point in the future. - if col.system { - continue - } for idx, txnResult := range col.txnResults { select { case <-ctx.Done(): - span.End() return default: }