Skip to content

Commit 11165de

Browse files
committed
cloud: have resumable reader manage span lifetime
This change adds a span to the cloud.ResumableReader that is tracked for the lifetime of the reader. This ensures that if the reader implementation needs to issue a new HTTP request, the span is still valid and can be used to annotate logs. Fixes: #153347 Release note: none
1 parent 8b550fa commit 11165de

File tree

4 files changed

+43
-12
lines changed

4 files changed

+43
-12
lines changed

pkg/cloud/azure/azure_storage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ func (s *azureStorage) ReadFile(
366366
}
367367
}
368368
}
369+
// BUG: we should follow the azure retry setting here.
369370
reader := resp.NewRetryReader(ctx, &azblob.RetryReaderOptions{MaxRetries: 3})
370371
return ioctx.ReadCloserAdapter(reader), fileSize, nil
371372
}

pkg/cloud/cloud_io.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ type ReaderOpenerAt func(ctx context.Context, pos int64) (io.ReadCloser, int64,
242242
type ResumingReader struct {
243243
Opener ReaderOpenerAt // Get additional content
244244
Reader io.ReadCloser // Currently opened reader
245+
ReaderSpan *tracing.Span // Span for the current reader, if Reader is non-nil
245246
Filename string // Used for logging
246247
Pos int64 // How much data was received so far
247248
Size int64 // Total size of the file
@@ -284,6 +285,10 @@ func NewResumingReader(
284285

285286
// Open opens the reader at its current offset.
286287
func (r *ResumingReader) Open(ctx context.Context) error {
288+
if r.Reader != nil {
289+
return errors.AssertionFailedf("reader already open")
290+
}
291+
287292
if r.Size > 0 && r.Pos >= r.Size {
288293
// Don't try to open a file if the size has been set and the position is
289294
// at size. This generally results in an invalid range error for the
@@ -294,10 +299,18 @@ func (r *ResumingReader) Open(ctx context.Context) error {
294299

295300
return DelayedRetry(ctx, "Open", r.ErrFn, func() error {
296301
var readErr error
302+
303+
ctx, span := tracing.ForkSpan(ctx, "resuming-reader")
297304
r.Reader, r.Size, readErr = r.Opener(ctx, r.Pos)
298305
if readErr != nil {
306+
span.Finish()
299307
return errors.Wrapf(readErr, "open %s", r.Filename)
300308
}
309+
310+
// We hold onto the span for the lifetime of the reader because the reader
311+
// may issue new HTTP requests after Open returns.
312+
r.ReaderSpan = span
313+
301314
return nil
302315
})
303316
}
@@ -340,10 +353,9 @@ func (r *ResumingReader) Read(ctx context.Context, p []byte) (int, error) {
340353
}
341354
log.Dev.Errorf(ctx, "Retry IO error: %s", lastErr)
342355
lastErr = nil
343-
if r.Reader != nil {
344-
r.Reader.Close()
345-
}
346-
r.Reader = nil
356+
// Ignore the error from Close(). We are already handling a read error
357+
// so we know the handle is in a bad state.
358+
_ = r.Close(ctx)
347359
}
348360
}
349361

@@ -356,10 +368,14 @@ func (r *ResumingReader) Read(ctx context.Context, p []byte) (int, error) {
356368

357369
// Close implements io.Closer.
358370
func (r *ResumingReader) Close(ctx context.Context) error {
359-
if r.Reader != nil {
360-
return r.Reader.Close()
371+
if r.Reader == nil {
372+
return nil
361373
}
362-
return nil
374+
375+
err := r.Reader.Close()
376+
r.ReaderSpan.Finish()
377+
r.Reader = nil
378+
return err
363379
}
364380

365381
// CheckHTTPContentRangeHeader parses Content-Range header and ensures that

pkg/cloud/cloudtestutils/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ go_library(
2323
"//pkg/util/syncutil",
2424
"//pkg/util/sysutil",
2525
"//pkg/util/timeutil",
26+
"//pkg/util/tracing",
27+
"//pkg/util/tracing/tracingpb",
2628
"@com_github_cockroachdb_errors//:errors",
2729
"@com_github_stretchr_testify//require",
2830
"@org_golang_x_sync//errgroup",

pkg/cloud/cloudtestutils/cloud_nemesis.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/cloud"
2020
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
2121
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
22+
"github.com/cockroachdb/cockroach/pkg/util/tracing"
23+
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
2224
"github.com/cockroachdb/errors"
2325
"github.com/stretchr/testify/require"
2426
"golang.org/x/sync/errgroup"
@@ -39,9 +41,15 @@ func RunCloudNemesisTest(t *testing.T, storage cloud.ExternalStorage) {
3941
listConcurrency: 1,
4042
}
4143

42-
// We create a context here because we don't want to support a caller supplied
43-
// cancelation signal.
44-
ctx := context.Background()
44+
// Create a root tracing span for the test to ensure ops create spans
45+
// which will help flush out span lifetime bugs.
46+
tracer := tracing.NewTracerWithOpt(
47+
context.Background(),
48+
tracing.WithUseAfterFinishOpt(true, true),
49+
tracing.WithSpanReusePercent(0))
50+
ctx, rootSpan := tracer.StartSpanCtx(context.Background(), "cloud-nemesis-test", tracing.WithRecording(tracingpb.RecordingStructured))
51+
defer rootSpan.Finish()
52+
4553
if err := nemesis.run(ctx, 2*time.Minute); err != nil {
4654
t.Fatalf("%+v", err)
4755
}
@@ -85,9 +93,13 @@ func (c *cloudNemesis) run(ctx context.Context, duration time.Duration) error {
8593
g, ctx := errgroup.WithContext(ctx)
8694

8795
g.Go(func() error {
88-
time.Sleep(duration)
96+
wait := time.NewTimer(duration)
97+
select {
98+
case <-ctx.Done():
99+
case <-wait.C:
100+
}
89101
close(done)
90-
return nil
102+
return ctx.Err()
91103
})
92104

93105
for i := 0; i < c.writeConcurrency; i++ {

0 commit comments

Comments
 (0)