Skip to content

Commit 6d92fdc

Browse files
author
Bas van Kervel
committed
added support for batch requests
1 parent 22c7ce0 commit 6d92fdc

File tree

3 files changed

+78
-21
lines changed

3 files changed

+78
-21
lines changed

rpc/codec/codec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ type Codec int
1212
// (de)serialization support for rpc interface
1313
type ApiCoder interface {
1414
// Parse message to request from underlying stream
15-
ReadRequest() (*shared.Request, error)
15+
ReadRequest() ([]*shared.Request, bool, error)
1616
// Parse response message from underlying stream
1717
ReadResponse() (interface{}, error)
1818
// Encode response to encoded form in underlying stream

rpc/codec/json.go

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,33 +8,53 @@ import (
88
)
99

1010
const (
11-
MAX_RESPONSE_SIZE = 64 * 1024
11+
MAX_REQUEST_SIZE = 1024 * 1024
12+
MAX_RESPONSE_SIZE = 1024 * 1024
1213
)
1314

1415
// Json serialization support
1516
type JsonCodec struct {
1617
c net.Conn
17-
d *json.Decoder
18-
e *json.Encoder
18+
buffer []byte
19+
bytesInBuffer int
1920
}
2021

2122
// Create new JSON coder instance
2223
func NewJsonCoder(conn net.Conn) ApiCoder {
2324
return &JsonCodec{
2425
c: conn,
25-
d: json.NewDecoder(conn),
26-
e: json.NewEncoder(conn),
26+
buffer: make([]byte, MAX_REQUEST_SIZE),
27+
bytesInBuffer: 0,
2728
}
2829
}
2930

3031
// Serialize obj to JSON and write it to conn
31-
func (self *JsonCodec) ReadRequest() (*shared.Request, error) {
32-
req := shared.Request{}
33-
err := self.d.Decode(&req)
32+
func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool, err error) {
33+
n, err := self.c.Read(self.buffer[self.bytesInBuffer:])
34+
if err != nil {
35+
self.bytesInBuffer = 0
36+
return nil, false, err
37+
}
38+
39+
self.bytesInBuffer += n
40+
41+
singleRequest := shared.Request{}
42+
err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &singleRequest)
3443
if err == nil {
35-
return &req, nil
44+
self.bytesInBuffer = 0
45+
requests := make([]*shared.Request, 1)
46+
requests[0] = &singleRequest
47+
return requests, false, nil
3648
}
37-
return nil, err
49+
50+
requests = make([]*shared.Request, 0)
51+
err = json.Unmarshal(self.buffer[:self.bytesInBuffer], &requests)
52+
if err == nil {
53+
self.bytesInBuffer = 0
54+
return requests, true, nil
55+
}
56+
57+
return nil, false, err
3858
}
3959

4060
func (self *JsonCodec) ReadResponse() (interface{}, error) {
@@ -66,7 +86,24 @@ func (self *JsonCodec) Encode(msg interface{}) ([]byte, error) {
6686

6787
// Parse JSON data from conn to obj
6888
func (self *JsonCodec) WriteResponse(res interface{}) error {
69-
return self.e.Encode(&res)
89+
data, err := json.Marshal(res)
90+
if err != nil {
91+
self.c.Close()
92+
return err
93+
}
94+
95+
bytesWritten := 0
96+
97+
for bytesWritten < len(data) {
98+
n, err := self.c.Write(data[bytesWritten:])
99+
if err != nil {
100+
self.c.Close()
101+
return err
102+
}
103+
bytesWritten += n
104+
}
105+
106+
return nil
70107
}
71108

72109
// Close decoder and encoder

rpc/comms/comms.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) {
4747
codec := c.New(conn)
4848

4949
for {
50-
req, err := codec.ReadRequest()
50+
requests, isBatch, err := codec.ReadRequest()
5151
if err == io.EOF {
5252
codec.Close()
5353
return
@@ -57,15 +57,35 @@ func handle(conn net.Conn, api shared.EthereumApi, c codec.Codec) {
5757
return
5858
}
5959

60-
var rpcResponse interface{}
61-
res, err := api.Execute(req)
60+
if isBatch {
61+
responses := make([]*interface{}, len(requests))
62+
responseCount := 0
63+
for _, req := range requests {
64+
res, err := api.Execute(req)
65+
if req.Id != nil {
66+
rpcResponse := shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err)
67+
responses[responseCount] = rpcResponse
68+
responseCount += 1
69+
}
70+
}
6271

63-
rpcResponse = shared.NewRpcResponse(req.Id, req.Jsonrpc, res, err)
64-
err = codec.WriteResponse(rpcResponse)
65-
if err != nil {
66-
glog.V(logger.Error).Infof("comms send err - %v\n", err)
67-
codec.Close()
68-
return
72+
err = codec.WriteResponse(responses[:responseCount])
73+
if err != nil {
74+
glog.V(logger.Error).Infof("comms send err - %v\n", err)
75+
codec.Close()
76+
return
77+
}
78+
} else {
79+
var rpcResponse interface{}
80+
res, err := api.Execute(requests[0])
81+
82+
rpcResponse = shared.NewRpcResponse(requests[0].Id, requests[0].Jsonrpc, res, err)
83+
err = codec.WriteResponse(rpcResponse)
84+
if err != nil {
85+
glog.V(logger.Error).Infof("comms send err - %v\n", err)
86+
codec.Close()
87+
return
88+
}
6989
}
7090
}
7191
}

0 commit comments

Comments
 (0)