Skip to content

Commit c0602e9

Browse files
committed
add optimized path for listing using reader at and range requests
1 parent 1aba4a7 commit c0602e9

File tree

8 files changed

+287
-32
lines changed

8 files changed

+287
-32
lines changed

main.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,10 @@ var (
6060
deleteTarget = deleteCmd.Flag("target", "Target storage name").Required().String()
6161

6262
// List command
63-
listCmd = app.Command("list", "List files in a zip archive")
64-
listKey = listCmd.Flag("key", "Storage key of the zip file").String()
65-
listURL = listCmd.Flag("url", "URL of the zip file").String()
63+
listCmd = app.Command("list", "List files in a zip archive")
64+
listKey = listCmd.Flag("key", "Storage key of the zip file").String()
65+
listURL = listCmd.Flag("url", "URL of the zip file").String()
66+
listFile = listCmd.Flag("file", "Local path to zip file").String()
6667

6768
// Slurp command
6869
slurpCmd = app.Command("slurp", "Download URL and store in storage")
@@ -260,18 +261,29 @@ func runDelete(config *zipserver.Config) {
260261
}
261262

262263
func runList(config *zipserver.Config) {
263-
if *listKey == "" && *listURL == "" {
264-
log.Fatal("Either --key or --url must be specified")
264+
count := 0
265+
if *listKey != "" {
266+
count++
265267
}
266-
if *listKey != "" && *listURL != "" {
267-
log.Fatal("Only one of --key or --url can be specified")
268+
if *listURL != "" {
269+
count++
270+
}
271+
if *listFile != "" {
272+
count++
273+
}
274+
if count == 0 {
275+
log.Fatal("One of --key, --url, or --file must be specified")
276+
}
277+
if count > 1 {
278+
log.Fatal("Only one of --key, --url, or --file can be specified")
268279
}
269280

270281
ops := zipserver.NewOperations(config)
271282

272283
params := zipserver.ListParams{
273-
Key: *listKey,
274-
URL: *listURL,
284+
Key: *listKey,
285+
URL: *listURL,
286+
File: *listFile,
275287
}
276288

277289
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(config.FileGetTimeout))

zipserver/archive_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,10 @@ func (m *mockFailingStorage) DeleteFile(_ context.Context, _, _ string) error {
690690
return nil
691691
}
692692

693+
func (m *mockFailingStorage) GetReaderAt(_ context.Context, _, _ string, _ uint64) (ReaderAtCloser, int64, error) {
694+
return nil, 0, errors.New("not implemented")
695+
}
696+
693697
type mockFailingReadCloser struct {
694698
t *testing.T
695699
path string

zipserver/gcs_storage.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,98 @@ func (c *GcsStorage) PutFile(ctx context.Context, bucket, key string, contents i
161161
return result, nil
162162
}
163163

164+
// gcsReaderAt implements ReaderAtCloser using HTTP Range requests
165+
type gcsReaderAt struct {
166+
client *http.Client
167+
url string
168+
size int64
169+
maxBytes uint64 // maximum total bytes to read (0 = unlimited)
170+
bytesRead uint64 // total bytes read so far
171+
ctx context.Context
172+
}
173+
174+
func (r *gcsReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
175+
if off >= r.size {
176+
return 0, io.EOF
177+
}
178+
179+
end := off + int64(len(p)) - 1
180+
if end >= r.size {
181+
end = r.size - 1
182+
}
183+
184+
toRead := uint64(end - off + 1)
185+
if r.maxBytes > 0 && r.bytesRead+toRead > r.maxBytes {
186+
return 0, fmt.Errorf("max read limit exceeded (%d bytes)", r.maxBytes)
187+
}
188+
189+
req, err := http.NewRequestWithContext(r.ctx, http.MethodGet, r.url, nil)
190+
if err != nil {
191+
return 0, err
192+
}
193+
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", off, end))
194+
195+
resp, err := r.client.Do(req)
196+
if err != nil {
197+
return 0, err
198+
}
199+
defer resp.Body.Close()
200+
201+
if resp.StatusCode != http.StatusPartialContent {
202+
return 0, fmt.Errorf("range request failed: %s", resp.Status)
203+
}
204+
205+
n, err = io.ReadFull(resp.Body, p[:end-off+1])
206+
r.bytesRead += uint64(n)
207+
return n, err
208+
}
209+
210+
func (r *gcsReaderAt) Close() error {
211+
return nil // No resources to release
212+
}
213+
214+
// GetReaderAt returns a ReaderAt for the file, suitable for random access reads.
215+
// This is more efficient than GetFile for operations that only need partial file access.
216+
// maxBytes limits the total bytes that can be read (0 = unlimited).
217+
func (c *GcsStorage) GetReaderAt(ctx context.Context, bucket, key string, maxBytes uint64) (ReaderAtCloser, int64, error) {
218+
httpClient, err := c.httpClient()
219+
if err != nil {
220+
return nil, 0, err
221+
}
222+
223+
url := c.url(bucket, key, "HEAD")
224+
req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
225+
if err != nil {
226+
return nil, 0, err
227+
}
228+
229+
res, err := httpClient.Do(req)
230+
if err != nil {
231+
return nil, 0, err
232+
}
233+
res.Body.Close()
234+
235+
if res.StatusCode != http.StatusOK {
236+
return nil, 0, errors.New(res.Status + " " + url)
237+
}
238+
239+
size := res.ContentLength
240+
if size < 0 {
241+
return nil, 0, errors.New("server did not return Content-Length")
242+
}
243+
244+
// Use GET URL for the reader
245+
getURL := c.url(bucket, key, "GET(range)")
246+
247+
return &gcsReaderAt{
248+
client: httpClient,
249+
url: getURL,
250+
size: size,
251+
maxBytes: maxBytes,
252+
ctx: ctx,
253+
}, size, nil
254+
}
255+
164256
// DeleteFile removes a file from a GCS bucket
165257
func (c *GcsStorage) DeleteFile(ctx context.Context, bucket, key string) error {
166258
httpClient, err := c.httpClient()

zipserver/list_handler.go

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ func (o *Operations) List(ctx context.Context, params ListParams) ListResult {
2525
if params.URL != "" {
2626
return o.listFromURL(ctx, params.URL)
2727
}
28-
return ListResult{Err: fmt.Errorf("either Key or URL must be specified")}
28+
if params.File != "" {
29+
return o.listFromFile(params.File)
30+
}
31+
return ListResult{Err: fmt.Errorf("either Key, URL, or File must be specified")}
2932
}
3033

3134
func (o *Operations) listFromBucket(ctx context.Context, key string) ListResult {
@@ -34,36 +37,36 @@ func (o *Operations) listFromBucket(ctx context.Context, key string) ListResult
3437
return ListResult{Err: err}
3538
}
3639

37-
reader, headers, err := storage.GetFile(ctx, o.config.Bucket, key)
40+
readerAt, size, err := storage.GetReaderAt(ctx, o.config.Bucket, key, o.config.MaxInputZipSize)
3841
if err != nil {
3942
return ListResult{Err: err}
4043
}
41-
defer reader.Close()
42-
43-
if headers != nil && o.config.MaxInputZipSize > 0 {
44-
if contentLength := headers.Get("Content-Length"); contentLength != "" {
45-
size, err := strconv.ParseInt(contentLength, 10, 64)
46-
if err != nil {
47-
return ListResult{Err: fmt.Errorf("invalid Content-Length: %w", err)}
48-
}
49-
if err := checkContentLength(o.config.MaxInputZipSize, size); err != nil {
50-
return ListResult{Err: err}
51-
}
52-
}
53-
}
44+
defer readerAt.Close()
5445

55-
var body []byte
5646
if o.config.MaxInputZipSize > 0 {
57-
var bytesRead uint64
58-
body, err = io.ReadAll(limitedReader(reader, o.config.MaxInputZipSize, &bytesRead))
59-
} else {
60-
body, err = io.ReadAll(reader)
47+
if err := checkContentLength(o.config.MaxInputZipSize, size); err != nil {
48+
return ListResult{Err: err}
49+
}
6150
}
51+
52+
zipFile, err := zip.NewReader(readerAt, size)
6253
if err != nil {
6354
return ListResult{Err: err}
6455
}
6556

66-
return o.listZipBytes(body)
57+
if o.config.MaxListFiles > 0 && len(zipFile.File) > o.config.MaxListFiles {
58+
return ListResult{Err: fmt.Errorf("zip too many files (max %d)", o.config.MaxListFiles)}
59+
}
60+
61+
var files []fileTuple
62+
for _, file := range zipFile.File {
63+
files = append(files, fileTuple{
64+
Filename: file.Name,
65+
Size: file.UncompressedSize64,
66+
})
67+
}
68+
69+
return ListResult{Files: files}
6770
}
6871

6972
func (o *Operations) listFromURL(ctx context.Context, url string) ListResult {
@@ -102,6 +105,28 @@ func (o *Operations) listFromURL(ctx context.Context, url string) ListResult {
102105
return o.listZipBytes(body)
103106
}
104107

108+
func (o *Operations) listFromFile(path string) ListResult {
109+
zipFile, err := zip.OpenReader(path)
110+
if err != nil {
111+
return ListResult{Err: err}
112+
}
113+
defer zipFile.Close()
114+
115+
if o.config.MaxListFiles > 0 && len(zipFile.File) > o.config.MaxListFiles {
116+
return ListResult{Err: fmt.Errorf("zip too many files (max %d)", o.config.MaxListFiles)}
117+
}
118+
119+
var files []fileTuple
120+
for _, file := range zipFile.File {
121+
files = append(files, fileTuple{
122+
Filename: file.Name,
123+
Size: file.UncompressedSize64,
124+
})
125+
}
126+
127+
return ListResult{Files: files}
128+
}
129+
105130
func (o *Operations) listZipBytes(body []byte) ListResult {
106131
zipFile, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
107132
if err != nil {

zipserver/mem_storage.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,40 @@ func (fs *MemStorage) GetFile(ctx context.Context, bucket, key string) (io.ReadC
9696
return nil, nil, fmt.Errorf("%s: object not found", objectPath)
9797
}
9898

99+
// memReaderAt wraps bytes.Reader to implement ReaderAtCloser with read limits
100+
type memReaderAt struct {
101+
*bytes.Reader
102+
maxBytes uint64
103+
bytesRead uint64
104+
}
105+
106+
func (r *memReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
107+
toRead := uint64(len(p))
108+
if r.maxBytes > 0 && r.bytesRead+toRead > r.maxBytes {
109+
return 0, fmt.Errorf("max read limit exceeded (%d bytes)", r.maxBytes)
110+
}
111+
n, err = r.Reader.ReadAt(p, off)
112+
r.bytesRead += uint64(n)
113+
return n, err
114+
}
115+
116+
func (r *memReaderAt) Close() error {
117+
return nil
118+
}
119+
120+
func (fs *MemStorage) GetReaderAt(ctx context.Context, bucket, key string, maxBytes uint64) (ReaderAtCloser, int64, error) {
121+
fs.mutex.Lock()
122+
defer fs.mutex.Unlock()
123+
124+
objectPath := fs.objectPath(bucket, key)
125+
126+
if obj, ok := fs.objects[objectPath]; ok {
127+
return &memReaderAt{Reader: bytes.NewReader(obj.data), maxBytes: maxBytes}, int64(len(obj.data)), nil
128+
}
129+
130+
return nil, 0, fmt.Errorf("%s: object not found", objectPath)
131+
}
132+
99133
func (fs *MemStorage) getHeaders(bucket, key string) (http.Header, error) {
100134
fs.mutex.Lock()
101135
defer fs.mutex.Unlock()

zipserver/operations.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ type DeleteOperationResult struct {
5858

5959
// ListParams contains parameters for the list operation
6060
type ListParams struct {
61-
Key string // Storage key of the zip file (mutually exclusive with URL)
62-
URL string // URL of the zip file (mutually exclusive with Key)
61+
Key string // Storage key of the zip file (mutually exclusive with URL/File)
62+
URL string // URL of the zip file (mutually exclusive with Key/File)
63+
File string // Local path to zip file (mutually exclusive with Key/URL)
6364
}
6465

6566
// ListResult contains the result of a list operation

zipserver/s3_storage.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,85 @@ func (c *S3Storage) DeleteFile(ctx context.Context, bucket, key string) error {
139139
return nil
140140
}
141141

142+
// s3ReaderAt implements ReaderAtCloser using S3 range requests
143+
type s3ReaderAt struct {
144+
svc *s3.S3
145+
bucket string
146+
key string
147+
size int64
148+
maxBytes uint64 // maximum total bytes to read (0 = unlimited)
149+
bytesRead uint64 // total bytes read so far
150+
ctx context.Context
151+
}
152+
153+
func (r *s3ReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
154+
if off >= r.size {
155+
return 0, io.EOF
156+
}
157+
158+
end := off + int64(len(p)) - 1
159+
if end >= r.size {
160+
end = r.size - 1
161+
}
162+
163+
toRead := uint64(end - off + 1)
164+
if r.maxBytes > 0 && r.bytesRead+toRead > r.maxBytes {
165+
return 0, fmt.Errorf("max read limit exceeded (%d bytes)", r.maxBytes)
166+
}
167+
168+
rangeStr := fmt.Sprintf("bytes=%d-%d", off, end)
169+
result, err := r.svc.GetObjectWithContext(r.ctx, &s3.GetObjectInput{
170+
Bucket: aws.String(r.bucket),
171+
Key: aws.String(r.key),
172+
Range: aws.String(rangeStr),
173+
})
174+
if err != nil {
175+
return 0, err
176+
}
177+
defer result.Body.Close()
178+
179+
n, err = io.ReadFull(result.Body, p[:end-off+1])
180+
r.bytesRead += uint64(n)
181+
return n, err
182+
}
183+
184+
func (r *s3ReaderAt) Close() error {
185+
return nil // No resources to release
186+
}
187+
188+
// GetReaderAt returns a ReaderAt for the file, suitable for random access reads.
189+
// This is more efficient than GetFile for operations that only need partial file access.
190+
// maxBytes limits the total bytes that can be read (0 = unlimited).
191+
func (c *S3Storage) GetReaderAt(ctx context.Context, bucket, key string, maxBytes uint64) (ReaderAtCloser, int64, error) {
192+
svc := s3.New(c.Session)
193+
194+
// Get file size via HeadObject
195+
head, err := svc.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
196+
Bucket: aws.String(bucket),
197+
Key: aws.String(key),
198+
})
199+
if err != nil {
200+
return nil, 0, err
201+
}
202+
203+
size := int64(0)
204+
if head.ContentLength != nil {
205+
size = *head.ContentLength
206+
}
207+
if size == 0 {
208+
return nil, 0, fmt.Errorf("server did not return Content-Length")
209+
}
210+
211+
return &s3ReaderAt{
212+
svc: svc,
213+
bucket: bucket,
214+
key: key,
215+
size: size,
216+
maxBytes: maxBytes,
217+
ctx: ctx,
218+
}, size, nil
219+
}
220+
142221
// GetFile implements Storage interface - downloads a file from S3
143222
func (c *S3Storage) GetFile(ctx context.Context, bucket, key string) (io.ReadCloser, http.Header, error) {
144223
svc := s3.New(c.Session)

0 commit comments

Comments
 (0)