33/// and appending them to the configured Delta table
44///
55use aws_lambda_events:: event:: sqs:: { SqsEvent , SqsMessage } ;
6+ use aws_sdk_sqs:: types:: DeleteMessageBatchRequestEntry ;
67use lambda_runtime:: { run, service_fn, tracing, Error , LambdaEvent } ;
78use serde:: Deserialize ;
89use tracing:: log:: * ;
@@ -28,6 +29,11 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
2829 // records should contain the raw deserialized JSON payload that was sent through SQS. It
2930 // should be "fit" for writing to Delta
3031 let mut records: Vec < String > = vec ! [ ] ;
32+ // Messages to delete from SQS if manually fetched
33+ //
34+ // This needs to be stored until after the successful write to Delta to ensure message
35+ // persistence
36+ let mut received = vec ! [ ] ;
3137
3238 if let Ok ( how_many_more) = std:: env:: var ( "BUFFER_MORE_MESSAGES" ) {
3339 more_count = how_many_more
@@ -49,6 +55,16 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
4955 . queue_url ( queue_url. clone ( ) )
5056 . send ( )
5157 . await ?;
58+
59+ for message in receive. messages . clone ( ) . unwrap_or_default ( ) {
60+ received. push (
61+ DeleteMessageBatchRequestEntry :: builder ( )
62+ . set_id ( message. message_id )
63+ . set_receipt_handle ( message. receipt_handle )
64+ . build ( ) ?,
65+ ) ;
66+ }
67+
5268 records. append ( & mut extract_json_from_sqs_direct (
5369 receive. messages . unwrap_or_default ( ) ,
5470 ) ) ;
@@ -81,6 +97,23 @@ async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
8197 error ! ( "An empty payload was extracted which doesn't seem right!" ) ;
8298 }
8399
100+ if let Ok ( queue_url) = std:: env:: var ( "BUFFER_MORE_QUEUE_URL" ) {
101+ if !received. is_empty ( ) {
102+ info ! (
103+ "Marking {} messages from SQS which were buffered as deleted" ,
104+ received. len( )
105+ ) ;
106+ for batch in received. chunks ( 10 ) {
107+ let _ = sqs_client
108+ . delete_message_batch ( )
109+ . queue_url ( queue_url. clone ( ) )
110+ . set_entries ( Some ( batch. to_vec ( ) ) )
111+ . send ( )
112+ . await ?;
113+ }
114+ }
115+ }
116+
84117 debug ! (
85118 "sqs-ingest completed its work in {}ms" ,
86119 fn_start. elapsed( ) . as_millis( )
0 commit comments