Skip to content

Commit 266ad1f

Browse files
authored
Merge pull request #6809 from onflow/tim/6807-AsyncUploader-engine.Unit-refactor
Refactor AsyncUploader to replace Engine.Unit with ComponentManager
2 parents 2e6fc67 + 955373a commit 266ad1f

File tree

5 files changed

+119
-58
lines changed

5 files changed

+119
-58
lines changed

engine/execution/ingestion/uploader/retryable_uploader_wrapper.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/onflow/flow-go/engine/execution"
1111
"github.com/onflow/flow-go/model/flow"
1212
"github.com/onflow/flow-go/module"
13+
"github.com/onflow/flow-go/module/component"
1314
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
1415
"github.com/onflow/flow-go/module/mempool/entity"
1516
"github.com/onflow/flow-go/storage"
@@ -34,6 +35,7 @@ type BadgerRetryableUploaderWrapper struct {
3435
results storage.ExecutionResults
3536
transactionResults storage.TransactionResults
3637
uploadStatusStore storage.ComputationResultUploadStatus
38+
component.Component
3739
}
3840

3941
func NewBadgerRetryableUploaderWrapper(
@@ -99,17 +101,10 @@ func NewBadgerRetryableUploaderWrapper(
99101
results: results,
100102
transactionResults: transactionResults,
101103
uploadStatusStore: uploadStatusStore,
104+
Component: uploader, // delegate to the AsyncUploader
102105
}
103106
}
104107

