|
1 | | -use crate::prelude::*; |
| 1 | +use crate::{ |
| 2 | + prelude::*, |
| 3 | + service::error::{SharedError, SharedResult, SharedResultExt}, |
| 4 | +}; |
2 | 5 |
|
3 | 6 | use futures::future::Ready; |
4 | 7 | use sqlx::PgPool; |
@@ -33,9 +36,12 @@ struct SourceIndexingState { |
33 | 36 | rows: HashMap<value::KeyValue, SourceRowIndexingState>, |
34 | 37 | scan_generation: usize, |
35 | 38 | } |
| 39 | + |
36 | 40 | pub struct SourceIndexingContext { |
37 | 41 | flow: Arc<builder::AnalyzedFlow>, |
38 | 42 | source_idx: usize, |
| 43 | + pending_update: Mutex<Option<Shared<BoxFuture<'static, SharedResult<()>>>>>, |
| 44 | + update_sem: Semaphore, |
39 | 45 | state: Mutex<SourceIndexingState>, |
40 | 46 | setup_execution_ctx: Arc<exec_ctx::FlowSetupExecutionContext>, |
41 | 47 | } |
@@ -88,6 +94,8 @@ impl SourceIndexingContext { |
88 | 94 | rows, |
89 | 95 | scan_generation, |
90 | 96 | }), |
| 97 | + pending_update: Mutex::new(None), |
| 98 | + update_sem: Semaphore::new(1), |
91 | 99 | setup_execution_ctx, |
92 | 100 | }) |
93 | 101 | } |
@@ -269,11 +277,49 @@ impl SourceIndexingContext { |
269 | 277 | pool.clone(), |
270 | 278 | )) |
271 | 279 | } |
272 | | - |
273 | 280 | pub async fn update( |
274 | 281 | self: &Arc<Self>, |
275 | 282 | pool: &PgPool, |
276 | 283 | update_stats: &Arc<stats::UpdateStats>, |
| 284 | + ) -> Result<()> { |
| 285 | + let pending_update_fut = { |
| 286 | + let mut pending_update = self.pending_update.lock().unwrap(); |
| 287 | + if let Some(pending_update_fut) = &*pending_update { |
| 288 | + pending_update_fut.clone() |
| 289 | + } else { |
| 290 | + let slf = self.clone(); |
| 291 | + let pool = pool.clone(); |
| 292 | + let update_stats = update_stats.clone(); |
| 293 | + let task = tokio::spawn(async move { |
| 294 | + { |
| 295 | + let _permit = slf.update_sem.acquire().await?; |
| 296 | + { |
| 297 | + let mut pending_update = slf.pending_update.lock().unwrap(); |
| 298 | + *pending_update = None; |
| 299 | + } |
| 300 | + slf.update_once(&pool, &update_stats).await?; |
| 301 | + } |
| 302 | + anyhow::Ok(()) |
| 303 | + }); |
| 304 | + let pending_update_fut = async move { |
| 305 | + task.await |
| 306 | + .map_err(SharedError::from)? |
| 307 | + .map_err(SharedError::new) |
| 308 | + } |
| 309 | + .boxed() |
| 310 | + .shared(); |
| 311 | + *pending_update = Some(pending_update_fut.clone()); |
| 312 | + pending_update_fut |
| 313 | + } |
| 314 | + }; |
| 315 | + pending_update_fut.await.std_result()?; |
| 316 | + Ok(()) |
| 317 | + } |
| 318 | + |
| 319 | + async fn update_once( |
| 320 | + self: &Arc<Self>, |
| 321 | + pool: &PgPool, |
| 322 | + update_stats: &Arc<stats::UpdateStats>, |
277 | 323 | ) -> Result<()> { |
278 | 324 | let plan = self.flow.get_execution_plan().await?; |
279 | 325 | let import_op = &plan.import_ops[self.source_idx]; |
|
0 commit comments