Skip to content

Commit eec054d

Browse files
authored
rpc: RPC recording functions (#562)
* rpc: RPC recording functions * fork.yaml: update fork.yaml with note about RPC recording functionality * rpc: improve MsgError to return JsonError to avoid interface around typed nil
1 parent 37be9e0 commit eec054d

File tree

9 files changed

+318
-5
lines changed

9 files changed

+318
-5
lines changed

fork.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,18 @@ def:
327327
- title: "Geth extras"
328328
description: Extend the tools available in geth to improve external testing and tooling.
329329
sub:
330+
- title: JSON-RPC recording
331+
description: |
332+
Extend server and client with configurable JSON-RPC message recording.
333+
This enables loggers and advanced metrics to be attached to server and client sides.
334+
globs:
335+
- "rpc/client.go"
336+
- "rpc/client_opt.go"
337+
- "rpc/handler.go"
338+
- "rpc/inproc.go"
339+
- "rpc/recording.go"
340+
- "rpc/server.go"
341+
- "rpc/subscription.go"
330342
- title: Simulated Backend
331343
globs:
332344
- "accounts/abi/bind/backends/simulated.go"

rpc/client.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ type Client struct {
105105
reqInit chan *requestOp // register response IDs, takes write lock
106106
reqSent chan error // signals write completion, releases write lock
107107
reqTimeout chan *requestOp // removes response IDs when call timeout expires
108+
109+
recorder Recorder // optional, may be nil
108110
}
109111

110112
type reconnectFunc func(context.Context) (ServerCodec, error)
@@ -121,6 +123,7 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn {
121123
ctx = context.WithValue(ctx, clientContextKey{}, c)
122124
ctx = context.WithValue(ctx, peerInfoContextKey{}, conn.peerInfo())
123125
handler := newHandler(ctx, conn, c.idgen, c.services, c.batchItemLimit, c.batchResponseMaxSize)
126+
handler.recorder = c.recorder
124127
return &clientConn{conn, handler}
125128
}
126129

@@ -258,6 +261,7 @@ func initClient(conn ServerCodec, services *serviceRegistry, cfg *clientConfig)
258261
reqInit: make(chan *requestOp),
259262
reqSent: make(chan error, 1),
260263
reqTimeout: make(chan *requestOp),
264+
recorder: cfg.recorder,
261265
}
262266

263267
// Set defaults.
@@ -343,6 +347,10 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str
343347
if err != nil {
344348
return err
345349
}
350+
var recordDone RecordDone
351+
if c.recorder != nil {
352+
recordDone = c.recorder.RecordOutgoing(ctx, msg)
353+
}
346354
op := &requestOp{
347355
ids: []json.RawMessage{msg.ID},
348356
resp: make(chan []*jsonrpcMessage, 1),
@@ -363,6 +371,9 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str
363371
return err
364372
}
365373
resp := batchresp[0]
374+
if recordDone != nil {
375+
recordDone(ctx, msg, resp)
376+
}
366377
switch {
367378
case resp.Error != nil:
368379
return resp.Error
@@ -415,7 +426,13 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
415426
op.ids[i] = msg.ID
416427
byID[string(msg.ID)] = i
417428
}
418-
429+
var recordDone []RecordDone
430+
if c.recorder != nil {
431+
recordDone = make([]RecordDone, len(b))
432+
for i, msg := range msgs {
433+
recordDone[i] = c.recorder.RecordOutgoing(ctx, msg)
434+
}
435+
}
419436
var err error
420437
if c.isHTTP {
421438
err = c.sendBatchHTTP(ctx, op, msgs)
@@ -446,6 +463,10 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
446463
}
447464
delete(byID, string(resp.ID))
448465

