Skip to content

Commit f1fa42a

Browse files
authored
feat(s3-push): delete(ack) event messages after processed (#483)
1 parent 5b371e9 commit f1fa42a

File tree

7 files changed

+204
-69
lines changed

7 files changed

+204
-69
lines changed

src/execution/live_updater.rs

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,39 @@ struct StatsReportState {
3131
const MIN_REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
3232
const REPORT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
3333

34+
struct SharedAckFn {
35+
count: usize,
36+
ack_fn: Option<Box<dyn FnOnce() -> BoxFuture<'static, Result<()>> + Send + Sync>>,
37+
}
38+
39+
impl SharedAckFn {
40+
fn new(
41+
count: usize,
42+
ack_fn: Box<dyn FnOnce() -> BoxFuture<'static, Result<()>> + Send + Sync>,
43+
) -> Self {
44+
Self {
45+
count,
46+
ack_fn: Some(ack_fn),
47+
}
48+
}
49+
50+
async fn ack(v: &Mutex<Self>) -> Result<()> {
51+
let ack_fn = {
52+
let mut v = v.lock().unwrap();
53+
v.count -= 1;
54+
if v.count > 0 {
55+
None
56+
} else {
57+
v.ack_fn.take()
58+
}
59+
};
60+
if let Some(ack_fn) = ack_fn {
61+
ack_fn().await?;
62+
}
63+
Ok(())
64+
}
65+
}
66+
3467
async fn update_source(
3568
flow_ctx: Arc<FlowContext>,
3669
plan: Arc<plan::ExecutionPlan>,
@@ -92,13 +125,50 @@ async fn update_source(
92125
futs.push(
93126
async move {
94127
let mut change_stream = change_stream;
95-
while let Some(change) = change_stream.next().await {
96-
tokio::spawn(source_context.clone().process_source_key(
97-
change.key,
98-
change.data,
99-
source_update_stats.clone(),
100-
pool.clone(),
101-
));
128+
let retry_options = retriable::RetryOptions {
129+
max_retries: None,
130+
initial_backoff: std::time::Duration::from_secs(5),
131+
max_backoff: std::time::Duration::from_secs(60),
132+
};
133+
loop {
134+
// Workaround as AsyncFnMut isn't mature yet.
135+
// Should be changed to use AsyncFnMut once it is.
136+
let change_stream = tokio::sync::Mutex::new(&mut change_stream);
137+
let change_msg = retriable::run(
138+
|| async {
139+
let mut change_stream = change_stream.lock().await;
140+
change_stream
141+
.next()
142+
.await
143+
.transpose()
144+
.map_err(retriable::Error::always_retryable)
145+
},
146+
&retry_options,
147+
)
148+
.await?;
149+
let change_msg = if let Some(change_msg) = change_msg {
150+
change_msg
151+
} else {
152+
break;
153+
};
154+
let ack_fn = change_msg.ack_fn.map(|ack_fn| {
155+
Arc::new(Mutex::new(SharedAckFn::new(
156+
change_msg.changes.iter().len(),
157+
ack_fn,
158+
)))
159+
});
160+
for change in change_msg.changes {
161+
let ack_fn = ack_fn.clone();
162+
tokio::spawn(source_context.clone().process_source_key(
163+
change.key,
164+
change.data,
165+
source_update_stats.clone(),
166+
ack_fn.map(|ack_fn| {
167+
move || async move { SharedAckFn::ack(&ack_fn).await }
168+
}),
169+
pool.clone(),
170+
));
171+
}
102172
}
103173
Ok(())
104174
}

src/execution/source_indexer.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::prelude::*;
22

3+
use futures::future::Ready;
34
use sqlx::PgPool;
45
use std::collections::{hash_map, HashMap};
56
use tokio::{sync::Semaphore, task::JoinSet};
@@ -36,6 +37,8 @@ pub struct SourceIndexingContext {
3637
state: Mutex<SourceIndexingState>,
3738
}
3839

40+
pub const NO_ACK: Option<fn() -> Ready<Result<()>>> = None;
41+
3942
impl SourceIndexingContext {
4043
pub async fn load(
4144
flow: Arc<builder::AnalyzedFlow>,
@@ -79,11 +82,15 @@ impl SourceIndexingContext {
7982
})
8083
}
8184

82-
pub async fn process_source_key(
85+
pub async fn process_source_key<
86+
AckFut: Future<Output = Result<()>> + Send + 'static,
87+
AckFn: FnOnce() -> AckFut,
88+
>(
8389
self: Arc<Self>,
8490
key: value::KeyValue,
8591
source_data: Option<interface::SourceData>,
8692
update_stats: Arc<stats::UpdateStats>,
93+
ack_fn: Option<AckFn>,
8794
pool: PgPool,
8895
) {
8996
let process = async {
@@ -173,11 +180,20 @@ impl SourceIndexingContext {
173180
}
174181
}
175182
drop(permit);
183+
if let Some(ack_fn) = ack_fn {
184+
ack_fn().await?;
185+
}
176186
anyhow::Ok(())
177187
};
178188
if let Err(e) = process.await {
179189
update_stats.num_errors.inc(1);
180-
error!("{:?}", e.context("Error in processing a source row"));
190+
error!(
191+
"{:?}",
192+
e.context(format!(
193+
"Error in processing row from source `{source}` with key: {key}",
194+
source = self.flow.flow_instance.import_ops[self.source_idx].name
195+
))
196+
);
181197
}
182198
}
183199

@@ -203,7 +219,7 @@ impl SourceIndexingContext {
203219
}
204220
Some(
205221
self.clone()
206-
.process_source_key(key, None, update_stats.clone(), pool.clone()),
222+
.process_source_key(key, None, update_stats.clone(), NO_ACK, pool.clone()),
207223
)
208224
}
209225

@@ -269,6 +285,7 @@ impl SourceIndexingContext {
269285
key,
270286
source_data,
271287
update_stats.clone(),
288+
NO_ACK,
272289
pool.clone(),
273290
));
274291
}

src/ops/interface.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ pub struct SourceChange {
9595
pub data: Option<SourceData>,
9696
}
9797

98+
pub struct SourceChangeMessage {
99+
pub changes: Vec<SourceChange>,
100+
pub ack_fn: Option<Box<dyn FnOnce() -> BoxFuture<'static, Result<()>> + Send + Sync>>,
101+
}
102+
98103
#[derive(Debug, Default)]
99104
pub struct SourceExecutorListOptions {
100105
pub include_ordinal: bool,
@@ -141,7 +146,9 @@ pub trait SourceExecutor: Send + Sync {
141146
options: &SourceExecutorGetOptions,
142147
) -> Result<PartialSourceRowData>;
143148

144-
async fn change_stream(&self) -> Result<Option<BoxStream<'async_trait, SourceChange>>> {
149+
async fn change_stream(
150+
&self,
151+
) -> Result<Option<BoxStream<'async_trait, Result<SourceChangeMessage>>>> {
145152
Ok(None)
146153
}
147154
}

src/ops/sources/amazon_s3.rs

Lines changed: 68 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use async_stream::try_stream;
33
use aws_config::BehaviorVersion;
44
use aws_sdk_s3::Client;
55
use globset::{Glob, GlobSet, GlobSetBuilder};
6-
use log::warn;
76
use std::sync::Arc;
87

98
use crate::base::field_attrs;
@@ -23,6 +22,20 @@ struct SqsContext {
2322
client: aws_sdk_sqs::Client,
2423
queue_url: String,
2524
}
25+
26+
impl SqsContext {
27+
async fn delete_message(&self, receipt_handle: String) -> Result<()> {
28+
error!("Deleting message: {}", receipt_handle);
29+
self.client
30+
.delete_message()
31+
.queue_url(&self.queue_url)
32+
.receipt_handle(receipt_handle)
33+
.send()
34+
.await?;
35+
Ok(())
36+
}
37+
}
38+
2639
struct Executor {
2740
client: Client,
2841
bucket_name: String,
@@ -152,24 +165,26 @@ impl SourceExecutor for Executor {
152165
Ok(PartialSourceRowData { value, ordinal })
153166
}
154167

155-
async fn change_stream(&self) -> Result<Option<BoxStream<'async_trait, SourceChange>>> {
168+
async fn change_stream(
169+
&self,
170+
) -> Result<Option<BoxStream<'async_trait, Result<SourceChangeMessage>>>> {
156171
let sqs_context = if let Some(sqs_context) = &self.sqs_context {
157172
sqs_context
158173
} else {
159174
return Ok(None);
160175
};
161176
let stream = stream! {
162177
loop {
163-
let changes = match self.poll_sqs(&sqs_context).await {
164-
Ok(changes) => changes,
178+
match self.poll_sqs(&sqs_context).await {
179+
Ok(messages) => {
180+
for message in messages {
181+
yield Ok(message);
182+
}
183+
}
165184
Err(e) => {
166-
warn!("Failed to poll SQS: {}", e);
167-
continue;
185+
yield Err(e);
168186
}
169187
};
170-
for change in changes {
171-
yield change;
172-
}
173188
}
174189
};
175190
Ok(Some(stream.boxed()))
@@ -206,7 +221,7 @@ pub struct S3Object {
206221
}
207222

208223
impl Executor {
209-
async fn poll_sqs(&self, sqs_context: &Arc<SqsContext>) -> Result<Vec<SourceChange>> {
224+
async fn poll_sqs(&self, sqs_context: &Arc<SqsContext>) -> Result<Vec<SourceChangeMessage>> {
210225
let resp = sqs_context
211226
.client
212227
.receive_message()
@@ -220,36 +235,53 @@ impl Executor {
220235
} else {
221236
return Ok(Vec::new());
222237
};
223-
let mut changes = vec![];
224-
for message in messages.into_iter().filter_map(|m| m.body) {
225-
let notification: S3EventNotification = serde_json::from_str(&message)?;
226-
for record in notification.records {
227-
let s3 = if let Some(s3) = record.s3 {
228-
s3
229-
} else {
230-
continue;
231-
};
232-
if s3.bucket.name != self.bucket_name {
233-
continue;
234-
}
235-
if !self
236-
.prefix
237-
.as_ref()
238-
.map_or(true, |prefix| s3.object.key.starts_with(prefix))
239-
{
240-
continue;
238+
let mut change_messages = vec![];
239+
for message in messages.into_iter() {
240+
if let Some(body) = message.body {
241+
let notification: S3EventNotification = serde_json::from_str(&body)?;
242+
let mut changes = vec![];
243+
for record in notification.records {
244+
let s3 = if let Some(s3) = record.s3 {
245+
s3
246+
} else {
247+
continue;
248+
};
249+
if s3.bucket.name != self.bucket_name {
250+
continue;
251+
}
252+
if !self
253+
.prefix
254+
.as_ref()
255+
.map_or(true, |prefix| s3.object.key.starts_with(prefix))
256+
{
257+
continue;
258+
}
259+
if record.event_name.starts_with("ObjectCreated:")
260+
|| record.event_name.starts_with("ObjectDeleted:")
261+
{
262+
changes.push(SourceChange {
263+
key: KeyValue::Str(s3.object.key.into()),
264+
data: None,
265+
});
266+
}
241267
}
242-
if record.event_name.starts_with("ObjectCreated:")
243-
|| record.event_name.starts_with("ObjectDeleted:")
244-
{
245-
changes.push(SourceChange {
246-
key: KeyValue::Str(s3.object.key.into()),
247-
data: None,
248-
});
268+
if let Some(receipt_handle) = message.receipt_handle {
269+
if !changes.is_empty() {
270+
let sqs_context = sqs_context.clone();
271+
change_messages.push(SourceChangeMessage {
272+
changes,
273+
ack_fn: Some(Box::new(move || {
274+
async move { sqs_context.delete_message(receipt_handle).await }
275+
.boxed()
276+
})),
277+
});
278+
} else {
279+
sqs_context.delete_message(receipt_handle).await?;
280+
}
249281
}
250282
}
251283
}
252-
Ok(changes)
284+
Ok(change_messages)
253285
}
254286
}
255287

src/ops/sources/google_drive.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ impl Executor {
190190
async fn get_recent_updates(
191191
&self,
192192
cutoff_time: &mut DateTime<Utc>,
193-
) -> Result<Vec<SourceChange>> {
193+
) -> Result<SourceChangeMessage> {
194194
let mut page_size: i32 = 10;
195195
let mut next_page_token: Option<String> = None;
196196
let mut changes = Vec::new();
@@ -234,7 +234,10 @@ impl Executor {
234234
page_size = 100;
235235
}
236236
*cutoff_time = Self::make_cutoff_time(most_recent_modified_time, start_time);
237-
Ok(changes)
237+
Ok(SourceChangeMessage {
238+
changes,
239+
ack_fn: None,
240+
})
238241
}
239242

240243
async fn is_file_covered(&self, file_id: &str) -> Result<bool> {
@@ -416,7 +419,9 @@ impl SourceExecutor for Executor {
416419
Ok(PartialSourceRowData { value, ordinal })
417420
}
418421

419-
async fn change_stream(&self) -> Result<Option<BoxStream<'async_trait, SourceChange>>> {
422+
async fn change_stream(
423+
&self,
424+
) -> Result<Option<BoxStream<'async_trait, Result<SourceChangeMessage>>>> {
420425
let poll_interval = if let Some(poll_interval) = self.recent_updates_poll_interval {
421426
poll_interval
422427
} else {
@@ -428,17 +433,7 @@ impl SourceExecutor for Executor {
428433
let stream = stream! {
429434
loop {
430435
interval.tick().await;
431-
let changes = self.get_recent_updates(&mut cutoff_time).await;
432-
match changes {
433-
Ok(changes) => {
434-
for change in changes {
435-
yield change;
436-
}
437-
}
438-
Err(e) => {
439-
error!("Error getting recent updates: {e}");
440-
}
441-
}
436+
yield self.get_recent_updates(&mut cutoff_time).await;
442437
}
443438
};
444439
Ok(Some(stream.boxed()))

0 commit comments

Comments
 (0)