diff --git a/docs/docs/core/flow_def.mdx b/docs/docs/core/flow_def.mdx index c8bbecf45..00b0257fc 100644 --- a/docs/docs/core/flow_def.mdx +++ b/docs/docs/core/flow_def.mdx @@ -151,6 +151,13 @@ If nothing changed during the last refresh cycle, only list operations will be p ::: +#### Concurrency control + +You can pass the following arguments to `add_source()` to control the concurrency of the source operation: + +* `max_inflight_count`: the maximum number of concurrent inflight requests for the source operation. + +The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variable](/docs/core/settings#list-of-environment-variables). ### Transform diff --git a/docs/docs/core/settings.mdx b/docs/docs/core/settings.mdx index fb1c7a924..80fc6a1d9 100644 --- a/docs/docs/core/settings.mdx +++ b/docs/docs/core/settings.mdx @@ -60,6 +60,7 @@ You have two ways to launch CocoIndex: * `app_namespace` (type: `str`, required): The namespace of the application. * `database` (type: `DatabaseConnectionSpec`, required): The connection to the Postgres database. +* `default_execution_options` (type: `DefaultExecutionOptions`, optional): The default execution options for the flow. ### App Namespace @@ -104,13 +105,20 @@ If you use the Postgres database hosted by [Supabase](https://supabase.com/), pl ::: +### DefaultExecutionOptions + +`DefaultExecutionOptions` is used to configure the default execution options for the flow. It has the following fields: + +* `source_max_inflight_count` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations. This only provides an default value, and can be overridden by the `max_inflight_count` argument passed to `FlowBuilder.add_source()` on per-source basis. + ## List of Environment Variables This is the list of environment variables, each of which has a corresponding field in `Settings`: | environment variable | corresponding field in `Settings` | required? | |---------------------|-------------------|----------| +| `COCOINDEX_APP_NAMESPACE` | `app_namespace` | No | | `COCOINDEX_DATABASE_URL` | `database.url` | Yes | | `COCOINDEX_DATABASE_USER` | `database.user` | No | | `COCOINDEX_DATABASE_PASSWORD` | `database.password` | No | -| `COCOINDEX_APP_NAMESPACE` | `app_namespace` | No | +| `COCOINDEX_SOURCE_MAX_INFLIGHT_COUNT` | `default_execution_options.source_max_inflight_count` | No | diff --git a/python/cocoindex/setting.py b/python/cocoindex/setting.py index 4404a5c13..177de50fe 100644 --- a/python/cocoindex/setting.py +++ b/python/cocoindex/setting.py @@ -43,6 +43,14 @@ class DatabaseConnectionSpec: password: str | None = None +@dataclass +class DefaultExecutionOptions: + """Default execution options.""" + + # The maximum number of concurrent inflight requests. + source_max_inflight_count: int | None = 256 + + def _load_field( target: dict[str, Any], name: str, @@ -55,7 +63,15 @@ def _load_field( if required: raise ValueError(f"{env_name} is not set") else: - target[name] = value if parse is None else parse(value) + if parse is None: + target[name] = value + else: + try: + target[name] = parse(value) + except Exception as e: + raise ValueError( + f"failed to parse environment variable {env_name}: {value}" + ) from e @dataclass @@ -64,6 +80,7 @@ class Settings: database: DatabaseConnectionSpec | None = None app_namespace: str = "" + default_execution_options: DefaultExecutionOptions | None = None @classmethod def from_env(cls) -> Self: @@ -79,9 +96,22 @@ def from_env(cls) -> Self: else: database = None + exec_kwargs: dict[str, Any] = dict() + _load_field( + exec_kwargs, + "source_max_inflight_count", + "COCOINDEX_SOURCE_MAX_INFLIGHT_COUNT", + parse=int, + ) + default_execution_options = DefaultExecutionOptions(**exec_kwargs) + app_namespace = os.getenv("COCOINDEX_APP_NAMESPACE", "") - return cls(database=database, app_namespace=app_namespace) + return cls( + database=database, + app_namespace=app_namespace, + default_execution_options=default_execution_options, + ) @dataclass diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index e7d2cf09e..2b7cc237c 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -361,6 +361,7 @@ impl DataScopeBuilder { } pub(super) struct AnalyzerContext { + pub lib_ctx: Arc, pub flow_ctx: Arc, } @@ -687,6 +688,13 @@ impl AnalyzerContext { .typ .clone(); let output = op_scope.add_op_output(import_op.name, output_type)?; + + let max_inflight_count = + (import_op.spec.execution_options.max_inflight_count).or_else(|| { + self.lib_ctx + .default_execution_options + .source_max_inflight_count + }); let result_fut = async move { trace!("Start building executor for source op `{}`", op_name); let executor = executor.await?; @@ -697,9 +705,7 @@ impl AnalyzerContext { primary_key_type, name: op_name, refresh_options: import_op.spec.refresh_options, - concurrency_controller: utils::ConcurrencyController::new( - import_op.spec.execution_options.max_inflight_count, - ), + concurrency_controller: utils::ConcurrencyController::new(max_inflight_count), }) }; Ok(result_fut) @@ -1013,7 +1019,10 @@ pub async fn analyze_flow( AnalyzedSetupState, impl Future> + Send + use<>, )> { - let analyzer_ctx = AnalyzerContext { flow_ctx }; + let analyzer_ctx = AnalyzerContext { + lib_ctx: get_lib_context()?, + flow_ctx, + }; let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new())); let root_op_scope = OpScope::new(ROOT_SCOPE_NAME.to_string(), None, root_data_scope); let mut import_ops_futs = Vec::with_capacity(flow_inst.import_ops.len()); @@ -1125,7 +1134,10 @@ pub async fn analyze_transient_flow<'a>( impl Future> + Send + 'a, )> { let mut root_data_scope = DataScopeBuilder::new(); - let analyzer_ctx = AnalyzerContext { flow_ctx }; + let analyzer_ctx = AnalyzerContext { + lib_ctx: get_lib_context()?, + flow_ctx, + }; let mut input_fields = vec![]; for field in flow_inst.input_fields.iter() { let analyzed_field = root_data_scope.add_field(field.name.clone(), &field.value_type)?; diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index 9ff13ce64..db5bf8a2e 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -320,6 +320,7 @@ impl FlowBuilder { }, }; let analyzer_ctx = AnalyzerContext { + lib_ctx: self.lib_context.clone(), flow_ctx: self.flow_inst_context.clone(), }; let analyzed = py @@ -417,6 +418,7 @@ impl FlowBuilder { }; let analyzer_ctx = AnalyzerContext { + lib_ctx: self.lib_context.clone(), flow_ctx: self.flow_inst_context.clone(), }; let analyzed = py @@ -464,6 +466,7 @@ impl FlowBuilder { }; let analyzer_ctx = AnalyzerContext { + lib_ctx: self.lib_context.clone(), flow_ctx: self.flow_inst_context.clone(), }; let analyzed = py diff --git a/src/lib_context.rs b/src/lib_context.rs index 2f22bd81c..3f5d92a00 100644 --- a/src/lib_context.rs +++ b/src/lib_context.rs @@ -195,6 +195,8 @@ pub struct LibContext { pub db_pools: DbPools, pub persistence_ctx: Option, pub flows: Mutex>>, + + pub default_execution_options: settings::DefaultExecutionOptions, } impl LibContext { @@ -262,6 +264,7 @@ pub fn create_lib_context(settings: settings::Settings) -> Result { db_pools, persistence_ctx, flows: Mutex::new(BTreeMap::new()), + default_execution_options: settings.default_execution_options, }) } @@ -296,26 +299,9 @@ mod tests { assert!(db_pools.pools.lock().unwrap().is_empty()); } - #[test] - fn test_settings_structure_without_database() { - let settings = settings::Settings { - database: None, - app_namespace: "test".to_string(), - }; - - // Test that we can create the basic structure - assert!(settings.database.is_none()); - assert_eq!(settings.app_namespace, "test"); - } - #[test] fn test_lib_context_without_database() { - let settings = settings::Settings { - database: None, - app_namespace: "test".to_string(), - }; - - let lib_context = create_lib_context(settings).unwrap(); + let lib_context = create_lib_context(settings::Settings::default()).unwrap(); assert!(lib_context.persistence_ctx.is_none()); assert!(lib_context.require_builtin_db_pool().is_err()); } @@ -329,7 +315,7 @@ mod tests { user: None, password: None, }), - app_namespace: "test".to_string(), + ..Default::default() }; // This would fail at runtime due to invalid connection, but we're testing the structure diff --git a/src/settings.rs b/src/settings.rs index c6368407c..0e5160ae9 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -7,13 +7,20 @@ pub struct DatabaseConnectionSpec { pub password: Option, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Default)] +pub struct DefaultExecutionOptions { + pub source_max_inflight_count: Option, +} + +#[derive(Deserialize, Debug, Default)] pub struct Settings { #[serde(default)] pub database: Option, #[serde(default)] #[allow(dead_code)] // Used via serialization/deserialization to Python pub app_namespace: String, + #[serde(default)] + pub default_execution_options: DefaultExecutionOptions, } #[cfg(test)]