466+
if recordDone != nil {
467+
recordDone[index](ctx, msgs[index], resp)
468+
}
469+
449470
// Assign result and error.
450471
elem := &b[index]
451472
switch {
@@ -462,6 +483,9 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
462483
for _, index := range byID {
463484
elem := &b[index]
464485
elem.Error = ErrMissingBatchResponse
486+
if recordDone != nil {
487+
recordDone[index](ctx, msgs[index], nil)
488+
}
465489
}
466490

467491
return err

rpc/client_opt.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ type clientConfig struct {
4141
idgen func() ID
4242
batchItemLimit int
4343
batchResponseLimit int
44+
45+
recorder Recorder
4446
}
4547

4648
func (cfg *clientConfig) initHeaders() {

rpc/handler.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ type handler struct {
6868

6969
subLock sync.Mutex
7070
serverSubs map[ID]*Subscription
71+
72+
// optional, may be nil
73+
recorder Recorder
7174
}
7275

7376
type callProc struct {
@@ -450,6 +453,12 @@ func (h *handler) handleResponses(batch []*jsonrpcMessage, handleCall func(*json
450453

451454
// handleSubscriptionResult processes subscription notifications.
452455
func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) {
456+
if h.recorder != nil {
457+
recordDone := h.recorder.RecordIncoming(h.rootCtx, msg)
458+
if recordDone != nil {
459+
defer recordDone(h.rootCtx, msg, nil)
460+
}
461+
}
453462
var result subscriptionResult
454463
if err := json.Unmarshal(msg.Params, &result); err != nil {
455464
h.log.Debug("Dropping invalid subscription message")
@@ -461,7 +470,19 @@ func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) {
461470
}
462471

463472
// handleCallMsg executes a call message and returns the answer.
464-
func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
473+
func (h *handler) handleCallMsg(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
474+
var recordDone RecordDone
475+
if h.recorder != nil {
476+
recordDone = h.recorder.RecordIncoming(cp.ctx, msg)
477+
}
478+
out := h.handleCallMsgInner(cp, msg)
479+
if recordDone != nil {
480+
recordDone(cp.ctx, msg, out)
481+
}
482+
return out
483+
}
484+
485+
func (h *handler) handleCallMsgInner(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
465486
start := time.Now()
466487
switch {
467488
case msg.isNotification():

rpc/inproc.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ import (
2222
)
2323

2424
// DialInProc attaches an in-process connection to the given RPC server.
25-
func DialInProc(handler *Server) *Client {
25+
func DialInProc(handler *Server, options ...ClientOption) *Client {
2626
initctx := context.Background()
2727
cfg := new(clientConfig)
28+
for _, opt := range options {
29+
opt.applyOption(cfg)
30+
}
2831
c, _ := newClient(initctx, cfg, func(context.Context) (ServerCodec, error) {
2932
p1, p2 := net.Pipe()
3033
go handler.ServeCodec(NewCodec(p1), 0)

rpc/recording.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package rpc
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
)
7+
8+
// RecordDone is called after an incoming input (request or notification) has successfully been processed,
9+
// and captures the output (nil or response).
10+
type RecordDone func(ctx context.Context, input, output RecordedMsg)
11+
12+
// Recorder captures RPC traffic
13+
type Recorder interface {
14+
// RecordIncoming records an incoming message (request or notification), before it has been processed.
15+
// It may optionally return a function to capture the result of the processing (response or nil).
16+
RecordIncoming(ctx context.Context, msg RecordedMsg) RecordDone
17+
18+
// RecordOutgoing records an outgoing message (request or notification), before it has been sent.
19+
// It may optionally return a function to capture the result of the request (response or nil).
20+
RecordOutgoing(ctx context.Context, msg RecordedMsg) RecordDone
21+
}
22+
23+
// RecordedMsg wraps around the internal jsonrpcMessage type,
24+
// to provide a public read-only interface for recording of RPC activity.
25+
// Every method name is prefixed with "Msg", to avoid conflict with internal methods and future geth changes.
26+
type RecordedMsg interface {
27+
MsgIsNotification() bool
28+
MsgIsResponse() bool
29+
MsgID() json.RawMessage
30+
MsgMethod() string
31+
MsgParams() json.RawMessage
32+
MsgError() *JsonError
33+
MsgResult() json.RawMessage
34+
}
35+
36+
var _ RecordedMsg = (*jsonrpcMessage)(nil)
37+
38+
func (msg *jsonrpcMessage) MsgIsNotification() bool {
39+
return msg.isNotification()
40+
}
41+
42+
func (msg *jsonrpcMessage) MsgIsResponse() bool {
43+
return msg.isResponse()
44+
}
45+
46+
func (msg *jsonrpcMessage) MsgID() json.RawMessage {
47+
return msg.ID
48+
}
49+
func (msg *jsonrpcMessage) MsgMethod() string {
50+
return msg.Method
51+
}
52+
func (msg *jsonrpcMessage) MsgParams() json.RawMessage {
53+
return msg.Params
54+
}
55+
func (msg *jsonrpcMessage) MsgError() *JsonError {
56+
return msg.Error
57+
}
58+
func (msg *jsonrpcMessage) MsgResult() json.RawMessage {
59+
return msg.Result
60+
}
61+
62+
var _ RecordedMsg = (*jsonrpcSubscriptionNotification)(nil)
63+
64+
func (notif *jsonrpcSubscriptionNotification) MsgIsNotification() bool {
65+
return true
66+
}
67+
68+
func (notif *jsonrpcSubscriptionNotification) MsgIsResponse() bool {
69+
return false
70+
}
71+
72+
func (notif *jsonrpcSubscriptionNotification) MsgID() json.RawMessage {
73+
return json.RawMessage{} // notifications do not have an ID
74+
}
75+
76+
func (notif *jsonrpcSubscriptionNotification) MsgMethod() string {
77+
return notif.Method
78+
}
79+
80+
func (notif *jsonrpcSubscriptionNotification) MsgParams() json.RawMessage {
81+
data, _ := json.Marshal(notif.Params)
82+
return data
83+
}
84+
85+
func (notif *jsonrpcSubscriptionNotification) MsgError() *JsonError {
86+
return nil
87+
}
88+
89+
func (notif *jsonrpcSubscriptionNotification) MsgResult() json.RawMessage {
90+
return nil
91+
}
92+
93+
// WithRecorder attaches a recorder to an RPC client, useful when it is serving bidirectional RPC
94+
func WithRecorder(r Recorder) ClientOption {
95+
return optionFunc(func(cfg *clientConfig) {
96+
cfg.recorder = r
97+
})
98+
}

0 commit comments

Comments
 (0)