Skip to content
96 changes: 58 additions & 38 deletions state/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"strings"
"time"

Expand Down Expand Up @@ -127,52 +128,38 @@ outer:
for _, col := range block.CollectionGuarantees {
colData := &collectionData{}
cols = append(cols, colData)
initialCol := true
for {
select {
case <-ctx.Done():
span.End()
return
default:
}
if initialCol {
initialCol = false
} else {
time.Sleep(time.Second)
}
client := spork.AccessNodes.Client()
info, err := client.CollectionByID(ctx, col.CollectionId)
if err != nil {
log.Errorf(
"Failed to fetch collection %x in block %x at height %d: %s",
col.CollectionId, hash, height, err,
)
if errors.Is(err, context.Canceled) {
span.End()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this something that could be called in a defer, or are there cases where we return without calling it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I avoided using a defer because of the trace.EndSpanOk() being called when the function is successful, but it seems that span.End should be idempotent (so ok to be defer). I don't know if there is a case where we would want to return without calling it, and I assumed not.

return
}
time.Sleep(time.Second)
continue
}
tctx := ctx
for _, txnHash := range info.TransactionIds {
ctx = tctx
initialTxn := true
txnIndex++
for {
select {
case <-ctx.Done():
span.End()
return
default:
}
if initialTxn {
initialTxn = false
} else {
time.Sleep(time.Second)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is different. we're no longer sleeping for each request, just failing requests. I think that's ok since we will receive an error if the request is rate limited, which will cause it to sleep.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In each of these cases there's only one request, and the sleep was only occurring on retries - notice that initialTxn := true was run for every transactionID (so every unique request that is made). The updated code should have the same behaviour as far as I can tell - no sleep on the first attempt, and sleep before retrying.

}
client := spork.AccessNodes.Client()
info, err := client.Transaction(ctx, txnHash)
if err != nil {
log.Errorf(
"Failed to fetch transaction %x in block %x at height %d: %s",
txnHash, hash, height, err,
)
if errors.Is(err, context.Canceled) {
span.End()
return
}
time.Sleep(time.Second)
continue
}
client = spork.AccessNodes.Client()
Expand All @@ -182,6 +169,11 @@ outer:
"Failed to fetch transaction result for %x in block %x at height %d: %s",
txnHash, hash, height, err,
)
if errors.Is(err, context.Canceled) {
span.End()
return
}
time.Sleep(time.Second)
continue
}
colData.txns = append(colData.txns, info)
Expand All @@ -201,30 +193,63 @@ outer:
// the self-sealed root block of a spork (where a ZeroID is used for
// the event collection hash).
if !(spork.Prev != nil && height == spork.RootBlock) {
// TODO(tav): We check for just one transaction in the system
// collection. If this changes in the future, we will need to
// update the logic here to speculatively fetch more transaction
// results.
// We always retrieve the first transaction of the system collection.
txnIndex++
for {
select {
case <-ctx.Done():
return
default:
}
client := spork.AccessNodes.Client()
txnResult, err := client.TransactionResult(ctx, hash, uint32(txnIndex))
if err != nil {
log.Errorf(
"Failed to fetch transaction result at index %d in block %x at height %d: %s",
txnIndex, hash, height, err,
)
if errors.Is(err, context.Canceled) {
span.End()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handling a canceled context previously didn't call end. is it OK to add it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of a canceled context, we will be returning anyways and abandoning the processing of the block (this was already the case). I think the previous code was incorrect in not calling end or deferring a call to it, as the span will be going out of scope.

return
}
time.Sleep(time.Second)
continue
}
col.txnResults = append(col.txnResults, txnResult)
break
}
if spork.Version >= 8 {
// check if the first tx in the system collection contains events indicating scheduled transactions were run
// We are already on the slow path where GetTransactionsByBlockID/GetTransactionResultsByBlockID has failed
systemTxEvents := col.txnResults[0].Events
scheduledTxs := 0
for _, event := range systemTxEvents {
if strings.HasSuffix(event.Type, ".FlowTransactionScheduler.PendingExecution") {
scheduledTxs++
}
}
// Request scheduled transactions and the final system transaction
for range scheduledTxs + 1 {
txnIndex++
for {
client := spork.AccessNodes.Client()
txnResult, err := client.TransactionResult(ctx, hash, uint32(txnIndex))
if err != nil {
log.Errorf(
"Failed to fetch transaction result at index %d in block %x at height %d: %s",
txnIndex, hash, height, err,
)
if errors.Is(err, context.Canceled) {
span.End()
return
}
if status.Code(err) == codes.NotFound {
// ensure we don't continuously request a nonexistent transaction
break
}
time.Sleep(time.Second)
continue
}
col.txnResults = append(col.txnResults, txnResult)
break
}
}
}
}
} else {
col := &collectionData{}
Expand Down Expand Up @@ -378,11 +403,6 @@ outer:
newCounter := 0
transfers := 0
for _, col := range cols {
// TODO(tav): We may want to index events from the system collection
// at some point in the future.
if col.system {
continue
}
for idx, txnResult := range col.txnResults {
select {
case <-ctx.Done():
Expand Down