Skip to content

Commit 672fc95

Browse files
authored
fix(p2p)!: use request timeout (#194)
## Overview We were missing request timeout for 1 handler. Fixing it + adding timeout tests. Also, renaming `RangeRequestTimeout` to `RequestTimeout` for consistency.
1 parent b81f0d7 commit 672fc95

File tree

6 files changed

+167
-38
lines changed

6 files changed

+167
-38
lines changed

p2p/exchange.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ func (ex *Exchange[H]) GetRangeByHeight(
293293
))
294294
defer span.End()
295295
session := newSession[H](
296-
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, ex.metrics, withValidation(from),
296+
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RequestTimeout, ex.metrics, withValidation(from),
297297
)
298298
defer session.close()
299299
// we request the next header height that we don't have: `fromHead`+1

p2p/exchange_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,15 +482,15 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
482482

483483
// create client + server(it does not have needed headers)
484484
exchg, store := createP2PExAndServer(t, host0, host1)
485-
exchg.Params.RangeRequestTimeout = time.Millisecond * 100
485+
exchg.Params.RequestTimeout = time.Millisecond * 100
486486
// create one more server(with more headers in the store)
487487
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](
488488
host2, headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10),
489489
WithNetworkID[ServerParameters](networkID),
490490
)
491491
require.NoError(t, err)
492492
// change store implementation
493-
serverSideEx.store = &timedOutStore{timeout: exchg.Params.RangeRequestTimeout}
493+
serverSideEx.store = &timedOutStore{timeout: exchg.Params.RequestTimeout}
494494
require.NoError(t, serverSideEx.Start(context.Background()))
495495
t.Cleanup(func() {
496496
serverSideEx.Stop(context.Background()) //nolint:errcheck

p2p/options.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ type ServerParameters struct {
2222
WriteDeadline time.Duration
2323
// ReadDeadline sets the timeout for reading messages from the stream
2424
ReadDeadline time.Duration
25-
// RangeRequestTimeout defines a timeout after which the session will try to re-request headers
25+
// RequestTimeout defines a timeout after which the session will try to re-request headers
2626
// from another peer.
27-
RangeRequestTimeout time.Duration
27+
RequestTimeout time.Duration
2828
// networkID is a network that will be used to create a protocol.ID
2929
// Is empty by default
3030
networkID string
@@ -35,9 +35,9 @@ type ServerParameters struct {
3535
// DefaultServerParameters returns the default params to configure the store.
3636
func DefaultServerParameters() ServerParameters {
3737
return ServerParameters{
38-
WriteDeadline: time.Second * 8,
39-
ReadDeadline: time.Minute,
40-
RangeRequestTimeout: time.Second * 10,
38+
WriteDeadline: time.Second * 8,
39+
ReadDeadline: time.Minute,
40+
RequestTimeout: time.Second * 10,
4141
}
4242
}
4343

@@ -48,9 +48,9 @@ func (p *ServerParameters) Validate() error {
4848
if p.ReadDeadline == 0 {
4949
return fmt.Errorf("invalid read time duration: %v", p.ReadDeadline)
5050
}
51-
if p.RangeRequestTimeout == 0 {
51+
if p.RequestTimeout == 0 {
5252
return fmt.Errorf("invalid request timeout for session: "+
53-
"%s. %s: %v", greaterThenZero, providedSuffix, p.RangeRequestTimeout)
53+
"%s. %s: %v", greaterThenZero, providedSuffix, p.RequestTimeout)
5454
}
5555
return nil
5656
}
@@ -88,15 +88,15 @@ func WithReadDeadline[T ServerParameters](deadline time.Duration) Option[T] {
8888
}
8989
}
9090

91-
// WithRangeRequestTimeout is a functional option that configures the
92-
// `RangeRequestTimeout` parameter.
93-
func WithRangeRequestTimeout[T parameters](duration time.Duration) Option[T] {
91+
// WithRequestTimeout is a functional option that configures the
92+
// `RequestTimeout` parameter.
93+
func WithRequestTimeout[T parameters](duration time.Duration) Option[T] {
9494
return func(p *T) {
9595
switch t := any(p).(type) {
9696
case *ClientParameters:
97-
t.RangeRequestTimeout = duration
97+
t.RequestTimeout = duration
9898
case *ServerParameters:
99-
t.RangeRequestTimeout = duration
99+
t.RequestTimeout = duration
100100
}
101101
}
102102
}
@@ -125,9 +125,9 @@ func WithParams[T parameters](params T) Option[T] {
125125
type ClientParameters struct {
126126
// MaxHeadersPerRangeRequest defines the max amount of headers that can be requested per 1 request.
127127
MaxHeadersPerRangeRequest uint64
128-
// RangeRequestTimeout defines a timeout after which the session will try to re-request headers
128+
// RequestTimeout defines a timeout after which the session will try to re-request headers
129129
// from another peer.
130-
RangeRequestTimeout time.Duration
130+
RequestTimeout time.Duration
131131
// networkID is a network that will be used to create a protocol.ID
132132
networkID string
133133
// chainID is an identifier of the chain.
@@ -142,7 +142,7 @@ type ClientParameters struct {
142142
func DefaultClientParameters() ClientParameters {
143143
return ClientParameters{
144144
MaxHeadersPerRangeRequest: 64,
145-
RangeRequestTimeout: time.Second * 8,
145+
RequestTimeout: time.Second * 8,
146146
}
147147
}
148148

@@ -156,9 +156,9 @@ func (p *ClientParameters) Validate() error {
156156
return fmt.Errorf("invalid MaxHeadersPerRangeRequest:%s. %s: %v",
157157
greaterThenZero, providedSuffix, p.MaxHeadersPerRangeRequest)
158158
}
159-
if p.RangeRequestTimeout == 0 {
159+
if p.RequestTimeout == 0 {
160160
return fmt.Errorf("invalid request timeout for session: "+
161-
"%s. %s: %v", greaterThenZero, providedSuffix, p.RangeRequestTimeout)
161+
"%s. %s: %v", greaterThenZero, providedSuffix, p.RequestTimeout)
162162
}
163163
return nil
164164
}

p2p/options_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,21 @@ func TestOptionsClientWithParams(t *testing.T) {
1212

1313
timeout := time.Second
1414
opt := WithParams(ClientParameters{
15-
RangeRequestTimeout: timeout,
15+
RequestTimeout: timeout,
1616
})
1717

1818
opt(&params)
19-
assert.Equal(t, timeout, params.RangeRequestTimeout)
19+
assert.Equal(t, timeout, params.RequestTimeout)
2020
}
2121

2222
func TestOptionsServerWithParams(t *testing.T) {
2323
params := DefaultServerParameters()
2424

2525
timeout := time.Second
2626
opt := WithParams(ServerParameters{
27-
RangeRequestTimeout: timeout,
27+
RequestTimeout: timeout,
2828
})
2929

3030
opt(&params)
31-
assert.Equal(t, timeout, params.RangeRequestTimeout)
31+
assert.Equal(t, timeout, params.RequestTimeout)
3232
}

p2p/server.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,16 @@ func (serv *ExchangeServer[H]) requestHandler(stream network.Stream) {
105105
log.Error(err)
106106
}
107107

108+
ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RequestTimeout)
109+
defer cancel()
110+
108111
var headers []H
109112
// retrieve and write Headers
110113
switch pbreq.Data.(type) {
111114
case *p2p_pb.HeaderRequest_Hash:
112-
headers, err = serv.handleRequestByHash(pbreq.GetHash())
115+
headers, err = serv.handleRequestByHash(ctx, pbreq.GetHash())
113116
case *p2p_pb.HeaderRequest_Origin:
114-
headers, err = serv.handleRequest(pbreq.GetOrigin(), pbreq.GetOrigin()+pbreq.Amount)
117+
headers, err = serv.handleRangeRequest(ctx, pbreq.GetOrigin(), pbreq.GetOrigin()+pbreq.Amount)
115118
default:
116119
log.Warn("server: invalid data type received")
117120
stream.Reset() //nolint:errcheck
@@ -166,11 +169,9 @@ func (serv *ExchangeServer[H]) requestHandler(stream network.Stream) {
166169

167170
// handleRequestByHash returns the Header at the given hash
168171
// if it exists.
169-
func (serv *ExchangeServer[H]) handleRequestByHash(hash []byte) ([]H, error) {
172+
func (serv *ExchangeServer[H]) handleRequestByHash(ctx context.Context, hash []byte) ([]H, error) {
170173
startTime := time.Now()
171174
log.Debugw("server: handling header request", "hash", header.Hash(hash).String())
172-
ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout)
173-
defer cancel()
174175
ctx, span := tracerServ.Start(ctx, "request-by-hash", trace.WithAttributes(
175176
attribute.String("hash", header.Hash(hash).String()),
176177
))
@@ -194,15 +195,16 @@ func (serv *ExchangeServer[H]) handleRequestByHash(hash []byte) ([]H, error) {
194195
return []H{h}, nil
195196
}
196197

197-
// handleRequest fetches the Header at the given origin and
198+
// handleRangeRequest fetches the Header at the given origin and
198199
// writes it to the stream.
199-
func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
200+
func (serv *ExchangeServer[H]) handleRangeRequest(ctx context.Context, from, to uint64) ([]H, error) {
200201
if from == uint64(0) {
201-
return serv.handleHeadRequest()
202+
return serv.handleHeadRequest(ctx)
202203
}
203204

204205
startTime := time.Now()
205-
ctx, span := tracerServ.Start(serv.ctx, "request-range", trace.WithAttributes(
206+
log.Debugw("server: handling range request", "from", from, "to", to)
207+
ctx, span := tracerServ.Start(ctx, "request-range", trace.WithAttributes(
206208
attribute.Int64("from", int64(from)),
207209
attribute.Int64("to", int64(to))))
208210
defer span.End()
@@ -266,11 +268,9 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
266268
}
267269

268270
// handleHeadRequest returns the latest stored head.
269-
func (serv *ExchangeServer[H]) handleHeadRequest() ([]H, error) {
271+
func (serv *ExchangeServer[H]) handleHeadRequest(ctx context.Context) ([]H, error) {
270272
startTime := time.Now()
271273
log.Debug("server: handling head request")
272-
ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout)
273-
defer cancel()
274274
ctx, span := tracerServ.Start(ctx, "request-head")
275275
defer span.End()
276276

p2p/server_test.go

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package p2p
33
import (
44
"context"
55
"testing"
6+
"time"
67

78
"github.com/ipfs/go-datastore"
89
"github.com/stretchr/testify/require"
@@ -28,7 +29,7 @@ func TestExchangeServer_handleRequestTimeout(t *testing.T) {
2829
server.Stop(context.Background()) //nolint:errcheck
2930
})
3031

31-
_, err = server.handleRequest(1, 200)
32+
_, err = server.handleRangeRequest(context.Background(), 1, 200)
3233
require.Error(t, err)
3334
}
3435

@@ -48,6 +49,134 @@ func TestExchangeServer_errorsOnLargeRequest(t *testing.T) {
4849
server.Stop(context.Background()) //nolint:errcheck
4950
})
5051

51-
_, err = server.handleRequest(1, header.MaxRangeRequestSize*2)
52+
_, err = server.handleRangeRequest(context.Background(), 1, header.MaxRangeRequestSize*2)
5253
require.Error(t, err)
5354
}
55+
56+
func TestExchangeServer_Timeout(t *testing.T) {
57+
const testRequestTimeout = 150 * time.Millisecond
58+
59+
peer := createMocknet(t, 1)
60+
61+
server, err := NewExchangeServer(
62+
peer[0],
63+
timeoutStore[*headertest.DummyHeader]{},
64+
WithNetworkID[ServerParameters](networkID),
65+
WithRequestTimeout[ServerParameters](testRequestTimeout),
66+
)
67+
require.NoError(t, err)
68+
69+
err = server.Start(context.Background())
70+
require.NoError(t, err)
71+
72+
t.Cleanup(func() {
73+
_ = server.Stop(context.Background())
74+
})
75+
76+
testCases := []struct {
77+
name string
78+
fn func() error
79+
}{
80+
{
81+
name: "handleHeadRequest",
82+
fn: func() error {
83+
ctx, cancel := context.WithTimeout(context.Background(), testRequestTimeout)
84+
defer cancel()
85+
86+
_, err := server.handleHeadRequest(ctx)
87+
return err
88+
},
89+
},
90+
{
91+
name: "handleRequest",
92+
fn: func() error {
93+
ctx, cancel := context.WithTimeout(context.Background(), testRequestTimeout)
94+
defer cancel()
95+
96+
_, err := server.handleRangeRequest(ctx, 1, 100)
97+
return err
98+
},
99+
},
100+
{
101+
name: "handleHeadRequest",
102+
fn: func() error {
103+
ctx, cancel := context.WithTimeout(context.Background(), testRequestTimeout)
104+
defer cancel()
105+
106+
hash := headertest.RandDummyHeader(t).Hash()
107+
_, err := server.handleRequestByHash(ctx, hash)
108+
return err
109+
},
110+
},
111+
}
112+
113+
for _, tc := range testCases {
114+
t.Run(tc.name, func(t *testing.T) {
115+
t.Parallel()
116+
117+
start := time.Now()
118+
err := tc.fn()
119+
took := time.Since(start)
120+
121+
require.Error(t, err)
122+
require.GreaterOrEqual(t, took, testRequestTimeout)
123+
})
124+
}
125+
}
126+
127+
var _ header.Store[*headertest.DummyHeader] = timeoutStore[*headertest.DummyHeader]{}
128+
129+
// timeoutStore does nothing but waits till context cancellation for every method.
130+
type timeoutStore[H header.Header[H]] struct{}
131+
132+
func (timeoutStore[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) {
133+
<-ctx.Done()
134+
var zero H
135+
return zero, ctx.Err()
136+
}
137+
138+
func (timeoutStore[H]) Get(ctx context.Context, _ header.Hash) (H, error) {
139+
<-ctx.Done()
140+
var zero H
141+
return zero, ctx.Err()
142+
}
143+
144+
func (timeoutStore[H]) GetByHeight(ctx context.Context, _ uint64) (H, error) {
145+
<-ctx.Done()
146+
var zero H
147+
return zero, ctx.Err()
148+
}
149+
150+
func (timeoutStore[H]) GetRangeByHeight(ctx context.Context, from H, to uint64) ([]H, error) {
151+
<-ctx.Done()
152+
return nil, ctx.Err()
153+
}
154+
155+
func (timeoutStore[H]) Init(ctx context.Context, _ H) error {
156+
<-ctx.Done()
157+
return ctx.Err()
158+
}
159+
160+
func (timeoutStore[H]) Height() uint64 {
161+
return 0
162+
}
163+
164+
func (timeoutStore[H]) Has(ctx context.Context, _ header.Hash) (bool, error) {
165+
<-ctx.Done()
166+
return false, ctx.Err()
167+
}
168+
169+
func (timeoutStore[H]) HasAt(ctx context.Context, _ uint64) bool {
170+
<-ctx.Done()
171+
return false
172+
}
173+
174+
func (timeoutStore[H]) Append(ctx context.Context, _ ...H) error {
175+
<-ctx.Done()
176+
return ctx.Err()
177+
}
178+
179+
func (timeoutStore[H]) GetRange(ctx context.Context, _ uint64, _ uint64) ([]H, error) {
180+
<-ctx.Done()
181+
return nil, ctx.Err()
182+
}

0 commit comments

Comments
 (0)