105-
func (b *BadgerRetryableUploaderWrapper) Ready() <-chan struct{} {
106-
return b.uploader.Ready()
107-
}
108-
109-
func (b *BadgerRetryableUploaderWrapper) Done() <-chan struct{} {
110-
return b.uploader.Done()
111-
}
112-
113108
func (b *BadgerRetryableUploaderWrapper) Upload(computationResult *execution.ComputationResult) error {
114109
if computationResult == nil || computationResult.ExecutableBlock == nil ||
115110
computationResult.ExecutableBlock.Block == nil {

engine/execution/ingestion/uploader/retryable_uploader_wrapper_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package uploader
22

33
import (
4+
"context"
45
"sync"
56
"testing"
67
"time"
@@ -11,6 +12,7 @@ import (
1112
"github.com/onflow/flow-go/model/flow"
1213
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
1314
executionDataMock "github.com/onflow/flow-go/module/executiondatasync/execution_data/mock"
15+
"github.com/onflow/flow-go/module/irrecoverable"
1416
"github.com/onflow/flow-go/module/mempool/entity"
1517
"github.com/onflow/flow-go/module/metrics"
1618

@@ -26,6 +28,8 @@ import (
2628
)
2729

2830
func Test_Upload_invoke(t *testing.T) {
31+
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
32+
defer cancel()
2933
wg := sync.WaitGroup{}
3034
uploaderCalled := false
3135

@@ -40,7 +44,7 @@ func Test_Upload_invoke(t *testing.T) {
4044
1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{})
4145

4246
testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader)
43-
defer testRetryableUploaderWrapper.Done()
47+
testRetryableUploaderWrapper.Start(ctx)
4448

4549
// nil input - no call to Upload()
4650
err := testRetryableUploaderWrapper.Upload(nil)
@@ -58,6 +62,8 @@ func Test_Upload_invoke(t *testing.T) {
5862
}
5963

6064
func Test_RetryUpload(t *testing.T) {
65+
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
66+
defer cancel()
6167
wg := sync.WaitGroup{}
6268
wg.Add(1)
6369
uploaderCalled := false
@@ -72,7 +78,7 @@ func Test_RetryUpload(t *testing.T) {
7278
1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{})
7379

7480
testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader)
75-
defer testRetryableUploaderWrapper.Done()
81+
testRetryableUploaderWrapper.Start(ctx)
7682

7783
err := testRetryableUploaderWrapper.RetryUpload()
7884
wg.Wait()
@@ -82,6 +88,8 @@ func Test_RetryUpload(t *testing.T) {
8288
}
8389

8490
func Test_AsyncUploaderCallback(t *testing.T) {
91+
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
92+
defer cancel()
8593
wgUploadCalleded := sync.WaitGroup{}
8694
wgUploadCalleded.Add(1)
8795

@@ -95,7 +103,7 @@ func Test_AsyncUploaderCallback(t *testing.T) {
95103
1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{})
96104

97105
testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader)
98-
defer testRetryableUploaderWrapper.Done()
106+
testRetryableUploaderWrapper.Start(ctx)
99107

100108
testComputationResult := createTestComputationResult()
101109
err := testRetryableUploaderWrapper.Upload(testComputationResult)

engine/execution/ingestion/uploader/uploader.go

Lines changed: 65 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ import (
66

77
"github.com/rs/zerolog"
88

9-
"github.com/onflow/flow-go/engine"
109
"github.com/onflow/flow-go/engine/execution"
1110
"github.com/onflow/flow-go/module"
11+
"github.com/onflow/flow-go/module/component"
12+
"github.com/onflow/flow-go/module/irrecoverable"
1213
"github.com/onflow/flow-go/utils/logging"
1314

1415
"github.com/sethvargo/go-retry"
@@ -26,74 +27,102 @@ func NewAsyncUploader(uploader Uploader,
2627
maxRetryNumber uint64,
2728
log zerolog.Logger,
2829
metrics module.ExecutionMetrics) *AsyncUploader {
29-
return &AsyncUploader{
30-
unit: engine.NewUnit(),
30+
a := &AsyncUploader{
3131
uploader: uploader,
3232
log: log.With().Str("component", "block_data_uploader").Logger(),
3333
metrics: metrics,
3434
retryInitialTimeout: retryInitialTimeout,
3535
maxRetryNumber: maxRetryNumber,
36+
// we use a channel rather than a Fifoqueue here because a Fifoqueue might drop items when full,
37+
// but it is not acceptable to skip uploading an execution result
38+
queue: make(chan *execution.ComputationResult, 20000),
3639
}
40+
builder := component.NewComponentManagerBuilder()
41+
for i := 0; i < 10; i++ {
42+
builder.AddWorker(a.UploadWorker)
43+
}
44+
a.cm = builder.Build()
45+
a.Component = a.cm
46+
return a
3747
}
3848

3949
// AsyncUploader wraps up another Uploader instance and make its upload asynchronous
4050
type AsyncUploader struct {
41-
module.ReadyDoneAware
42-
unit *engine.Unit
4351
uploader Uploader
4452
log zerolog.Logger
4553
metrics module.ExecutionMetrics
4654
retryInitialTimeout time.Duration
4755
maxRetryNumber uint64
4856
onComplete OnCompleteFunc // callback function called after Upload is completed
57+
queue chan *execution.ComputationResult
58+
cm *component.ComponentManager
59+
component.Component
4960
}
5061

51-
func (a *AsyncUploader) Ready() <-chan struct{} {
52-
return a.unit.Ready()
53-
}
54-
55-
func (a *AsyncUploader) Done() <-chan struct{} {
56-
return a.unit.Done()
62+
// UploadWorker implements a component worker which asynchronously uploads computation results
63+
// from the execution node (after a block is executed) to storage such as a GCP bucket or S3 bucket.
64+
func (a *AsyncUploader) UploadWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
65+
ready()
66+
67+
done := ctx.Done()
68+
for {
69+
select {
70+
case <-done:
71+
return
72+
case computationResult := <-a.queue:
73+
a.UploadTask(ctx, computationResult)
74+
}
75+
}
5776
}
5877

5978
func (a *AsyncUploader) SetOnCompleteCallback(onComplete OnCompleteFunc) {
6079
a.onComplete = onComplete
6180
}
6281

82+
// Upload adds the computation result to a queue to be processed asynchronously by workers,
83+
// ensuring that multiple uploads can be run in parallel.
84+
// No errors expected during normal operation.
6385
func (a *AsyncUploader) Upload(computationResult *execution.ComputationResult) error {
86+
a.queue <- computationResult
87+
return nil
88+
}
6489

90+
// UploadTask implements retrying for uploading computation results.
91+
// When the upload is complete, the callback will be called with the result (for example,
92+
// to record that the upload was successful) and any error.
93+
// No errors expected during normal operation.
94+
func (a *AsyncUploader) UploadTask(ctx context.Context, computationResult *execution.ComputationResult) {
6595
backoff := retry.NewFibonacci(a.retryInitialTimeout)
6696
backoff = retry.WithMaxRetries(a.maxRetryNumber, backoff)
6797

68-
a.unit.Launch(func() {
69-
a.metrics.ExecutionBlockDataUploadStarted()
70-
start := time.Now()
71-
72-
a.log.Debug().Msgf("computation result of block %s is being uploaded",
73-
computationResult.ExecutableBlock.ID().String())
98+
a.metrics.ExecutionBlockDataUploadStarted()
99+
start := time.Now()
74100

75-
err := retry.Do(a.unit.Ctx(), backoff, func(ctx context.Context) error {
76-
err := a.uploader.Upload(computationResult)
77-
if err != nil {
78-
a.log.Warn().Err(err).Msg("error while uploading block data, retrying")
79-
}
80-
return retry.RetryableError(err)
81-
})
101+
a.log.Debug().Msgf("computation result of block %s is being uploaded",
102+
computationResult.ExecutableBlock.ID().String())
82103

104+
err := retry.Do(ctx, backoff, func(ctx context.Context) error {
105+
err := a.uploader.Upload(computationResult)
83106
if err != nil {
84-
a.log.Error().Err(err).
85-
Hex("block_id", logging.Entity(computationResult.ExecutableBlock)).
86-
Msg("failed to upload block data")
87-
} else {
88-
a.log.Debug().Msgf("computation result of block %s was successfully uploaded",
89-
computationResult.ExecutableBlock.ID().String())
107+
a.log.Warn().Err(err).Msg("error while uploading block data, retrying")
90108
}
109+
return retry.RetryableError(err)
110+
})
91111

92-
a.metrics.ExecutionBlockDataUploadFinished(time.Since(start))
112+
// We only log upload errors here because the errors originate from an external cloud provider
113+
// and the upload success is not critical to correct continued operation of the node
114+
if err != nil {
115+
a.log.Error().Err(err).
116+
Hex("block_id", logging.Entity(computationResult.ExecutableBlock)).
117+
Msg("failed to upload block data")
118+
} else {
119+
a.log.Debug().Msgf("computation result of block %s was successfully uploaded",
120+
computationResult.ExecutableBlock.ID().String())
121+
}
93122

94-
if a.onComplete != nil {
95-
a.onComplete(computationResult, err)
96-
}
97-
})
98-
return nil
123+
a.metrics.ExecutionBlockDataUploadFinished(time.Since(start))
124+
125+
if a.onComplete != nil {
126+
a.onComplete(computationResult, err)
127+
}
99128
}

engine/execution/ingestion/uploader/uploader_test.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package uploader
22

33
import (
44
"bytes"
5+
"context"
56
"fmt"
67
"runtime/debug"
78
"sync"
@@ -13,17 +14,17 @@ import (
1314
"go.uber.org/atomic"
1415

1516
"github.com/onflow/flow-go/engine/execution"
16-
"github.com/onflow/flow-go/engine/execution/state/unittest"
17+
exeunittest "github.com/onflow/flow-go/engine/execution/state/unittest"
18+
"github.com/onflow/flow-go/module/irrecoverable"
1719
"github.com/onflow/flow-go/module/metrics"
18-
testutils "github.com/onflow/flow-go/utils/unittest"
19-
unittest2 "github.com/onflow/flow-go/utils/unittest"
20+
"github.com/onflow/flow-go/utils/unittest"
2021
)
2122

2223
func Test_AsyncUploader(t *testing.T) {
2324

24-
computationResult := unittest.ComputationResultFixture(
25+
computationResult := exeunittest.ComputationResultFixture(
2526
t,
26-
testutils.IdentifierFixture(),
27+
unittest.IdentifierFixture(),
2728
nil)
2829

2930
t.Run("uploads are run in parallel and emit metrics", func(t *testing.T) {
@@ -46,6 +47,8 @@ func Test_AsyncUploader(t *testing.T) {
4647

4748
metrics := &DummyCollector{}
4849
async := NewAsyncUploader(uploader, 1*time.Nanosecond, 1, zerolog.Nop(), metrics)
50+
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
51+
async.Start(ctx)
4952

5053
err := async.Upload(computationResult)
5154
require.NoError(t, err)
@@ -63,6 +66,8 @@ func Test_AsyncUploader(t *testing.T) {
6366
wgContinueUpload.Done() //release all
6467

6568
// shut down component
69+
cancel()
70+
unittest.AssertClosesBefore(t, async.Done(), 1*time.Second, "async uploader did not finish in time")
6671
<-async.Done()
6772

6873
require.Equal(t, int64(0), metrics.Counter.Load())
@@ -89,6 +94,9 @@ func Test_AsyncUploader(t *testing.T) {
8994
}
9095

9196
async := NewAsyncUploader(uploader, 1*time.Nanosecond, 5, zerolog.Nop(), &metrics.NoopCollector{})
97+
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
98+
async.Start(ctx)
99+
defer cancel()
92100

93101
err := async.Upload(computationResult)
94102
require.NoError(t, err)
@@ -107,7 +115,7 @@ func Test_AsyncUploader(t *testing.T) {
107115
// 2. shut down async uploader right after upload initiated (not completed)
108116
// 3. assert that upload called only once even when trying to use retry mechanism
109117
t.Run("stopping component stops retrying", func(t *testing.T) {
110-
testutils.SkipUnless(t, testutils.TEST_FLAKY, "flaky")
118+
unittest.SkipUnless(t, unittest.TEST_FLAKY, "flaky")
111119

112120
callCount := 0
113121
t.Log("test started grID:", string(bytes.Fields(debug.Stack())[1]))
@@ -151,6 +159,8 @@ func Test_AsyncUploader(t *testing.T) {
151159
}
152160
t.Log("about to create NewAsyncUploader grID:", string(bytes.Fields(debug.Stack())[1]))
153161
async := NewAsyncUploader(uploader, 1*time.Nanosecond, 5, zerolog.Nop(), &metrics.NoopCollector{})
162+
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
163+
async.Start(ctx)
154164
t.Log("about to call async.Upload() grID:", string(bytes.Fields(debug.Stack())[1]))
155165
err := async.Upload(computationResult) // doesn't matter what we upload
156166
require.NoError(t, err)
@@ -163,11 +173,11 @@ func Test_AsyncUploader(t *testing.T) {
163173

164174
// stop component and check that it's fully stopped
165175
t.Log("about to initiate shutdown grID: ", string(bytes.Fields(debug.Stack())[1]))
166-
c := async.Done()
176+
cancel()
167177
t.Log("about to notify upload() that shutdown started and can continue uploading grID:", string(bytes.Fields(debug.Stack())[1]))
168178
wgShutdownStarted.Done()
169179
t.Log("about to check async done channel is closed grID:", string(bytes.Fields(debug.Stack())[1]))
170-
unittest2.RequireCloseBefore(t, c, 1*time.Second, "async uploader not closed in time")
180+
unittest.RequireCloseBefore(t, async.Done(), 1*time.Second, "async uploader not closed in time")
171181

172182
t.Log("about to check if callCount is 1 grID:", string(bytes.Fields(debug.Stack())[1]))
173183
require.Equal(t, 1, callCount)
@@ -190,12 +200,15 @@ func Test_AsyncUploader(t *testing.T) {
190200
async.SetOnCompleteCallback(func(computationResult *execution.ComputationResult, err error) {
191201
onCompleteCallbackCalled = true
192202
})
203+
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background())
204+
async.Start(ctx)
193205

194206
err := async.Upload(computationResult)
195207
require.NoError(t, err)
196208

197-
wgUploadCalleded.Wait()
198-
<-async.Done()
209+
unittest.AssertReturnsBefore(t, wgUploadCalleded.Wait, time.Second)
210+
cancel()
211+
unittest.AssertClosesBefore(t, async.Done(), 1*time.Second, "async uploader not done in time")
199212

200213
require.True(t, onCompleteCallbackCalled)
201214
})

module/component/component.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,25 @@ var ErrComponentShutdown = fmt.Errorf("component has already shut down")
1919
// channels that close when startup and shutdown have completed.
2020
// Once Start has been called, the channel returned by Done must close eventually,
2121
// whether that be because of a graceful shutdown or an irrecoverable error.
22+
// See also ComponentManager below.
2223
type Component interface {
2324
module.Startable
24-
module.ReadyDoneAware
25+
// Ready returns a ready channel that is closed once startup has completed.
26+
// Unlike the previous [module.ReadyDoneAware] interface, Ready does not start the component,
27+
// but only exposes information about whether the component has completed startup.
28+
// To start the component, instead use the Start() method.
29+
// Note that the ready channel may never close if errors are encountered during startup,
30+
// or if shutdown has already commenced before startup is complete.
31+
// This should be an idempotent method.
32+
Ready() <-chan struct{}
33+
34+
// Done returns a done channel that is closed once shutdown has completed.
35+
// Unlike the previous [module.ReadyDoneAware] interface, Done does not shut down the component,
36+
// but only exposes information about whether the component has shut down yet.
37+
// To shutdown the component, instead cancel the context that was passed to Start().
38+
// Implementations must close the done channel even if errors are encountered during shutdown.
39+
// This should be an idempotent method.
40+
Done() <-chan struct{}
2541
}
2642

2743
type ComponentFactory func() (Component, error)

0 commit comments

Comments
 (0)