Skip to content

Commit ce77406

Browse files
committed
1 parent 77f8379 commit ce77406

31 files changed

+2597
-1381
lines changed

codes.go

Lines changed: 0 additions & 78 deletions
This file was deleted.

conn.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Copyright 2020 The Go Language Server Authors.
2+
// SPDX-License-Identifier: BSD-3-Clause
3+
4+
package jsonrpc2
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"sync"
10+
"sync/atomic"
11+
)
12+
13+
// Conn is the common interface to jsonrpc clients and servers.
14+
//
15+
// Conn is bidirectional; it does not have a designated server or client end.
16+
// It manages the jsonrpc2 protocol, connecting responses back to their calls.
17+
type Conn interface {
18+
// Call invokes the target method and waits for a response.
19+
// The params will be marshaled to JSON before sending over the wire, and will
20+
// be handed to the method invoked.
21+
// The response will be unmarshaled from JSON into the result.
22+
// The id returned will be unique from this connection, and can be used for
23+
// logging or tracking.
24+
Call(ctx context.Context, method string, params, result interface{}) (ID, error)
25+
26+
// Notify invokes the target method but does not wait for a response.
27+
// The params will be marshaled to JSON before sending over the wire, and will
28+
// be handed to the method invoked.
29+
Notify(ctx context.Context, method string, params interface{}) error
30+
31+
// Go starts a goroutine to handle the connection.
32+
// It must be called exactly once for each Conn.
33+
// It returns immediately.
34+
// You must block on Done() to wait for the connection to shut down.
35+
// This is a temporary measure, this should be started automatically in the
36+
// future.
37+
Go(ctx context.Context, handler Handler)
38+
39+
// Close closes the connection and it's underlying stream.
40+
// It does not wait for the close to complete, use the Done() channel for
41+
// that.
42+
Close() error
43+
44+
// Done returns a channel that will be closed when the processing goroutine
45+
// has terminated, which will happen if Close() is called or an underlying
46+
// stream is closed.
47+
Done() <-chan struct{}
48+
49+
// Err returns an error if there was one from within the processing goroutine.
50+
// If err returns non nil, the connection will be already closed or closing.
51+
Err() error
52+
}
53+
54+
type conn struct {
55+
seq int64 // access atomic
56+
writeMu sync.Mutex // protects writes to the stream
57+
stream Stream
58+
pendingMu sync.Mutex // protects the pending map
59+
pending map[ID]chan *Response
60+
61+
done chan struct{}
62+
err atomic.Value
63+
}
64+
65+
// NewConn creates a new connection object around the supplied stream.
66+
func NewConn(s Stream) Conn {
67+
conn := &conn{
68+
stream: s,
69+
pending: make(map[ID]chan *Response),
70+
done: make(chan struct{}),
71+
}
72+
73+
return conn
74+
}
75+
76+
// Notify implemens Conn.
77+
func (c *conn) Notify(ctx context.Context, method string, params interface{}) (err error) {
78+
notify, err := NewNotification(method, params)
79+
if err != nil {
80+
return fmt.Errorf("marshaling notify parameters: %v", err)
81+
}
82+
83+
_, err = c.write(ctx, notify)
84+
return err
85+
}
86+
87+
func (c *conn) replier(req Message) Replier {
88+
reply := func(ctx context.Context, result interface{}, err error) error {
89+
call, ok := req.(*Request)
90+
if !ok {
91+
// request was a notify, no need to respond
92+
return nil
93+
}
94+
95+
response, err := NewResponse(call.id, result, err)
96+
if err != nil {
97+
return err
98+
}
99+
100+
_, err = c.write(ctx, response)
101+
if err != nil {
102+
// TODO(iancottrell): if a stream write fails, we really need to shut down
103+
// the whole stream
104+
return err
105+
}
106+
return nil
107+
}
108+
109+
return reply
110+
}
111+
112+
func (c *conn) write(ctx context.Context, msg Message) (n int64, err error) {
113+
c.writeMu.Lock()
114+
n, err = c.stream.Write(ctx, msg)
115+
c.writeMu.Unlock()
116+
return n, err
117+
}
118+
119+
// Go implemens Conn.
120+
func (c *conn) Go(ctx context.Context, handler Handler) {
121+
go c.run(ctx, handler)
122+
}
123+
124+
func (c *conn) run(ctx context.Context, handler Handler) {
125+
defer close(c.done)
126+
127+
for {
128+
// get the next message
129+
msg, _, err := c.stream.Read(ctx)
130+
if err != nil {
131+
// The stream failed, we cannot continue.
132+
c.fail(err)
133+
return
134+
}
135+
136+
switch msg := msg.(type) {
137+
case Requester:
138+
if err := handler(ctx, c.replier(msg), msg); err != nil {
139+
// delivery failed, not much we can do
140+
c.fail(err)
141+
return
142+
}
143+
144+
case *Response:
145+
// If method is not set, this should be a response, in which case we must
146+
// have an id to send the response back to the caller.
147+
c.pendingMu.Lock()
148+
rchan, ok := c.pending[msg.id]
149+
c.pendingMu.Unlock()
150+
if ok {
151+
rchan <- msg
152+
}
153+
}
154+
}
155+
}
156+
157+
// Close implemens Conn.
158+
func (c *conn) Close() error {
159+
return c.stream.Close()
160+
}
161+
162+
// Done implemens Conn.
163+
func (c *conn) Done() <-chan struct{} {
164+
return c.done
165+
}
166+
167+
// Err implemens Conn.
168+
func (c *conn) Err() error {
169+
if err := c.err.Load(); err != nil {
170+
return err.(error)
171+
}
172+
return nil
173+
}
174+
175+
// fail sets a failure condition on the stream and closes it.
176+
func (c *conn) fail(err error) {
177+
c.err.Store(err)
178+
c.stream.Close()
179+
}

