Skip to content

Commit 21947c6

Browse files
authored
feat(flow-control): basic flow control for source #rows (#694)
1 parent 22cab1d commit 21947c6

File tree

8 files changed

+61
-2
lines changed

8 files changed

+61
-2
lines changed

python/cocoindex/flow.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,11 @@ class _SourceRefreshOptions:
416416
refresh_interval: datetime.timedelta | None = None
417417

418418

419+
@dataclass
420+
class _ExecutionOptions:
421+
max_inflight_count: int | None = None
422+
423+
419424
class FlowBuilder:
420425
"""
421426
A flow builder is used to build a flow.
@@ -439,6 +444,7 @@ def add_source(
439444
*,
440445
name: str | None = None,
441446
refresh_interval: datetime.timedelta | None = None,
447+
max_inflight_count: int | None = None,
442448
) -> DataSlice[T]:
443449
"""
444450
Import a source to the flow.
@@ -454,9 +460,12 @@ def add_source(
454460
self._state.field_name_builder.build_name(
455461
name, prefix=_to_snake_case(_spec_kind(spec)) + "_"
456462
),
457-
dump_engine_object(
463+
refresh_options=dump_engine_object(
458464
_SourceRefreshOptions(refresh_interval=refresh_interval)
459465
),
466+
execution_options=dump_engine_object(
467+
_ExecutionOptions(max_inflight_count=max_inflight_count)
468+
),
460469
),
461470
name,
462471
)

src/base/spec.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,11 @@ impl SpecFormatter for OpSpec {
253253
}
254254
}
255255

256+
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
257+
pub struct ExecutionOptions {
258+
pub max_inflight_count: Option<u32>,
259+
}
260+
256261
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
257262
pub struct SourceRefreshOptions {
258263
pub refresh_interval: Option<std::time::Duration>,
@@ -274,6 +279,9 @@ pub struct ImportOpSpec {
274279

275280
#[serde(default)]
276281
pub refresh_options: SourceRefreshOptions,
282+
283+
#[serde(default)]
284+
pub execution_options: ExecutionOptions,
277285
}
278286

279287
impl SpecFormatter for ImportOpSpec {

src/builder/analyzer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,9 @@ impl AnalyzerContext {
697697
primary_key_type,
698698
name: op_name,
699699
refresh_options: import_op.spec.refresh_options,
700+
concurrency_controller: utils::ConcurrencyController::new(
701+
import_op.spec.execution_options.max_inflight_count,
702+
),
700703
})
701704
};
702705
Ok(result_fut)

src/builder/flow_builder.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ impl FlowBuilder {
288288
OpScopeRef(self.root_op_scope.clone())
289289
}
290290

291-
#[pyo3(signature = (kind, op_spec, target_scope, name, refresh_options=None))]
291+
#[pyo3(signature = (kind, op_spec, target_scope, name, refresh_options=None, execution_options=None))]
292292
pub fn add_source(
293293
&mut self,
294294
py: Python<'_>,
@@ -297,6 +297,7 @@ impl FlowBuilder {
297297
target_scope: Option<OpScopeRef>,
298298
name: String,
299299
refresh_options: Option<py::Pythonized<spec::SourceRefreshOptions>>,
300+
execution_options: Option<py::Pythonized<spec::ExecutionOptions>>,
300301
) -> PyResult<DataSlice> {
301302
if let Some(target_scope) = target_scope {
302303
if *target_scope != self.root_op_scope {
@@ -313,6 +314,9 @@ impl FlowBuilder {
313314
spec: op_spec.into_inner(),
314315
},
315316
refresh_options: refresh_options.map(|o| o.into_inner()).unwrap_or_default(),
317+
execution_options: execution_options
318+
.map(|o| o.into_inner())
319+
.unwrap_or_default(),
316320
},
317321
};
318322
let analyzer_ctx = AnalyzerContext {

src/builder/plan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ pub struct AnalyzedImportOp {
5656
pub output: AnalyzedOpOutput,
5757
pub primary_key_type: schema::ValueType,
5858
pub refresh_options: spec::SourceRefreshOptions,
59+
pub concurrency_controller: utils::ConcurrencyController,
5960
}
6061

6162
pub struct AnalyzedFunctionExecInfo {

src/execution/source_indexer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ impl SourceIndexingContext {
282282
state.scan_generation
283283
};
284284
while let Some(row) = rows_stream.next().await {
285+
let _ = import_op.concurrency_controller.acquire().await?;
285286
for row in row? {
286287
self.process_source_key_if_newer(
287288
row.key,

src/utils/concur_control.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use crate::prelude::*;
2+
3+
use tokio::sync::{Semaphore, SemaphorePermit};
4+
5+
pub struct ConcurrencyController {
6+
inflight_count_sem: Option<Semaphore>,
7+
}
8+
9+
pub struct ConcurrencyControllerPermit<'a> {
10+
_inflight_count_permit: Option<SemaphorePermit<'a>>,
11+
}
12+
13+
impl ConcurrencyController {
14+
pub fn new(max_inflight_count: Option<u32>) -> Self {
15+
Self {
16+
inflight_count_sem: max_inflight_count.map(|max| Semaphore::new(max as usize)),
17+
}
18+
}
19+
20+
pub async fn acquire<'a>(&'a self) -> Result<ConcurrencyControllerPermit<'a>> {
21+
let inflight_count_permit = if let Some(sem) = &self.inflight_count_sem {
22+
Some(sem.acquire().await?)
23+
} else {
24+
None
25+
};
26+
Ok(ConcurrencyControllerPermit {
27+
_inflight_count_permit: inflight_count_permit,
28+
})
29+
}
30+
}

src/utils/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,6 @@ pub mod fingerprint;
33
pub mod immutable;
44
pub mod retryable;
55
pub mod yaml_ser;
6+
7+
mod concur_control;
8+
pub use concur_control::ConcurrencyController;

0 commit comments

Comments
 (0)