Skip to content

Commit d0a62f5

Browse files
committed
Support request batching for HTTP
1 parent 15cb10d commit d0a62f5

File tree

11 files changed

+357
-49
lines changed

11 files changed

+357
-49
lines changed

internal/endpoint.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ type Endpoint struct {
2424

2525
// AddXForwardedHeaders adds X-Forwarded-For header to requests sent to upstream providers
2626
AddXForwardedHeaders bool `yaml:"add_xfwd_headers,omitempty"`
27+
28+
Headers map[string]string `yaml:"headers,omitempty"`
2729
}
2830

2931
// GetActiveProviders returns a list of providers that are currently considered online

internal/http.go

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ var defaultClient = &http.Client{
2525
},
2626
}
2727

28-
func ProxyHTTP(ctx context.Context, endpoint *Endpoint, req *rpc.Request, timing *servertiming.Header) (*rpc.Response, *Provider, error) {
28+
func ProxyHTTP(ctx context.Context, endpoint *Endpoint, req *rpc.BatchRequest, timing *servertiming.Header) (*rpc.BatchResponse, *Provider, error) {
2929
providers := endpoint.GetActiveProviders()
3030

3131
for _, provider := range providers {
@@ -93,15 +93,15 @@ func SendHTTPRequest(ctx context.Context, provider *Provider, body []byte) ([]by
9393
return b, nil
9494
}
9595

96-
func SendHTTPRPCRequest(ctx context.Context, p *Provider, rpcreq *rpc.Request) (*rpc.Response, error) {
97-
req := rpc.SerializeRequest(rpcreq)
96+
func SendHTTPRPCRequest(ctx context.Context, p *Provider, req *rpc.BatchRequest) (*rpc.BatchResponse, error) {
97+
body := rpc.SerializeBatchRequest(req)
9898

99-
b, err := SendHTTPRequest(ctx, p, req)
99+
b, err := SendHTTPRequest(ctx, p, body)
100100
if err != nil {
101101
return nil, err
102102
}
103103

104-
response, err := rpc.DecodeResponse(b)
104+
response, err := rpc.DecodeBatchResponse(b)
105105
if err != nil {
106106
return nil, fmt.Errorf("bad response: %w, raw: %s", err, string(b))
107107
}
@@ -127,19 +127,26 @@ func IncomingHttpHandler(ctx context.Context, endpoint *Endpoint, w http.Respons
127127
return
128128
}
129129

130-
rpcReq, err := rpc.DecodeRequest(body)
130+
req, err := rpc.DecodeBatchRequest(body)
131131
if err != nil {
132-
log.Error("http: bad request", "error", err, "msg", rpc.FormatRawBody(string(body)))
132+
log.Error("http: bad request", "error", err, "body", rpc.FormatRawBody(string(body)))
133133
http.Error(w, "bad request", http.StatusBadRequest)
134134
return
135135
}
136136

137-
log = log.With("rpc_id", rpc.GetRequestIDString(rpcReq.ID), "method", rpcReq.Method)
137+
if req.IsBatch {
138+
rpc.BatchIDCounter++
139+
log = log.With("batch_id", rpc.BatchIDCounter, "size", len(req.Requests))
140+
} else {
141+
log = log.With("rpc_id", req.Requests[0].GetID(), "method", req.Requests[0].Method)
142+
}
138143

139-
res, provider, err := ProxyHTTP(ctx, endpoint, rpcReq, timing)
144+
res, provider, err := ProxyHTTP(ctx, endpoint, req, timing)
140145

141146
if err != nil {
142-
metrics.RecordRequest(endpoint.Name, provider.Name, "http", rpcReq.Method, time.Since(start).Seconds(), true)
147+
for _, req := range req.Requests {
148+
metrics.RecordFailedRequest(endpoint.Name, provider.Name, "http", req.Method)
149+
}
143150

144151
if err == ErrNoProvidersAvailable {
145152
log.Error("no providers available")
@@ -154,9 +161,41 @@ func IncomingHttpHandler(ctx context.Context, endpoint *Endpoint, w http.Respons
154161

155162
log = log.With("provider", provider.Name, "request_time", time.Since(start))
156163

157-
log.Debug("request")
164+
if req.IsBatch {
165+
if len(res.Responses) != len(req.Requests) {
166+
log.Error("batch response size mismatch", "request size", len(req.Requests), "response size", len(res.Responses))
167+
http.Error(w, "batch response size mismatch", http.StatusInternalServerError)
168+
return
169+
}
170+
171+
// batch_id and size is already set
172+
log.Debug("batch request")
173+
}
158174

159-
metrics.RecordRequest(endpoint.Name, provider.Name, "http", rpcReq.Method, time.Since(start).Seconds(), res.IsError())
175+
for _, res := range req.Requests {
176+
if req.IsBatch {
177+
log.Debug("request", "rpc_id", res.GetID(), "method", res.Method)
178+
} else {
179+
// id and method is already set for this single request
180+
log.Debug("request")
181+
}
182+
}
183+
184+
for i, res := range res.Responses {
185+
method := req.Requests[i].Method
186+
187+
if res.IsError() {
188+
metrics.RecordFailedRequest(endpoint.Name, provider.Name, "http", method)
189+
} else {
190+
metrics.RecordRequest(endpoint.Name, provider.Name, "http", method, time.Since(start).Seconds())
191+
}
192+
}
193+
194+
if endpoint.Headers != nil {
195+
for k, v := range endpoint.Headers {
196+
w.Header().Set(k, v)
197+
}
198+
}
160199

161200
if !endpoint.Public {
162201
w.Header().Set("X-Provider", provider.Name)
@@ -168,7 +207,7 @@ func IncomingHttpHandler(ctx context.Context, endpoint *Endpoint, w http.Respons
168207

169208
w.Header().Set("Content-Type", "application/json; charset=utf-8")
170209

171-
_, err = w.Write(rpc.SerializeResponse(res))
210+
_, err = w.Write(rpc.SerializeBatchResponse(res))
172211
if err != nil {
173212
log.Error("error writing body", "error", err)
174213
return

internal/metrics/metrics.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,15 @@ func MetricsHandler() http.Handler {
7474
}
7575

7676
// RecordRequest records metrics for a request
77-
func RecordRequest(endpoint, provider, transport, method string, duration float64, failed bool) {
77+
func RecordRequest(endpoint, provider, transport, method string, duration float64) {
7878
requestsTotal.WithLabelValues(endpoint, provider, transport, method).Inc()
79-
if failed {
80-
failedRequestsTotal.WithLabelValues(endpoint, provider, transport, method).Inc()
81-
}
8279
requestDuration.WithLabelValues(endpoint, provider, transport, method).Observe(duration)
8380
}
8481

82+
func RecordFailedRequest(endpoint, provider, transport, method string) {
83+
failedRequestsTotal.WithLabelValues(endpoint, provider, transport, method).Inc()
84+
}
85+
8586
func RecordOpenConnection(endpoint, provider string) {
8687
openConnections.WithLabelValues(endpoint, provider, "ws").Inc()
8788
totalConnections.WithLabelValues(endpoint, provider, "ws").Inc()

internal/provider.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,17 +130,17 @@ func (e *Provider) Healthcheck(p *Endpoint) error {
130130
ctx := context.Background()
131131

132132
fn := func(ctx context.Context, req *rpc.Request, errRpcError bool) (*rpc.Response, error) {
133-
res, err := SendHTTPRPCRequest(ctx, e, req)
133+
res, err := SendHTTPRPCRequest(ctx, e, rpc.NewBatchRequest(req))
134134
if err != nil {
135135
return nil, err
136136
}
137137

138-
if errRpcError && res.IsError() {
139-
_, err := res.GetError()
138+
if errRpcError && res.Responses[0].IsError() {
139+
_, err := res.Responses[0].GetError()
140140
return nil, err
141141
}
142142

143-
return res, nil
143+
return res.Responses[0], nil
144144
}
145145

146146
switch p.Kind {

internal/provider_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ func TestRateLimitWithRetryAfter(t *testing.T) {
7070
}
7171

7272
// Send a request that will trigger rate limiting
73-
_, _, err := ProxyHTTP(context.Background(), provider, rpc.NewRequest("1", "eth_blockNumber", []interface{}{}), &servertiming.Header{})
73+
req := rpc.NewBatchRequest(
74+
rpc.NewRequest("1", "eth_blockNumber", []interface{}{}),
75+
)
76+
_, _, err := ProxyHTTP(context.Background(), provider, req, &servertiming.Header{})
7477

7578
// Verify the error and retry time was set correctly
7679
assert.Error(t, err)
@@ -126,7 +129,10 @@ func TestSlowProvider(t *testing.T) {
126129
timing := &servertiming.Header{}
127130

128131
// Attempt to proxy a request - it should use the fast provider because the first one is too slow
129-
resp, endpoint, err := ProxyHTTP(context.Background(), provider, rpc.NewRequest("1", "eth_blockNumber", []interface{}{}), timing)
132+
req := rpc.NewBatchRequest(
133+
rpc.NewRequest("1", "eth_blockNumber", []interface{}{}),
134+
)
135+
resp, endpoint, err := ProxyHTTP(context.Background(), provider, req, timing)
130136

131137
// Verify we got a response
132138
assert.NoError(t, err)
@@ -166,7 +172,10 @@ func TestNonRespondingProvider(t *testing.T) {
166172

167173
// Attempt to use the non-responding provider first
168174
timing := &servertiming.Header{}
169-
resp, endpoint, err := ProxyHTTP(context.Background(), provider, rpc.NewRequest("1", "eth_blockNumber", []interface{}{}), timing)
175+
req := rpc.NewBatchRequest(
176+
rpc.NewRequest("1", "eth_blockNumber", []interface{}{}),
177+
)
178+
resp, endpoint, err := ProxyHTTP(context.Background(), provider, req, timing)
170179

171180
// Verify we got a response from the working provider
172181
assert.NoError(t, err)

internal/rpc/batch.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package rpc
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
)
7+
8+
type BatchResponse struct {
9+
Responses []*Response
10+
IsBatch bool
11+
}
12+
13+
var _ json.Unmarshaler = &BatchResponse{}
14+
var _ json.Marshaler = &BatchResponse{}
15+
16+
func NewBatchResponse(res ...*Response) *BatchResponse {
17+
return &BatchResponse{
18+
Responses: res,
19+
IsBatch: len(res) > 1,
20+
}
21+
}
22+
23+
// UnmarshalJSON for a batch request supports decoding a single request as well
24+
func (r *BatchResponse) UnmarshalJSON(b []byte) error {
25+
switch b[0] {
26+
case '[':
27+
var res []*Response
28+
err := json.Unmarshal(b, &res)
29+
if err != nil {
30+
return err
31+
}
32+
r.Responses = res
33+
r.IsBatch = true
34+
return nil
35+
case '{':
36+
var res Response
37+
err := json.Unmarshal(b, &res)
38+
if err != nil {
39+
return err
40+
}
41+
r.Responses = []*Response{&res}
42+
r.IsBatch = false
43+
return nil
44+
}
45+
46+
return fmt.Errorf("invalid request: %s", FormatRawBody(string(b)))
47+
}
48+
49+
// MarshalJSON will encode a single (non batched) request to a single object or multiple requests into an array
50+
func (r BatchResponse) MarshalJSON() ([]byte, error) {
51+
if len(r.Responses) == 0 {
52+
return nil, fmt.Errorf("empty batch response")
53+
}
54+
55+
// If the batch is just one single request then unbatch it
56+
if !r.IsBatch {
57+
return json.Marshal(r.Responses[0])
58+
}
59+
60+
return json.Marshal(r.Responses)
61+
}
62+
63+
type BatchRequest struct {
64+
Requests []*Request
65+
IsBatch bool
66+
}
67+
68+
var _ json.Unmarshaler = &BatchRequest{}
69+
var _ json.Marshaler = &BatchRequest{}
70+
71+
func NewBatchRequest(req ...*Request) *BatchRequest {
72+
return &BatchRequest{
73+
Requests: req,
74+
IsBatch: len(req) > 1,
75+
}
76+
}
77+
78+
// UnmarshalJSON for a batch request supports decoding a single request as well
79+
func (r *BatchRequest) UnmarshalJSON(b []byte) error {
80+
switch b[0] {
81+
case '[':
82+
var req []*Request
83+
err := json.Unmarshal(b, &req)
84+
if err != nil {
85+
return err
86+
}
87+
r.Requests = req
88+
r.IsBatch = true
89+
return nil
90+
case '{':
91+
var req Request
92+
err := json.Unmarshal(b, &req)
93+
if err != nil {
94+
return err
95+
}
96+
r.Requests = []*Request{&req}
97+
r.IsBatch = false
98+
return nil
99+
}
100+
101+
return fmt.Errorf("invalid request: %s", FormatRawBody(string(b)))
102+
}
103+
104+
// MarshalJSON will encode a single (non batched) request to a single object or multiple requests into an array
105+
func (r BatchRequest) MarshalJSON() ([]byte, error) {
106+
if len(r.Requests) == 0 {
107+
return nil, fmt.Errorf("empty batch request")
108+
}
109+
110+
// If the batch is just one single request then unbatch it
111+
if !r.IsBatch {
112+
return json.Marshal(r.Requests[0])
113+
}
114+
115+
return json.Marshal(r.Requests)
116+
}
117+
118+
func DecodeBatchRequest(b []byte) (*BatchRequest, error) {
119+
var batch BatchRequest
120+
err := json.Unmarshal(b, &batch)
121+
if err != nil {
122+
return nil, err
123+
}
124+
return &batch, nil
125+
}
126+
127+
func DecodeBatchResponse(b []byte) (*BatchResponse, error) {
128+
var batch BatchResponse
129+
err := json.Unmarshal(b, &batch)
130+
if err != nil {
131+
return nil, err
132+
}
133+
return &batch, nil
134+
}
135+
136+
func SerializeBatchRequest(req *BatchRequest) []byte {
137+
b, err := json.Marshal(req)
138+
if err != nil {
139+
panic(err)
140+
}
141+
return b
142+
}
143+
144+
func SerializeBatchResponse(res *BatchResponse) []byte {
145+
b, err := json.Marshal(res)
146+
if err != nil {
147+
panic(err)
148+
}
149+
return b
150+
}

0 commit comments

Comments
 (0)