Skip to content

Commit c7177b2

Browse files
committed
Allow the handling of S3 Event Notifications and SQS events directly in the lambda
1 parent f46eaf1 commit c7177b2

File tree

1 file changed

+20
-23
lines changed

1 file changed

+20
-23
lines changed

src/main.rs

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
use aws_lambda_events::event::s3::{S3Entity, S3Event};
2+
use aws_lambda_events::sqs::SqsEvent;
23
use aws_sdk_s3::Client as S3Client;
34
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
45
use routefinder::Router;
56
use tracing::log::*;
6-
use aws_lambda_events::sqs::SqsEvent;
7-
// use deltalake::{DeltaResult};
87

98
use std::collections::HashMap;
109

11-
1210
/// A simple structure to make deserializing test events for identification easier
1311
///
1412
/// See <fhttps://github.com/buoyant-data/oxbow/issues/8>
@@ -18,15 +16,14 @@ struct TestEvent {
1816
event: String,
1917
}
2018

21-
2219
/// Convert the given [aws_lambda_events::sqs::SqsEvent] to a collection of
2320
/// [aws_lambda_events::s3::S3EventRecord] entities. This is mostly useful for handling S3 Bucket
2421
/// Notifications which have been passed into SQS
2522
///
2623
/// In the case where the [aws_lambda_events::sqs::SqsEvent] contains an `s3:TestEvent` which is
2724
/// fired when S3 Bucket Notifications are first enabled, the event will be ignored to avoid
2825
/// errorsin the processing pipeline
29-
async fn s3_from_sqs(event: SqsEvent) -> Result<S3Event,anyhow::Error> {
26+
fn s3_from_sqs(event: SqsEvent) -> Result<S3Event, anyhow::Error> {
3027
let mut records = vec![];
3128
for record in event.records.iter() {
3229
/* each record is an SqsMessage */
@@ -55,12 +52,13 @@ async fn s3_from_sqs(event: SqsEvent) -> Result<S3Event,anyhow::Error> {
5552
};
5653
}
5754
}
58-
Ok(aws_lambda_events::s3::S3Event { records: records })
55+
Ok(aws_lambda_events::s3::S3Event { records })
5956
}
6057

61-
62-
63-
async fn function_handler(event: LambdaEvent<SqsEvent>, client: &S3Client) -> Result<(), Error> {
58+
async fn function_handler(
59+
event: LambdaEvent<serde_json::Value>,
60+
client: &S3Client,
61+
) -> Result<(), Error> {
6462
let input_pattern =
6563
std::env::var("INPUT_PATTERN").expect("You must define INPUT_PATTERN in the environment");
6664
let output_template = std::env::var("OUTPUT_TEMPLATE")
@@ -72,9 +70,13 @@ async fn function_handler(event: LambdaEvent<SqsEvent>, client: &S3Client) -> Re
7270
.parse(&output_template)?;
7371

7472
router.add(input_pattern, 1)?;
75-
let records = s3_from_sqs(event.payload);
7673

77-
for entity in entities_from(records.await?)? {
74+
let records = match serde_json::from_value::<SqsEvent>(event.payload.clone()) {
75+
Ok(sqs_event) => s3_from_sqs(sqs_event)?,
76+
Err(_) => serde_json::from_value(event.payload)?,
77+
};
78+
79+
for entity in entities_from(records)? {
7880
debug!("Processing {entity:?}");
7981

8082
if let Some(source_key) = entity.object.key {
@@ -116,13 +118,10 @@ async fn main() -> Result<(), Error> {
116118
run(func).await
117119
}
118120

119-
/**
120-
* Return the deserialized and useful objects from the event payload
121-
*
122-
* This function will apply a filter to make sure that it is only return objects which have been
123-
* put in this invocation
124-
*/
125-
121+
/// Return the deserialized and useful objects from the event payload
122+
///
123+
/// This function will apply a filter to make sure that it is only return objects which have been
124+
/// put in this invocation
126125
fn entities_from(event: S3Event) -> Result<Vec<S3Entity>, anyhow::Error> {
127126
Ok(event
128127
.records
@@ -133,10 +132,8 @@ fn entities_from(event: S3Event) -> Result<Vec<S3Entity>, anyhow::Error> {
133132
.collect())
134133
}
135134

136-
/**
137-
* Take the source key and the already configured router in order to access a collection of
138-
* captured parameters in a HashMap format
139-
*/
135+
/// Take the source key and the already configured router in order to access a collection of
136+
/// captured parameters in a HashMap format
140137
fn captured_parameters<Handler>(
141138
router: &Router<Handler>,
142139
source_key: &str,
@@ -275,4 +272,4 @@ mod tests {
275272
"databases/oltp/a_table/ds=2023-09-05/some.parquet"
276273
);
277274
}
278-
}
275+
}

0 commit comments

Comments
 (0)