1
- // SPDX-License-Identifier: BSD-3-Clause
2
1
// SPDX-FileCopyrightText: Copyright 2021 The Go Language Server Authors
2
+ // SPDX-License-Identifier: BSD-3-Clause
3
3
4
4
package jsonrpc2
5
5
6
6
import (
7
- "bytes"
8
7
"context"
9
- "encoding/json"
10
8
"fmt"
11
9
"sync"
12
10
"sync/atomic"
13
11
12
+ json "github.com/goccy/go-json"
13
+
14
14
"go.lsp.dev/pkg/event"
15
15
"go.lsp.dev/pkg/event/label"
16
16
"go.lsp.dev/pkg/event/tag"
17
17
)
18
18
19
19
// Conn is the common interface to jsonrpc clients and servers.
20
+ //
20
21
// Conn is bidirectional; it does not have a designated server or client end.
21
22
// It manages the jsonrpc2 protocol, connecting responses back to their calls.
22
23
type Conn interface {
23
24
// Call invokes the target method and waits for a response.
25
+ //
24
26
// The params will be marshaled to JSON before sending over the wire, and will
25
27
// be handed to the method invoked.
28
+ //
26
29
// The response will be unmarshaled from JSON into the result.
30
+ //
27
31
// The id returned will be unique from this connection, and can be used for
28
32
// logging or tracking.
29
33
Call (ctx context.Context , method string , params , result interface {}) (ID , error )
30
34
31
35
// Notify invokes the target method but does not wait for a response.
36
+ //
32
37
// The params will be marshaled to JSON before sending over the wire, and will
33
38
// be handed to the method invoked.
34
39
Notify (ctx context.Context , method string , params interface {}) error
35
40
36
41
// Go starts a goroutine to handle the connection.
37
- // It must be called exactly once for each Conn.
38
- // It returns immediately.
39
- // You must block on Done() to wait for the connection to shut down.
42
+ //
43
+ // It must be called exactly once for each Conn. It returns immediately.
44
+ // Must block on Done() to wait for the connection to shut down.
45
+ //
40
46
// This is a temporary measure, this should be started automatically in the
41
47
// future.
42
48
Go (ctx context.Context , handler Handler )
43
49
44
50
// Close closes the connection and it's underlying stream.
51
+ //
45
52
// It does not wait for the close to complete, use the Done() channel for
46
53
// that.
47
54
Close () error
@@ -52,19 +59,20 @@ type Conn interface {
52
59
Done () <- chan struct {}
53
60
54
61
// Err returns an error if there was one from within the processing goroutine.
62
+ //
55
63
// If err returns non nil, the connection will be already closed or closing.
56
64
Err () error
57
65
}
58
66
59
67
type conn struct {
60
- seq int64 // access atomically
61
- writeMu sync.Mutex // protects writes to the stream
62
- stream Stream
63
- pendingMu sync.Mutex // protects the pending map
64
- pending map [ID ]chan * Response
65
-
66
- done chan struct {}
67
- err atomic.Value
68
+ seq int64 // access atomically
69
+ writeMu sync.Mutex // protects writes to the stream
70
+ stream Stream // supplied stream
71
+ pendingMu sync.Mutex // protects the pending map
72
+ pending map [ID ]chan * Response // holds the pending response channel with the ID as the key.
73
+
74
+ done chan struct {} // closed when done
75
+ err atomic.Value // holds run error
68
76
}
69
77
70
78
// NewConn creates a new connection object around the supplied stream.
@@ -78,9 +86,9 @@ func NewConn(s Stream) Conn {
78
86
}
79
87
80
88
// Call implements Conn.
81
- func (c * conn ) Call (ctx context.Context , method string , params , result interface {}) (_ ID , err error ) {
89
+ func (c * conn ) Call (ctx context.Context , method string , params , result interface {}) (id ID , err error ) {
82
90
// generate a new request identifier
83
- id := ID { number : atomic .AddInt64 (& c .seq , 1 )}
91
+ id = NewNumberID ( atomic .AddInt64 (& c .seq , 1 ))
84
92
call , err := NewCall (id , method , params )
85
93
if err != nil {
86
94
return id , fmt .Errorf ("marshaling call parameters: %w" , err )
@@ -123,22 +131,22 @@ func (c *conn) Call(ctx context.Context, method string, params, result interface
123
131
124
132
// now wait for the response
125
133
select {
126
- case response := <- rchan :
127
- switch {
128
- case response .err != nil : // is it an error response?
129
- return id , response .err
134
+ case resp := <- rchan :
135
+ // is it an error response?
136
+ if resp .err != nil {
137
+ return id , resp .err
138
+ }
130
139
131
- case result == nil || len (response .result ) == 0 :
140
+ if result == nil || len (resp .result ) == 0 {
132
141
return id , nil
142
+ }
133
143
134
- default :
135
- dec := json .NewDecoder (bytes .NewReader (response .result ))
136
- if err := dec .Decode (result ); err != nil {
137
- return id , fmt .Errorf ("unmarshaling result: %w" , err )
138
- }
139
- return id , nil
144
+ if err := json .Unmarshal (resp .result , result ); err != nil {
145
+ return id , fmt .Errorf ("unmarshaling result: %w" , err )
140
146
}
141
147
148
+ return id , nil
149
+
142
150
case <- ctx .Done ():
143
151
return id , ctx .Err ()
144
152
}
@@ -150,6 +158,7 @@ func (c *conn) Notify(ctx context.Context, method string, params interface{}) (e
150
158
if err != nil {
151
159
return fmt .Errorf ("marshaling notify parameters: %w" , err )
152
160
}
161
+
153
162
ctx , done := event .Start (ctx , method ,
154
163
tag .Method .Of (method ),
155
164
tag .RPCDirection .Of (tag .Outbound ),
@@ -162,6 +171,7 @@ func (c *conn) Notify(ctx context.Context, method string, params interface{}) (e
162
171
event .Metric (ctx , tag .Started .Of (1 ))
163
172
n , err := c .write (ctx , notify )
164
173
event .Metric (ctx , tag .SentBytes .Of (n ))
174
+
165
175
return err
166
176
}
167
177
@@ -176,25 +186,31 @@ func (c *conn) replier(req Message, spanDone func()) Replier {
176
186
// request was a notify, no need to respond
177
187
return nil
178
188
}
189
+
179
190
response , err := NewResponse (call .id , result , err )
180
191
if err != nil {
181
192
return err
182
193
}
194
+
183
195
n , err := c .write (ctx , response )
184
196
event .Metric (ctx , tag .SentBytes .Of (n ))
185
197
if err != nil {
186
- // TODO(iancottrell): if a stream write fails, we really need to shut down
187
- // the whole stream
198
+ // TODO(iancottrell): if a stream write fails, we really need to shut down the whole stream
188
199
return err
189
200
}
190
201
return nil
191
202
}
192
203
}
193
204
194
- func (c * conn ) write (ctx context.Context , msg Message ) (int64 , error ) {
205
+ func (c * conn ) write (ctx context.Context , msg Message ) (n int64 , err error ) {
195
206
c .writeMu .Lock ()
196
- defer c .writeMu .Unlock ()
197
- return c .stream .Write (ctx , msg )
207
+ n , err = c .stream .Write (ctx , msg )
208
+ c .writeMu .Unlock ()
209
+ if err != nil {
210
+ return 0 , fmt .Errorf ("write to stream: %w" , err )
211
+ }
212
+
213
+ return n , nil
198
214
}
199
215
200
216
// Go implements Conn.
@@ -204,6 +220,7 @@ func (c *conn) Go(ctx context.Context, handler Handler) {
204
220
205
221
func (c * conn ) run (ctx context.Context , handler Handler ) {
206
222
defer close (c .done )
223
+
207
224
for {
208
225
// get the next message
209
226
msg , n , err := c .stream .Read (ctx )
@@ -212,6 +229,7 @@ func (c *conn) run(ctx context.Context, handler Handler) {
212
229
c .fail (err )
213
230
return
214
231
}
232
+
215
233
switch msg := msg .(type ) {
216
234
case Request :
217
235
labels := []label.Label {
@@ -224,14 +242,18 @@ func (c *conn) run(ctx context.Context, handler Handler) {
224
242
} else {
225
243
labels = labels [:len (labels )- 1 ]
226
244
}
245
+
227
246
reqCtx , spanDone := event .Start (ctx , msg .Method (), labels ... )
228
247
event .Metric (reqCtx ,
229
248
tag .Started .Of (1 ),
230
- tag .ReceivedBytes .Of (n ))
249
+ tag .ReceivedBytes .Of (n ),
250
+ )
251
+
231
252
if err := handler (reqCtx , c .replier (msg , spanDone ), msg ); err != nil {
232
253
// delivery failed, not much we can do
233
254
event .Error (reqCtx , "jsonrpc2 message delivery failed" , err )
234
255
}
256
+
235
257
case * Response :
236
258
// If method is not set, this should be a response, in which case we must
237
259
// have an id to send the response back to the caller.
@@ -269,6 +291,7 @@ func (c *conn) fail(err error) {
269
291
c .stream .Close ()
270
292
}
271
293
294
+ // recordStatus records the status code based on the error.
272
295
func recordStatus (ctx context.Context , err error ) {
273
296
if err != nil {
274
297
event .Label (ctx , tag .StatusCode .Of ("ERROR" ))
0 commit comments