Skip to content

Commit e5be7e7

Browse files
committed
jsonrpc2: fix all logic
1 parent c227f42 commit e5be7e7

File tree

1 file changed

+96
-101
lines changed

1 file changed

+96
-101
lines changed

jsonrpc2.go

Lines changed: 96 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,10 @@ func NewConn(ctx context.Context, s Stream, options ...Options) *Conn {
147147
// the default canceller does nothing
148148
conn.Canceler = defaultCanceler
149149
}
150-
151-
go func() {
152-
conn.err = conn.Run(ctx)
153-
close(conn.done)
154-
}()
150+
if conn.logger == nil {
151+
// the default logger does nothing
152+
conn.logger = zap.NewNop()
153+
}
155154

156155
return conn
157156
}
@@ -170,20 +169,21 @@ func (c *Conn) Write(p []byte) (n int, err error) {
170169
func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) error {
171170
jsonParams, err := marshalToEmbedded(params)
172171
if err != nil {
173-
return xerrors.Errorf("failed to marshalling call parameters: %v", err)
172+
return xerrors.Errorf("failed to marshaling call parameters: %v", err)
174173
}
175-
id := ID{Number: c.seq.Add(1)}
176174

175+
id := ID{Number: c.seq.Add(1)}
177176
req := &Request{
178-
ID: &id,
179-
Method: method,
180-
Params: jsonParams,
177+
JSONRPC: Version,
178+
ID: &id,
179+
Method: method,
180+
Params: jsonParams,
181181
}
182182

183183
// marshal the request now it is complete
184-
data, err := gojay.Marshal(req)
184+
data, err := gojay.MarshalJSONObject(req)
185185
if err != nil {
186-
return xerrors.Errorf("failed to marshalling call request: %v", err)
186+
return xerrors.Errorf("failed to marshaling call request: %v", err)
187187
}
188188

189189
rchan := make(chan *Response)
@@ -203,8 +203,9 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface
203203
}()
204204

