Fix: Implement backup_to_bucket_ and delete_after_backup for aws-s3 polling mode#49734
Fix: Implement backup_to_bucket_ and delete_after_backup for aws-s3 polling mode#49734MichaelKatsoulis wants to merge 1 commit intoelastic:mainfrom
Conversation
🤖 GitHub commentsJust comment with:
|
|
Pinging @elastic/obs-ds-hosted-services (Team:obs-ds-hosted-services) |
📝 WalkthroughWalkthroughThe AWS S3 input's worker loop now integrates S3 object finalization into the existing ACK callback mechanism. Previously, the ACK callback only checkpointed state and updated metrics. With these changes, when events from an S3 object are successfully processed and acknowledged, the finalization step (backup to destination bucket and conditional deletion) executes after all events are ACKed. Finalization errors are logged and reported as degraded status, but state checkpointing proceeds regardless of finalization outcome. Tests validate the finalization flow with mocked S3 operations. 🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
x-pack/filebeat/input/awss3/s3_input.go (1)
238-256:⚠️ Potential issue | 🔴 CriticalUnconditional state checkpoint after finalize failure causes permanent backup/delete skip on shutdown.
The ACK callback at lines 241–259 runs asynchronously (acks.go:97) after the context is canceled during shutdown. When
finalize()fails with "context canceled" at line 245, the error is logged butAddState()at line 253 still executes unconditionally, marking the object as processed. This prevents re-processing on restart, so the backup/delete is permanently skipped. Only callAddState()if finalization succeeds or is skipped (whenstate.Storedis false).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@x-pack/filebeat/input/awss3/s3_input.go` around lines 238 - 256, The ACK callback registered with acks.Add currently calls in.registry.AddState unconditionally even when finalize() failed (e.g., due to context canceled), causing objects to be marked done and skipped on restart; update the callback in the acks.Add closure to only call in.registry.AddState(state) when finalization either was not required (state.Stored == false) or when finalize() returned nil (success). Locate the closure referencing finalize, state.Stored and in.registry.AddState and wrap the AddState call in a conditional that skips checkpointing when finalize() returned an error (but keep the existing error logs and status.UpdateStatus calls).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@x-pack/filebeat/input/awss3/s3_input.go`:
- Around line 238-256: The ACK callback registered with acks.Add currently calls
in.registry.AddState unconditionally even when finalize() failed (e.g., due to
context canceled), causing objects to be marked done and skipped on restart;
update the callback in the acks.Add closure to only call
in.registry.AddState(state) when finalization either was not required
(state.Stored == false) or when finalize() returned nil (success). Locate the
closure referencing finalize, state.Stored and in.registry.AddState and wrap the
AddState call in a conditional that skips checkpointing when finalize() returned
an error (but keep the existing error logs and status.UpdateStatus calls).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4073ab27-93cb-4fe2-8798-4a2e2633f4ea
📒 Files selected for processing (2)
x-pack/filebeat/input/awss3/s3_input.gox-pack/filebeat/input/awss3/s3_test.go
When Filebeat
aws-s3input runs in polling mode (e.g.non_aws_bucket_namefor MinIO/S3-compatible), it can fully process an object (events published and registry state stored) but never performbackup_to_bucket_*anddelete_after_backup.User-visible symptom
stored: true.CopyObject/DeleteObjectcalls for the processed object.Root cause (bug)
Object finalization (backup/delete) is implemented in
(*s3ObjectProcessor).FinalizeS3Object()ins3_objects.go.sqs_s3_event.go),FinalizeS3Object()is collected and executed after successful processing / ACK.s3_input.go), the ACK callback used to only checkpoint state (in.registry.AddState(state)) and increment metrics. It did not callFinalizeS3Object().As a result, polling mode could mark objects as completed in the registry without ever copying/deleting them.
Fix
In
s3_input.go(poller worker loop), callFinalizeS3Object()inside the ACK callback after all events for the object are ACKed, and only when processing succeeded (state.Stored == true).Checklist
stresstest.shscript to run them under stress conditions and race detector to verify their stability../changelog/fragmentsusing the changelog tool.Disruptive User Impact
None. It fixes the problem.
How to test this PR locally
Related issues
Use cases
Screenshots
Logs