Skip to content

Commit ee66ac2

Browse files
committed
Support client call handlers
1 parent dd97b81 commit ee66ac2

File tree

7 files changed

+84
-30
lines changed

7 files changed

+84
-30
lines changed

client.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,13 +226,22 @@ func websocketClient(ctx context.Context, addr string, namespace string, outs []
226226
exiting := make(chan struct{})
227227
c.exiting = exiting
228228

229+
var hnd reqestHandler
230+
if len(config.reverseHandlers) > 0 {
231+
h := makeHandler(defaultServerConfig())
232+
for _, reverseHandler := range config.reverseHandlers {
233+
h.register(reverseHandler.ns, reverseHandler.hnd)
234+
}
235+
hnd = h
236+
}
237+
229238
go (&wsConn{
230239
conn: conn,
231240
connFactory: connFactory,
232241
reconnectBackoff: config.reconnectBackoff,
233242
pingInterval: config.pingInterval,
234243
timeout: config.timeout,
235-
handler: nil,
244+
handler: hnd,
236245
requests: requests,
237246
stop: stop,
238247
exiting: exiting,

handler.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ import (
2020
"github.com/filecoin-project/go-jsonrpc/metrics"
2121
)
2222

23-
type rpcHandler struct {
23+
// methodHandler is a handler for a single method
24+
type methodHandler struct {
2425
paramReceivers []reflect.Type
2526
nParams int
2627

@@ -94,9 +95,34 @@ type response struct {
9495
Error *respError `json:"error,omitempty"`
9596
}
9697

98+
type handler struct {
99+
methods map[string]methodHandler
100+
errors *Errors
101+
102+
maxRequestSize int64
103+
104+
// aliasedMethods contains a map of alias:original method names.
105+
// These are used as fallbacks if a method is not found by the given method name.
106+
aliasedMethods map[string]string
107+
108+
paramDecoders map[reflect.Type]ParamDecoder
109+
}
110+
111+
func makeHandler(sc ServerConfig) *handler {
112+
return &handler{
113+
methods: make(map[string]methodHandler),
114+
errors: sc.errors,
115+
116+
aliasedMethods: map[string]string{},
117+
paramDecoders: sc.paramDecoders,
118+
119+
maxRequestSize: sc.maxRequestSize,
120+
}
121+
}
122+
97123
// Register
98124

99-
func (s *RPCServer) register(namespace string, r interface{}) {
125+
func (s *handler) register(namespace string, r interface{}) {
100126
val := reflect.ValueOf(r)
101127
// TODO: expect ptr
102128

@@ -117,7 +143,7 @@ func (s *RPCServer) register(namespace string, r interface{}) {
117143

118144
valOut, errOut, _ := processFuncOut(funcType)
119145

120-
s.methods[namespace+"."+method.Name] = rpcHandler{
146+
s.methods[namespace+"."+method.Name] = methodHandler{
121147
paramReceivers: recvs,
122148
nParams: ins,
123149

@@ -137,7 +163,7 @@ func (s *RPCServer) register(namespace string, r interface{}) {
137163
type rpcErrFunc func(w func(func(io.Writer)), req *request, code ErrorCode, err error)
138164
type chanOut func(reflect.Value, interface{}) error
139165

140-
func (s *RPCServer) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
166+
func (s *handler) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
141167
wf := func(cb func(io.Writer)) {
142168
cb(w)
143169
}
@@ -194,7 +220,7 @@ func doCall(methodName string, f reflect.Value, params []reflect.Value) (out []r
194220
return out, nil
195221
}
196222

197-
func (s *RPCServer) getSpan(ctx context.Context, req request) (context.Context, *trace.Span) {
223+
func (s *handler) getSpan(ctx context.Context, req request) (context.Context, *trace.Span) {
198224
if req.Meta == nil {
199225
return ctx, nil
200226
}
@@ -221,7 +247,7 @@ func (s *RPCServer) getSpan(ctx context.Context, req request) (context.Context,
221247
return ctx, span
222248
}
223249

224-
func (s *RPCServer) createError(err error) *respError {
250+
func (s *handler) createError(err error) *respError {
225251
var code ErrorCode = 1
226252
if s.errors != nil {
227253
c, ok := s.errors.byType[reflect.TypeOf(err)]
@@ -245,7 +271,7 @@ func (s *RPCServer) createError(err error) *respError {
245271
return out
246272
}
247273

248-
func (s *RPCServer) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut) {
274+
func (s *handler) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut) {
249275
// Not sure if we need to sanitize the incoming req.Method or not.
250276
ctx, span := s.getSpan(ctx, req)
251277
ctx, _ = tag.New(ctx, tag.Insert(metrics.RPCMethod, req.Method))

options.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ import (
1010

1111
type ParamEncoder func(reflect.Value) (reflect.Value, error)
1212

13+
type clientHandler struct {
14+
ns string
15+
hnd interface{}
16+
}
17+
1318
type Config struct {
1419
reconnectBackoff backoff
1520
pingInterval time.Duration
@@ -18,6 +23,8 @@ type Config struct {
1823
paramEncoders map[reflect.Type]ParamEncoder
1924
errors *Errors
2025

26+
reverseHandlers []clientHandler
27+
2128
httpClient *http.Client
2229

2330
noReconnect bool
@@ -81,6 +88,12 @@ func WithErrors(es Errors) func(c *Config) {
8188
}
8289
}
8390

91+
func WithClientHandler(ns string, hnd interface{}) func(c *Config) {
92+
return func(c *Config) {
93+
c.reverseHandlers = append(c.reverseHandlers, clientHandler{ns, hnd})
94+
}
95+
}
96+
8497
func WithHTTPClient(h *http.Client) func(c *Config) {
8598
return func(c *Config) {
8699
c.httpClient = h

options_server.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"golang.org/x/xerrors"
99
)
1010

11+
// note: we embed reflect.Type because proxy-structs are not comparable
1112
type jsonrpcReverseClient struct{ reflect.Type }
1213

1314
type ParamDecoder func(ctx context.Context, json []byte) (reflect.Value, error)
@@ -57,6 +58,8 @@ func WithServerPingInterval(d time.Duration) ServerOption {
5758
}
5859
}
5960

61+
// WithReverseClient will allow extracting reverse client on **WEBSOCKET** calls.
62+
// RP is a proxy-struct type, much like the one passed to NewClient.
6063
func WithReverseClient[RP any](namespace string) ServerOption {
6164
return func(c *ServerConfig) {
6265
c.reverseClientBuilder = func(ctx context.Context, conn *wsConn) (context.Context, error) {
@@ -86,6 +89,11 @@ func WithReverseClient[RP any](namespace string) ServerOption {
8689
}
8790
}
8891

92+
// ExtractReverseClient will extract reverse client from context. Reverse client for the type
93+
// will only be present if the server was constructed with a matching WithReverseClient option
94+
// and the connection was a websocket connection.
95+
// If there is no reverse client, the call will return a zero value and `false`. Otherwise a reverse
96+
// client and `true` will be returned.
8997
func ExtractReverseClient[C any](ctx context.Context) (C, bool) {
9098
c, ok := ctx.Value(jsonrpcReverseClient{reflect.TypeOf(new(C)).Elem()}).(*C)
9199
if !ok {

rpc_test.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,8 +1142,8 @@ func TestIDHandling(t *testing.T) {
11421142
}
11431143
}
11441144

1145-
// 1. make server call on client
1146-
// 2. make client handle
1145+
// 1. make server call on client **
1146+
// 2. make client handle **
11471147
// 3. alias on client
11481148
// 4. alias call on server
11491149
// 6. custom/object param type
@@ -1174,11 +1174,17 @@ type RevCallTestClientProxy struct {
11741174
CallOnClient func(int) (int, error)
11751175
}
11761176

1177-
func TestServerCallUser(t *testing.T) {
1177+
type RevCallTestClientHandler struct {
1178+
}
1179+
1180+
func (h *RevCallTestClientHandler) CallOnClient(a int) (int, error) {
1181+
return a * 2, nil
1182+
}
1183+
1184+
func TestReverseCall(t *testing.T) {
11781185
// setup server
11791186

11801187
rpcServer := NewServer(WithReverseClient[RevCallTestClientProxy]("Client"))
1181-
11821188
rpcServer.Register("Server", &RevCallTestServerHandler{})
11831189

11841190
// httptest stuff
@@ -1192,9 +1198,11 @@ func TestServerCallUser(t *testing.T) {
11921198
}
11931199
closer, err := NewMergeClient(context.Background(), "ws://"+testServ.Listener.Addr().String(), "Server", []interface{}{
11941200
&client,
1195-
}, nil)
1201+
}, nil, WithClientHandler("Client", &RevCallTestClientHandler{}))
11961202
require.NoError(t, err)
11971203

1204+
// do the call!
1205+
11981206
e := client.Call()
11991207
require.NoError(t, e)
12001208

server.go

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"io"
77
"net/http"
8-
"reflect"
98
"strings"
109
"time"
1110

@@ -20,19 +19,10 @@ const (
2019

2120
// RPCServer provides a jsonrpc 2.0 http server handler
2221
type RPCServer struct {
23-
methods map[string]rpcHandler
24-
errors *Errors
25-
26-
// aliasedMethods contains a map of alias:original method names.
27-
// These are used as fallbacks if a method is not found by the given method name.
28-
aliasedMethods map[string]string
29-
30-
paramDecoders map[reflect.Type]ParamDecoder
31-
22+
*handler
3223
reverseClientBuilder func(context.Context, *wsConn) (context.Context, error)
3324

3425
pingInterval time.Duration
35-
maxRequestSize int64
3626
}
3727

3828
// NewServer creates new RPCServer instance
@@ -43,12 +33,8 @@ func NewServer(opts ...ServerOption) *RPCServer {
4333
}
4434

4535
return &RPCServer{
46-
methods: map[string]rpcHandler{},
47-
aliasedMethods: map[string]string{},
48-
paramDecoders: config.paramDecoders,
36+
handler: makeHandler(config),
4937
reverseClientBuilder: config.reverseClientBuilder,
50-
maxRequestSize: config.maxRequestSize,
51-
errors: config.errors,
5238

5339
pingInterval: config.pingInterval,
5440
}

websocket.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,18 @@ type outChanReg struct {
4545
ch reflect.Value
4646
}
4747

48+
type reqestHandler interface {
49+
handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut)
50+
}
51+
4852
type wsConn struct {
4953
// outside params
5054
conn *websocket.Conn
5155
connFactory func() (*websocket.Conn, error)
5256
reconnectBackoff backoff
5357
pingInterval time.Duration
5458
timeout time.Duration
55-
handler *RPCServer
59+
handler reqestHandler
5660
requests <-chan clientRequest
5761
pongs chan struct{}
5862
stopPings func()

0 commit comments

Comments
 (0)