Skip to content

Commit 020667c

Browse files
committed
jsonrpc2: implements Conn.Run method
1 parent ec0ba56 commit 020667c

File tree

1 file changed

+132
-18
lines changed

1 file changed

+132
-18
lines changed

jsonrpc2.go

Lines changed: 132 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,17 @@ type Canceler func(context.Context, *Conn, *Request)
4848
// Conn is a JSON RPC 2 client server connection.
4949
// Conn is bidirectional; it does not have a designated server or client end.
5050
type Conn struct {
51-
handle Handler
52-
canceler Canceler
53-
logger *zap.Logger
54-
capacity int
55-
reject bool
56-
stream Stream
57-
done chan struct{}
58-
err error
59-
seq atomic.Int64 // must only be accessed using atomic operations
60-
pending atomic.Value // map[ID]chan *Response
61-
handling atomic.Value // map[ID]handling
51+
handler Handler
52+
canceler Canceler
53+
logger *zap.Logger
54+
capacity int
55+
overloaded bool
56+
stream Stream
57+
done chan struct{}
58+
err error
59+
seq atomic.Int64 // must only be accessed using atomic operations
60+
pending atomic.Value // map[ID]chan *Response
61+
handling atomic.Value // map[ID]handling
6262
}
6363

6464
var _ Interface = (*Conn)(nil)
@@ -69,7 +69,7 @@ type Options func(*Conn)
6969
// WithHandler apply custom hander to Conn.
7070
func WithHandler(h Handler) Options {
7171
return func(c *Conn) {
72-
c.handle = h
72+
c.handler = h
7373
}
7474
}
7575

@@ -94,10 +94,10 @@ func WithCapacity(capacity int) Options {
9494
}
9595
}
9696

97-
// WithReject apply reject boolean to Conn.
98-
func WithReject(reject bool) Options {
97+
// WithOverloaded apply overloaded boolean to Conn.
98+
func WithOverloaded(overloaded bool) Options {
9999
return func(c *Conn) {
100-
c.reject = reject
100+
c.overloaded = overloaded
101101
}
102102
}
103103

@@ -134,9 +134,9 @@ func NewConn(ctx context.Context, s Stream, options ...Options) *Conn {
134134
opt(conn)
135135
}
136136

137-
if conn.handle == nil {
137+
if conn.handler == nil {
138138
// the default handler reports a method error
139-
conn.handle = defaultHandler
139+
conn.handler = defaultHandler
140140
}
141141
if conn.canceler == nil {
142142
// the default canceller does nothing
@@ -296,8 +296,122 @@ func (c *Conn) Cancel(id ID) {
296296
}
297297
}
298298

299+
type queue struct {
300+
ctx context.Context
301+
c *Conn
302+
r *Request
303+
}
304+
305+
func (c *Conn) deliver(ctx context.Context, q chan queue, request *Request) bool {
306+
e := queue{ctx: ctx, c: c, r: request}
307+
if !c.overloaded {
308+
q <- e
309+
return true
310+
}
311+
select {
312+
case q <- e:
313+
return true
314+
default:
315+
return false
316+
}
317+
}
318+
319+
// combined has all the fields of both Request and Response.
320+
// We can decode this and then work out which it is.
321+
type combined struct {
322+
VersionTag Message `json:"jsonrpc"`
323+
ID *ID `json:"id,omitempty"`
324+
Method string `json:"method"`
325+
Params *gojay.EmbeddedJSON `json:"params,omitempty"`
326+
Result *gojay.EmbeddedJSON `json:"result,omitempty"`
327+
Error *Error `json:"error,omitempty"`
328+
}
329+
299330
// Run run the jsonrpc2 server.
300-
func (c *Conn) Run(ctx context.Context) error { return nil }
331+
func (c *Conn) Run(ctx context.Context) error {
332+
q := make(chan queue, c.capacity)
333+
defer close(q)
334+
335+
// start the queue processor
336+
go func() {
337+
for e := range q {
338+
if e.ctx.Err() != nil {
339+
continue
340+
}
341+
c.handler(e.ctx, e.c, e.r)
342+
}
343+
}()
344+
345+
for {
346+
// get the data for a message
347+
data, err := c.stream.Read(ctx)
348+
if err != nil {
349+
// the stream failed, we cannot continue
350+
return err
351+
}
352+
353+
// read a combined message
354+
msg := &combined{}
355+
if err := gojay.Unsafe.Unmarshal(data, msg); err != nil {
356+
// a badly formed message arrived, log it and continue
357+
// we trust the stream to have isolated the error to just this message
358+
c.logger.Info(Receive.String(),
359+
zap.Error(Errorf(0, "unmarshal failed: %v", err)),
360+
)
361+
continue
362+
}
363+
364+
// work out which kind of message we have
365+
switch {
366+
case msg.Method != "":
367+
// if method is set it must be a request
368+
req := &Request{
369+
Method: msg.Method,
370+
Params: msg.Params,
371+
ID: msg.ID,
372+
}
373+
if req.IsNotify() {
374+
c.logger.Info(Receive.String(), zap.String("req.ID", req.ID.String()), zap.String("req.Method", req.Method), zap.Any("req.Params", req.Params))
375+
c.deliver(ctx, q, req)
376+
} else {
377+
// we have a Call, add to the processor queue
378+
reqCtx, reqCancel := context.WithCancel(ctx)
379+
m := c.handling.Load().(handlingMap)
380+
m[*req.ID] = handling{
381+
request: req,
382+
cancel: reqCancel,
383+
start: time.Now(),
384+
}
385+
c.handling.Store(m)
386+
c.logger.Info(Receive.String(), zap.String("req.ID", req.ID.String()), zap.String("req.Method", req.Method), zap.Any("req.Params", req.Params))
387+
if !c.deliver(reqCtx, q, req) {
388+
// queue is full, reject the message by directly replying
389+
c.Reply(ctx, req, nil, Errorf(CodeServerOverloaded, "no room in queue"))
390+
}
391+
}
392+
393+
case msg.ID != nil:
394+
// we have a response, get the pending entry from the map
395+
m := c.pending.Load().(pendingMap)
396+
rchan := m[*msg.ID]
397+
if rchan != nil {
398+
delete(m, *msg.ID)
399+
}
400+
c.pending.Store(m)
401+
// and send the reply to the channel
402+
resp := &Response{
403+
Result: msg.Result,
404+
Error: msg.Error,
405+
ID: msg.ID,
406+
}
407+
rchan <- resp
408+
close(rchan)
409+
410+
default:
411+
c.logger.Info(Receive.String(), zap.Error(Errorf(0, "message not a call, notify or response, ignoring")))
412+
}
413+
}
414+
}
301415

302416
// Wait blocks until the connection is terminated, and returns any error that cause the termination.
303417
func (c *Conn) Wait(ctx context.Context) error { return nil }

0 commit comments

Comments
 (0)