conn_gojay.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2020 The Go Language Server Authors.
2+
// SPDX-License-Identifier: BSD-3-Clause
3+
4+
// +build gojay
5+
6+
package jsonrpc2
7+
8+
import (
9+
"bytes"
10+
"context"
11+
"fmt"
12+
"sync/atomic"
13+
14+
"github.com/francoispqt/gojay"
15+
)
16+
17+
// Call implements Conn.
18+
func (c *conn) Call(ctx context.Context, method string, params, result interface{}) (ID, error) {
19+
// generate a new request identifier
20+
id := ID{number: atomic.AddInt64(&c.seq, 1)}
21+
call, err := NewRequest(id, method, params)
22+
if err != nil {
23+
return id, fmt.Errorf("marshaling call parameters: %v", err)
24+
}
25+
26+
// We have to add ourselves to the pending map before we send, otherwise we
27+
// are racing the response. Also add a buffer to rchan, so that if we get a
28+
// wire response between the time this call is cancelled and id is deleted
29+
// from c.pending, the send to rchan will not block.
30+
rchan := make(chan *Response, 1)
31+
32+
c.pendingMu.Lock()
33+
c.pending[id] = rchan
34+
c.pendingMu.Unlock()
35+
36+
defer func() {
37+
c.pendingMu.Lock()
38+
delete(c.pending, id)
39+
c.pendingMu.Unlock()
40+
}()
41+
42+
// now we are ready to send
43+
_, err = c.write(ctx, call)
44+
if err != nil {
45+
// sending failed, we will never get a response, so don't leave it pending
46+
return id, err
47+
}
48+
49+
// now wait for the response
50+
select {
51+
case response := <-rchan:
52+
switch {
53+
case response.err != nil:
54+
return id, response.err
55+
56+
case result == nil || len(response.result) == 0:
57+
return id, nil
58+
59+
default:
60+
dec := gojay.BorrowDecoder(bytes.NewReader(response.result))
61+
defer dec.Release()
62+
if err := dec.Decode(&result); err != nil {
63+
return id, fmt.Errorf("unmarshaling result: %v", err)
64+
}
65+
return id, nil
66+
}
67+
68+
case <-ctx.Done():
69+
return id, ctx.Err()
70+
}
71+
}

0 commit comments

Comments
 (0)