Skip to content

Commit d47ded2

Browse files
vgonkivsrenaynay
andauthored
enable getSamples and getRow in shrex (#4288)
Co-authored-by: rene <41963722+renaynay@users.noreply.github.com>
1 parent e5f1761 commit d47ded2

File tree

17 files changed

+846
-200
lines changed

17 files changed

+846
-200
lines changed

share/shwap/accessor.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"io"
66

77
libshare "github.com/celestiaorg/go-square/v3/share"
8+
"github.com/celestiaorg/rsmt2d"
89

910
"github.com/celestiaorg/celestia-node/share"
1011
)
@@ -13,5 +14,11 @@ import (
1314
type Accessor interface {
1415
AxisRoots(context.Context) (*share.AxisRoots, error)
1516
RowNamespaceData(context.Context, libshare.Namespace, int) (RowNamespaceData, error)
17+
Sample(ctx context.Context, idx SampleCoords) (Sample, error)
18+
AxisHalf(ctx context.Context, axisType rsmt2d.Axis, axisIdx int) (AxisHalf, error)
1619
Reader() (io.Reader, error)
20+
RangeNamespaceData(
21+
ctx context.Context,
22+
from, to int,
23+
) (RangeNamespaceData, error)
1724
}

share/shwap/namespace_data.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,7 @@ func (nd NamespaceData) WriteTo(writer io.Writer) (int64, error) {
9292
}
9393
return n, nil
9494
}
95+
96+
func (nd NamespaceData) IsEmpty() bool {
97+
return len(nd) == 0
98+
}

share/shwap/p2p/shrex/client.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,15 @@ func (c *Client) Get(
5858
"peer", peer.String(),
5959
)
6060
requestTime := time.Now()
61-
status, err := c.doRequest(ctx, logger, req, resp, peer)
61+
n, status, err := c.doRequest(ctx, logger, req, resp, peer)
6262
if err != nil {
6363
logger.Warnw("requesting data from peer failed", "error", err)
6464
}
6565
c.metrics.observeRequest(ctx, req.Name(), status, time.Since(requestTime))
66-
logger.Debugw("requested data", "status", status, "duration", time.Since(requestTime))
66+
logger.Debugw("requested data",
67+
"status", status, "duration",
68+
time.Since(requestTime), "total bytes received", n,
69+
)
6770
return err
6871
}
6972

