Skip to content

Commit 8ea2fe9

Browse files
authored
fix(s3): ignore unrelated messages in events (#480)
1 parent 1e5e684 commit 8ea2fe9

File tree

1 file changed

+10
-6
lines changed

1 file changed

+10
-6
lines changed

src/ops/sources/amazon_s3.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -178,16 +178,15 @@ impl SourceExecutor for Executor {
178178

179179
#[derive(Debug, Deserialize)]
180180
pub struct S3EventNotification {
181-
#[serde(rename = "Records")]
181+
#[serde(default, rename = "Records")]
182182
pub records: Vec<S3EventRecord>,
183183
}
184184

185185
#[derive(Debug, Deserialize)]
186186
pub struct S3EventRecord {
187187
#[serde(rename = "eventName")]
188188
pub event_name: String,
189-
// pub eventTime: String,
190-
pub s3: S3Entity,
189+
pub s3: Option<S3Entity>,
191190
}
192191

193192
#[derive(Debug, Deserialize)]
@@ -225,21 +224,26 @@ impl Executor {
225224
for message in messages.into_iter().filter_map(|m| m.body) {
226225
let notification: S3EventNotification = serde_json::from_str(&message)?;
227226
for record in notification.records {
228-
if record.s3.bucket.name != self.bucket_name {
227+
let s3 = if let Some(s3) = record.s3 {
228+
s3
229+
} else {
230+
continue;
231+
};
232+
if s3.bucket.name != self.bucket_name {
229233
continue;
230234
}
231235
if !self
232236
.prefix
233237
.as_ref()
234-
.map_or(true, |prefix| record.s3.object.key.starts_with(prefix))
238+
.map_or(true, |prefix| s3.object.key.starts_with(prefix))
235239
{
236240
continue;
237241
}
238242
if record.event_name.starts_with("ObjectCreated:")
239243
|| record.event_name.starts_with("ObjectDeleted:")
240244
{
241245
changes.push(SourceChange {
242-
key: KeyValue::Str(record.s3.object.key.into()),
246+
key: KeyValue::Str(s3.object.key.into()),
243247
data: None,
244248
});
245249
}

0 commit comments

Comments
 (0)