Skip to content

Commit 750beca

Browse files
committed
fix: gracefull conection drop
1 parent fe15701 commit 750beca

File tree

2 files changed

+74
-4
lines changed

2 files changed

+74
-4
lines changed

acp_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package acp
22

33
import (
44
"context"
5+
"encoding/json"
56
"io"
67
"slices"
78
"sync"
@@ -467,6 +468,59 @@ func TestConnectionHandlesNotifications(t *testing.T) {
467468
}
468469
}
469470

471+
func TestConnection_DoesNotCancelInboundContextBeforeDrainingNotificationsOnDisconnect(t *testing.T) {
472+
const n = 25
473+
474+
incomingR, incomingW := io.Pipe()
475+
476+
var (
477+
wg sync.WaitGroup
478+
canceledCount atomic.Int64
479+
)
480+
wg.Add(n)
481+
482+
c := NewConnection(func(ctx context.Context, method string, _ json.RawMessage) (any, *RequestError) {
483+
defer wg.Done()
484+
// Slow down processing so some notifications are handled after the receive
485+
// loop observes EOF and signals disconnect.
486+
time.Sleep(10 * time.Millisecond)
487+
if ctx.Err() != nil {
488+
canceledCount.Add(1)
489+
}
490+
return nil, nil
491+
}, io.Discard, incomingR)
492+
493+
// Write notifications quickly and then close the stream to simulate a peer disconnect.
494+
for i := 0; i < n; i++ {
495+
if _, err := io.WriteString(incomingW, `{"jsonrpc":"2.0","method":"test/notify","params":{}}`+"\n"); err != nil {
496+
t.Fatalf("write notification: %v", err)
497+
}
498+
}
499+
_ = incomingW.Close()
500+
501+
select {
502+
case <-c.Done():
503+
// Expected: peer disconnect observed promptly.
504+
case <-time.After(2 * time.Second):
505+
t.Fatalf("timeout waiting for connection Done()")
506+
}
507+
508+
done := make(chan struct{})
509+
go func() {
510+
wg.Wait()
511+
close(done)
512+
}()
513+
select {
514+
case <-done:
515+
case <-time.After(3 * time.Second):
516+
t.Fatalf("timeout waiting for notification handlers")
517+
}
518+
519+
if got := canceledCount.Load(); got != 0 {
520+
t.Fatalf("inbound handler context was canceled for %d/%d notifications", got, n)
521+
}
522+
}
523+
470524
// Test initialize method behavior
471525
func TestConnectionHandlesInitialize(t *testing.T) {
472526
c2aR, c2aW := io.Pipe()

connection.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"log/slog"
1111
"sync"
1212
"sync/atomic"
13+
"time"
1314
)
1415

1516
type anyMessage struct {
@@ -152,11 +153,26 @@ func (c *Connection) receive() {
152153
// First, signal disconnect to callers waiting on responses.
153154
c.cancel(cause)
154155

155-
// Then close and drain notifications so already-received messages are delivered
156-
// to handlers with a still-valid inbound context.
156+
// Then close the notification queue so already-received messages can drain.
157+
// IMPORTANT: Do not block this receive goroutine waiting for the drain to complete;
158+
// notification handlers may legitimately block until their context is canceled.
157159
close(c.notificationQueue)
158-
c.notificationWg.Wait()
159-
c.inboundCancel(cause)
160+
161+
// Cancel inboundCtx after notifications finish, but ensure we don't leak forever if a
162+
// handler blocks waiting for cancellation.
163+
const drainTimeout = 5 * time.Second
164+
go func() {
165+
done := make(chan struct{})
166+
go func() {
167+
c.notificationWg.Wait()
168+
close(done)
169+
}()
170+
select {
171+
case <-done:
172+
case <-time.After(drainTimeout):
173+
}
174+
c.inboundCancel(cause)
175+
}()
160176

161177
c.loggerOrDefault().Info("peer connection closed")
162178
}

0 commit comments

Comments
 (0)