Skip to content

Commit c47f6f5

Browse files
committed
Merge branch 'master' into bastian/computation-profiling
2 parents 4b21755 + a539468 commit c47f6f5

File tree

20 files changed

+1594
-196
lines changed

20 files changed

+1594
-196
lines changed

adapters/access.go

Lines changed: 352 additions & 26 deletions
Large diffs are not rendered by default.
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
/*
2+
* Flow Emulator
3+
*
4+
* Copyright Flow Foundation
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package adapters
20+
21+
import (
22+
"context"
23+
"testing"
24+
"time"
25+
26+
"github.com/onflow/flow-go/engine/access/subscription"
27+
accessmodel "github.com/onflow/flow-go/model/access"
28+
flowgo "github.com/onflow/flow-go/model/flow"
29+
"github.com/onflow/flow/protobuf/go/flow/entities"
30+
"github.com/rs/zerolog"
31+
"github.com/stretchr/testify/assert"
32+
"github.com/stretchr/testify/require"
33+
34+
"github.com/onflow/flow-emulator/emulator"
35+
"github.com/onflow/flow-emulator/storage/memstore"
36+
)
37+
38+
func setupRealBlockchain(t *testing.T) (*emulator.Blockchain, *AccessAdapter) {
39+
store := memstore.New()
40+
logger := zerolog.Nop()
41+
42+
blockchain, err := emulator.New(
43+
emulator.WithStore(store),
44+
emulator.WithServerLogger(logger),
45+
)
46+
require.NoError(t, err)
47+
48+
adapter := NewAccessAdapter(&logger, blockchain)
49+
return blockchain, adapter
50+
}
51+
52+
func TestStreamingBlocks_Integration(t *testing.T) {
53+
if testing.Short() {
54+
t.Skip("skipping integration test")
55+
}
56+
57+
blockchain, adapter := setupRealBlockchain(t)
58+
59+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
60+
defer cancel()
61+
62+
sub := adapter.SubscribeBlocksFromStartHeight(ctx, 0, flowgo.BlockStatusSealed)
63+
require.NotNil(t, sub)
64+
require.NoError(t, sub.Err())
65+
66+
ch := sub.Channel()
67+
require.NotNil(t, ch)
68+
69+
select {
70+
case data := <-ch:
71+
block, ok := data.(*flowgo.Block)
72+
require.True(t, ok, "expected *flowgo.Block, got %T", data)
73+
assert.Equal(t, uint64(0), block.Height)
74+
case <-time.After(2 * time.Second):
75+
t.Fatal("timeout waiting for genesis block")
76+
}
77+
78+
newBlock, _, err := blockchain.ExecuteAndCommitBlock()
79+
require.NoError(t, err)
80+
81+
select {
82+
case data := <-ch:
83+
block, ok := data.(*flowgo.Block)
84+
require.True(t, ok, "expected *flowgo.Block, got %T", data)
85+
assert.Equal(t, uint64(1), block.Height)
86+
assert.Equal(t, newBlock.ID(), block.ID())
87+
case <-time.After(2 * time.Second):
88+
t.Fatal("timeout waiting for new block")
89+
}
90+
}
91+
92+
func TestStreamingBlockHeaders_Integration(t *testing.T) {
93+
if testing.Short() {
94+
t.Skip("skipping integration test")
95+
}
96+
97+
blockchain, adapter := setupRealBlockchain(t)
98+
99+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
100+
defer cancel()
101+
102+
genesisBlock, err := blockchain.GetBlockByHeight(0)
103+
require.NoError(t, err)
104+
105+
sub := adapter.SubscribeBlockHeadersFromStartBlockID(ctx, genesisBlock.ID(), flowgo.BlockStatusSealed)
106+
require.NotNil(t, sub)
107+
require.NoError(t, sub.Err())
108+
109+
ch := sub.Channel()
110+
111+
select {
112+
case data := <-ch:
113+
header, ok := data.(*flowgo.Header)
114+
require.True(t, ok, "expected *flowgo.Header, got %T", data)
115+
assert.Equal(t, uint64(0), header.Height)
116+
assert.Equal(t, genesisBlock.ID(), header.ID())
117+
case <-time.After(2 * time.Second):
118+
t.Fatal("timeout waiting for genesis header")
119+
}
120+
}
121+
122+
func TestStreamingBlockDigests_Integration(t *testing.T) {
123+
if testing.Short() {
124+
t.Skip("skipping integration test")
125+
}
126+
127+
blockchain, adapter := setupRealBlockchain(t)
128+
129+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
130+
defer cancel()
131+
132+
sub := adapter.SubscribeBlockDigestsFromLatest(ctx, flowgo.BlockStatusSealed)
133+
require.NotNil(t, sub)
134+
require.NoError(t, sub.Err())
135+
136+
ch := sub.Channel()
137+
138+
select {
139+
case data := <-ch:
140+
digest, ok := data.(*flowgo.BlockDigest)
141+
require.True(t, ok, "expected *flowgo.BlockDigest, got %T", data)
142+
assert.Equal(t, uint64(0), digest.Height)
143+
case <-time.After(2 * time.Second):
144+
t.Fatal("timeout waiting for genesis digest")
145+
}
146+
147+
newBlock, _, err := blockchain.ExecuteAndCommitBlock()
148+
require.NoError(t, err)
149+
150+
select {
151+
case data := <-ch:
152+
digest, ok := data.(*flowgo.BlockDigest)
153+
require.True(t, ok, "expected *flowgo.BlockDigest, got %T", data)
154+
assert.Equal(t, uint64(1), digest.Height)
155+
assert.Equal(t, newBlock.ID(), digest.BlockID)
156+
case <-time.After(2 * time.Second):
157+
t.Fatal("timeout waiting for new digest")
158+
}
159+
}
160+
161+
func TestStreamingTransactionStatuses_Integration(t *testing.T) {
162+
if testing.Short() {
163+
t.Skip("skipping integration test")
164+
}
165+
166+
blockchain, adapter := setupRealBlockchain(t)
167+
168+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
169+
defer cancel()
170+
171+
serviceKey := blockchain.ServiceKey()
172+
latestBlock, err := blockchain.GetLatestBlock()
173+
require.NoError(t, err)
174+
175+
serviceAddress := flowgo.BytesToAddress(serviceKey.Address.Bytes())
176+
177+
txBuilder := flowgo.NewTransactionBodyBuilder().
178+
SetScript([]byte(`
179+
transaction {
180+
prepare(signer: &Account) {
181+
log("Transaction executed successfully")
182+
}
183+
}
184+
`)).
185+
SetReferenceBlockID(latestBlock.ID()).
186+
SetProposalKey(serviceAddress, serviceKey.Index, serviceKey.SequenceNumber).
187+
SetPayer(serviceAddress).
188+
AddAuthorizer(serviceAddress)
189+
190+
tx, err := txBuilder.Build()
191+
require.NoError(t, err)
192+
193+
sub := adapter.SendAndSubscribeTransactionStatuses(ctx, tx, entities.EventEncodingVersion_CCF_V0)
194+
require.NotNil(t, sub)
195+
require.NoError(t, sub.Err())
196+
197+
ch := sub.Channel()
198+
199+
go func() {
200+
time.Sleep(100 * time.Millisecond)
201+
_, _, _ = blockchain.ExecuteAndCommitBlock()
202+
}()
203+
204+
statusesReceived := make(map[flowgo.TransactionStatus]bool)
205+
timeout := time.After(5 * time.Second)
206+
207+
for {
208+
select {
209+
case data := <-ch:
210+
if data == nil {
211+
continue
212+
}
213+
214+
if sub.Err() != nil {
215+
if sub.Err() == subscription.ErrEndOfData {
216+
goto done
217+
}
218+
continue
219+
}
220+
221+
results, ok := data.([]*accessmodel.TransactionResult)
222+
if !ok {
223+
continue
224+
}
225+
226+
for _, result := range results {
227+
if result.TransactionID != flowgo.ZeroID {
228+
assert.Equal(t, tx.ID(), result.TransactionID)
229+
}
230+
statusesReceived[result.Status] = true
231+
232+
if result.Status == flowgo.TransactionStatusSealed {
233+
goto done
234+
}
235+
}
236+
237+
case <-timeout:
238+
t.Fatalf("timeout waiting for transaction statuses. Received: %v", statusesReceived)
239+
}
240+
}
241+
242+
done:
243+
assert.True(t, statusesReceived[flowgo.TransactionStatusSealed],
244+
"should receive sealed status. Got: %v", statusesReceived)
245+
}
246+
247+
func TestStreamingMultipleBlocks_Integration(t *testing.T) {
248+
if testing.Short() {
249+
t.Skip("skipping integration test")
250+
}
251+
252+
blockchain, adapter := setupRealBlockchain(t)
253+
254+
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
255+
defer cancel()
256+
257+
sub := adapter.SubscribeBlocksFromStartHeight(ctx, 0, flowgo.BlockStatusSealed)
258+
require.NotNil(t, sub)
259+
require.NoError(t, sub.Err())
260+
261+
ch := sub.Channel()
262+
blocksReceived := 0
263+
expectedBlocks := 5
264+
265+
select {
266+
case data := <-ch:
267+
block := data.(*flowgo.Block)
268+
assert.Equal(t, uint64(0), block.Height)
269+
blocksReceived++
270+
case <-time.After(2 * time.Second):
271+
t.Fatal("timeout waiting for genesis block")
272+
}
273+
274+
for i := 0; i < expectedBlocks-1; i++ {
275+
_, _, err := blockchain.ExecuteAndCommitBlock()
276+
require.NoError(t, err)
277+
278+
select {
279+
case data := <-ch:
280+
receivedBlock := data.(*flowgo.Block)
281+
assert.Equal(t, uint64(i+1), receivedBlock.Height)
282+
blocksReceived++
283+
case <-time.After(2 * time.Second):
284+
t.Fatalf("timeout waiting for block %d", i+1)
285+
}
286+
}
287+
288+
assert.Equal(t, expectedBlocks, blocksReceived)
289+
}

convert/emu.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ func ToStorableResult(
2929
output fvm.ProcedureOutput,
3030
blockID flowgo.Identifier,
3131
blockHeight uint64,
32+
collectionID flowgo.Identifier,
3233
) (
3334
types.StorableTransactionResult,
3435
error,
@@ -48,5 +49,6 @@ func ToStorableResult(
4849
ErrorMessage: errorMessage,
4950
Logs: output.Logs,
5051
Events: output.Events,
52+
CollectionID: collectionID,
5153
}, nil
5254
}

0 commit comments

Comments
 (0)