Skip to content

Commit c043d3a

Browse files
committed
Clean up visibility timeout and SQS settings for more aggressive SQS consumption
1 parent 05edd20 commit c043d3a

File tree

2 files changed

+27
-4
lines changed

2 files changed

+27
-4
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ members = [
77
resolver = "2"
88

99
[workspace.package]
10-
version = "1.3.2"
10+
version = "1.3.3"
1111
edition = "2021"
1212
keywords = ["deltalake", "parquet", "lambda", "delta", "sqs"]
1313
homepage = "https://github.com/buoyant-data/oxbow"

lambdas/sqs-ingest/src/main.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,30 @@ use tracing::log::*;
1111
use oxbow::write::*;
1212

1313
use std::env;
14+
use std::time::{Instant, SystemTime, UNIX_EPOCH};
1415

1516
/// This is the primary invocation point for the lambda and should do the heavy lifting
1617
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
1718
let table_uri = std::env::var("DELTA_TABLE_URI").expect("Failed to get `DELTA_TABLE_URI`");
19+
let fn_start_since_epoch_ms: u128 = SystemTime::now()
20+
.duration_since(UNIX_EPOCH)
21+
.expect("Failed to get a system time after unix epoch")
22+
.as_millis();
1823

1924
// Hold onto the instant that the function started in order to attempt to exit on time.
20-
let fn_start = std::time::Instant::now();
25+
let fn_start = Instant::now();
2126
trace!("payload received: {:?}", event.payload.records);
2227

2328
let config = aws_config::from_env().load().await;
2429
let sqs_client = aws_sdk_sqs::Client::new(&config);
2530
// How many more messages should sqs-ingest try to consume?
2631
let mut more_count = 0;
32+
debug!(
33+
"Context deadline in milliseconds since epoch is: {}",
34+
event.context.deadline
35+
);
2736
// Millis to allow for consuming more messages
28-
let more_deadline_ms: u128 = (event.context.deadline / 2).into();
37+
let more_deadline_ms: u128 = ((event.context.deadline as u128) - fn_start_since_epoch_ms) / 2;
2938
// records should contain the raw deserialized JSON payload that was sent through SQS. It
3039
// should be "fit" for writing to Delta
3140
let mut records: Vec<String> = vec![];
@@ -49,12 +58,26 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
4958
let mut fetched = 0;
5059

5160
while !completed {
61+
let visibility_timeout: i32 = (more_deadline_ms / 1000).try_into()?;
62+
debug!("Fetching things from SQS with a {visibility_timeout}s visibility timeout");
63+
5264
let receive = sqs_client
5365
.receive_message()
5466
.max_number_of_messages(10)
67+
// Set the visibility timeout to the timeout of the function to ensure that
68+
// messages are not made visible befoore the function exits
69+
.visibility_timeout(visibility_timeout)
70+
// Enable a smol long poll to receive messages
71+
.wait_time_seconds(1)
5572
.queue_url(queue_url.clone())
5673
.send()
57-
.await?;
74+
.await;
75+
76+
if receive.is_err() {
77+
error!("Received {receive:?} from SQS, not buffering more messages");
78+
break;
79+
}
80+
let receive = receive.unwrap();
5881

5982
for message in receive.messages.clone().unwrap_or_default() {
6083
received.push(

0 commit comments

Comments
 (0)