Skip to content

Commit 291dfa6

Browse files
mustansir14MuneebUllahKhan222
authored andcommitted
[INS-232] Fix S3 Source "panic: runtime error: index out of range" bug (#4610)
* use separate checkpointer instance for each scanBucket call, add index out of range check in checkpointer
1 parent 27f3629 commit 291dfa6

File tree

3 files changed

+27
-30
lines changed

3 files changed

+27
-30
lines changed

pkg/sources/s3/checkpointer.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,15 @@ const defaultMaxObjectsPerPage = 1000
7070

7171
// NewCheckpointer creates a new checkpointer for S3 scanning operations.
7272
// The progress provides the underlying mechanism for persisting scan state.
73-
func NewCheckpointer(ctx context.Context, progress *sources.Progress) *Checkpointer {
73+
func NewCheckpointer(ctx context.Context, progress *sources.Progress, isUnitScan bool) *Checkpointer {
7474
ctx.Logger().Info("Creating checkpointer")
7575

7676
return &Checkpointer{
7777
// We are resuming if we have completed objects from a previous scan.
7878
completedObjects: make([]bool, defaultMaxObjectsPerPage),
7979
completionOrder: make([]int, 0, defaultMaxObjectsPerPage),
8080
progress: progress,
81+
isUnitScan: isUnitScan,
8182
}
8283
}
8384

@@ -192,6 +193,10 @@ func (p *Checkpointer) UpdateObjectCompletion(
192193
if checkpointIdx < 0 {
193194
return nil // No completed objects yet
194195
}
196+
if checkpointIdx >= len(pageContents) {
197+
// this should never happen
198+
return fmt.Errorf("checkpoint index %d exceeds page contents size %d", checkpointIdx, len(pageContents))
199+
}
195200
obj := pageContents[checkpointIdx]
196201

197202
return p.updateCheckpoint(bucket, role, *obj.Key)
@@ -229,11 +234,3 @@ func (p *Checkpointer) updateCheckpoint(bucket string, role string, lastKey stri
229234
)
230235
return nil
231236
}
232-
233-
// SetIsUnitScan sets whether the checkpointer is operating in unit scan mode.
234-
func (p *Checkpointer) SetIsUnitScan(isUnitScan bool) {
235-
p.mu.Lock()
236-
defer p.mu.Unlock()
237-
238-
p.isUnitScan = isUnitScan
239-
}

pkg/sources/s3/checkpointer_test.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func TestCheckpointerResumption(t *testing.T) {
1919

2020
// First scan - process 6 objects then interrupt.
2121
initialProgress := &sources.Progress{}
22-
tracker := NewCheckpointer(ctx, initialProgress)
22+
tracker := NewCheckpointer(ctx, initialProgress, false)
2323

2424
firstPage := &s3.ListObjectsV2Output{
2525
Contents: make([]s3types.Object, 12), // Total of 12 objects
@@ -42,7 +42,7 @@ func TestCheckpointerResumption(t *testing.T) {
4242
assert.Equal(t, "key-5", resumeInfo.StartAfter)
4343

4444
// Resume scan with existing progress.
45-
resumeTracker := NewCheckpointer(ctx, initialProgress)
45+
resumeTracker := NewCheckpointer(ctx, initialProgress, false)
4646

4747
resumePage := &s3.ListObjectsV2Output{
4848
Contents: firstPage.Contents[6:], // Remaining 6 objects
@@ -66,7 +66,7 @@ func TestCheckpointerResumptionWithRole(t *testing.T) {
6666

6767
// First scan - process 6 objects then interrupt.
6868
initialProgress := &sources.Progress{}
69-
tracker := NewCheckpointer(ctx, initialProgress)
69+
tracker := NewCheckpointer(ctx, initialProgress, false)
7070
role := "test-role"
7171

7272
firstPage := &s3.ListObjectsV2Output{
@@ -91,7 +91,7 @@ func TestCheckpointerResumptionWithRole(t *testing.T) {
9191
assert.Equal(t, role, resumeInfo.Role)
9292

9393
// Resume scan with existing progress.
94-
resumeTracker := NewCheckpointer(ctx, initialProgress)
94+
resumeTracker := NewCheckpointer(ctx, initialProgress, false)
9595

9696
resumePage := &s3.ListObjectsV2Output{
9797
Contents: firstPage.Contents[6:], // Remaining 6 objects
@@ -124,7 +124,7 @@ func TestCheckpointerReset(t *testing.T) {
124124

125125
ctx := context.Background()
126126
progress := new(sources.Progress)
127-
tracker := NewCheckpointer(ctx, progress)
127+
tracker := NewCheckpointer(ctx, progress, false)
128128

129129
tracker.completedObjects[1] = true
130130
tracker.completedObjects[2] = true
@@ -441,8 +441,7 @@ func TestCheckpointerUpdateWithRole(t *testing.T) {
441441
func TestCheckpointerUpdateUnitScan(t *testing.T) {
442442
ctx := context.Background()
443443
progress := new(sources.Progress)
444-
tracker := NewCheckpointer(ctx, progress)
445-
tracker.SetIsUnitScan(true)
444+
tracker := NewCheckpointer(ctx, progress, true)
446445

447446
page := &s3.ListObjectsV2Output{
448447
Contents: make([]s3types.Object, 3),
@@ -528,7 +527,7 @@ func TestComplete(t *testing.T) {
528527
EncodedResumeInfo: tt.initialState.resumeInfo,
529528
Message: tt.initialState.message,
530529
}
531-
tracker := NewCheckpointer(ctx, progress)
530+
tracker := NewCheckpointer(ctx, progress, false)
532531

533532
err := tracker.Complete(ctx, tt.completeMessage)
534533
assert.NoError(t, err)

pkg/sources/s3/s3.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ type Source struct {
4848
concurrency int
4949
conn *sourcespb.S3
5050

51-
checkpointer *Checkpointer
5251
sources.Progress
5352
metricsCollector metricsCollector
5453

@@ -95,7 +94,6 @@ func (s *Source) Init(
9594
}
9695
s.conn = &conn
9796

98-
s.checkpointer = NewCheckpointer(ctx, &s.Progress)
9997
s.metricsCollector = metricsInstance
10098

10199
s.setMaxObjectSize(conn.GetMaxObjectSize())
@@ -299,7 +297,8 @@ func (s *Source) scanBuckets(
299297
}
300298
var totalObjectCount uint64
301299

302-
pos := determineResumePosition(ctx, s.checkpointer, bucketsToScan)
300+
checkpointer := NewCheckpointer(ctx, &s.Progress, false)
301+
pos := determineResumePosition(ctx, checkpointer, bucketsToScan)
303302
switch {
304303
case pos.isNewScan:
305304
ctx.Logger().Info("Starting new scan from beginning")
@@ -340,7 +339,7 @@ func (s *Source) scanBuckets(
340339
)
341340
}
342341

343-
objectCount := s.scanBucket(ctx, client, role, bucket, sources.ChanReporter{Ch: chunksChan}, startAfter)
342+
objectCount := s.scanBucket(ctx, client, role, bucket, sources.ChanReporter{Ch: chunksChan}, startAfter, checkpointer)
344343
totalObjectCount += objectCount
345344
}
346345

@@ -359,6 +358,7 @@ func (s *Source) scanBucket(
359358
bucket string,
360359
reporter sources.ChunkReporter,
361360
startAfter *string,
361+
checkpointer *Checkpointer,
362362
) uint64 {
363363
s.metricsCollector.RecordBucketForRole(role)
364364

@@ -412,7 +412,7 @@ func (s *Source) scanBucket(
412412
errorCount: &errorCount,
413413
objectCount: &objectCount,
414414
}
415-
s.pageChunker(ctx, pageMetadata, processingState, reporter)
415+
s.pageChunker(ctx, pageMetadata, processingState, reporter, checkpointer)
416416

417417
pageNumber++
418418
}
@@ -458,8 +458,9 @@ func (s *Source) pageChunker(
458458
metadata pageMetadata,
459459
state processingState,
460460
reporter sources.ChunkReporter,
461+
checkpointer *Checkpointer,
461462
) {
462-
s.checkpointer.Reset() // Reset the checkpointer for each PAGE
463+
checkpointer.Reset() // Reset the checkpointer for each PAGE
463464
ctx = context.WithValues(ctx, "bucket", metadata.bucket, "page_number", metadata.pageNumber)
464465
for objIdx, obj := range metadata.page.Contents {
465466
ctx = context.WithValues(ctx, "key", *obj.Key, "size", *obj.Size)
@@ -471,7 +472,7 @@ func (s *Source) pageChunker(
471472
if obj.StorageClass == s3types.ObjectStorageClassGlacier || obj.StorageClass == s3types.ObjectStorageClassGlacierIr {
472473
ctx.Logger().V(5).Info("Skipping object in storage class", "storage_class", obj.StorageClass)
473474
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "storage_class", float64(*obj.Size))
474-
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.role, metadata.page.Contents); err != nil {
475+
if err := checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.role, metadata.page.Contents); err != nil {
475476
ctx.Logger().Error(err, "could not update progress for glacier object")
476477
}
477478
continue
@@ -481,7 +482,7 @@ func (s *Source) pageChunker(
481482
if *obj.Size > s.maxObjectSize {
482483
ctx.Logger().V(5).Info("Skipping large file", "max_object_size", s.maxObjectSize)
483484
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "size_limit", float64(*obj.Size))
484-
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.role, metadata.page.Contents); err != nil {
485+
if err := checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.role, metadata.page.Contents); err != nil {
485486
ctx.Logger().Error(err, "could not update progress for large file")
486487
}
487488
continue
@@ -491,7 +492,7 @@ func (s *Source) pageChunker(
491492
if *obj.Size == 0 {
492493
ctx.Logger().V(5).Info("Skipping empty file")
493494
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "empty_file", 0)
494-
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.role, metadata.page.Contents); err != nil {
495+
if err := checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.role, metadata.page.Contents); err != nil {
495496
ctx.Logger().Error(err, "could not update progress for empty file")
496497
}
497498
continue
@@ -501,7 +502,7 @@ func (s *Source) pageChunker(
501502
if common.SkipFile(*obj.Key) {
502503
ctx.Logger().V(5).Info("Skipping file with incompatible extension")
503504
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "incompatible_extension", float64(*obj.Size))
504-
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.role, metadata.page.Contents); err != nil {
505+
if err := checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.role, metadata.page.Contents); err != nil {
505506
ctx.Logger().Error(err, "could not update progress for incompatible file")
506507
}
507508
continue
@@ -613,7 +614,7 @@ func (s *Source) pageChunker(
613614
state.errorCount.Store(prefix, 0)
614615
}
615616
// Update progress after successful processing.
616-
if err := s.checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.role, metadata.page.Contents); err != nil {
617+
if err := checkpointer.UpdateObjectCompletion(ctx, objIdx, metadata.bucket, metadata.role, metadata.page.Contents); err != nil {
617618
ctx.Logger().Error(err, "could not update progress for scanned object")
618619
}
619620
s.metricsCollector.RecordObjectScanned(metadata.bucket, float64(*obj.Size))
@@ -744,7 +745,7 @@ func (s *Source) ChunkUnit(ctx context.Context, unit sources.SourceUnit, reporte
744745
return fmt.Errorf("could not create s3 client for bucket %s and role %s: %w", s3unit.Bucket, s3unit.Role, err)
745746
}
746747

747-
s.checkpointer.SetIsUnitScan(true)
748+
checkpointer := NewCheckpointer(ctx, &s.Progress, true)
748749

749750
var startAfterPtr *string
750751
startAfter := s.Progress.GetEncodedResumeInfoFor(unitID)
@@ -757,7 +758,7 @@ func (s *Source) ChunkUnit(ctx context.Context, unit sources.SourceUnit, reporte
757758
startAfterPtr = &startAfter
758759
}
759760
defer s.Progress.ClearEncodedResumeInfoFor(unitID)
760-
s.scanBucket(ctx, defaultClient, s3unit.Role, s3unit.Bucket, reporter, startAfterPtr)
761+
s.scanBucket(ctx, defaultClient, s3unit.Role, s3unit.Bucket, reporter, startAfterPtr, checkpointer)
761762
return nil
762763
}
763764

0 commit comments

Comments
 (0)