Skip to content

Commit c68e35a

Browse files
committed
ioctx: add ReaderAtSeekerAdapter for random access support
This commit adds support for adapting context-aware readers that implement ReaderAtCtx and SeekerCtx into the standard io.ReaderAt and io.Seeker interfaces. This is needed for integrating cloud storage readers with libraries that require random access (e.g., Apache Parquet). The adapter: - Captures a context at construction time and uses it for all operations - Validates that the underlying reader actually supports seeking - Provides io.Reader, io.ReaderAt, io.Seeker, and io.Closer interfaces - Enables efficient random access to cloud-stored files without buffering Release note: None Epic: CRDB-23802
1 parent 1781fc1 commit c68e35a

File tree

4 files changed

+366
-1
lines changed

4 files changed

+366
-1
lines changed

pkg/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,7 @@ ALL_TESTS = [
775775
"//pkg/util/interval/generic:generic_test",
776776
"//pkg/util/interval:interval_test",
777777
"//pkg/util/intsets:intsets_test",
778+
"//pkg/util/ioctx:ioctx_test",
778779
"//pkg/util/ipaddr:ipaddr_test",
779780
"//pkg/util/iterutil:iterutil_test",
780781
"//pkg/util/json/tokenizer:tokenizer_test",
@@ -2700,6 +2701,7 @@ GO_TARGETS = [
27002701
"//pkg/util/intsets:intsets",
27012702
"//pkg/util/intsets:intsets_test",
27022703
"//pkg/util/ioctx:ioctx",
2704+
"//pkg/util/ioctx:ioctx_test",
27032705
"//pkg/util/ipaddr:ipaddr",
27042706
"//pkg/util/ipaddr:ipaddr_test",
27052707
"//pkg/util/iterutil:iterutil",

pkg/util/ioctx/BUILD.bazel

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
load("@io_bazel_rules_go//go:def.bzl", "go_library")
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
22

33
go_library(
44
name = "ioctx",
@@ -7,3 +7,14 @@ go_library(
77
visibility = ["//visibility:public"],
88
deps = ["@com_github_cockroachdb_errors//:errors"],
99
)
10+
11+
go_test(
12+
name = "ioctx_test",
13+
size = "small",
14+
srcs = ["reader_test.go"],
15+
embed = [":ioctx"],
16+
deps = [
17+
"@com_github_cockroachdb_errors//:errors",
18+
"@com_github_stretchr_testify//require",
19+
],
20+
)

pkg/util/ioctx/reader.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,93 @@ type nopCloser struct {
138138

139139
// Close is part of the ReadClosedCtx interface.
140140
func (nopCloser) Close(ctx context.Context) error { return nil }
141+
142+
// ReaderAtCtx is like io.ReaderAt, but the ReadAt() method takes a context.
143+
type ReaderAtCtx interface {
144+
ReadAt(ctx context.Context, p []byte, off int64) (n int, err error)
145+
}
146+
147+
// SeekerCtx is like io.Seeker, but the Seek() method takes a context.
148+
type SeekerCtx interface {
149+
Seek(ctx context.Context, offset int64, whence int) (n int64, err error)
150+
}
151+
152+
// ReaderAtSeekerCloser combines io.Reader, io.ReaderAt, io.Seeker, and io.Closer.
153+
// This is useful for formats like Parquet that require random access to files.
154+
type ReaderAtSeekerCloser interface {
155+
io.Reader
156+
io.ReaderAt
157+
io.Seeker
158+
io.Closer
159+
}
160+
161+
// ReaderAtSeekerAdapter adapts a ReadCloserCtx that implements ReaderAtCtx
162+
// and SeekerCtx into a ReaderAtSeekerCloser. The context is captured at construction
163+
// time and used for all operations (Read, ReadAt, Seek, and Close).
164+
//
165+
// Returns an error if the underlying reader does not implement both ReaderAtCtx and
166+
// SeekerCtx, or if a test seek operation fails (indicating the reader claims to support
167+
// seeking but doesn't actually work).
168+
//
169+
// This is useful for integrating context-aware cloud storage readers with libraries
170+
// that require the standard io.ReaderAt and io.Seeker interfaces (e.g., Parquet).
171+
//
172+
// IMPORTANT: The returned adapter's lifetime is tied to the provided context. If the
173+
// context is canceled, all subsequent Read/ReadAt/Seek operations will fail. The caller
174+
// must ensure the context remains valid for the entire duration the adapter is in use.
175+
func ReaderAtSeekerAdapter(
176+
ctx context.Context, readCloser ReadCloserCtx,
177+
) (ReaderAtSeekerCloser, error) {
178+
readerAt, okReaderAt := readCloser.(ReaderAtCtx)
179+
if !okReaderAt {
180+
return nil, errors.Newf("reader does not implement ioctx.ReaderAtCtx (type %T)", readCloser)
181+
}
182+
seeker, okSeeker := readCloser.(SeekerCtx)
183+
if !okSeeker {
184+
return nil, errors.Newf("reader does not implement ioctx.SeekerCtx (type %T)", readCloser)
185+
}
186+
187+
// Test if seeking actually works by attempting a no-op seek
188+
// This catches cases where the reader implements the interface but fails at runtime
189+
if _, err := seeker.Seek(ctx, 0, io.SeekCurrent); err != nil {
190+
return nil, errors.Wrapf(err, "reader type %T claims to support seeking but test seek failed", readCloser)
191+
}
192+
193+
return &readerAtSeekerAdapter{
194+
ctx: ctx,
195+
readCloser: readCloser,
196+
readerAt: readerAt,
197+
seeker: seeker,
198+
}, nil
199+
}
200+
201+
type readerAtSeekerAdapter struct {
202+
ctx context.Context
203+
readCloser ReadCloserCtx
204+
readerAt ReaderAtCtx
205+
seeker SeekerCtx
206+
}
207+
208+
var _ ReaderAtSeekerCloser = &readerAtSeekerAdapter{}
209+
210+
// Read implements io.Reader using the captured context.
211+
func (r *readerAtSeekerAdapter) Read(p []byte) (int, error) {
212+
return r.readCloser.Read(r.ctx, p)
213+
}
214+
215+
// ReadAt implements io.ReaderAt by delegating to the underlying ReaderAtCtx
216+
// using the captured context.
217+
func (r *readerAtSeekerAdapter) ReadAt(p []byte, off int64) (int, error) {
218+
return r.readerAt.ReadAt(r.ctx, p, off)
219+
}
220+
221+
// Seek implements io.Seeker by delegating to the underlying SeekerCtx
222+
// using the captured context.
223+
func (r *readerAtSeekerAdapter) Seek(offset int64, whence int) (int64, error) {
224+
return r.seeker.Seek(r.ctx, offset, whence)
225+
}
226+
227+
// Close implements io.Closer using the captured context.
228+
func (r *readerAtSeekerAdapter) Close() error {
229+
return r.readCloser.Close(r.ctx)
230+
}

pkg/util/ioctx/reader_test.go

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package ioctx
7+
8+
import (
9+
"context"
10+
"io"
11+
"testing"
12+
13+
"github.com/cockroachdb/errors"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
// mockReadCloserCtx is a mock that implements ReadCloserCtx, ReaderAtCtx, and SeekerCtx
18+
type mockReadCloserCtx struct {
19+
data []byte
20+
pos int64
21+
closed bool
22+
seekCalls int
23+
readCalls int
24+
}
25+
26+
func newMockReadCloserCtx(data []byte) *mockReadCloserCtx {
27+
return &mockReadCloserCtx{
28+
data: data,
29+
}
30+
}
31+
32+
func (m *mockReadCloserCtx) Read(ctx context.Context, p []byte) (int, error) {
33+
m.readCalls++
34+
if m.pos >= int64(len(m.data)) {
35+
return 0, io.EOF
36+
}
37+
n := copy(p, m.data[m.pos:])
38+
m.pos += int64(n)
39+
return n, nil
40+
}
41+
42+
func (m *mockReadCloserCtx) Close(ctx context.Context) error {
43+
m.closed = true
44+
return nil
45+
}
46+
47+
func (m *mockReadCloserCtx) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
48+
if off >= int64(len(m.data)) {
49+
return 0, io.EOF
50+
}
51+
n := copy(p, m.data[off:])
52+
if n < len(p) {
53+
return n, io.EOF
54+
}
55+
return n, nil
56+
}
57+
58+
func (m *mockReadCloserCtx) Seek(ctx context.Context, offset int64, whence int) (int64, error) {
59+
m.seekCalls++
60+
var newPos int64
61+
switch whence {
62+
case io.SeekStart:
63+
newPos = offset
64+
case io.SeekCurrent:
65+
newPos = m.pos + offset
66+
case io.SeekEnd:
67+
newPos = int64(len(m.data)) + offset
68+
default:
69+
return 0, errors.Newf("invalid whence: %d", whence)
70+
}
71+
72+
if newPos < 0 {
73+
return 0, errors.New("negative position")
74+
}
75+
76+
m.pos = newPos
77+
return newPos, nil
78+
}
79+
80+
// mockReadCloserCtxNoSeek implements ReadCloserCtx but NOT SeekerCtx
81+
type mockReadCloserCtxNoSeek struct {
82+
data []byte
83+
pos int64
84+
}
85+
86+
func (m *mockReadCloserCtxNoSeek) Read(ctx context.Context, p []byte) (int, error) {
87+
if m.pos >= int64(len(m.data)) {
88+
return 0, io.EOF
89+
}
90+
n := copy(p, m.data[m.pos:])
91+
m.pos += int64(n)
92+
return n, nil
93+
}
94+
95+
func (m *mockReadCloserCtxNoSeek) Close(ctx context.Context) error {
96+
return nil
97+
}
98+
99+
func (m *mockReadCloserCtxNoSeek) ReadAt(ctx context.Context, p []byte, off int64) (int, error) {
100+
if off >= int64(len(m.data)) {
101+
return 0, io.EOF
102+
}
103+
n := copy(p, m.data[off:])
104+
if n < len(p) {
105+
return n, io.EOF
106+
}
107+
return n, nil
108+
}
109+
110+
// mockReadCloserCtxBrokenSeek implements SeekerCtx but Seek always fails
111+
type mockReadCloserCtxBrokenSeek struct {
112+
data []byte
113+
pos int64
114+
}
115+
116+
func (m *mockReadCloserCtxBrokenSeek) Read(ctx context.Context, p []byte) (int, error) {
117+
if m.pos >= int64(len(m.data)) {
118+
return 0, io.EOF
119+
}
120+
n := copy(p, m.data[m.pos:])
121+
m.pos += int64(n)
122+
return n, nil
123+
}
124+
125+
func (m *mockReadCloserCtxBrokenSeek) Close(ctx context.Context) error {
126+
return nil
127+
}
128+
129+
func (m *mockReadCloserCtxBrokenSeek) ReadAt(
130+
ctx context.Context, p []byte, off int64,
131+
) (int, error) {
132+
if off >= int64(len(m.data)) {
133+
return 0, io.EOF
134+
}
135+
n := copy(p, m.data[off:])
136+
if n < len(p) {
137+
return n, io.EOF
138+
}
139+
return n, nil
140+
}
141+
142+
func (m *mockReadCloserCtxBrokenSeek) Seek(
143+
ctx context.Context, offset int64, whence int,
144+
) (int64, error) {
145+
return 0, errors.New("seek operation failed")
146+
}
147+
148+
func TestReaderAtSeekerAdapterBasicOperations(t *testing.T) {
149+
ctx := context.Background()
150+
testData := []byte("Hello, World! This is test data for seeking.")
151+
mock := newMockReadCloserCtx(testData)
152+
153+
adapter, err := ReaderAtSeekerAdapter(ctx, mock)
154+
require.NoError(t, err)
155+
require.NotNil(t, adapter)
156+
157+
// Test that the initial seek test was called
158+
require.Equal(t, 1, mock.seekCalls, "adapter should test seek during construction")
159+
160+
// Test Read
161+
buf := make([]byte, 5)
162+
n, err := adapter.Read(buf)
163+
require.NoError(t, err)
164+
require.Equal(t, 5, n)
165+
require.Equal(t, []byte("Hello"), buf)
166+
167+
// Test ReadAt (should read at specific offset)
168+
buf = make([]byte, 5)
169+
n, err = adapter.ReadAt(buf, 7)
170+
require.NoError(t, err)
171+
require.Equal(t, 5, n)
172+
require.Equal(t, []byte("World"), buf)
173+
174+
// Test Seek to start
175+
pos, err := adapter.Seek(0, io.SeekStart)
176+
require.NoError(t, err)
177+
require.Equal(t, int64(0), pos)
178+
179+
// Test Seek current
180+
pos, err = adapter.Seek(5, io.SeekCurrent)
181+
require.NoError(t, err)
182+
require.Equal(t, int64(5), pos)
183+
184+
// Test Seek from end
185+
pos, err = adapter.Seek(-5, io.SeekEnd)
186+
require.NoError(t, err)
187+
require.Equal(t, int64(len(testData)-5), pos)
188+
189+
// Test Close
190+
err = adapter.Close()
191+
require.NoError(t, err)
192+
require.True(t, mock.closed)
193+
}
194+
195+
func TestReaderAtSeekerAdapterMissingReaderAtCtx(t *testing.T) {
196+
ctx := context.Background()
197+
198+
// Create a reader that doesn't implement ReaderAtCtx
199+
reader := ReadCloserAdapter(io.NopCloser(nil))
200+
201+
adapter, err := ReaderAtSeekerAdapter(ctx, reader)
202+
require.Error(t, err)
203+
require.Nil(t, adapter)
204+
require.Contains(t, err.Error(), "does not implement ioctx.ReaderAtCtx")
205+
}
206+
207+
func TestReaderAtSeekerAdapterMissingSeekerCtx(t *testing.T) {
208+
ctx := context.Background()
209+
210+
// Create a reader that implements ReaderAtCtx but not SeekerCtx
211+
reader := &mockReadCloserCtxNoSeek{
212+
data: []byte("test data"),
213+
}
214+
215+
adapter, err := ReaderAtSeekerAdapter(ctx, reader)
216+
require.Error(t, err)
217+
require.Nil(t, adapter)
218+
require.Contains(t, err.Error(), "does not implement ioctx.SeekerCtx")
219+
}
220+
221+
func TestReaderAtSeekerAdapterBrokenSeek(t *testing.T) {
222+
ctx := context.Background()
223+
224+
// Create a reader that implements SeekerCtx but seek fails
225+
reader := &mockReadCloserCtxBrokenSeek{
226+
data: []byte("test data"),
227+
}
228+
229+
adapter, err := ReaderAtSeekerAdapter(ctx, reader)
230+
require.Error(t, err)
231+
require.Nil(t, adapter)
232+
require.Contains(t, err.Error(), "test seek failed")
233+
}
234+
235+
func TestReaderAtSeekerAdapterReadAtEOF(t *testing.T) {
236+
ctx := context.Background()
237+
testData := []byte("short")
238+
mock := newMockReadCloserCtx(testData)
239+
240+
adapter, err := ReaderAtSeekerAdapter(ctx, mock)
241+
require.NoError(t, err)
242+
243+
// Try to read past EOF
244+
buf := make([]byte, 10)
245+
n, err := adapter.ReadAt(buf, int64(len(testData)))
246+
require.Equal(t, io.EOF, err)
247+
require.Equal(t, 0, n)
248+
}
249+
250+
func TestReaderAtSeekerAdapterInvalidSeek(t *testing.T) {
251+
ctx := context.Background()
252+
testData := []byte("test data")
253+
mock := newMockReadCloserCtx(testData)
254+
255+
adapter, err := ReaderAtSeekerAdapter(ctx, mock)
256+
require.NoError(t, err)
257+
258+
// Negative position
259+
_, err = adapter.Seek(-10, io.SeekStart)
260+
require.Error(t, err)
261+
require.Contains(t, err.Error(), "negative position")
262+
}

0 commit comments

Comments
 (0)