Skip to content

Commit a246558

Browse files
committed
fix: gracefull conection
1 parent 750beca commit a246558

File tree

2 files changed

+58
-1
lines changed

2 files changed

+58
-1
lines changed

acp_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,54 @@ func TestConnection_DoesNotCancelInboundContextBeforeDrainingNotificationsOnDisc
521521
}
522522
}
523523

524+
func TestConnection_CancelsRequestHandlersOnDisconnectEvenWithNotificationBacklog(t *testing.T) {
525+
const numNotifications = 200
526+
527+
incomingR, incomingW := io.Pipe()
528+
529+
reqDone := make(chan struct{})
530+
531+
c := NewConnection(func(ctx context.Context, method string, _ json.RawMessage) (any, *RequestError) {
532+
switch method {
533+
case "test/notify":
534+
// Slow down to create a backlog of queued notifications.
535+
time.Sleep(5 * time.Millisecond)
536+
return nil, nil
537+
case "test/request":
538+
// Requests should be canceled promptly on disconnect (uses c.ctx).
539+
<-ctx.Done()
540+
close(reqDone)
541+
return nil, NewInternalError(map[string]any{"error": "canceled"})
542+
default:
543+
return nil, nil
544+
}
545+
}, io.Discard, incomingR)
546+
547+
for i := 0; i < numNotifications; i++ {
548+
if _, err := io.WriteString(incomingW, `{"jsonrpc":"2.0","method":"test/notify","params":{}}`+"\n"); err != nil {
549+
t.Fatalf("write notification: %v", err)
550+
}
551+
}
552+
if _, err := io.WriteString(incomingW, `{"jsonrpc":"2.0","id":1,"method":"test/request","params":{}}`+"\n"); err != nil {
553+
t.Fatalf("write request: %v", err)
554+
}
555+
_ = incomingW.Close()
556+
557+
// Disconnect should be observed quickly.
558+
select {
559+
case <-c.Done():
560+
case <-time.After(2 * time.Second):
561+
t.Fatalf("timeout waiting for connection Done()")
562+
}
563+
564+
// Even with a big notification backlog, the request handler should be canceled promptly.
565+
select {
566+
case <-reqDone:
567+
case <-time.After(1 * time.Second):
568+
t.Fatalf("timeout waiting for request handler cancellation")
569+
}
570+
}
571+
524572
// Test initialize method behavior
525573
func TestConnectionHandlesInitialize(t *testing.T) {
526574
c2aR, c2aW := io.Pipe()

connection.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,15 @@ func (c *Connection) handleResponse(msg *anyMessage) {
202202

203203
func (c *Connection) handleInbound(req *anyMessage) {
204204
res := anyMessage{JSONRPC: "2.0"}
205+
206+
// Notifications are allowed a slightly longer-lived context during disconnect so we can
207+
// process already-received end-of-connection messages. Requests, however, should be
208+
// canceled promptly when the peer disconnects to avoid doing unnecessary work after
209+
// the caller is gone.
210+
ctx := c.ctx
211+
if req.ID == nil {
212+
ctx = c.inboundCtx
213+
}
205214
// copy ID if present
206215
if req.ID != nil {
207216
res.ID = req.ID
@@ -214,7 +223,7 @@ func (c *Connection) handleInbound(req *anyMessage) {
214223
return
215224
}
216225

217-
result, err := c.handler(c.inboundCtx, req.Method, req.Params)
226+
result, err := c.handler(ctx, req.Method, req.Params)
218227
if req.ID == nil {
219228
// Notification: no response is sent; log handler errors to surface decode failures.
220229
if err != nil {

0 commit comments

Comments
 (0)