Skip to content

Commit 66b0c34

Browse files
authored
Merge pull request #2 from kuntalkumarbasu/main
feat!: enabling sqs event processing
2 parents defb177 + c7177b2 commit 66b0c34

File tree

2 files changed

+69
-18
lines changed

2 files changed

+69
-18
lines changed

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@ authors = [
1111
anyhow = "1.0.75"
1212
aws-config = "0.56.0"
1313
aws-sdk-s3 = "0.29.0"
14-
aws_lambda_events = { version = "0.10.0", default-features = false, features = ["s3"] }
15-
chrono = "0.4.26"
14+
aws_lambda_events = { version = "0.12.0", default-features = false, features = ["sns", "sqs", "s3"] }
15+
chrono = "0.4.31"
1616
liquid = "0.26"
17-
17+
serde = { version = "=1", features = ["rc"] }
1818
lambda_runtime = "0.8.1"
1919
routefinder = "0.5.3"
2020
serde_json = "1.0.105"
2121
tokio = { version = "1", features = ["macros"] }
2222
tracing = { version = "0.1", features = ["log"] }
2323
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "env-filter", "tracing-log"] }
2424
urlencoding = "2.1.3"
25-
25+
url = { version = "2.3", features = ["serde"] }

src/main.rs

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,64 @@
11
use aws_lambda_events::event::s3::{S3Entity, S3Event};
2+
use aws_lambda_events::sqs::SqsEvent;
23
use aws_sdk_s3::Client as S3Client;
34
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
45
use routefinder::Router;
56
use tracing::log::*;
67

78
use std::collections::HashMap;
89

9-
async fn function_handler(event: LambdaEvent<S3Event>, client: &S3Client) -> Result<(), Error> {
10+
/// A simple structure to make deserializing test events for identification easier
11+
///
12+
/// See <fhttps://github.com/buoyant-data/oxbow/issues/8>
13+
#[derive(serde::Deserialize)]
14+
#[serde(rename_all = "PascalCase")]
15+
struct TestEvent {
16+
event: String,
17+
}
18+
19+
/// Convert the given [aws_lambda_events::sqs::SqsEvent] to a collection of
20+
/// [aws_lambda_events::s3::S3EventRecord] entities. This is mostly useful for handling S3 Bucket
21+
/// Notifications which have been passed into SQS
22+
///
23+
/// In the case where the [aws_lambda_events::sqs::SqsEvent] contains an `s3:TestEvent` which is
24+
/// fired when S3 Bucket Notifications are first enabled, the event will be ignored to avoid
25+
/// errorsin the processing pipeline
26+
fn s3_from_sqs(event: SqsEvent) -> Result<S3Event, anyhow::Error> {
27+
let mut records = vec![];
28+
for record in event.records.iter() {
29+
/* each record is an SqsMessage */
30+
if let Some(body) = &record.body {
31+
match serde_json::from_str::<S3Event>(body) {
32+
Ok(s3event) => {
33+
for s3record in s3event.records {
34+
records.push(s3record.clone());
35+
}
36+
}
37+
Err(err) => {
38+
// if we cannot deserialize and the event is an s3::TestEvent, then we should
39+
// just return empty records.
40+
let test_event = serde_json::from_str::<TestEvent>(body);
41+
// Early exit with the original error if we cannot parse the JSON at all
42+
if test_event.is_err() {
43+
return Err(err.into());
44+
}
45+
46+
// Ignore the error on deserialization if the event ends up being an S3
47+
// TestEvent which is fired when bucket notifications are originally configured
48+
if "s3:TestEvent" != test_event.unwrap().event {
49+
return Err(err.into());
50+
}
51+
}
52+
};
53+
}
54+
}
55+
Ok(aws_lambda_events::s3::S3Event { records })
56+
}
57+
58+
async fn function_handler(
59+
event: LambdaEvent<serde_json::Value>,
60+
client: &S3Client,
61+
) -> Result<(), Error> {
1062
let input_pattern =
1163
std::env::var("INPUT_PATTERN").expect("You must define INPUT_PATTERN in the environment");
1264
let output_template = std::env::var("OUTPUT_TEMPLATE")
@@ -18,9 +70,13 @@ async fn function_handler(event: LambdaEvent<S3Event>, client: &S3Client) -> Res
1870
.parse(&output_template)?;
1971

2072
router.add(input_pattern, 1)?;
21-
info!("Processing records: {event:?}");
2273

23-
for entity in entities_from(event.payload)? {
74+
let records = match serde_json::from_value::<SqsEvent>(event.payload.clone()) {
75+
Ok(sqs_event) => s3_from_sqs(sqs_event)?,
76+
Err(_) => serde_json::from_value(event.payload)?,
77+
};
78+
79+
for entity in entities_from(records)? {
2480
debug!("Processing {entity:?}");
2581

2682
if let Some(source_key) = entity.object.key {
@@ -62,13 +118,10 @@ async fn main() -> Result<(), Error> {
62118
run(func).await
63119
}
64120

65-
/**
66-
* Return the deserialized and useful objects from the event payload
67-
*
68-
* This function will apply a filter to make sure that it is only return objects which have been
69-
* put in this invocation
70-
*/
71-
121+
/// Return the deserialized and useful objects from the event payload
122+
///
123+
/// This function will apply a filter to make sure that it is only return objects which have been
124+
/// put in this invocation
72125
fn entities_from(event: S3Event) -> Result<Vec<S3Entity>, anyhow::Error> {
73126
Ok(event
74127
.records
@@ -79,10 +132,8 @@ fn entities_from(event: S3Event) -> Result<Vec<S3Entity>, anyhow::Error> {
79132
.collect())
80133
}
81134

82-
/**
83-
* Take the source key and the already configured router in order to access a collection of
84-
* captured parameters in a HashMap format
85-
*/
135+
/// Take the source key and the already configured router in order to access a collection of
136+
/// captured parameters in a HashMap format
86137
fn captured_parameters<Handler>(
87138
router: &Router<Handler>,
88139
source_key: &str,

0 commit comments

Comments
 (0)