Skip to content

Commit f04627e

Browse files
committed
fix: ensure cache is synced to disk immediately after successful uploads
- Add proper error handling for all cache.save() calls after successful uploads - Log warnings if cache save fails instead of silently ignoring errors - Ensures cache data is persisted immediately after each upload, not deferred - Prevents loss of cache metadata if process crashes or is killed - Applies to both regular partitions and partition slices
1 parent 8513f9e commit f04627e

File tree

1 file changed

+33
-15
lines changed

1 file changed

+33
-15
lines changed

cmd/archiver.go

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,10 +1300,13 @@ func (a *Archiver) processSinglePartition(partition PartitionInfo, program *tea.
13001300
}
13011301
}
13021302

1303-
// Save metadata to cache after successful upload
1303+
// Save metadata to cache immediately after successful upload
13041304
cache.setFileMetadataWithETagAndStartTime(partition.TableName, objectKey, fileSize, uncompressedSize, md5Hash, multipartETag, true, startTime)
1305-
_ = cache.save(a.config.CacheScope)
1306-
a.logger.Debug(fmt.Sprintf(" 💾 Saved file metadata to cache: compressed=%d, uncompressed=%d, md5=%s, multipartETag=%s", fileSize, uncompressedSize, md5Hash, multipartETag))
1305+
if err := cache.save(a.config.CacheScope); err != nil {
1306+
a.logger.Warn(fmt.Sprintf(" ⚠️ Failed to save cache metadata: %v", err))
1307+
} else {
1308+
a.logger.Debug(fmt.Sprintf(" 💾 Saved file metadata to cache: compressed=%d, uncompressed=%d, md5=%s, multipartETag=%s", fileSize, uncompressedSize, md5Hash, multipartETag))
1309+
}
13071310
}
13081311

13091312
result.Stage = "Complete"
@@ -1427,9 +1430,11 @@ func (a *Archiver) processSinglePartitionSlice(partition PartitionInfo, _ *tea.P
14271430
result.Stage = StageSkipped
14281431
// Clean up temp file
14291432
cleanupTempFile(tempFilePath)
1430-
// Save to cache
1433+
// Save to cache immediately
14311434
cache.setFileMetadataWithETagAndStartTime(partition.TableName, objectKey, fileSize, uncompressedSize, md5Hash, "", true, sliceStartTime)
1432-
_ = cache.save(a.config.CacheScope)
1435+
if err := cache.save(a.config.CacheScope); err != nil {
1436+
a.logger.Warn(fmt.Sprintf(" ⚠️ Failed to save cache metadata: %v", err))
1437+
}
14331438
result.Duration = time.Since(sliceStartTime)
14341439
return result
14351440
}
@@ -1443,9 +1448,11 @@ func (a *Archiver) processSinglePartitionSlice(partition PartitionInfo, _ *tea.P
14431448
result.Stage = StageSkipped
14441449
// Clean up temp file
14451450
cleanupTempFile(tempFilePath)
1446-
// Save to cache with multipart ETag
1451+
// Save to cache immediately with multipart ETag
14471452
cache.setFileMetadataWithETagAndStartTime(partition.TableName, objectKey, fileSize, uncompressedSize, md5Hash, multipartETag, true, sliceStartTime)
1448-
_ = cache.save(a.config.CacheScope)
1453+
if err := cache.save(a.config.CacheScope); err != nil {
1454+
a.logger.Warn(fmt.Sprintf(" ⚠️ Failed to save cache metadata: %v", err))
1455+
}
14491456
result.Duration = time.Since(sliceStartTime)
14501457
return result
14511458
}
@@ -1486,9 +1493,11 @@ func (a *Archiver) processSinglePartitionSlice(partition PartitionInfo, _ *tea.P
14861493
}
14871494
}
14881495

1489-
// Save metadata to cache with multipart ETag
1496+
// Save metadata to cache immediately after successful upload
14901497
cache.setFileMetadataWithETagAndStartTime(partition.TableName, objectKey, fileSize, uncompressedSize, md5Hash, multipartETag, true, sliceStartTime)
1491-
_ = cache.save(a.config.CacheScope)
1498+
if err := cache.save(a.config.CacheScope); err != nil {
1499+
a.logger.Warn(fmt.Sprintf(" ⚠️ Failed to save cache metadata: %v", err))
1500+
}
14921501

14931502
// Only log in debug mode when TUI is disabled
14941503
if a.config.Debug {
@@ -1641,9 +1650,11 @@ func (a *Archiver) checkExistingFile(partition PartitionInfo, objectKey string,
16411650
result.SkipReason = fmt.Sprintf("Already exists with matching size (%d bytes) and MD5 (%s)", s3Size, s3ETag)
16421651
result.Stage = StageSkipped
16431652
a.logger.Debug(" ✅ Skipping: Size and MD5 match")
1644-
// Save to cache for future runs
1653+
// Save to cache immediately for future runs
16451654
cache.setFileMetadata(partition.TableName, objectKey, localSize, uncompressedSize, localMD5, true)
1646-
_ = cache.save(a.config.CacheScope)
1655+
if err := cache.save(a.config.CacheScope); err != nil {
1656+
a.logger.Warn(fmt.Sprintf(" ⚠️ Failed to save cache metadata: %v", err))
1657+
}
16471658
return true, result
16481659
}
16491660

@@ -1656,9 +1667,11 @@ func (a *Archiver) checkExistingFile(partition PartitionInfo, objectKey string,
16561667
result.SkipReason = fmt.Sprintf("Already exists with matching size (%d bytes) and multipart ETag (%s)", s3Size, s3ETag)
16571668
result.Stage = StageSkipped
16581669
a.logger.Debug(" ✅ Skipping: Size and multipart ETag match")
1659-
// Save to cache for future runs with multipart ETag
1670+
// Save to cache immediately for future runs with multipart ETag
16601671
cache.setFileMetadataWithETagAndStartTime(partition.TableName, objectKey, localSize, uncompressedSize, localMD5, localMultipartETag, true, time.Time{})
1661-
_ = cache.save(a.config.CacheScope)
1672+
if err := cache.save(a.config.CacheScope); err != nil {
1673+
a.logger.Warn(fmt.Sprintf(" ⚠️ Failed to save cache metadata: %v", err))
1674+
}
16621675
return true, result
16631676
}
16641677
a.logger.Debug(fmt.Sprintf(" ❌ Multipart ETag mismatch: S3=%s, Local=%s", s3ETag, localMultipartETag))
@@ -2500,10 +2513,15 @@ func (a *Archiver) extractPartitionDataStreaming(partition PartitionInfo, progra
25002513
}
25012514
}
25022515

2503-
// Save row count to cache if it was unknown
2516+
// Save row count to cache immediately if it was unknown
25042517
if partition.RowCount <= 0 && rowCount > 0 {
25052518
cache.setRowCount(partition.TableName, rowCount)
2506-
_ = cache.save(a.config.CacheScope)
2519+
if err := cache.save(a.config.CacheScope); err != nil {
2520+
// Log warning but don't fail - row count caching is not critical
2521+
if a.config.Debug {
2522+
a.logger.Debug(fmt.Sprintf(" ⚠️ Failed to save row count to cache: %v", err))
2523+
}
2524+
}
25072525
}
25082526

25092527
return tempFilePath, fileSize, md5Hash, uncompressedSize, nil

0 commit comments

Comments
 (0)