Skip to content

Commit aecd7cf

Browse files
authored
feat(p2p): cover Exchange with traces (#150)
1 parent 30ce8bc commit aecd7cf

File tree

3 files changed

+101
-16
lines changed

3 files changed

+101
-16
lines changed

p2p/exchange.go

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,20 @@ import (
1414
"github.com/libp2p/go-libp2p/core/peer"
1515
"github.com/libp2p/go-libp2p/core/protocol"
1616
"github.com/libp2p/go-libp2p/p2p/net/conngater"
17+
"go.opentelemetry.io/otel"
18+
"go.opentelemetry.io/otel/attribute"
19+
"go.opentelemetry.io/otel/codes"
20+
"go.opentelemetry.io/otel/trace"
1721

1822
"github.com/celestiaorg/go-header"
1923
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
2024
)
2125

22-
var log = logging.Logger("header/p2p")
26+
var (
27+
log = logging.Logger("header/p2p")
28+
29+
tracerClient = otel.Tracer("header/p2p-client")
30+
)
2331

2432
// minHeadResponses is the minimum number of headers of the same height
2533
// received from peers to determine the network head. If all trusted peers
@@ -113,6 +121,8 @@ func (ex *Exchange[H]) Stop(ctx context.Context) error {
113121
// and return the highest one.
114122
func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (H, error) {
115123
log.Debug("requesting head")
124+
ctx, span := tracerClient.Start(ctx, "head")
125+
defer span.End()
116126

117127
reqCtx := ctx
118128
startTime := time.Now()
@@ -157,8 +167,15 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
157167
)
158168
for _, from := range peers {
159169
go func(from peer.ID) {
170+
_, newSpan := tracerClient.Start(
171+
ctx, "requesting peer",
172+
trace.WithAttributes(attribute.String("peerID", from.String())),
173+
)
174+
defer newSpan.End()
175+
160176
headers, err := ex.request(reqCtx, from, headerReq)
161177
if err != nil {
178+
newSpan.SetStatus(codes.Error, err.Error())
162179
log.Errorw("head request to peer failed", "peer", from, "err", err)
163180
headerRespCh <- zero
164181
return
@@ -171,6 +188,7 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
171188
if errors.As(err, &verErr) && verErr.SoftFailure {
172189
log.Debugw("received head from tracked peer that soft-failed verification",
173190
"tracked peer", from, "err", err)
191+
newSpan.SetStatus(codes.Error, err.Error())
174192
headerRespCh <- headers[0]
175193
return
176194
}
@@ -180,10 +198,12 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
180198
}
181199
logF("verifying head received from tracked peer", "tracked peer", from,
182200
"height", headers[0].Height(), "err", err)
201+
newSpan.SetStatus(codes.Error, err.Error())
183202
headerRespCh <- zero
184203
return
185204
}
186205
}
206+
newSpan.SetStatus(codes.Ok, "")
187207
// request ensures that the result slice will have at least one Header
188208
headerRespCh <- headers[0]
189209
}(from)
@@ -206,22 +226,25 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
206226
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
207227
status = headStatusTimeout
208228
}
209-
229+
span.SetStatus(codes.Error, fmt.Sprintf("head request %s", status))
210230
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, status)
211231
return zero, ctx.Err()
212232
case <-ex.ctx.Done():
213233
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusCanceled)
234+
span.SetStatus(codes.Error, "exchange client stopped")
214235
return zero, ex.ctx.Err()
215236
}
216237
}
217238

218239
head, err := bestHead[H](headers)
219240
if err != nil {
220241
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusNoHeaders)
242+
span.SetStatus(codes.Error, headStatusNoHeaders)
221243
return zero, err
222244
}
223245

224246
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusOk)
247+
span.SetStatus(codes.Ok, "")
225248
return head, nil
226249
}
227250

