@@ -6,31 +6,17 @@ package jsonrpc2
6
6
7
7
import (
8
8
"context"
9
- )
10
-
11
- // Handler is the interface used to hook into the message handling of an rpc
12
- // connection.
13
- type Handler interface {
14
- // Deliver is invoked to handle incoming requests.
15
- // If the request returns false from IsNotify then the Handler must eventually
16
- // call Reply on the Conn with the supplied request.
17
- // Handlers are called synchronously, they should pass the work off to a go
18
- // routine if they are going to take a long time.
19
- // If Deliver returns true all subsequent handlers will be invoked with
20
- // delivered set to true, and should not attempt to deliver the message.
21
- Deliver (ctx context.Context , r * Request , delivered bool ) bool
9
+ "fmt"
10
+ "sync"
22
11
23
- // Cancel is invoked for cancelled outgoing requests.
24
- // It is okay to use the connection to send notifications, but the context will
25
- // be in the cancelled state, so you must do it with the background context
26
- // instead.
27
- // If Cancel returns true all subsequent handlers will be invoked with
28
- // cancelled set to true, and should not attempt to cancel the message.
29
- Cancel (ctx context.Context , conn * Conn , id ID , cancelled bool ) bool
12
+ "golang.org/x/tools/internal/telemetry/event"
13
+ )
30
14
31
- // Request is called near the start of processing any request.
32
- Request (ctx context.Context , conn * Conn , direction Direction , r * WireRequest ) context.Context
33
- }
15
+ // Handler is invoked to handle incoming requests.
16
+ // If the request returns false from IsNotify then the Handler must eventually
17
+ // call Reply on the Conn with the supplied request.
18
+ // The handler should return ErrNotHandled if it could not handle the request.
19
+ type Handler func (context.Context , * Request ) error
34
20
35
21
// Direction is used to indicate to a logger whether the logged message was being
36
22
// sent or received.
@@ -54,32 +40,77 @@ func (d Direction) String() string {
54
40
}
55
41
}
56
42
57
- type EmptyHandler struct {}
58
-
59
- func (EmptyHandler ) Deliver (ctx context.Context , r * Request , delivered bool ) bool {
60
- return false
61
- }
62
-
63
- func (EmptyHandler ) Cancel (ctx context.Context , conn * Conn , id ID , cancelled bool ) bool {
64
- return false
43
+ // MethodNotFound is a Handler that replies to all call requests with the
44
+ // standard method not found response.
45
+ // This should normally be the final handler in a chain.
46
+ func MethodNotFound (ctx context.Context , r * Request ) error {
47
+ return r .Reply (ctx , nil , NewErrorf (CodeMethodNotFound , "method %q not found" , r .Method ))
65
48
}
66
49
67
- func (EmptyHandler ) Request (ctx context.Context , conn * Conn , direction Direction , r * WireRequest ) context.Context {
68
- return ctx
50
+ // MustReply creates a Handler that panics if the wrapped handler does
51
+ // not call Reply for every request that it is passed.
52
+ func MustReply (handler Handler ) Handler {
53
+ return func (ctx context.Context , req * Request ) error {
54
+ err := handler (ctx , req )
55
+ if req .done != nil {
56
+ panic (fmt .Errorf ("request %q was never replied to" , req .Method ))
57
+ }
58
+ return err
59
+ }
69
60
}
70
61
71
- func (EmptyHandler ) Response (ctx context.Context , conn * Conn , direction Direction , r * WireResponse ) context.Context {
72
- return ctx
62
+ // CancelHandler returns a handler that supports cancellation, and a canceller
63
+ // that can be used to trigger canceling in progress requests.
64
+ func CancelHandler (handler Handler ) (Handler , Canceller ) {
65
+ var mu sync.Mutex
66
+ handling := make (map [ID ]context.CancelFunc )
67
+ wrapped := func (ctx context.Context , req * Request ) error {
68
+ if req .ID != nil {
69
+ cancelCtx , cancel := context .WithCancel (ctx )
70
+ ctx = cancelCtx
71
+ mu .Lock ()
72
+ handling [* req .ID ] = cancel
73
+ mu .Unlock ()
74
+ req .OnReply (func () {
75
+ mu .Lock ()
76
+ delete (handling , * req .ID )
77
+ mu .Unlock ()
78
+ })
79
+ }
80
+ return handler (ctx , req )
81
+ }
82
+ return wrapped , func (id ID ) {
83
+ mu .Lock ()
84
+ cancel , found := handling [id ]
85
+ mu .Unlock ()
86
+ if found {
87
+ cancel ()
88
+ }
89
+ }
73
90
}
74
91
75
- type defaultHandler struct { EmptyHandler }
76
-
77
- func (defaultHandler ) Deliver (ctx context.Context , r * Request , delivered bool ) bool {
78
- if delivered {
79
- return false
80
- }
81
- if ! r .IsNotify () {
82
- r .Reply (ctx , nil , NewErrorf (CodeMethodNotFound , "method %q not found" , r .Method ))
92
+ // AsyncHandler returns a handler that processes each request goes in its own
93
+ // goroutine.
94
+ // The handler returns immediately, without the request being processed.
95
+ // Each request then waits for the previous request to finish before it starts.
96
+ // This allows the stream to unblock at the cost of unbounded goroutines
97
+ // all stalled on the previous one.
98
+ func AsyncHandler (handler Handler ) Handler {
99
+ nextRequest := make (chan struct {})
100
+ close (nextRequest )
101
+ return func (ctx context.Context , req * Request ) error {
102
+ waitForPrevious := nextRequest
103
+ nextRequest = make (chan struct {})
104
+ unlockNext := nextRequest
105
+ req .OnReply (func () { close (unlockNext ) })
106
+ _ , queueDone := event .StartSpan (ctx , "queued" )
107
+ go func () {
108
+ <- waitForPrevious
109
+ queueDone ()
110
+ if err := handler (ctx , req ); err != nil {
111
+ event .Error (ctx , "jsonrpc2 async message delivery failed" , err )
112
+ }
113
+ }()
114
+ return nil
83
115
}
84
- return true
85
116
}
0 commit comments