Skip to content

Commit 32a6ad1

Browse files
committed
Fixup: Fix synchronization issues in the test
Maybe I will replace this with e2e alltogether. To be consistent I should probably extend other p2p clients. On the other hand I dislike methods for testing only.
1 parent ebac4e2 commit 32a6ad1

File tree

4 files changed

+56
-6
lines changed

4 files changed

+56
-6
lines changed

go/worker/storage/p2p/checkpointsync/client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ type Client interface {
3333
request *GetCheckpointChunkRequest,
3434
cp *Checkpoint,
3535
) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error)
36+
37+
// IsReady is true when protocol client is aware of at least one remote peer.
38+
IsReady() bool
3639
}
3740

3841
// Checkpoint contains checkpoint metadata together with peer information.
@@ -114,6 +117,10 @@ func (c *client) getBestPeers(opts ...rpc.BestPeersOption) []core.PeerID {
114117
return append(c.mgr.GetBestPeers(opts...), c.fallbackMgr.GetBestPeers(opts...)...)
115118
}
116119

120+
func (c *client) IsReady() bool {
121+
return len(c.getBestPeers()) > 0
122+
}
123+
117124
// NewClient creates a new checkpoint sync protocol client.
118125
//
119126
// Previously, it was part of the storage sync protocol. To enable seamless rolling

go/worker/storage/p2p/diffsync/client.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package diffsync
33
import (
44
"context"
55

6+
"github.com/libp2p/go-libp2p/core"
67
"github.com/oasisprotocol/oasis-core/go/common"
78
"github.com/oasisprotocol/oasis-core/go/p2p/protocol"
89
"github.com/oasisprotocol/oasis-core/go/p2p/rpc"
@@ -23,6 +24,9 @@ type Client interface {
2324
// GetDiff requests a write log of entries that must be applied to get from the first given root
2425
// to the second one.
2526
GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error)
27+
28+
// IsReady is true when protocol client is aware of at least one remote peer.
29+
IsReady() bool
2630
}
2731

2832
type client struct {
@@ -33,8 +37,7 @@ type client struct {
3337

3438
func (c *client) GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiffResponse, rpc.PeerFeedback, error) {
3539
var rsp GetDiffResponse
36-
peers := append(c.mgr.GetBestPeers(), c.fallbackMgr.GetBestPeers()...)
37-
pf, err := c.rc.CallOne(ctx, peers, MethodGetDiff, request, &rsp,
40+
pf, err := c.rc.CallOne(ctx, c.getBestPeers(), MethodGetDiff, request, &rsp,
3841
rpc.WithMaxPeerResponseTime(MaxGetDiffResponseTime),
3942
)
4043
if err != nil {
@@ -43,6 +46,14 @@ func (c *client) GetDiff(ctx context.Context, request *GetDiffRequest) (*GetDiff
4346
return &rsp, pf, nil
4447
}
4548

49+
func (c *client) getBestPeers(opts ...rpc.BestPeersOption) []core.PeerID {
50+
return append(c.mgr.GetBestPeers(opts...), c.fallbackMgr.GetBestPeers(opts...)...)
51+
}
52+
53+
func (c *client) IsReady() bool {
54+
return len(c.getBestPeers()) > 0
55+
}
56+
4657
// NewClient creates a new diff sync protocol client.
4758
//
4859
// Previously, it was part of the storage sync protocol. To enable seamless rolling

go/worker/storage/p2p/sync_test/interoperable_test.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ func testLegacyHostClient(ctx context.Context, t *testing.T, host p2pApi.Service
117117
require := require.New(t)
118118

119119
client := synclegacy.NewClient(host, chainContext, runtimeID)
120-
time.Sleep(2 * time.Second)
121-
120+
err := waitReady(ctx, client.IsReady)
121+
require.NoError(err, "Wait for p2p client readiness")
122122
// Test diff part of the storagesync protocol.
123123
rsp, _, err := client.GetDiff(ctx, &synclegacy.GetDiffRequest{})
124124
require.NoError(err, "Fetch storage diff from p2p")
@@ -146,15 +146,19 @@ func testNewClients(ctx context.Context, t *testing.T, host p2pApi.Service, back
146146

147147
// Test diff sync protocol.
148148
diffClient := diffsync.NewClient(host, chainContext, runtimeID)
149-
time.Sleep(2 * time.Second)
149+
err := waitReady(ctx, diffClient.IsReady)
150+
require.NoError(err, "Wait for p2p client readiness")
151+
150152
rsp2, _, err := diffClient.GetDiff(ctx, &diffsync.GetDiffRequest{})
151153
require.NoError(err, "Fetch storage diff from p2p")
152154
err = assertEqualGetDiffResponse(ctx, backend, rsp2.WriteLog)
153155
require.NoError(err, "Assert expected storage diff response")
154156

155157
// Test checkpoint sync protocol.
156158
cpsClient := checkpointsync.NewClient(host, chainContext, runtimeID)
157-
time.Sleep(2 * time.Second)
159+
err = waitReady(ctx, cpsClient.IsReady)
160+
require.NoError(err, "Wait for p2p client readiness")
161+
158162
cps, err := cpsClient.GetCheckpoints(ctx, &checkpointsync.GetCheckpointsRequest{
159163
Version: 1,
160164
})
@@ -307,6 +311,27 @@ func getAvailablePort() (int, error) {
307311
return addr.Port, nil
308312
}
309313

314+
func waitReady(ctx context.Context, isReady func() bool) error {
315+
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
316+
defer cancel()
317+
318+
backoff := 20 * time.Millisecond
319+
for {
320+
select {
321+
case <-ctx.Done():
322+
return ctx.Err()
323+
default:
324+
}
325+
326+
if isReady() {
327+
return nil
328+
}
329+
330+
time.Sleep(backoff)
331+
backoff *= 2
332+
}
333+
}
334+
310335
type backendMock struct{}
311336

312337
func (bm *backendMock) SyncGet(context.Context, *syncer.GetRequest) (*syncer.ProofResponse, error) {

go/worker/storage/p2p/synclegacy/client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ type Client interface {
3636
request *GetCheckpointChunkRequest,
3737
cp *Checkpoint,
3838
) (*GetCheckpointChunkResponse, rpc.PeerFeedback, error)
39+
40+
// IsReady is true when protocol client is aware of at least one remote peer.
41+
IsReady() bool
3942
}
4043

4144
// Checkpoint contains checkpoint metadata together with peer information.
@@ -129,6 +132,10 @@ func GetStorageSyncProtocolID(chainContext string, runtimeID common.Namespace) c
129132
return protocol.NewRuntimeProtocolID(chainContext, runtimeID, StorageSyncProtocolID, StorageSyncProtocolVersion)
130133
}
131134

135+
func (c *client) IsReady() bool {
136+
return len(c.mgrC.GetBestPeers()) > 0
137+
}
138+
132139
// NewClient creates a new storage sync protocol client.
133140
func NewClient(p2p rpc.P2P, chainContext string, runtimeID common.Namespace) Client {
134141
// Use two separate clients and managers for the same protocol. This is to make sure that peers

0 commit comments

Comments
 (0)