@@ -75,13 +78,13 @@ func (c *Client) doRequest(
7578
req request,
7679
resp response,
7780
peer peer.ID,
78-
) (status, error) {
81+
) (int64, status, error) {
7982
streamOpenCtx, cancel := context.WithTimeout(ctx, c.params.ReadTimeout)
8083
defer cancel()
8184

8285
stream, err := c.host.NewStream(streamOpenCtx, peer, ProtocolID(c.params.NetworkID(), req.Name()))
8386
if err != nil {
84-
return statusOpenStreamErr, fmt.Errorf("open stream: %w", err)
87+
return 0, statusOpenStreamErr, fmt.Errorf("open stream: %w", err)
8588
}
8689
defer func() {
8790
utils.CloseAndLog(log, "shrex/client stream", stream)
@@ -91,7 +94,7 @@ func (c *Client) doRequest(
9194

9295
_, err = req.WriteTo(stream)
9396
if err != nil {
94-
return statusSendReqErr, fmt.Errorf("writing request: %w", err)
97+
return 0, statusSendReqErr, fmt.Errorf("writing request: %w", err)
9598
}
9699

97100
err = stream.CloseWrite()
@@ -100,26 +103,30 @@ func (c *Client) doRequest(
100103
}
101104

102105
var statusResp shrexpb.Response
103-
_, err = serde.Read(stream, &statusResp)
106+
statusLength, err := serde.Read(stream, &statusResp)
104107
if err != nil {
105-
return statusReadStatusErr, fmt.Errorf("unexpected error during reading the status from stream: %w", err)
108+
return int64(statusLength),
109+
statusReadStatusErr,
110+
fmt.Errorf("unexpected error during reading the status from stream: %w", err)
106111
}
107112

108113
switch statusResp.Status {
109114
case shrexpb.Status_OK:
110115
case shrexpb.Status_NOT_FOUND:
111-
return statusNotFound, ErrNotFound
116+
return int64(statusLength), statusNotFound, ErrNotFound
112117
case shrexpb.Status_INTERNAL:
113-
return statusInternalErr, ErrInternalServer
118+
return int64(statusLength), statusInternalErr, ErrInternalServer
114119
default:
115-
return statusReadRespErr, ErrInvalidResponse
120+
return int64(statusLength), statusReadRespErr, ErrInvalidResponse
116121
}
117122

118-
_, err = resp.ReadFrom(stream)
123+
dataLength, err := resp.ReadFrom(stream)
124+
st := statusSuccess
119125
if err != nil {
120-
return statusReadRespErr, fmt.Errorf("%w: %w", ErrInvalidResponse, err)
126+
err = fmt.Errorf("%w: %w", ErrInvalidResponse, err)
127+
st = statusReadRespErr
121128
}
122-
return statusSuccess, nil
129+
return int64(statusLength) + dataLength, st, err
123130
}
124131

125132
func (c *Client) setStreamDeadlines(ctx context.Context, logger *zap.SugaredLogger, stream network.Stream) {

share/shwap/p2p/shrex/metrics.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ func InitClientMetrics() (*Metrics, error) {
103103
if err != nil {
104104
return nil, err
105105
}
106-
107106
return &Metrics{
108107
requestDuration: requestDuration,
109108
totalRequestCounter: totalRequestCounter,

share/shwap/p2p/shrex/request.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ var registry = []newRequestID{
2121
func() request {
2222
return &shwap.EdsID{}
2323
},
24+
func() request {
25+
return &shwap.SampleID{}
26+
},
27+
func() request {
28+
return &shwap.RowID{}
29+
},
30+
func() request {
31+
return &shwap.RangeNamespaceDataID{}
32+
},
2433
}
2534

2635
// request represents compatible generalised interface for requests.

share/shwap/p2p/shrex/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type Server struct {
2727

2828
host host.Host
2929

30-
store *store.Store
30+
store store.AccessorGetter
3131

3232
params *ServerParams
3333
metrics *Metrics
@@ -39,7 +39,7 @@ type Server struct {
3939
func NewServer(
4040
params *ServerParams,
4141
host host.Host,
42-
store *store.Store,
42+
store store.AccessorGetter,
4343
) (*Server, error) {
4444
if err := params.Validate(); err != nil {
4545
return nil, fmt.Errorf("shrex/server: parameters are not valid: %w", err)
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package shrex_getter
2+
3+
import (
4+
"context"
5+
6+
"go.opentelemetry.io/otel/attribute"
7+
"go.opentelemetry.io/otel/metric"
8+
9+
"github.com/celestiaorg/celestia-node/libs/utils"
10+
)
11+
12+
type metrics struct {
13+
requestAttempt metric.Int64Histogram
14+
}
15+
16+
func (sg *Getter) WithMetrics() error {
17+
requestAttempt, err := meter.Int64Histogram(
18+
"getters_shrex_attempts_per_request",
19+
metric.WithDescription("Number of attempts per shrex request"),
20+
)
21+
if err != nil {
22+
return err
23+
}
24+
25+
sg.metrics = &metrics{
26+
requestAttempt: requestAttempt,
27+
}
28+
return nil
29+
}
30+
31+
func (m *metrics) recordAttempts(ctx context.Context, requestName string, attempt int, success bool) {
32+
if m == nil {
33+
return
34+
}
35+
ctx = utils.ResetContextOnError(ctx)
36+
37+
m.requestAttempt.Record(ctx, int64(attempt),
38+
metric.WithAttributes(
39+
attribute.String("request_type", requestName),
40+
attribute.Bool("success", success)),
41+
)
42+
}

0 commit comments

Comments
 (0)