Skip to content

Commit 15fb0dc

Browse files
authored
rpc: add timeout to rpc client Unsubscribe (#30318)
Fixes #30156 This adds a repro of the linked issue. I fixed it by adding a timeout when issuing the call to unsubscribe.
1 parent d0fd133 commit 15fb0dc

File tree

3 files changed

+69
-1
lines changed

3 files changed

+69
-1
lines changed

rpc/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ var (
4545
const (
4646
defaultDialTimeout = 10 * time.Second // used if context has no deadline
4747
subscribeTimeout = 10 * time.Second // overall timeout eth_subscribe, rpc_modules calls
48+
unsubscribeTimeout = 10 * time.Second // timeout for *_unsubscribe calls
4849
)
4950

5051
const (

rpc/client_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,70 @@ func TestClientCloseUnsubscribeRace(t *testing.T) {
518518
}
519519
}
520520

521+
// unsubscribeBlocker will wait for the quit channel to process an unsubscribe
522+
// request.
523+
type unsubscribeBlocker struct {
524+
ServerCodec
525+
quit chan struct{}
526+
}
527+
528+
func (b *unsubscribeBlocker) readBatch() ([]*jsonrpcMessage, bool, error) {
529+
msgs, batch, err := b.ServerCodec.readBatch()
530+
for _, msg := range msgs {
531+
if msg.isUnsubscribe() {
532+
<-b.quit
533+
}
534+
}
535+
return msgs, batch, err
536+
}
537+
538+
// TestUnsubscribeTimeout verifies that calling the client's Unsubscribe
539+
// function will eventually timeout and not block forever in case the serve does
540+
// not respond.
541+
// It reproducers the issue https://github.com/ethereum/go-ethereum/issues/30156
542+
func TestUnsubscribeTimeout(t *testing.T) {
543+
srv := NewServer()
544+
srv.RegisterName("nftest", new(notificationTestService))
545+
546+
// Setup middleware to block on unsubscribe.
547+
p1, p2 := net.Pipe()
548+
blocker := &unsubscribeBlocker{ServerCodec: NewCodec(p1), quit: make(chan struct{})}
549+
defer close(blocker.quit)
550+
551+
// Serve the middleware.
552+
go srv.ServeCodec(blocker, OptionMethodInvocation|OptionSubscriptions)
553+
defer srv.Stop()
554+
555+
// Create the client on the other end of the pipe.
556+
cfg := new(clientConfig)
557+
client, _ := newClient(context.Background(), cfg, func(context.Context) (ServerCodec, error) {
558+
return NewCodec(p2), nil
559+
})
560+
defer client.Close()
561+
562+
// Start subscription.
563+
sub, err := client.Subscribe(context.Background(), "nftest", make(chan int), "someSubscription", 1, 1)
564+
if err != nil {
565+
t.Fatalf("failed to subscribe: %v", err)
566+
}
567+
568+
// Now on a separate thread, attempt to unsubscribe. Since the middleware
569+
// won't return, the function will only return if it times out on the request.
570+
done := make(chan struct{})
571+
go func() {
572+
sub.Unsubscribe()
573+
done <- struct{}{}
574+
}()
575+
576+
// Wait for the timeout. If the expected time for the timeout elapses, the
577+
// test is considered failed.
578+
select {
579+
case <-done:
580+
case <-time.After(unsubscribeTimeout + 3*time.Second):
581+
t.Fatalf("Unsubscribe did not return within %s", unsubscribeTimeout)
582+
}
583+
}
584+
521585
// unsubscribeRecorder collects the subscription IDs of *_unsubscribe calls.
522586
type unsubscribeRecorder struct {
523587
ServerCodec

rpc/subscription.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,5 +371,8 @@ func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, e
371371

372372
func (sub *ClientSubscription) requestUnsubscribe() error {
373373
var result interface{}
374-
return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
374+
ctx, cancel := context.WithTimeout(context.Background(), unsubscribeTimeout)
375+
defer cancel()
376+
err := sub.client.CallContext(ctx, &result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
377+
return err
375378
}

0 commit comments

Comments
 (0)