-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrpc_client.go
More file actions
223 lines (188 loc) · 7.38 KB
/
rpc_client.go
File metadata and controls
223 lines (188 loc) · 7.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package dasmon
import (
"context"
"errors"
"fmt"
"io"
"reflect"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/types"
eth "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
// Initialize the encoder
var enc = &encoder.SszNetworkEncoder{}
const (
responseCodeSuccess = 0x00
responseCodeInvalidRequest = 0x01
responseCodeServerError = 0x02
forkDigestLength = 4
// RPC protocol identifiers following Ethereum consensus layer specification
// Format: /eth2/beacon_chain/req/<method>/<version>/<encoding>
protoDataColumnsByRange = "/eth2/beacon_chain/req/data_column_sidecars_by_range/1/ssz_snappy"
protoDataColumnsByRoot = "/eth2/beacon_chain/req/data_column_sidecars_by_root/1/ssz_snappy"
protoStatus = "/eth2/beacon_chain/req/status/2/ssz_snappy"
protoMetadata = "/eth2/beacon_chain/req/metadata/2/ssz_snappy"
)
type RpcClient struct {
host.Host
}
func NewClient(h host.Host) *RpcClient {
return &RpcClient{
Host: h,
}
}
// newInstance allocates a new instance of type T.
// For pointer types, it allocates the underlying type and returns a pointer to it.
// For value types, it returns the zero value.
func newInstance[T any]() T {
var zero T
typ := reflect.TypeOf(zero)
if typ != nil && typ.Kind() == reflect.Ptr {
// Allocate new instance of the underlying type
return reflect.New(typ.Elem()).Interface().(T)
}
return zero
}
// readStatusCode reads the 1-byte response code and optional error message
func readStatusCode(stream io.Reader) (uint8, string, error) {
b := make([]byte, 1)
if _, err := io.ReadFull(stream, b); err != nil {
return 0, "", fmt.Errorf("failed to read status code: %w", err)
}
code := b[0]
if code != responseCodeSuccess {
// Read error message (varint length prefix + message)
msg := &types.ErrorMessage{}
if err := enc.DecodeWithMaxLength(stream, msg); err != nil {
return code, "", fmt.Errorf("failed to read error message: %w", err)
}
return code, string(*msg), nil
}
return code, "", nil
}
// readContextBytes reads the 4-byte fork digest context
func readContextBytes(stream io.Reader) ([]byte, error) {
b := make([]byte, forkDigestLength)
if _, err := io.ReadFull(stream, b); err != nil {
return nil, fmt.Errorf("failed to read context bytes: %w", err)
}
return b, nil
}
func request[Request ssz.Marshaler, Response ssz.Unmarshaler](ctx context.Context, host host.Host, p peer.ID, proto string, req Request) (Response, error) {
// The libp2p swarm will dial the peer if we don't already have a connection. If there are no addresses, this will fail.
s, err := host.NewStream(ctx, p, protocol.ID(proto))
if err != nil {
return *new(Response), err
}
defer s.Close()
// Transfer the deadline to the stream.
if t, has := ctx.Deadline(); has {
_ = s.SetDeadline(t)
}
if any(req) != nil {
// Encode and write the request. This does snappy compression behind the scenes.
if _, err = enc.EncodeWithMaxLength(s, req); err != nil {
return *new(Response), fmt.Errorf("failed to encode and write ssz: %w", err)
}
}
// Close our write side; some clients are very picky about this.
_ = s.CloseWrite()
// Read response status code
code, errMsg, err := readStatusCode(s)
if err != nil {
return *new(Response), fmt.Errorf("failed to read status code: %w", err)
}
if code != responseCodeSuccess {
return *new(Response), fmt.Errorf("received error response (code %d): %s", code, errMsg)
}
// Read context bytes (fork digest)
if _, err := readContextBytes(s); err != nil {
return *new(Response), err
}
// Decode response - allocate new instance for unmarshaling
resp := newInstance[Response]()
if err := enc.DecodeWithMaxLength(s, resp); err != nil {
return *new(Response), fmt.Errorf("failed to decode response: %w", err)
}
return resp, nil
}
func requestChunked[Request ssz.Marshaler, Response ssz.Unmarshaler](ctx context.Context, host host.Host, p peer.ID, proto string, req Request) ([]Response, error) {
// The libp2p swarm will dial the peer if we don't already have a connection. If there are no addresses, this will fail.
s, err := host.NewStream(ctx, p, protocol.ID(proto))
if err != nil {
return nil, err
}
defer s.Close()
// Transfer the deadline to the stream.
if t, has := ctx.Deadline(); has {
_ = s.SetDeadline(t)
}
// Encode and write the request. This does snappy compression behind the scenes.
if _, err = enc.EncodeWithMaxLength(s, req); err != nil {
return nil, fmt.Errorf("failed to encode and write ssz: %w", err)
}
// Close our write side; some clients are very picky about this.
_ = s.CloseWrite()
// Read chunked responses until EOF or error
var responses []Response
for {
// Read status code for this chunk
code, errMsg, err := readStatusCode(s)
if err != nil {
if err == io.EOF || errors.Is(err, io.EOF) {
// EOF is expected when all chunks have been read
break
}
// Check if we got at least some responses before the error
if len(responses) > 0 {
// Return what we have with the error
return responses, fmt.Errorf("failed to read status code for chunk: %w", err)
}
return nil, fmt.Errorf("failed to read status code for chunk: %w", err)
}
if code != responseCodeSuccess {
return responses, fmt.Errorf("received error response for chunk (code %d): %s", code, errMsg)
}
// Read context bytes (fork digest) for this chunk
if _, err := readContextBytes(s); err != nil {
return responses, err
}
// Decode the chunk - allocate new instance for unmarshaling
resp := newInstance[Response]()
if err := enc.DecodeWithMaxLength(s, resp); err != nil {
if err == io.EOF || errors.Is(err, io.EOF) {
// Unexpected EOF during decode
if len(responses) > 0 {
return responses, fmt.Errorf("unexpected EOF while decoding chunk: %w", err)
}
return nil, fmt.Errorf("unexpected EOF while decoding chunk: %w", err)
}
return responses, fmt.Errorf("failed to decode chunk: %w", err)
}
responses = append(responses, resp)
}
return responses, nil
}
func snappify(protocol string) string {
return protocol + "/" + encoder.ProtocolSuffixSSZSnappy
}
// DataColumnByRoot requests data columns by root
func (c *RpcClient) DataColumnByRoot(ctx context.Context, peerID peer.ID, req *types.DataColumnsByRootIdentifiers) ([]*eth.DataColumnSidecar, error) {
return requestChunked[*types.DataColumnsByRootIdentifiers, *eth.DataColumnSidecar](ctx, c.Host, peerID, protoDataColumnsByRoot, req)
}
// DataColumnByRange requests data columns by range
func (c *RpcClient) DataColumnByRange(ctx context.Context, peerID peer.ID, req *eth.DataColumnSidecarsByRangeRequest) ([]*eth.DataColumnSidecar, error) {
return requestChunked[*eth.DataColumnSidecarsByRangeRequest, *eth.DataColumnSidecar](ctx, c.Host, peerID, protoDataColumnsByRange, req)
}
// Status exchanges status with a peer (V2)
func (c *RpcClient) Status(ctx context.Context, peerID peer.ID, req *eth.StatusV2) (*eth.StatusV2, error) {
return request[*eth.StatusV2, *eth.StatusV2](ctx, c.Host, peerID, protoStatus, req)
}
// Metadata requests metadata from a peer (V2)
func (c *RpcClient) Metadata(ctx context.Context, peerID peer.ID) (*eth.MetaDataV2, error) {
return request[*eth.MetaDataV2, *eth.MetaDataV2](ctx, c.Host, peerID, protoMetadata, nil)
}