Skip to content

Commit b373d1d

Browse files
authored
Merge pull request #33 from probe-lab/raulk/fixes
implement Status and Metadata handlers + close write side of streams on request + misc fixes
2 parents 1293b40 + 7c81251 commit b373d1d

File tree

13 files changed

+330
-227
lines changed

13 files changed

+330
-227
lines changed

api/client.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type ClientConfig struct {
1717
Endpoint string
1818
StateTimeout time.Duration
1919
QueryTimeout time.Duration
20+
Logger log.FieldLogger
2021
}
2122

2223
type Client struct {
@@ -85,21 +86,36 @@ func (c *Client) get(
8586
// we will only handle JSONs
8687
req.Header.Set("Accept", "application/json")
8788

89+
l := c.cfg.Logger.WithFields(log.Fields{
90+
"url": callURL,
91+
"method": req.Method,
92+
})
93+
l.Info("requesting beacon API")
8894
resp, err := c.client.Do(req)
95+
8996
if err != nil {
97+
l.WithError(err).Warn("error requesting beacon API")
9098
return respBody, errors.Wrap(err, fmt.Sprintf("unable to request URL %s", callURL.String()))
9199
}
92100
if resp == nil {
93-
return respBody, errors.New("got empty response from the API")
101+
err := errors.New("got empty response from the API")
102+
l.WithError(err).Warn("error requesting beacon API")
103+
return respBody, err
94104
}
95105
defer resp.Body.Close()
96106

97107
respBody, err = io.ReadAll(resp.Body)
98108
if err != nil {
109+
l.WithError(err).Warn("failed to read response body")
99110
return respBody, errors.Wrap(err, "reading response body")
100111
}
101112

102-
// return copy of the request
113+
if len(respBody) > 1024 {
114+
l.Infof("successful beacon API response: [omitted due to length %d]", len(respBody))
115+
} else {
116+
l.Infof("successful beacon API response: %s", respBody)
117+
}
118+
103119
return respBody, nil
104120
}
105121

api/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func Test_ApiGetPeerDASstate(t *testing.T) {
3636
httpCli, testMainCtx, cancel := genTestAPICli(t)
3737
defer cancel()
3838

39-
_, err := httpCli.GetPeerDASstate(testMainCtx)
39+
_, err := httpCli.GetBeaconStateHead(testMainCtx)
4040
require.NoError(t, err)
4141
}
4242

api/fork_choice.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ type ForkChoiceNode struct {
3131
ExecutionBlockHash string `json:"execution_block_hash"`
3232
}
3333

34-
func (c *Client) GetForkChoice(ctx context.Context) (ForkChoiceNode, error) {
35-
var forkChoice ForkChoiceNode
34+
func (c *Client) GetForkChoice(ctx context.Context) (ForkChoice, error) {
35+
var forkChoice ForkChoice
3636

3737
resp, err := c.get(ctx, c.cfg.QueryTimeout, ForkChoiceBase, "")
3838
if err != nil {

api/node_identity.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"strconv"
77

8+
"github.com/ethereum/go-ethereum/common/hexutil"
89
"github.com/pkg/errors"
910
)
1011

@@ -17,23 +18,33 @@ type NodeIdentity struct {
1718
Maddrs []string `json:"p2p_addresses"`
1819
DiscvAddrs []string `json:"discovery_addresses"`
1920
Metadata struct {
20-
SeqNum string `json:"seq_number"`
21-
Attnets string `json:"attnets"`
22-
Syncnets string `json:"syncnets"`
23-
Cgc string `json:"custody_group_count"`
21+
SeqNum string `json:"seq_number"`
22+
Attnets hexutil.Bytes `json:"attnets"`
23+
Syncnets hexutil.Bytes `json:"syncnets"`
24+
Cgc string `json:"custody_group_count"`
2425
} `json:"metadata"`
2526
} `json:"data"`
2627
}
2728

2829
func (i *NodeIdentity) CustodyInt() (int, error) {
30+
// TODO remove patch for Prysm, adding dummy data in missing fields.
31+
// https://github.com/OffchainLabs/prysm/pull/15506
32+
if i.Data.Metadata.Cgc == "" {
33+
return 128, nil
34+
}
2935
return strconv.Atoi(i.Data.Metadata.Cgc)
3036
}
3137

32-
func (i *NodeIdentity) Attnets() string {
38+
func (i *NodeIdentity) Attnets() []byte {
3339
return i.Data.Metadata.Attnets
3440
}
3541

36-
func (i *NodeIdentity) Syncnets() string {
42+
func (i *NodeIdentity) Syncnets() []byte {
43+
// TODO remove patch for Prysm, adding dummy data in missing fields.
44+
// https://github.com/OffchainLabs/prysm/pull/15506
45+
if i.Data.Metadata.Syncnets == nil {
46+
return []byte{0x00}
47+
}
3748
return i.Data.Metadata.Syncnets
3849
}
3950

api/state.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@ import (
1010

1111
var BeaconStateBase = "eth/v2/debug/beacon/states/head"
1212

13-
type PeerDASstate struct {
13+
type BeaconState struct {
1414
Version string `json:"version"`
1515
ExecutionOptimistic bool `json:"execution_optimistic"`
1616
Finalized bool `json:"finalized"`
1717
Data electra.BeaconState `json:"data"`
1818
}
1919

20-
func (c *Client) GetPeerDASstate(ctx context.Context) (PeerDASstate, error) {
21-
var state PeerDASstate
20+
func (c *Client) GetBeaconStateHead(ctx context.Context) (BeaconState, error) {
21+
var state BeaconState
2222

2323
resp, err := c.get(ctx, c.cfg.QueryTimeout, BeaconStateBase, "")
2424
if err != nil {

beacon_api.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ type BeaconAPIConfig struct {
4141
}
4242

4343
type BeaconAPIImpl struct {
44-
cfg BeaconAPIConfig
45-
apiCli *api.Client
44+
cfg BeaconAPIConfig
45+
apiClient *api.Client
4646

4747
specs map[string]any
48-
headState *api.PeerDASstate
48+
headState *api.BeaconState
4949
forkSchedules api.ForkSchedule
5050
fuluForkEpoch uint64
5151
}
@@ -55,27 +55,28 @@ func NewBeaconAPI(cfg BeaconAPIConfig) (BeaconAPI, error) {
5555
Endpoint: cfg.Endpoint,
5656
StateTimeout: ApiStateTimeout,
5757
QueryTimeout: ApiQueryTimeout,
58+
Logger: cfg.Logger,
5859
}
5960
apiCli, err := api.NewClient(ethApiCfg)
6061
if err != nil {
6162
return nil, err
6263
}
6364

6465
return &BeaconAPIImpl{
65-
cfg: cfg,
66-
apiCli: apiCli,
66+
cfg: cfg,
67+
apiClient: apiCli,
6768
}, nil
6869
}
6970

7071
func (b *BeaconAPIImpl) Init(ctx context.Context) error {
7172
// check api connection
72-
if err := b.apiCli.CheckConnection(ctx); err != nil {
73+
if err := b.apiClient.CheckConnection(ctx); err != nil {
7374
return fmt.Errorf("connection to %s was stablished, but not active - %s", b.cfg.Endpoint, err.Error())
7475
}
7576
b.cfg.Logger.Info("connected to the beacon API...")
7677

7778
// Get node identity and ENR
78-
nodeIdentity, err := b.apiCli.GetNodeIdentity(ctx)
79+
nodeIdentity, err := b.apiClient.GetNodeIdentity(ctx)
7980
if err != nil {
8081
b.cfg.Logger.WithError(err).Warn("failed to get node identity")
8182
} else {
@@ -92,14 +93,14 @@ func (b *BeaconAPIImpl) Init(ctx context.Context) error {
9293
}
9394

9495
// get the config specs from the apiCli
95-
specs, err := b.apiCli.GetConfigSpecs(ctx)
96+
specs, err := b.apiClient.GetConfigSpecs(ctx)
9697
if err != nil {
9798
return err
9899
}
99100
b.specs = specs
100101

101102
// Get genesis data from the proper endpoint
102-
genesisData, err := b.apiCli.GetGenesis(ctx)
103+
genesisData, err := b.apiClient.GetGenesis(ctx)
103104
if err != nil {
104105
return errors.Wrap(err, "failed to get genesis data")
105106
}
@@ -133,7 +134,7 @@ func (b *BeaconAPIImpl) Init(ctx context.Context) error {
133134
}
134135

135136
// get the network configuration from the apiCli
136-
forkSchedules, err := b.apiCli.GetNetworkConfig(ctx)
137+
forkSchedules, err := b.apiClient.GetNetworkConfig(ctx)
137138
if err != nil {
138139
return err
139140
}
@@ -143,7 +144,7 @@ func (b *BeaconAPIImpl) Init(ctx context.Context) error {
143144
}
144145

145146
// compose and get the local Metadata
146-
currentState, err := b.apiCli.GetPeerDASstate(ctx)
147+
currentState, err := b.apiClient.GetBeaconStateHead(ctx)
147148
if err != nil {
148149
return err
149150
}
@@ -190,7 +191,7 @@ func (b *BeaconAPIImpl) Init(ctx context.Context) error {
190191
return fmt.Errorf("tooled closed without reaching fulu upgrade")
191192

192193
case <-time.After(secondsToFulu):
193-
currentState, err = b.apiCli.GetPeerDASstate(ctx)
194+
currentState, err = b.apiClient.GetBeaconStateHead(ctx)
194195
if err != nil {
195196
return err
196197
}
@@ -412,9 +413,9 @@ func (b *BeaconAPIImpl) GetFuluForkEpoch() uint64 {
412413
}
413414

414415
func (b *BeaconAPIImpl) GetNodeIdentity(ctx context.Context) (*api.NodeIdentity, error) {
415-
return b.apiCli.GetNodeIdentity(ctx)
416+
return b.apiClient.GetNodeIdentity(ctx)
416417
}
417418

418419
func (b *BeaconAPIImpl) GetBeaconBlock(ctx context.Context, slot uint64) (*spec.VersionedSignedBeaconBlock, error) {
419-
return b.apiCli.GetBeaconBlock(ctx, slot)
420+
return b.apiClient.GetBeaconBlock(ctx, slot)
420421
}

custody_utils.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ package dasguardian
22

33
import (
44
"encoding/binary"
5-
"encoding/hex"
65
"math"
76
"sort"
87

98
"github.com/ethereum/go-ethereum/p2p/enode"
9+
"github.com/ethereum/go-ethereum/p2p/enr"
1010
"github.com/holiman/uint256"
11-
errors "github.com/pkg/errors"
11+
"github.com/pkg/errors"
1212
)
1313

1414
var (
@@ -39,31 +39,28 @@ type AttnetsEntry []byte
3939

4040
func (c AttnetsEntry) ENRKey() string { return "attnets" }
4141

42-
func GetAttnetsFromEnr(ethNode *enode.Node) string {
42+
func GetAttnetsFromEnr(ethNode *enode.Node) (AttnetsEntry, error) {
4343
enr := ethNode.Record()
4444

4545
var attnetsEntry AttnetsEntry
4646
err := enr.Load(&attnetsEntry)
47-
if err != nil {
48-
return "no-attnets"
49-
}
50-
return "0x" + hex.EncodeToString(attnetsEntry)
47+
return attnetsEntry, err
5148
}
5249

5350
// syncnets
5451
type SyncnetsEntry []byte
5552

5653
func (c SyncnetsEntry) ENRKey() string { return "syncnets" }
5754

58-
func GetSyncnetsFromEnr(ethNode *enode.Node) string {
59-
enr := ethNode.Record()
55+
func GetSyncnetsFromEnr(ethNode *enode.Node) (SyncnetsEntry, error) {
56+
record := ethNode.Record()
6057

6158
var syncnetsEntry SyncnetsEntry
62-
err := enr.Load(&syncnetsEntry)
63-
if err != nil {
64-
return "no-syncnets"
59+
err := record.Load(&syncnetsEntry)
60+
if enr.IsNotFound(err) {
61+
return nil, nil
6562
}
66-
return "0x" + hex.EncodeToString(syncnetsEntry)
63+
return syncnetsEntry, err
6764
}
6865

6966
// Mainly from: prysm/beacon-chain/core/peerdas/helpers.go

encoding.go

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ import (
88
"sync"
99
"time"
1010

11+
"github.com/ethereum/go-ethereum/common/hexutil"
1112
"github.com/golang/snappy"
1213
"github.com/libp2p/go-libp2p/core/network"
1314
dynssz "github.com/pk910/dynamic-ssz"
14-
errors "github.com/pkg/errors"
15+
"github.com/pkg/errors"
1516
log "github.com/sirupsen/logrus"
1617
)
1718

@@ -127,12 +128,19 @@ func readVarint(r io.Reader) (uint64, error) {
127128

128129
// writeRequest writes a request to the stream with SSZ+Snappy encoding
129130
func (r *ReqResp) writeRequest(stream network.Stream, req any) error {
131+
defer stream.CloseWrite()
132+
130133
// Set write deadline
131134
if err := stream.SetWriteDeadline(time.Now().Add(r.cfg.WriteTimeout)); err != nil {
132135
return fmt.Errorf("failed to set write deadline: %w", err)
133136
}
134137

135-
// Marshal to SSZ
138+
if req == nil {
139+
// we close the write side of the stream immediately, communicating we have no data to send
140+
return nil
141+
}
142+
143+
// Marshal to SSZ if the request is not nil
136144
data, err := sszCodec.MarshalSSZ(req)
137145
if err != nil {
138146
return fmt.Errorf("failed to marshal SSZ: %w", err)
@@ -157,6 +165,14 @@ func (r *ReqResp) writeRequest(stream network.Stream, req any) error {
157165
return fmt.Errorf("failed to compress data: %w", err)
158166
}
159167

168+
log.WithFields(log.Fields{
169+
"protocol": stream.Protocol(),
170+
"data": hexutil.Encode(data),
171+
"data_len": len(data),
172+
"wire_data": hexutil.Encode(buf.Bytes()),
173+
"wire_len": buf.Len(),
174+
}).Debug("writing request")
175+
160176
// Write buffer to the stream
161177
if _, err := io.Copy(stream, &buf); err != nil {
162178
return fmt.Errorf("failed to write final payload to stream: %w", err)
@@ -271,7 +287,8 @@ func (r *ReqResp) readResponse(stream network.Stream, resp any) error {
271287
}).Debug("Raw response code received")
272288
}
273289

274-
if code[0] != ResponseCodeSuccess {
290+
success := code[0] == ResponseCodeSuccess
291+
if !success {
275292
if log.GetLevel() >= log.DebugLevel {
276293
errorType := getResponseCodeName(code[0])
277294
log.WithFields(log.Fields{
@@ -280,7 +297,6 @@ func (r *ReqResp) readResponse(stream network.Stream, resp any) error {
280297
"error_type": errorType,
281298
}).Debug("Non-success response code received")
282299
}
283-
return fmt.Errorf("RPC error code: %d", code[0])
284300
}
285301

286302
// Read uncompressed length prefix
@@ -330,6 +346,21 @@ func (r *ReqResp) readResponse(stream network.Stream, resp any) error {
330346
}).Debug("Raw response data received")
331347
}
332348

349+
if !success {
350+
var errorMessage ErrorMessage
351+
l := log.WithFields(log.Fields{
352+
"response_type": responseType,
353+
"raw_data_hex": fmt.Sprintf("0x%x", data),
354+
})
355+
if err := sszCodec.UnmarshalSSZ(&errorMessage, data); err != nil {
356+
l.WithError(err).Error("failed to unmarshal SSZ error message")
357+
return fmt.Errorf("failed to unmarshal SSZ error message: %w", err)
358+
}
359+
msg := string(errorMessage)
360+
l.Warnf("RPC failed; error message: %s", msg)
361+
return fmt.Errorf("RPC failed: %s", msg)
362+
}
363+
333364
// Unmarshal from SSZ
334365
if err := sszCodec.UnmarshalSSZ(resp, data); err != nil {
335366
if log.GetLevel() >= log.DebugLevel {

0 commit comments

Comments
 (0)