Skip to content

Commit a65a32e

Browse files
committed
feat(flow-control): support source max inflight count in global setting
1 parent 03b472c commit a65a32e

File tree

7 files changed

+81
-28
lines changed

7 files changed

+81
-28
lines changed

docs/docs/core/flow_def.mdx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ If nothing changed during the last refresh cycle, only list operations will be p
151151

152152
:::
153153

154+
#### Concurrency control
155+
156+
You can pass the following arguments to `add_source()` to control the concurrency of the source operation:
157+
158+
* `max_inflight_count`: the maximum number of concurrent inflight requests for the source operation.
159+
160+
The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variable](/docs/core/settings#list-of-environment-variables).
154161

155162
### Transform
156163

docs/docs/core/settings.mdx

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ You have two ways to launch CocoIndex:
6060

6161
* `app_namespace` (type: `str`, required): The namespace of the application.
6262
* `database` (type: `DatabaseConnectionSpec`, required): The connection to the Postgres database.
63+
* `default_execution_options` (type: `DefaultExecutionOptions`, optional): The default execution options for the flow.
6364

6465
### App Namespace
6566

@@ -104,13 +105,20 @@ If you use the Postgres database hosted by [Supabase](https://supabase.com/), pl
104105

105106
:::
106107

108+
### DefaultExecutionOptions
109+
110+
`DefaultExecutionOptions` is used to configure the default execution options for the flow. It has the following fields:
111+
112+
* `source_max_inflight_count` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations. This only provides an default value, and can be overridden by the `max_inflight_count` argument passed to `FlowBuilder.add_source()` on per-source basis.
113+
107114
## List of Environment Variables
108115

109116
This is the list of environment variables, each of which has a corresponding field in `Settings`:
110117

111118
| environment variable | corresponding field in `Settings` | required? |
112119
|---------------------|-------------------|----------|
120+
| `COCOINDEX_APP_NAMESPACE` | `app_namespace` | No |
113121
| `COCOINDEX_DATABASE_URL` | `database.url` | Yes |
114122
| `COCOINDEX_DATABASE_USER` | `database.user` | No |
115123
| `COCOINDEX_DATABASE_PASSWORD` | `database.password` | No |
116-
| `COCOINDEX_APP_NAMESPACE` | `app_namespace` | No |
124+
| `COCOINDEX_SOURCE_MAX_INFLIGHT_COUNT` | `default_execution_options.source_max_inflight_count` | No |

python/cocoindex/setting.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ class DatabaseConnectionSpec:
4343
password: str | None = None
4444

4545

46+
@dataclass
47+
class DefaultExecutionOptions:
48+
"""Default execution options."""
49+
50+
# The maximum number of concurrent inflight requests.
51+
source_max_inflight_count: int | None = 256
52+
53+
4654
def _load_field(
4755
target: dict[str, Any],
4856
name: str,
@@ -55,7 +63,15 @@ def _load_field(
5563
if required:
5664
raise ValueError(f"{env_name} is not set")
5765
else:
58-
target[name] = value if parse is None else parse(value)
66+
if parse is None:
67+
target[name] = value
68+
else:
69+
try:
70+
target[name] = parse(value)
71+
except Exception as e:
72+
raise ValueError(
73+
f"failed to parse environment variable {env_name}: {value}"
74+
) from e
5975

6076

6177
@dataclass
@@ -64,6 +80,7 @@ class Settings:
6480

6581
database: DatabaseConnectionSpec | None = None
6682
app_namespace: str = ""
83+
default_execution_options: DefaultExecutionOptions | None = None
6784

6885
@classmethod
6986
def from_env(cls) -> Self:
@@ -79,9 +96,22 @@ def from_env(cls) -> Self:
7996
else:
8097
database = None
8198

99+
exec_kwargs: dict[str, Any] = dict()
100+
_load_field(
101+
exec_kwargs,
102+
"source_max_inflight_count",
103+
"COCOINDEX_SOURCE_MAX_INFLIGHT_COUNT",
104+
parse=int,
105+
)
106+
default_execution_options = DefaultExecutionOptions(**exec_kwargs)
107+
82108
app_namespace = os.getenv("COCOINDEX_APP_NAMESPACE", "")
83109

84-
return cls(database=database, app_namespace=app_namespace)
110+
return cls(
111+
database=database,
112+
app_namespace=app_namespace,
113+
default_execution_options=default_execution_options,
114+
)
85115

86116

87117
@dataclass

src/builder/analyzer.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ impl DataScopeBuilder {
361361
}
362362

363363
pub(super) struct AnalyzerContext {
364+
pub lib_ctx: Arc<LibContext>,
364365
pub flow_ctx: Arc<FlowInstanceContext>,
365366
}
366367

@@ -687,6 +688,13 @@ impl AnalyzerContext {
687688
.typ
688689
.clone();
689690
let output = op_scope.add_op_output(import_op.name, output_type)?;
691+
692+
let max_inflight_count =
693+
(import_op.spec.execution_options.max_inflight_count).or_else(|| {
694+
self.lib_ctx
695+
.default_execution_options
696+
.source_max_inflight_count
697+
});
690698
let result_fut = async move {
691699
trace!("Start building executor for source op `{}`", op_name);
692700
let executor = executor.await?;
@@ -697,9 +705,7 @@ impl AnalyzerContext {
697705
primary_key_type,
698706
name: op_name,
699707
refresh_options: import_op.spec.refresh_options,
700-
concurrency_controller: utils::ConcurrencyController::new(
701-
import_op.spec.execution_options.max_inflight_count,
702-
),
708+
concurrency_controller: utils::ConcurrencyController::new(max_inflight_count),
703709
})
704710
};
705711
Ok(result_fut)
@@ -1013,7 +1019,10 @@ pub async fn analyze_flow(
10131019
AnalyzedSetupState,
10141020
impl Future<Output = Result<ExecutionPlan>> + Send + use<>,
10151021
)> {
1016-
let analyzer_ctx = AnalyzerContext { flow_ctx };
1022+
let analyzer_ctx = AnalyzerContext {
1023+
lib_ctx: get_lib_context()?,
1024+
flow_ctx,
1025+
};
10171026
let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new()));
10181027
let root_op_scope = OpScope::new(ROOT_SCOPE_NAME.to_string(), None, root_data_scope);
10191028
let mut import_ops_futs = Vec::with_capacity(flow_inst.import_ops.len());
@@ -1125,7 +1134,10 @@ pub async fn analyze_transient_flow<'a>(
11251134
impl Future<Output = Result<TransientExecutionPlan>> + Send + 'a,
11261135
)> {
11271136
let mut root_data_scope = DataScopeBuilder::new();
1128-
let analyzer_ctx = AnalyzerContext { flow_ctx };
1137+
let analyzer_ctx = AnalyzerContext {
1138+
lib_ctx: get_lib_context()?,
1139+
flow_ctx,
1140+
};
11291141
let mut input_fields = vec![];
11301142
for field in flow_inst.input_fields.iter() {
11311143
let analyzed_field = root_data_scope.add_field(field.name.clone(), &field.value_type)?;

src/builder/flow_builder.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ impl FlowBuilder {
320320
},
321321
};
322322
let analyzer_ctx = AnalyzerContext {
323+
lib_ctx: self.lib_context.clone(),
323324
flow_ctx: self.flow_inst_context.clone(),
324325
};
325326
let analyzed = py
@@ -417,6 +418,7 @@ impl FlowBuilder {
417418
};
418419

419420
let analyzer_ctx = AnalyzerContext {
421+
lib_ctx: self.lib_context.clone(),
420422
flow_ctx: self.flow_inst_context.clone(),
421423
};
422424
let analyzed = py
@@ -464,6 +466,7 @@ impl FlowBuilder {
464466
};
465467

466468
let analyzer_ctx = AnalyzerContext {
469+
lib_ctx: self.lib_context.clone(),
467470
flow_ctx: self.flow_inst_context.clone(),
468471
};
469472
let analyzed = py

src/lib_context.rs

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ pub struct LibContext {
195195
pub db_pools: DbPools,
196196
pub persistence_ctx: Option<PersistenceContext>,
197197
pub flows: Mutex<BTreeMap<String, Arc<FlowContext>>>,
198+
199+
pub default_execution_options: settings::DefaultExecutionOptions,
198200
}
199201

200202
impl LibContext {
@@ -262,6 +264,7 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
262264
db_pools,
263265
persistence_ctx,
264266
flows: Mutex::new(BTreeMap::new()),
267+
default_execution_options: settings.default_execution_options,
265268
})
266269
}
267270

@@ -296,26 +299,9 @@ mod tests {
296299
assert!(db_pools.pools.lock().unwrap().is_empty());
297300
}
298301

299-
#[test]
300-
fn test_settings_structure_without_database() {
301-
let settings = settings::Settings {
302-
database: None,
303-
app_namespace: "test".to_string(),
304-
};
305-
306-
// Test that we can create the basic structure
307-
assert!(settings.database.is_none());
308-
assert_eq!(settings.app_namespace, "test");
309-
}
310-
311302
#[test]
312303
fn test_lib_context_without_database() {
313-
let settings = settings::Settings {
314-
database: None,
315-
app_namespace: "test".to_string(),
316-
};
317-
318-
let lib_context = create_lib_context(settings).unwrap();
304+
let lib_context = create_lib_context(settings::Settings::default()).unwrap();
319305
assert!(lib_context.persistence_ctx.is_none());
320306
assert!(lib_context.require_builtin_db_pool().is_err());
321307
}
@@ -329,7 +315,7 @@ mod tests {
329315
user: None,
330316
password: None,
331317
}),
332-
app_namespace: "test".to_string(),
318+
..Default::default()
333319
};
334320

335321
// This would fail at runtime due to invalid connection, but we're testing the structure

src/settings.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,20 @@ pub struct DatabaseConnectionSpec {
77
pub password: Option<String>,
88
}
99

10-
#[derive(Deserialize, Debug)]
10+
#[derive(Deserialize, Debug, Default)]
11+
pub struct DefaultExecutionOptions {
12+
pub source_max_inflight_count: Option<u32>,
13+
}
14+
15+
#[derive(Deserialize, Debug, Default)]
1116
pub struct Settings {
1217
#[serde(default)]
1318
pub database: Option<DatabaseConnectionSpec>,
1419
#[serde(default)]
1520
#[allow(dead_code)] // Used via serialization/deserialization to Python
1621
pub app_namespace: String,
22+
#[serde(default)]
23+
pub default_execution_options: DefaultExecutionOptions,
1724
}
1825

1926
#[cfg(test)]

0 commit comments

Comments
 (0)