Skip to content

Commit 3177287

Browse files
feat!: enabling sqs event processing
1 parent defb177 commit 3177287

File tree

2 files changed

+63
-8
lines changed

2 files changed

+63
-8
lines changed

Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,16 @@ authors = [
1111
anyhow = "1.0.75"
1212
aws-config = "0.56.0"
1313
aws-sdk-s3 = "0.29.0"
14-
aws_lambda_events = { version = "0.10.0", default-features = false, features = ["s3"] }
15-
chrono = "0.4.26"
14+
aws_lambda_events = { version = "0.12.0", default-features = false, features = ["sns", "sqs", "s3"] }
15+
deltalake = { version = "0.16.5", features = ["s3", "json"]}
16+
chrono = "0.4.31"
1617
liquid = "0.26"
17-
18+
serde = { version = "=1", features = ["rc"] }
1819
lambda_runtime = "0.8.1"
1920
routefinder = "0.5.3"
2021
serde_json = "1.0.105"
2122
tokio = { version = "1", features = ["macros"] }
2223
tracing = { version = "0.1", features = ["log"] }
2324
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "env-filter", "tracing-log"] }
2425
urlencoding = "2.1.3"
25-
26+
url = { version = "2.3", features = ["serde"] }

src/main.rs

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,64 @@ use aws_sdk_s3::Client as S3Client;
33
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
44
use routefinder::Router;
55
use tracing::log::*;
6+
use aws_lambda_events::sqs::SqsEvent;
7+
// use deltalake::{DeltaResult};
68

79
use std::collections::HashMap;
810

9-
async fn function_handler(event: LambdaEvent<S3Event>, client: &S3Client) -> Result<(), Error> {
11+
12+
/// A simple structure to make deserializing test events for identification easier
13+
///
14+
/// See <fhttps://github.com/buoyant-data/oxbow/issues/8>
15+
#[derive(serde::Deserialize)]
16+
#[serde(rename_all = "PascalCase")]
17+
struct TestEvent {
18+
event: String,
19+
}
20+
21+
22+
/// Convert the given [aws_lambda_events::sqs::SqsEvent] to a collection of
23+
/// [aws_lambda_events::s3::S3EventRecord] entities. This is mostly useful for handling S3 Bucket
24+
/// Notifications which have been passed into SQS
25+
///
26+
/// In the case where the [aws_lambda_events::sqs::SqsEvent] contains an `s3:TestEvent` which is
27+
/// fired when S3 Bucket Notifications are first enabled, the event will be ignored to avoid
28+
/// errorsin the processing pipeline
29+
async fn s3_from_sqs(event: SqsEvent) -> Result<S3Event,anyhow::Error> {
30+
let mut records = vec![];
31+
for record in event.records.iter() {
32+
/* each record is an SqsMessage */
33+
if let Some(body) = &record.body {
34+
match serde_json::from_str::<S3Event>(body) {
35+
Ok(s3event) => {
36+
for s3record in s3event.records {
37+
records.push(s3record.clone());
38+
}
39+
}
40+
Err(err) => {
41+
// if we cannot deserialize and the event is an s3::TestEvent, then we should
42+
// just return empty records.
43+
let test_event = serde_json::from_str::<TestEvent>(body);
44+
// Early exit with the original error if we cannot parse the JSON at all
45+
if test_event.is_err() {
46+
return Err(err.into());
47+
}
48+
49+
// Ignore the error on deserialization if the event ends up being an S3
50+
// TestEvent which is fired when bucket notifications are originally configured
51+
if "s3:TestEvent" != test_event.unwrap().event {
52+
return Err(err.into());
53+
}
54+
}
55+
};
56+
}
57+
}
58+
Ok(aws_lambda_events::s3::S3Event { records: records })
59+
}
60+
61+
62+
63+
async fn function_handler(event: LambdaEvent<SqsEvent>, client: &S3Client) -> Result<(), Error> {
1064
let input_pattern =
1165
std::env::var("INPUT_PATTERN").expect("You must define INPUT_PATTERN in the environment");
1266
let output_template = std::env::var("OUTPUT_TEMPLATE")
@@ -18,9 +72,9 @@ async fn function_handler(event: LambdaEvent<S3Event>, client: &S3Client) -> Res
1872
.parse(&output_template)?;
1973

2074
router.add(input_pattern, 1)?;
21-
info!("Processing records: {event:?}");
75+
let records = s3_from_sqs(event.payload);
2276

23-
for entity in entities_from(event.payload)? {
77+
for entity in entities_from(records.await?)? {
2478
debug!("Processing {entity:?}");
2579

2680
if let Some(source_key) = entity.object.key {
@@ -221,4 +275,4 @@ mod tests {
221275
"databases/oltp/a_table/ds=2023-09-05/some.parquet"
222276
);
223277
}
224-
}
278+
}

0 commit comments

Comments
 (0)