@@ -374,6 +374,46 @@ func (c *Connection) Call(ctx context.Context, method string, params any) *Async
374374 return ac
375375}
376376
377+ // Async, signals that the current jsonrpc2 request may be handled
378+ // asynchronously to subsequent requests, when ctx is the request context.
379+ //
380+ // Async must be called at most once on each request's context (and its
381+ // descendants).
382+ func Async (ctx context.Context ) {
383+ if r , ok := ctx .Value (asyncKey ).(* releaser ); ok {
384+ r .release (false )
385+ }
386+ }
387+
388+ type asyncKeyType struct {}
389+
390+ var asyncKey = asyncKeyType {}
391+
392+ // A releaser implements concurrency safe 'releasing' of async requests. (A
393+ // request is released when it is allowed to run concurrent with other
394+ // requests, via a call to [Async].)
395+ type releaser struct {
396+ mu sync.Mutex
397+ ch chan struct {}
398+ released bool
399+ }
400+
401+ // release closes the associated channel. If soft is set, multiple calls to
402+ // release are allowed.
403+ func (r * releaser ) release (soft bool ) {
404+ r .mu .Lock ()
405+ defer r .mu .Unlock ()
406+
407+ if r .released {
408+ if ! soft {
409+ panic ("jsonrpc2.Async called multiple times" )
410+ }
411+ } else {
412+ close (r .ch )
413+ r .released = true
414+ }
415+ }
416+
377417type AsyncCall struct {
378418 id ID
379419 ready chan struct {} // closed after response has been set
@@ -425,28 +465,6 @@ func (ac *AsyncCall) Await(ctx context.Context, result any) error {
425465 return json .Unmarshal (ac .response .Result , result )
426466}
427467
428- // Respond delivers a response to an incoming Call.
429- //
430- // Respond must be called exactly once for any message for which a handler
431- // returns ErrAsyncResponse. It must not be called for any other message.
432- func (c * Connection ) Respond (id ID , result any , err error ) error {
433- var req * incomingRequest
434- c .updateInFlight (func (s * inFlightState ) {
435- req = s .incomingByID [id ]
436- })
437- if req == nil {
438- return c .internalErrorf ("Request not found for ID %v" , id )
439- }
440-
441- if err == ErrAsyncResponse {
442- // Respond is supposed to supply the asynchronous response, so it would be
443- // confusing to call Respond with an error that promises to call Respond
444- // again.
445- err = c .internalErrorf ("Respond called with ErrAsyncResponse for %q" , req .Method )
446- }
447- return c .processResult ("Respond" , req , result , err )
448- }
449-
450468// Cancel cancels the Context passed to the Handle call for the inbound message
451469// with the given ID.
452470//
@@ -576,11 +594,6 @@ func (c *Connection) acceptRequest(ctx context.Context, msg *Request, preempter
576594 if preempter != nil {
577595 result , err := preempter .Preempt (req .ctx , req .Request )
578596
579- if req .IsCall () && errors .Is (err , ErrAsyncResponse ) {
580- // This request will remain in flight until Respond is called for it.
581- return
582- }
583-
584597 if ! errors .Is (err , ErrNotHandled ) {
585598 c .processResult ("Preempt" , req , result , err )
586599 return
@@ -655,19 +668,20 @@ func (c *Connection) handleAsync() {
655668 continue
656669 }
657670
658- result , err := c .handler .Handle (req .ctx , req .Request )
659- c .processResult (c .handler , req , result , err )
671+ releaser := & releaser {ch : make (chan struct {})}
672+ ctx := context .WithValue (req .ctx , asyncKey , releaser )
673+ go func () {
674+ defer releaser .release (true )
675+ result , err := c .handler .Handle (ctx , req .Request )
676+ c .processResult (c .handler , req , result , err )
677+ }()
678+ <- releaser .ch
660679 }
661680}
662681
663682// processResult processes the result of a request and, if appropriate, sends a response.
664683func (c * Connection ) processResult (from any , req * incomingRequest , result any , err error ) error {
665684 switch err {
666- case ErrAsyncResponse :
667- if ! req .IsCall () {
668- return c .internalErrorf ("%#v returned ErrAsyncResponse for a %q Request without an ID" , from , req .Method )
669- }
670- return nil // This request is still in flight, so don't record the result yet.
671685 case ErrNotHandled , ErrMethodNotFound :
672686 // Add detail describing the unhandled method.
673687 err = fmt .Errorf ("%w: %q" , ErrMethodNotFound , req .Method )
@@ -705,10 +719,10 @@ func (c *Connection) processResult(from any, req *incomingRequest, result any, e
705719 } else if err != nil {
706720 err = fmt .Errorf ("%w: %q notification failed: %v" , ErrInternal , req .Method , err )
707721 }
708- if err != nil {
709- // TODO: can/should we do anything with this error beyond writing it to the event log?
710- // (Is this the right label to attach to the log?)
711- }
722+ }
723+ if err != nil {
724+ // TODO: can/should we do anything with this error beyond writing it to the event log?
725+ // (Is this the right label to attach to the log?)
712726 }
713727
714728 // Cancel the request to free any associated resources.
@@ -725,7 +739,23 @@ func (c *Connection) processResult(from any, req *incomingRequest, result any, e
725739// write is used by all things that write outgoing messages, including replies.
726740// it makes sure that writes are atomic
727741func (c * Connection ) write (ctx context.Context , msg Message ) error {
728- err := c .writer .Write (ctx , msg )
742+ var err error
743+ // Fail writes immediately if the connection is shutting down.
744+ //
745+ // TODO(rfindley): should we allow cancellation notifications through? It
746+ // could be the case that writes can still succeed.
747+ c .updateInFlight (func (s * inFlightState ) {
748+ err = s .shuttingDown (ErrServerClosing )
749+ })
750+ if err == nil {
751+ err = c .writer .Write (ctx , msg )
752+ }
753+
754+ // For rejected requests, we don't set the writeErr (which would break the
755+ // connection). They can just be returned to the caller.
756+ if errors .Is (err , ErrRejected ) {
757+ return err
758+ }
729759
730760 if err != nil && ctx .Err () == nil {
731761 // The call to Write failed, and since ctx.Err() is nil we can't attribute
0 commit comments