@@ -230,10 +253,17 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
230253
// thereafter.
231254
func (ex *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
232255
log.Debugw("requesting header", "height", height)
256+
ctx, span := tracerClient.Start(ctx, "get-by-height",
257+
trace.WithAttributes(
258+
attribute.Int64("height", int64(height)),
259+
))
260+
defer span.End()
233261
var zero H
234262
// sanity check height
235263
if height == 0 {
236-
return zero, fmt.Errorf("specified request height must be greater than 0")
264+
err := fmt.Errorf("specified request height must be greater than 0")
265+
span.SetStatus(codes.Error, err.Error())
266+
return zero, err
237267
}
238268
// create request
239269
req := &p2p_pb.HeaderRequest{
@@ -242,8 +272,10 @@ func (ex *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error
242272
}
243273
headers, err := ex.performRequest(ctx, req)
244274
if err != nil {
275+
span.SetStatus(codes.Error, err.Error())
245276
return zero, err
246277
}
278+
span.SetStatus(codes.Ok, "")
247279
return headers[0], nil
248280
}
249281

@@ -254,19 +286,36 @@ func (ex *Exchange[H]) GetRangeByHeight(
254286
from H,
255287
to uint64,
256288
) ([]H, error) {
289+
ctx, span := tracerClient.Start(ctx, "get-range-by-height",
290+
trace.WithAttributes(
291+
attribute.Int64("from", int64(from.Height())),
292+
attribute.Int64("to", int64(to)),
293+
))
294+
defer span.End()
257295
session := newSession[H](
258296
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, ex.metrics, withValidation(from),
259297
)
260298
defer session.close()
261299
// we request the next header height that we don't have: `fromHead`+1
262300
amount := to - (from.Height() + 1)
263-
return session.getRangeByHeight(ctx, from.Height()+1, amount, ex.Params.MaxHeadersPerRangeRequest)
301+
result, err := session.getRangeByHeight(ctx, from.Height()+1, amount, ex.Params.MaxHeadersPerRangeRequest)
302+
if err != nil {
303+
span.SetStatus(codes.Error, err.Error())
304+
return nil, err
305+
}
306+
span.SetStatus(codes.Ok, "")
307+
return result, nil
264308
}
265309

266310
// Get performs a request for the Header by the given hash corresponding
267311
// to the RawHeader. Note that the Header must be verified thereafter.
268312
func (ex *Exchange[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
269313
log.Debugw("requesting header", "hash", hash.String())
314+
ctx, span := tracerClient.Start(ctx, "get-by-hash",
315+
trace.WithAttributes(
316+
attribute.String("hash", hash.String()),
317+
))
318+
defer span.End()
270319
var zero H
271320
// create request
272321
req := &p2p_pb.HeaderRequest{
@@ -275,12 +324,16 @@ func (ex *Exchange[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
275324
}
276325
headers, err := ex.performRequest(ctx, req)
277326
if err != nil {
327+
span.SetStatus(codes.Error, err.Error())
278328
return zero, err
279329
}
280330

281331
if !bytes.Equal(headers[0].Hash(), hash) {
282-
return zero, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, headers[0].Hash())
332+
err = fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, headers[0].Hash())
333+
span.SetStatus(codes.Error, err.Error())
334+
return zero, err
283335
}
336+
span.SetStatus(codes.Ok, "")
284337
return headers[0], nil
285338
}
286339

p2p/server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
)
2121

2222
var (
23-
tracer = otel.Tracer("header/server")
23+
tracerServ = otel.Tracer("header/server")
2424
)
2525

2626
// ExchangeServer represents the server-side component for
@@ -173,7 +173,7 @@ func (serv *ExchangeServer[H]) handleRequestByHash(hash []byte) ([]H, error) {
173173
log.Debugw("server: handling header request", "hash", header.Hash(hash).String())
174174
ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout)
175175
defer cancel()
176-
ctx, span := tracer.Start(ctx, "request-by-hash", trace.WithAttributes(
176+
ctx, span := tracerServ.Start(ctx, "request-by-hash", trace.WithAttributes(
177177
attribute.String("hash", header.Hash(hash).String()),
178178
))
179179
defer span.End()
@@ -204,7 +204,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
204204
}
205205

206206
startTime := time.Now()
207-
ctx, span := tracer.Start(serv.ctx, "request-range", trace.WithAttributes(
207+
ctx, span := tracerServ.Start(serv.ctx, "request-range", trace.WithAttributes(
208208
attribute.Int64("from", int64(from)),
209209
attribute.Int64("to", int64(to))))
210210
defer span.End()
@@ -273,7 +273,7 @@ func (serv *ExchangeServer[H]) handleHeadRequest() ([]H, error) {
273273
log.Debug("server: handling head request")
274274
ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout)
275275
defer cancel()
276-
ctx, span := tracer.Start(ctx, "request-head")
276+
ctx, span := tracerServ.Start(ctx, "request-head")
277277
defer span.End()
278278

279279
head, err := serv.store.Head(ctx)

p2p/session.go

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,19 @@ import (
99

1010
"github.com/libp2p/go-libp2p/core/host"
1111
"github.com/libp2p/go-libp2p/core/protocol"
12+
"go.opentelemetry.io/otel"
13+
"go.opentelemetry.io/otel/attribute"
14+
"go.opentelemetry.io/otel/codes"
15+
"go.opentelemetry.io/otel/trace"
1216

1317
"github.com/celestiaorg/go-header"
1418
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
1519
)
1620

21+
var (
22+
tracerSession = otel.Tracer("header/p2p-session")
23+
)
24+
1725
// errEmptyResponse means that server side closes the connection without sending at least 1
1826
// response.
1927
var errEmptyResponse = errors.New("empty response")
@@ -77,9 +85,15 @@ func newSession[H header.Header[H]](
7785
func (s *session[H]) getRangeByHeight(
7886
ctx context.Context,
7987
from, amount, headersPerPeer uint64,
80-
) ([]H, error) {
88+
) (_ []H, err error) {
8189
log.Debugw("requesting headers", "from", from, "to", from+amount-1) // -1 need to exclude to+1 height
8290

91+
ctx, span := tracerSession.Start(ctx, "get-range-by-height", trace.WithAttributes(
92+
attribute.Int64("from", int64(from)),
93+
attribute.Int64("to", int64(from+amount-1)),
94+
))
95+
defer span.End()
96+
8397
requests := prepareRequests(from, amount, headersPerPeer)
8498
result := make(chan []H, len(requests))
8599
s.reqCh = make(chan *p2p_pb.HeaderRequest, len(requests))
@@ -94,8 +108,11 @@ LOOP:
94108
for {
95109
select {
96110
case <-s.ctx.Done():
97-
return nil, errors.New("header/p2p: exchange is closed")
111+
err = errors.New("header/p2p: exchange is closed")
112+
span.SetStatus(codes.Error, err.Error())
113+
return nil, err
98114
case <-ctx.Done():
115+
span.SetStatus(codes.Error, ctx.Err().Error())
99116
return nil, ctx.Err()
100117
case res := <-result:
101118
headers = append(headers, res...)
@@ -113,6 +130,7 @@ LOOP:
113130
"from", headers[0].Height(),
114131
"to", headers[len(headers)-1].Height(),
115132
)
133+
span.SetStatus(codes.Ok, "")
116134
return headers, nil
117135
}
118136

@@ -152,19 +170,28 @@ func (s *session[H]) doRequest(
152170
req *p2p_pb.HeaderRequest,
153171
headers chan []H,
154172
) {
173+
ctx, span := tracerSession.Start(ctx, "request-headers-from-peer", trace.WithAttributes(
174+
attribute.String("peerID", stat.peerID.String()),
175+
attribute.Int64("from", int64(req.GetOrigin())),
176+
attribute.Int64("amount", int64(req.Amount)),
177+
))
178+
defer span.End()
179+
155180
ctx, cancel := context.WithTimeout(ctx, s.requestTimeout)
156181
defer cancel()
157182

158183
r, size, duration, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req)
159184
s.metrics.response(ctx, size, duration, err)
160185
if err != nil {
186+
span.SetStatus(codes.Error, err.Error())
161187
// we should not punish peer at this point and should try to parse responses, despite that error
162188
// was received.
163189
log.Debugw("requesting headers from peer failed", "peer", stat.peerID, "err", err)
164190
}
165191

166192
h, err := s.processResponses(r)
167193
if err != nil {
194+
span.SetStatus(codes.Error, err.Error())
168195
logFn := log.Errorw
169196

170197
switch err {
@@ -195,21 +222,26 @@ func (s *session[H]) doRequest(
195222
"requestedAmount", req.Amount,
196223
)
197224

225+
remainingHeaders := req.Amount - uint64(len(h))
226+
227+
span.SetStatus(codes.Ok, "")
228+
198229
// update peer stats
199230
stat.updateStats(size, duration)
200231

201-
responseLn := uint64(len(h))
202232
// ensure that we received the correct amount of headers.
203-
if responseLn < req.Amount {
204-
from := h[responseLn-1].Height()
205-
amount := req.Amount - responseLn
233+
if remainingHeaders > 0 {
234+
span.AddEvent("remaining headers", trace.WithAttributes(
235+
attribute.Int64("amount", int64(remainingHeaders))),
236+
)
206237

238+
from := h[uint64(len(h))-1].Height()
207239
select {
208240
case <-s.ctx.Done():
209241
return
210242
// create a new request with the remaining headers.
211243
// prepareRequests will return a slice with 1 element at this point
212-
case s.reqCh <- prepareRequests(from+1, amount, req.Amount)[0]:
244+
case s.reqCh <- prepareRequests(from+1, remainingHeaders, req.Amount)[0]:
213245
log.Debugw("sending additional request to get remaining headers")
214246
}
215247
}

0 commit comments

Comments
 (0)