Skip to content

Commit 614468a

Browse files
authored
[Exec] Adding collection result consumer type (#4049)
1 parent 68bdd7a commit 614468a

File tree

16 files changed

+183
-18
lines changed

16 files changed

+183
-18
lines changed

engine/execution/computation/computer/computer.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/onflow/flow-go/crypto/hash"
1313
"github.com/onflow/flow-go/engine/execution"
14+
"github.com/onflow/flow-go/engine/execution/computation/result"
1415
"github.com/onflow/flow-go/engine/execution/state/delta"
1516
"github.com/onflow/flow-go/engine/execution/utils"
1617
"github.com/onflow/flow-go/fvm"
@@ -126,6 +127,7 @@ type blockComputer struct {
126127
signer module.Local
127128
spockHasher hash.Hasher
128129
receiptHasher hash.Hasher
130+
colResCons []result.ExecutedCollectionConsumer
129131
}
130132

131133
func SystemChunkContext(vmCtx fvm.Context, logger zerolog.Logger) fvm.Context {
@@ -152,6 +154,7 @@ func NewBlockComputer(
152154
committer ViewCommitter,
153155
signer module.Local,
154156
executionDataProvider *provider.Provider,
157+
colResCons []result.ExecutedCollectionConsumer,
155158
) (BlockComputer, error) {
156159
systemChunkCtx := SystemChunkContext(vmCtx, logger)
157160
vmCtx = fvm.NewContextFromParent(
@@ -170,6 +173,7 @@ func NewBlockComputer(
170173
signer: signer,
171174
spockHasher: utils.NewSPOCKHasher(),
172175
receiptHasher: utils.NewExecutionReceiptHasher(),
176+
colResCons: colResCons,
173177
}, nil
174178
}
175179

@@ -303,7 +307,8 @@ func (e *blockComputer) executeBlock(
303307
e.receiptHasher,
304308
parentBlockExecutionResultID,
305309
block,
306-
len(transactions))
310+
len(transactions),
311+
e.colResCons)
307312
defer collector.Stop()
308313

309314
stateView := delta.NewDeltaView(snapshot)

engine/execution/computation/computer/computer_test.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,8 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
181181
zerolog.Nop(),
182182
committer,
183183
me,
184-
prov)
184+
prov,
185+
nil)
185186
require.NoError(t, err)
186187

187188
// create a block with 1 collection with 2 transactions
@@ -311,7 +312,8 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
311312
zerolog.Nop(),
312313
committer,
313314
me,
314-
prov)
315+
prov,
316+
nil)
315317
require.NoError(t, err)
316318

317319
// create an empty block
@@ -401,7 +403,8 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
401403
zerolog.Nop(),
402404
comm,
403405
me,
404-
prov)
406+
prov,
407+
nil)
405408
require.NoError(t, err)
406409

407410
// create an empty block
@@ -450,7 +453,8 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
450453
zerolog.Nop(),
451454
committer,
452455
me,
453-
prov)
456+
prov,
457+
nil)
454458
require.NoError(t, err)
455459

456460
collectionCount := 2
@@ -636,7 +640,8 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
636640
zerolog.Nop(),
637641
committer.NewNoopViewCommitter(),
638642
me,
639-
prov)
643+
prov,
644+
nil)
640645
require.NoError(t, err)
641646

642647
result, err := exe.ExecuteBlock(
@@ -722,7 +727,8 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
722727
zerolog.Nop(),
723728
committer.NewNoopViewCommitter(),
724729
me,
725-
prov)
730+
prov,
731+
nil)
726732
require.NoError(t, err)
727733

728734
const collectionCount = 2
@@ -823,7 +829,8 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
823829
zerolog.Nop(),
824830
committer.NewNoopViewCommitter(),
825831
me,
826-
prov)
832+
prov,
833+
nil)
827834
require.NoError(t, err)
828835

829836
block := generateBlock(collectionCount, transactionCount, rag)
@@ -1045,7 +1052,8 @@ func Test_AccountStatusRegistersAreIncluded(t *testing.T) {
10451052
zerolog.Nop(),
10461053
committer.NewNoopViewCommitter(),
10471054
me,
1048-
prov)
1055+
prov,
1056+
nil)
10491057
require.NoError(t, err)
10501058

