Skip to content

Commit 1da0a33

Browse files
committed
feat(global-concur-control): provide a global controller
1 parent 192dcdb commit 1da0a33

File tree

10 files changed

+135
-78
lines changed

10 files changed

+135
-78
lines changed

docs/docs/core/flow_def.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ These options can be passed in to the following APIs:
370370

371371
* [`FlowBuilder.add_source()`](#import-from-source): The options above control the processing concurrency of multiple rows from a source. New rows will not be loaded in memory if it'll be over the limit.
372372

373-
The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variables](/docs/core/settings#list-of-environment-variables).
373+
The default value can be specified by [`GlobalExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variables](/docs/core/settings#list-of-environment-variables).
374374

375375
* [`DataSlice.row()`](#for-each-row): The options above provides a finer-grained control, to limit the processing concurrency of multiple rows within a table at any level.
376376

docs/docs/core/settings.mdx

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ You have two ways to launch CocoIndex:
6060

6161
* `app_namespace` (type: `str`, required): The namespace of the application.
6262
* `database` (type: `DatabaseConnectionSpec`, required): The connection to the Postgres database.
63-
* `default_execution_options` (type: `DefaultExecutionOptions`, optional): The default execution options for the flow.
63+
* `global_execution_options` (type: `GlobalExecutionOptions`, optional): The default execution options for the flow.
6464

6565
### App Namespace
6666

@@ -105,9 +105,9 @@ If you use the Postgres database hosted by [Supabase](https://supabase.com/), pl
105105

106106
:::
107107

108-
### DefaultExecutionOptions
108+
### GlobalExecutionOptions
109109

110-
`DefaultExecutionOptions` is used to configure the default execution options for the flow. It has the following fields:
110+
`GlobalExecutionOptions` is used to configure the default execution options for the flow. It has the following fields:
111111

112112
* `source_max_inflight_rows` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations.
113113
* `source_max_inflight_bytes` (type: `int`, optional): The maximum number of concurrent inflight bytes for source operations.
@@ -124,5 +124,5 @@ This is the list of environment variables, each of which has a corresponding fie
124124
| `COCOINDEX_DATABASE_URL` | `database.url` | Yes |
125125
| `COCOINDEX_DATABASE_USER` | `database.user` | No |
126126
| `COCOINDEX_DATABASE_PASSWORD` | `database.password` | No |
127-
| `COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS` | `default_execution_options.source_max_inflight_rows` | No |
128-
| `COCOINDEX_SOURCE_MAX_INFLIGHT_BYTES` | `default_execution_options.source_max_inflight_bytes` | No |
127+
| `COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS` | `global_execution_options.source_max_inflight_rows` | No |
128+
| `COCOINDEX_SOURCE_MAX_INFLIGHT_BYTES` | `global_execution_options.source_max_inflight_bytes` | No |

python/cocoindex/setting.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class DatabaseConnectionSpec:
4444

4545

4646
@dataclass
47-
class DefaultExecutionOptions:
47+
class GlobalExecutionOptions:
4848
"""Default execution options."""
4949

5050
# The maximum number of concurrent inflight requests.
@@ -81,7 +81,7 @@ class Settings:
8181

8282
database: DatabaseConnectionSpec | None = None
8383
app_namespace: str = ""
84-
default_execution_options: DefaultExecutionOptions | None = None
84+
global_execution_options: GlobalExecutionOptions | None = None
8585

8686
@classmethod
8787
def from_env(cls) -> Self:
@@ -110,14 +110,14 @@ def from_env(cls) -> Self:
110110
"COCOINDEX_SOURCE_MAX_INFLIGHT_BYTES",
111111
parse=int,
112112
)
113-
default_execution_options = DefaultExecutionOptions(**exec_kwargs)
113+
global_execution_options = GlobalExecutionOptions(**exec_kwargs)
114114

115115
app_namespace = os.getenv("COCOINDEX_APP_NAMESPACE", "")
116116

117117
return cls(
118118
database=database,
119119
app_namespace=app_namespace,
120-
default_execution_options=default_execution_options,
120+
global_execution_options=global_execution_options,
121121
)
122122

123123

src/base/spec.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,15 @@ pub struct ExecutionOptions {
262262
pub max_inflight_bytes: Option<usize>,
263263
}
264264

265+
impl ExecutionOptions {
266+
pub fn get_concur_control_options(&self) -> concur_control::Options {
267+
concur_control::Options {
268+
max_inflight_rows: self.max_inflight_rows,
269+
max_inflight_bytes: self.max_inflight_bytes,
270+
}
271+
}
272+
}
273+
265274
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
266275
pub struct SourceRefreshOptions {
267276
pub refresh_interval: Option<std::time::Duration>,

src/builder/analyzer.rs

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -689,18 +689,11 @@ impl AnalyzerContext {
689689
.clone();
690690
let output = op_scope.add_op_output(import_op.name, output_type)?;
691691

692-
let max_inflight_rows =
693-
(import_op.spec.execution_options.max_inflight_rows).or_else(|| {
694-
self.lib_ctx
695-
.default_execution_options
696-
.source_max_inflight_rows
697-
});
698-
let max_inflight_bytes =
699-
(import_op.spec.execution_options.max_inflight_bytes).or_else(|| {
700-
self.lib_ctx
701-
.default_execution_options
702-
.source_max_inflight_bytes
703-
});
692+
let concur_control_options = import_op
693+
.spec
694+
.execution_options
695+
.get_concur_control_options();
696+
let global_concurrency_controller = self.lib_ctx.global_concurrency_controller.clone();
704697
let result_fut = async move {
705698
trace!("Start building executor for source op `{}`", op_name);
706699
let executor = executor.await?;
@@ -711,9 +704,9 @@ impl AnalyzerContext {
711704
primary_key_type,
712705
name: op_name,
713706
refresh_options: import_op.spec.refresh_options,
714-
concurrency_controller: concur_control::ConcurrencyController::new(
715-
max_inflight_rows,
716-
max_inflight_bytes,
707+
concurrency_controller: concur_control::CombinedConcurrencyController::new(
708+
&concur_control_options,
709+
global_concurrency_controller,
717710
),
718711
})
719712
};
@@ -808,7 +801,8 @@ impl AnalyzerContext {
808801
};
809802
let op_name = reactive_op.name.clone();
810803

811-
let exec_options = foreach_op.execution_options.clone();
804+
let concur_control_options =
805+
foreach_op.execution_options.get_concur_control_options();
812806
async move {
813807
Ok(AnalyzedReactiveOp::ForEach(AnalyzedForEachOp {
814808
local_field_ref,
@@ -817,8 +811,7 @@ impl AnalyzerContext {
817811
.with_context(|| format!("Analyzing foreach op: {op_name}"))?,
818812
name: op_name,
819813
concurrency_controller: concur_control::ConcurrencyController::new(
820-
exec_options.max_inflight_rows,
821-
exec_options.max_inflight_bytes,
814+
&concur_control_options,
822815
),
823816
}))
824817
}

src/builder/plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub struct AnalyzedImportOp {
5757
pub primary_key_type: schema::ValueType,
5858
pub refresh_options: spec::SourceRefreshOptions,
5959

60-
pub concurrency_controller: concur_control::ConcurrencyController,
60+
pub concurrency_controller: concur_control::CombinedConcurrencyController,
6161
}
6262

6363
pub struct AnalyzedFunctionExecInfo {

src/execution/source_indexer.rs

Lines changed: 23 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ impl SourceIndexingContext {
108108
key: value::KeyValue,
109109
source_data: Option<interface::SourceData>,
110110
update_stats: Arc<stats::UpdateStats>,
111-
_concur_permit: concur_control::ConcurrencyControllerPermit,
111+
_concur_permit: concur_control::CombinedConcurrencyControllerPermit,
112112
ack_fn: Option<AckFn>,
113113
pool: PgPool,
114114
) {
@@ -247,36 +247,6 @@ impl SourceIndexingContext {
247247
}
248248
}
249249

250-
// Expected to be called during scan, which has no value.
251-
fn process_source_key_if_newer(
252-
self: &Arc<Self>,
253-
key: value::KeyValue,
254-
source_version: SourceVersion,
255-
update_stats: &Arc<stats::UpdateStats>,
256-
concur_permit: concur_control::ConcurrencyControllerPermit,
257-
pool: &PgPool,
258-
) -> Option<impl Future<Output = ()> + Send + 'static> {
259-
{
260-
let mut state = self.state.lock().unwrap();
261-
let scan_generation = state.scan_generation;
262-
let row_state = state.rows.entry(key.clone()).or_default();
263-
row_state.touched_generation = scan_generation;
264-
if row_state
265-
.source_version
266-
.should_skip(&source_version, Some(update_stats.as_ref()))
267-
{
268-
return None;
269-
}
270-
}
271-
Some(self.clone().process_source_key(
272-
key,
273-
None,
274-
update_stats.clone(),
275-
concur_permit,
276-
NO_ACK,
277-
pool.clone(),
278-
))
279-
}
280250
pub async fn update(
281251
self: &Arc<Self>,
282252
pool: &PgPool,
@@ -336,21 +306,34 @@ impl SourceIndexingContext {
336306
};
337307
while let Some(row) = rows_stream.next().await {
338308
for row in row? {
309+
let source_version = SourceVersion::from_current_with_ordinal(
310+
row.ordinal
311+
.ok_or_else(|| anyhow::anyhow!("ordinal is not available"))?,
312+
);
313+
{
314+
let mut state = self.state.lock().unwrap();
315+
let scan_generation = state.scan_generation;
316+
let row_state = state.rows.entry(row.key.clone()).or_default();
317+
row_state.touched_generation = scan_generation;
318+
if row_state
319+
.source_version
320+
.should_skip(&source_version, Some(update_stats.as_ref()))
321+
{
322+
continue;
323+
}
324+
}
339325
let concur_permit = import_op
340326
.concurrency_controller
341327
.acquire(concur_control::BYTES_UNKNOWN_YET)
342328
.await?;
343-
self.process_source_key_if_newer(
329+
join_set.spawn(self.clone().process_source_key(
344330
row.key,
345-
SourceVersion::from_current_with_ordinal(
346-
row.ordinal
347-
.ok_or_else(|| anyhow::anyhow!("ordinal is not available"))?,
348-
),
349-
update_stats,
331+
None,
332+
update_stats.clone(),
350333
concur_permit,
351-
pool,
352-
)
353-
.map(|fut| join_set.spawn(fut));
334+
NO_ACK,
335+
pool.clone(),
336+
));
354337
}
355338
}
356339
while let Some(result) = join_set.join_next().await {

src/lib_context.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ pub struct LibContext {
196196
pub persistence_ctx: Option<PersistenceContext>,
197197
pub flows: Mutex<BTreeMap<String, Arc<FlowContext>>>,
198198

199-
pub default_execution_options: settings::DefaultExecutionOptions,
199+
pub global_concurrency_controller: Arc<concur_control::ConcurrencyController>,
200200
}
201201

202202
impl LibContext {
@@ -269,7 +269,12 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
269269
db_pools,
270270
persistence_ctx,
271271
flows: Mutex::new(BTreeMap::new()),
272-
default_execution_options: settings.default_execution_options,
272+
global_concurrency_controller: Arc::new(concur_control::ConcurrencyController::new(
273+
&concur_control::Options {
274+
max_inflight_rows: settings.global_execution_options.source_max_inflight_rows,
275+
max_inflight_bytes: settings.global_execution_options.source_max_inflight_bytes,
276+
},
277+
)),
273278
})
274279
}
275280

src/settings.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ pub struct DatabaseConnectionSpec {
88
}
99

1010
#[derive(Deserialize, Debug, Default)]
11-
pub struct DefaultExecutionOptions {
11+
pub struct GlobalExecutionOptions {
1212
pub source_max_inflight_rows: Option<usize>,
1313
pub source_max_inflight_bytes: Option<usize>,
1414
}
@@ -21,7 +21,7 @@ pub struct Settings {
2121
#[allow(dead_code)] // Used via serialization/deserialization to Python
2222
pub app_namespace: String,
2323
#[serde(default)]
24-
pub default_execution_options: DefaultExecutionOptions,
24+
pub global_execution_options: GlobalExecutionOptions,
2525
}
2626

2727
#[cfg(test)]

src/utils/concur_control.rs

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ impl WeightedSemaphore {
4848
}
4949
}
5050

51+
pub struct Options {
52+
pub max_inflight_rows: Option<usize>,
53+
pub max_inflight_bytes: Option<usize>,
54+
}
55+
5156
pub struct ConcurrencyControllerPermit {
5257
_inflight_count_permit: Option<OwnedSemaphorePermit>,
5358
_inflight_bytes_permit: Option<OwnedSemaphorePermit>,
@@ -61,10 +66,14 @@ pub struct ConcurrencyController {
6166
pub static BYTES_UNKNOWN_YET: Option<fn() -> usize> = None;
6267

6368
impl ConcurrencyController {
64-
pub fn new(max_inflight_count: Option<usize>, max_inflight_bytes: Option<usize>) -> Self {
69+
pub fn new(exec_options: &Options) -> Self {
6570
Self {
66-
inflight_count_sem: max_inflight_count.map(|max| Arc::new(Semaphore::new(max))),
67-
inflight_bytes_sem: max_inflight_bytes.map(|max| WeightedSemaphore::new(max)),
71+
inflight_count_sem: exec_options
72+
.max_inflight_rows
73+
.map(|max| Arc::new(Semaphore::new(max))),
74+
inflight_bytes_sem: exec_options
75+
.max_inflight_bytes
76+
.map(|max| WeightedSemaphore::new(max)),
6877
}
6978
}
7079

@@ -82,8 +91,7 @@ impl ConcurrencyController {
8291
};
8392
let inflight_bytes_permit = if let Some(sem) = &self.inflight_bytes_sem {
8493
if let Some(bytes_fn) = bytes_fn {
85-
let n = bytes_fn();
86-
sem.acquire(n, false).await?
94+
sem.acquire(bytes_fn(), false).await?
8795
} else {
8896
Some(sem.acquire_reservation().await?)
8997
}
@@ -107,3 +115,62 @@ impl ConcurrencyController {
107115
}
108116
}
109117
}
118+
119+
pub struct CombinedConcurrencyControllerPermit {
120+
_permit: ConcurrencyControllerPermit,
121+
_global_permit: ConcurrencyControllerPermit,
122+
}
123+
124+
pub struct CombinedConcurrencyController {
125+
controller: ConcurrencyController,
126+
global_controller: Arc<ConcurrencyController>,
127+
needs_num_bytes: bool,
128+
}
129+
130+
impl CombinedConcurrencyController {
131+
pub fn new(exec_options: &Options, global_controller: Arc<ConcurrencyController>) -> Self {
132+
Self {
133+
controller: ConcurrencyController::new(exec_options),
134+
needs_num_bytes: exec_options.max_inflight_bytes.is_some()
135+
|| global_controller.inflight_bytes_sem.is_some(),
136+
global_controller,
137+
}
138+
}
139+
140+
pub async fn acquire(
141+
&self,
142+
bytes_fn: Option<impl FnOnce() -> usize>,
143+
) -> Result<CombinedConcurrencyControllerPermit, AcquireError> {
144+
let num_bytes_fn = if let Some(bytes_fn) = bytes_fn
145+
&& self.needs_num_bytes
146+
{
147+
let num_bytes = bytes_fn();
148+
Some(move || num_bytes)
149+
} else {
150+
None
151+
};
152+
153+
let permit = self.controller.acquire(num_bytes_fn).await?;
154+
let global_permit = self.global_controller.acquire(num_bytes_fn).await?;
155+
Ok(CombinedConcurrencyControllerPermit {
156+
_permit: permit,
157+
_global_permit: global_permit,
158+
})
159+
}
160+
161+
pub async fn acquire_bytes_with_reservation<'a>(
162+
&'a self,
163+
bytes_fn: impl FnOnce() -> usize,
164+
) -> Result<(Option<OwnedSemaphorePermit>, Option<OwnedSemaphorePermit>), AcquireError> {
165+
let num_bytes = bytes_fn();
166+
let permit = self
167+
.controller
168+
.acquire_bytes_with_reservation(move || num_bytes)
169+
.await?;
170+
let global_permit = self
171+
.global_controller
172+
.acquire_bytes_with_reservation(move || num_bytes)
173+
.await?;
174+
Ok((permit, global_permit))
175+
}
176+
}

0 commit comments

Comments
 (0)