@@ -50,6 +50,12 @@ type Handler func(context.Context, *Conn, *Request)
50
50
// instead.
51
51
type Canceler func (context.Context , * Conn , * Request )
52
52
53
+ type handling struct {
54
+ request * Request
55
+ cancel context.CancelFunc
56
+ start time.Time
57
+ }
58
+
53
59
// Conn is a JSON RPC 2 client server connection.
54
60
// Conn is bidirectional; it does not have a designated server or client end.
55
61
type Conn struct {
@@ -60,20 +66,14 @@ type Conn struct {
60
66
Capacity int
61
67
RejectIfOverloaded bool
62
68
stream Stream
63
- pendingMu sync.Mutex // protects the pending map
64
69
pending map [ID ]chan * Response
65
- handlingMu sync.Mutex // protects the handling map
70
+ pendingMu sync.Mutex // protects the pending map
66
71
handling map [ID ]handling
72
+ handlingMu sync.Mutex // protects the handling map
67
73
}
68
74
69
75
var _ Interface = (* Conn )(nil )
70
76
71
- type queueEntry struct {
72
- ctx context.Context
73
- conn * Conn
74
- request * Request
75
- }
76
-
77
77
// Options represents a functional options.
78
78
type Options func (* Conn )
79
79
@@ -127,9 +127,6 @@ var defaultLogger = zap.NewNop()
127
127
func NewConn (s Stream , options ... Options ) * Conn {
128
128
conn := & Conn {
129
129
seq : new (atomic.Int64 ),
130
- Handler : defaultHandler , // the default handler reports a method error
131
- Canceler : defaultCanceler , // the default canceller does nothing
132
- Logger : defaultLogger , // the default Logger does nothing
133
130
stream : s ,
134
131
pending : make (map [ID ]chan * Response ),
135
132
handling : make (map [ID ]handling ),
@@ -139,6 +136,19 @@ func NewConn(s Stream, options ...Options) *Conn {
139
136
opt (conn )
140
137
}
141
138
139
+ // the default handler reports a method error
140
+ if conn .Handler == nil {
141
+ conn .Handler = defaultHandler
142
+ }
143
+ // the default canceller does nothing
144
+ if conn .Canceler == nil {
145
+ conn .Canceler = defaultCanceler
146
+ }
147
+ // the default Logger does nothing
148
+ if conn .Logger == nil {
149
+ conn .Logger = defaultLogger
150
+ }
151
+
142
152
return conn
143
153
}
144
154
@@ -156,7 +166,7 @@ func (c *Conn) Cancel(id ID) {
156
166
157
167
// Notify is called to send a notification request over the connection.
158
168
func (c * Conn ) Notify (ctx context.Context , method string , params interface {}) error {
159
- c .Logger .Debug ("Notify" )
169
+ c .Logger .Debug ("Notify" , zap . String ( "method" , method ), zap . Any ( "params" , params ) )
160
170
p , err := c .marshalInterface (params )
161
171
if err != nil {
162
172
return Errorf (CodeParseError , "failed to marshaling notify parameters: %v" , err )
@@ -187,7 +197,7 @@ func (c *Conn) Notify(ctx context.Context, method string, params interface{}) er
187
197
188
198
// Call sends a request over the connection and then waits for a response.
189
199
func (c * Conn ) Call (ctx context.Context , method string , params , result interface {}) error {
190
- c .Logger .Debug ("Call" )
200
+ c .Logger .Debug ("Call" , zap . String ( "method" , method ), zap . Any ( "params" , params ) )
191
201
p , err := c .marshalInterface (params )
192
202
if err != nil {
193
203
return Errorf (CodeParseError , "failed to marshaling call parameters: %v" , err )
@@ -218,7 +228,6 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface
218
228
c .pendingMu .Unlock ()
219
229
}()
220
230
221
- start := time .Now ()
222
231
c .Logger .Debug (Send ,
223
232
zap .String ("req.JSONRPC" , req .JSONRPC ),
224
233
zap .String ("id" , id .String ()),
@@ -233,12 +242,8 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface
233
242
// wait for the response
234
243
select {
235
244
case resp := <- rchan :
236
- elapsed := time .Since (start )
237
245
c .Logger .Debug (Receive ,
238
- zap .String ("resp.ID" , resp .ID .String ()),
239
- zap .Duration ("elapsed" , elapsed ),
240
- zap .String ("req.method" , req .Method ),
241
- zap .Any ("resp.Result" , resp .Result ),
246
+ zap .Any ("resp" , resp ),
242
247
)
243
248
244
249
// is it an error response?
@@ -331,24 +336,24 @@ func (c *Conn) Reply(ctx context.Context, req *Request, result interface{}, err
331
336
return nil
332
337
}
333
338
334
- type handling struct {
339
+ type queueEntry struct {
340
+ ctx context.Context
341
+ conn * Conn
335
342
request * Request
336
- cancel context.CancelFunc
337
- start time.Time
338
343
}
339
344
340
- func (c * Conn ) deliver (ctx context.Context , q chan queueEntry , request * Request ) bool {
345
+ func (c * Conn ) deliver (ctx context.Context , queuec chan queueEntry , request * Request ) bool {
341
346
c .Logger .Debug ("deliver" )
342
347
343
348
e := queueEntry {ctx : ctx , conn : c , request : request }
344
349
345
350
if ! c .RejectIfOverloaded {
346
- q <- e
351
+ queuec <- e
347
352
return true
348
353
}
349
354
350
355
select {
351
- case q <- e :
356
+ case queuec <- e :
352
357
return true
353
358
default :
354
359
return false
@@ -357,16 +362,15 @@ func (c *Conn) deliver(ctx context.Context, q chan queueEntry, request *Request)
357
362
358
363
// Run run the jsonrpc2 server.
359
364
func (c * Conn ) Run (ctx context.Context ) (err error ) {
360
- q := make (chan queueEntry , c .Capacity )
361
- defer close (q )
365
+ queuec := make (chan queueEntry , c .Capacity )
366
+ defer close (queuec )
362
367
363
368
// start the queue processor
364
369
go func () {
365
- for e := range q {
370
+ for e := range queuec {
366
371
if e .ctx .Err () != nil {
367
372
continue
368
373
}
369
- c .Logger .Debug ("c.Handler" , zap .Reflect ("e" , e .conn ), zap .Reflect ("e.request" , e .request ))
370
374
c .Handler (e .ctx , e .conn , e .request )
371
375
}
372
376
}()
@@ -377,8 +381,6 @@ func (c *Conn) Run(ctx context.Context) (err error) {
377
381
return err // read the stream failed, cannot continue
378
382
}
379
383
380
- c .Logger .Debug (Receive , zap .ByteString ("data" , data ), zap .Int ("len(data)" , len (data )))
381
-
382
384
msg := & Combined {}
383
385
if err := json .Unmarshal (data , msg ); err != nil { // TODO(zchee): use gojay
384
386
// a badly formed message arrived, log it and continue
@@ -407,7 +409,7 @@ func (c *Conn) Run(ctx context.Context) (err error) {
407
409
zap .Any ("req.Params" , req .Params ),
408
410
)
409
411
// add to the processor queue
410
- c .deliver (ctx , q , req )
412
+ c .deliver (ctx , queuec , req )
411
413
// TODO: log when we drop a message?
412
414
} else {
413
415
// handle the Call, add to the processor queue.
@@ -425,7 +427,7 @@ func (c *Conn) Run(ctx context.Context) (err error) {
425
427
zap .Any ("req.Params" , req .Params ),
426
428
)
427
429
428
- if ! c .deliver (ctxReq , q , req ) {
430
+ if ! c .deliver (ctxReq , queuec , req ) {
429
431
// queue is full, reject the message by directly replying
430
432
c .Reply (ctx , req , nil , Errorf (CodeServerOverloaded , "no room in queue" ))
431
433
}
@@ -464,6 +466,6 @@ func (c *Conn) marshalInterface(obj interface{}) (*json.RawMessage, error) {
464
466
return nil , err
465
467
}
466
468
raw := json .RawMessage (data )
467
- c . Logger . Debug ( "marshalInterface" , zap . ByteString ( "raw" , raw ))
469
+
468
470
return & raw , nil
469
471
}
0 commit comments