10511059
block := generateBlockWithVisitor(1, 1, fag, func(txBody *flow.TransactionBody) {
@@ -1156,7 +1164,8 @@ func Test_ExecutingSystemCollection(t *testing.T) {
11561164
zerolog.Nop(),
11571165
committer,
11581166
me,
1159-
prov)
1167+
prov,
1168+
nil)
11601169
require.NoError(t, err)
11611170

11621171
// create empty block, it will have system collection attached while executing

engine/execution/computation/computer/result_collector.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/onflow/flow-go/crypto"
1212
"github.com/onflow/flow-go/crypto/hash"
1313
"github.com/onflow/flow-go/engine/execution"
14+
"github.com/onflow/flow-go/engine/execution/computation/result"
1415
"github.com/onflow/flow-go/engine/execution/state/delta"
1516
"github.com/onflow/flow-go/fvm/state"
1617
"github.com/onflow/flow-go/ledger"
@@ -41,6 +42,7 @@ type transactionResult struct {
4142
*state.ExecutionSnapshot
4243
}
4344

45+
// TODO(ramtin): move committer and other folks to consumers layer
4446
type resultCollector struct {
4547
tracer module.Tracer
4648
blockSpan otelTrace.Span
@@ -62,7 +64,8 @@ type resultCollector struct {
6264

6365
parentBlockExecutionResultID flow.Identifier
6466

65-
result *execution.ComputationResult
67+
result *execution.ComputationResult
68+
consumers []result.ExecutedCollectionConsumer
6669

6770
chunks []*flow.Chunk
6871
spockSignatures []crypto.Signature
@@ -88,6 +91,7 @@ func newResultCollector(
8891
parentBlockExecutionResultID flow.Identifier,
8992
block *entity.ExecutableBlock,
9093
numTransactions int,
94+
consumers []result.ExecutedCollectionConsumer,
9195
) *resultCollector {
9296
numCollections := len(block.Collections()) + 1
9397
now := time.Now()
@@ -104,6 +108,7 @@ func newResultCollector(
104108
executionDataProvider: executionDataProvider,
105109
parentBlockExecutionResultID: parentBlockExecutionResultID,
106110
result: execution.NewEmptyComputationResult(block),
111+
consumers: consumers,
107112
chunks: make([]*flow.Chunk, 0, numCollections),
108113
spockSignatures: make([]crypto.Signature, 0, numCollections),
109114
blockStartTime: now,
@@ -226,6 +231,13 @@ func (collector *resultCollector) commitCollection(
226231
NumberOfCollections: 1,
227232
}
228233

234+
for _, consumer := range collector.consumers {
235+
err = consumer.OnExecutedCollection(collector.result.CollectionResult(collection.collectionIndex))
236+
if err != nil {
237+
return fmt.Errorf("consumer failed: %w", err)
238+
}
239+
}
240+
229241
return nil
230242
}
231243

engine/execution/computation/execution_verification_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,8 @@ func executeBlockAndVerifyWithParameters(t *testing.T,
701701
logger,
702702
ledgerCommiter,
703703
me,
704-
prov)
704+
prov,
705+
nil)
705706
require.NoError(t, err)
706707

707708
executableBlock := unittest.ExecutableBlockFromTransactions(chain.ChainID(), txs)

engine/execution/computation/manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ func New(
130130
committer,
131131
me,
132132
executionDataProvider,
133+
nil, // TODO(ramtin): update me with proper consumers
133134
)
134135

135136
if err != nil {

engine/execution/computation/manager_benchmark_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ func BenchmarkComputeBlock(b *testing.B) {
153153
zerolog.Nop(),
154154
committer.NewNoopViewCommitter(),
155155
me,
156-
prov)
156+
prov,
157+
nil)
157158
require.NoError(b, err)
158159

159160
derivedChainData, err := derived.NewDerivedChainData(

engine/execution/computation/manager_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ func TestComputeBlockWithStorage(t *testing.T) {
139139
zerolog.Nop(),
140140
committer.NewNoopViewCommitter(),
141141
me,
142-
prov)
142+
prov,
143+
nil)
143144
require.NoError(t, err)
144145

145146
derivedChainData, err := derived.NewDerivedChainData(10)
@@ -777,6 +778,7 @@ func Test_EventEncodingFailsOnlyTxAndCarriesOn(t *testing.T) {
777778
committer.NewNoopViewCommitter(),
778779
me,
779780
prov,
781+
nil,
780782
)
781783
require.NoError(t, err)
782784

engine/execution/computation/programs_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ func TestPrograms_TestContractUpdates(t *testing.T) {
135135
zerolog.Nop(),
136136
committer.NewNoopViewCommitter(),
137137
me,
138-
prov)
138+
prov,
139+
nil)
139140
require.NoError(t, err)
140141

141142
derivedChainData, err := derived.NewDerivedChainData(10)
@@ -246,7 +247,8 @@ func TestPrograms_TestBlockForks(t *testing.T) {
246247
zerolog.Nop(),
247248
committer.NewNoopViewCommitter(),
248249
me,
249-
prov)
250+
prov,
251+
nil)
250252
require.NoError(t, err)
251253

252254
derivedChainData, err := derived.NewDerivedChainData(10)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package result
2+
3+
import (
4+
"github.com/onflow/flow-go/model/flow"
5+
)
6+
7+
// ExecutedCollection holds results of a collection execution
8+
type ExecutedCollection interface {
9+
// BlockHeader returns the block header in which collection was included
10+
BlockHeader() *flow.Header
11+
12+
// Collection returns the content of the collection
13+
Collection() *flow.Collection
14+
15+
// RegisterUpdates returns all registers that were updated during collection execution
16+
UpdatedRegisters() flow.RegisterEntries
17+
18+
// ReadRegisterIDs returns all registers that has been read during collection execution
19+
ReadRegisterIDs() flow.RegisterIDs
20+
21+
// EmittedEvents returns a list of events emitted during collection execution
22+
EmittedEvents() flow.EventsList
23+
24+
// TransactionResults returns a list of transaction results
25+
TransactionResults() flow.TransactionResults
26+
}
27+
28+
// ExecutedCollectionConsumer consumes ExecutedCollections
29+
type ExecutedCollectionConsumer interface {
30+
OnExecutedCollection(ec ExecutedCollection) error
31+
}

engine/execution/messages.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,61 @@ func NewEmptyComputationResult(
5252
},
5353
}
5454
}
55+
56+
func (cr ComputationResult) transactionResultsByCollectionIndex(colIndex int) []flow.TransactionResult {
57+
var startTxnIndex int
58+
if colIndex > 0 {
59+
startTxnIndex = cr.TransactionResultIndex[colIndex-1]
60+
}
61+
endTxnIndex := cr.TransactionResultIndex[colIndex]
62+
return cr.TransactionResults[startTxnIndex:endTxnIndex]
63+
}
64+
65+
func (cr *ComputationResult) CollectionResult(colIndex int) *ColResSnapshot {
66+
if colIndex < 0 && colIndex > len(cr.CompleteCollections) {
67+
return nil
68+
}
69+
return &ColResSnapshot{
70+
blockHeader: cr.Block.Header,
71+
collection: &flow.Collection{
72+
Transactions: cr.CollectionAt(colIndex).Transactions,
73+
},
74+
updatedRegisters: cr.StateSnapshots[colIndex].UpdatedRegisters(),
75+
readRegisterIDs: cr.StateSnapshots[colIndex].ReadRegisterIDs(),
76+
emittedEvents: cr.Events[colIndex],
77+
transactionResults: cr.transactionResultsByCollectionIndex(colIndex),
78+
}
79+
}
80+
81+
type ColResSnapshot struct {
82+
blockHeader *flow.Header
83+
collection *flow.Collection
84+
updatedRegisters flow.RegisterEntries
85+
readRegisterIDs flow.RegisterIDs
86+
emittedEvents flow.EventsList
87+
transactionResults flow.TransactionResults
88+
}
89+
90+
func (c *ColResSnapshot) BlockHeader() *flow.Header {
91+
return c.blockHeader
92+
}
93+
94+
func (c *ColResSnapshot) Collection() *flow.Collection {
95+
return c.collection
96+
}
97+
98+
func (c *ColResSnapshot) UpdatedRegisters() flow.RegisterEntries {
99+
return c.updatedRegisters
100+
}
101+
102+
func (c *ColResSnapshot) ReadRegisterIDs() flow.RegisterIDs {
103+
return c.readRegisterIDs
104+
}
105+
106+
func (c *ColResSnapshot) EmittedEvents() flow.EventsList {
107+
return c.emittedEvents
108+
}
109+
110+
func (c *ColResSnapshot) TransactionResults() flow.TransactionResults {
111+
return c.transactionResults
112+
}

0 commit comments

Comments
 (0)