205205
start := time.Now()
206-
c.logger.Info(Send.String(),
207-
zap.String("id", id.String()),
206+
c.logger.Debug(Send,
207+
zap.String("req.JSONRPC", req.JSONRPC),
208+
zap.Any("id", id),
208209
zap.String("req.method", req.Method),
209210
zap.Any("req.params", req.Params),
210211
)
@@ -214,8 +215,9 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface
214215

215216
select {
216217
case resp := <-rchan:
217-
c.logger.Info(Receive.String(),
218-
zap.String("id", id.String()),
218+
c.logger.Debug(Receive,
219+
zap.String("req.JSONRPC", req.JSONRPC),
220+
zap.Any("id", id),
219221
zap.Duration("elapsed", time.Since(start)),
220222
zap.String("req.method", req.Method),
221223
zap.Any("resp.Result", resp.Result),
@@ -228,7 +230,7 @@ func (c *Conn) Call(ctx context.Context, method string, params, result interface
228230
if result == nil || resp.Result == nil {
229231
return nil
230232
}
231-
if err := gojay.Unsafe.Unmarshal(*resp.Result.EmbeddedJSON, result); err != nil {
233+
if err := gojay.Unsafe.Unmarshal(*resp.Result, result); err != nil {
232234
return xerrors.Errorf("failed to unmarshalling result: %v", err)
233235
}
234236
return nil
@@ -255,27 +257,33 @@ func (c *Conn) Reply(ctx context.Context, req *Request, result interface{}, err
255257
}
256258

257259
elapsed := time.Since(handling.start)
258-
var raw *RawMessage
259-
if err == nil {
260-
raw, err = marshalToEmbedded(result)
261-
}
262260

263261
resp := &Response{
264-
ID: req.ID,
265-
Result: raw,
262+
JSONRPC: Version,
263+
ID: req.ID,
266264
}
267-
268-
if err != nil {
269-
resp.Error = Errorf(0, "%s", err)
265+
if err == nil {
266+
if resp.Result, err = marshalToEmbedded(result); err != nil {
267+
return err
268+
}
269+
} else {
270+
resp.Error = New(CodeParseError, err)
270271
}
271272

272273
data, err := gojay.Marshal(resp)
273274
if err != nil {
275+
c.logger.Error(Send,
276+
zap.Any("resp.ID", resp.ID.Number),
277+
zap.Duration("elapsed", elapsed),
278+
zap.String("req.Method", req.Method),
279+
zap.Any("resp.Result", resp.Result),
280+
zap.Error(err),
281+
)
274282
return err
275283
}
276284

277-
c.logger.Info(Send.String(),
278-
zap.String("resp.ID", resp.ID.String()),
285+
c.logger.Debug(Send,
286+
zap.Any("resp.ID", resp.ID),
279287
zap.Duration("elapsed", elapsed),
280288
zap.String("req.Method", req.Method),
281289
zap.Any("resp.Result", resp.Result),
@@ -291,21 +299,22 @@ func (c *Conn) Reply(ctx context.Context, req *Request, result interface{}, err
291299

292300
// Notify is called to send a notification request over the connection.
293301
func (c *Conn) Notify(ctx context.Context, method string, params interface{}) error {
294-
jsonParams, err := marshalToEmbedded(params)
302+
prms, err := marshalToEmbedded(params)
295303
if err != nil {
296-
return xerrors.Errorf("failed to marshalling notify parameters: %v", err)
304+
return Errorf(CodeParseError, "failed to marshaling notify parameters: %v", err)
297305
}
298306

299307
req := &NotificationMessage{
300-
Method: method,
301-
Params: jsonParams,
308+
JSONRPC: Version,
309+
Method: method,
310+
Params: prms,
302311
}
303312
data, err := gojay.MarshalJSONObject(req)
304313
if err != nil {
305-
return xerrors.Errorf("failed to marshalling notify request: %v", err)
314+
return Errorf(CodeParseError, "failed to marshaling notify request: %v", err)
306315
}
307316

308-
c.logger.Info(Send.String(),
317+
c.logger.Debug(Send,
309318
zap.String("req.Method", req.Method),
310319
zap.Any("req.Params", req.Params),
311320
)
@@ -347,17 +356,6 @@ func (c *Conn) deliver(ctx context.Context, q chan queue, request *Request) bool
347356
}
348357
}
349358

350-
// combined has all the fields of both Request and Response.
351-
// We can decode this and then work out which it is.
352-
type combined struct {
353-
VersionTag Message `json:"jsonrpc"`
354-
ID *ID `json:"id,omitempty"`
355-
Method string `json:"method"`
356-
Params *RawMessage `json:"params,omitempty"`
357-
Result *RawMessage `json:"result,omitempty"`
358-
Error *Error `json:"error,omitempty"`
359-
}
360-
361359
// Run run the jsonrpc2 server.
362360
func (c *Conn) Run(ctx context.Context) (err error) {
363361
q := make(chan queue, c.capacity)
@@ -374,22 +372,21 @@ func (c *Conn) Run(ctx context.Context) (err error) {
374372
}()
375373

376374
for {
377-
var data []byte
378-
// get the data for a message
379-
_, err = c.stream.Read(ctx, data)
375+
data := make([]byte, 0, 512)
376+
_, err = c.stream.Read(ctx, data) // get the data for a message
380377
if err != nil {
381-
// the stream failed, we cannot continue
382-
return err
378+
return err // the stream failed, we cannot continue
383379
}
384380

385381
// read a combined message
386-
msg := &combined{}
387-
if err := gojay.Unsafe.Unmarshal(data, msg); err != nil {
382+
msg := new(Combined)
383+
if err := gojay.Unsafe.UnmarshalJSONObject(data, msg); err != nil {
388384
// a badly formed message arrived, log it and continue
389385
// we trust the stream to have isolated the error to just this message
390-
c.logger.Info(Receive.String(),
391-
zap.Error(Errorf(0, "unmarshal failed: %v", err)),
386+
c.logger.Debug(Receive,
387+
zap.Error(Errorf(CodeParseError, "unmarshal failed: %v", err)),
392388
)
389+
393390
continue
394391
}
395392

@@ -398,32 +395,43 @@ func (c *Conn) Run(ctx context.Context) (err error) {
398395
case msg.Method != "":
399396
// if method is set it must be a request
400397
req := &Request{
401-
Method: msg.Method,
402-
Params: msg.Params,
403-
ID: msg.ID,
398+
JSONRPC: Version,
399+
Method: msg.Method,
400+
Params: msg.Params,
401+
ID: msg.ID,
404402
}
405403
if req.IsNotify() {
406-
c.logger.Info(Receive.String(), zap.String("req.ID", req.ID.String()), zap.String("req.Method", req.Method), zap.Any("req.Params", req.Params))
404+
c.logger.Debug(Receive,
405+
zap.Any("req.ID", req.ID),
406+
zap.String("req.Method", req.Method),
407+
zap.Any("req.Params", req.Params),
408+
)
407409
c.deliver(ctx, q, req)
408-
} else {
409-
// we have a Call, add to the processor queue
410-
reqCtx, reqCancel := context.WithCancel(ctx)
411-
defer reqCancel()
412-
m, ok := c.handling.Load().(handlingMap)
413-
if !ok {
414-
return errLoadhandlingMap
415-
}
416-
m[*req.ID] = handling{
417-
request: req,
418-
cancel: reqCancel,
419-
start: time.Now(),
420-
}
421-
c.handling.Store(m)
422-
c.logger.Info(Receive.String(), zap.String("req.ID", req.ID.String()), zap.String("req.Method", req.Method), zap.Any("req.Params", req.Params))
423-
if !c.deliver(reqCtx, q, req) {
424-
// queue is full, reject the message by directly replying
425-
c.Reply(ctx, req, nil, Errorf(CodeServerOverloaded, "no room in queue"))
426-
}
410+
411+
return
412+
}
413+
414+
// we have a Call, add to the processor queue
415+
reqCtx, reqCancel := context.WithCancel(ctx)
416+
m, ok := c.handling.Load().(handlingMap)
417+
if !ok {
418+
return errLoadhandlingMap
419+
}
420+
m[*req.ID] = handling{
421+
request: req,
422+
cancel: reqCancel,
423+
start: time.Now(),
424+
}
425+
c.handling.Store(m)
426+
427+
c.logger.Debug(Receive,
428+
zap.Any("req.ID", req.ID),
429+
zap.String("req.Method", req.Method),
430+
zap.Any("req.Params", req.Params),
431+
)
432+
if !c.deliver(reqCtx, q, req) {
433+
// queue is full, reject the message by directly replying
434+
c.Reply(ctx, req, nil, Errorf(CodeServerOverloaded, "no room in queue"))
427435
}
428436

429437
case msg.ID != nil:
@@ -437,49 +445,36 @@ func (c *Conn) Run(ctx context.Context) (err error) {
437445
delete(m, *msg.ID)
438446
}
439447
c.pending.Store(m)
448+
440449
// and send the reply to the channel
441450
resp := &Response{
442-
Result: msg.Result,
443-
Error: msg.Error,
444-
ID: msg.ID,
451+
JSONRPC: Version,
452+
Result: msg.Result,
453+
Error: msg.Error,
454+
ID: msg.ID,
445455
}
446456
rchan <- resp
447457
close(rchan)
448458

449459
default:
450-
c.logger.Info(Receive.String(), zap.Error(Errorf(0, "message not a call, notify or response, ignoring")))
460+
c.logger.Error(Receive, zap.Error(Errorf(CodeInvalidParams, "message not a call, notify or response, ignoring")))
451461
}
452462
}
453463
}
454464

455-
// Direction is used to indicate to a logger whether the logged message was being
456-
// sent or received.
457-
type Direction bool
458-
459465
const (
460466
// Send indicates the message is outgoing.
461-
Send = Direction(true)
467+
Send = "send"
462468
// Receive indicates the message is incoming.
463-
Receive = Direction(false)
469+
Receive = "receive"
464470
)
465471

466-
func (d Direction) String() string {
467-
switch d {
468-
case Send:
469-
return "send"
470-
case Receive:
471-
return "receive"
472-
default:
473-
panic("unreachable")
474-
}
475-
}
476-
477472
func marshalToEmbedded(obj interface{}) (*RawMessage, error) {
478-
data, err := gojay.Marshal(obj)
473+
data, err := gojay.MarshalAny(obj)
479474
if err != nil {
480475
return nil, err
481476
}
482-
raw := gojay.EmbeddedJSON(data)
477+
raw := RawMessage(gojay.EmbeddedJSON(data))
483478

484-
return &RawMessage{EmbeddedJSON: &raw}, nil
479+
return &raw, nil
485480
}

0 commit comments

Comments
 (0)