From f0b4ed9eb738154cab0160f8f6bc6ac4f8dfefee Mon Sep 17 00:00:00 2001 From: LJ Date: Tue, 13 May 2025 12:54:46 -0700 Subject: [PATCH] fix(s3): ignore unrelated messages in events --- src/ops/sources/amazon_s3.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/ops/sources/amazon_s3.rs b/src/ops/sources/amazon_s3.rs index da37f193e..ffdb5a11d 100644 --- a/src/ops/sources/amazon_s3.rs +++ b/src/ops/sources/amazon_s3.rs @@ -178,7 +178,7 @@ impl SourceExecutor for Executor { #[derive(Debug, Deserialize)] pub struct S3EventNotification { - #[serde(rename = "Records")] + #[serde(default, rename = "Records")] pub records: Vec, } @@ -186,8 +186,7 @@ pub struct S3EventNotification { pub struct S3EventRecord { #[serde(rename = "eventName")] pub event_name: String, - // pub eventTime: String, - pub s3: S3Entity, + pub s3: Option, } #[derive(Debug, Deserialize)] @@ -225,13 +224,18 @@ impl Executor { for message in messages.into_iter().filter_map(|m| m.body) { let notification: S3EventNotification = serde_json::from_str(&message)?; for record in notification.records { - if record.s3.bucket.name != self.bucket_name { + let s3 = if let Some(s3) = record.s3 { + s3 + } else { + continue; + }; + if s3.bucket.name != self.bucket_name { continue; } if !self .prefix .as_ref() - .map_or(true, |prefix| record.s3.object.key.starts_with(prefix)) + .map_or(true, |prefix| s3.object.key.starts_with(prefix)) { continue; } @@ -239,7 +243,7 @@ impl Executor { || record.event_name.starts_with("ObjectDeleted:") { changes.push(SourceChange { - key: KeyValue::Str(record.s3.object.key.into()), + key: KeyValue::Str(s3.object.key.into()), data: None, }); }