From c5083706fb7f49da4d9a619c74a7ae25c2bcd2ea Mon Sep 17 00:00:00 2001 From: LJ Date: Mon, 7 Jul 2025 08:31:02 -0700 Subject: [PATCH] refactor(naming): `max_inflight_rows` instead of `max_inflight_count` --- docs/docs/core/flow_def.mdx | 2 +- docs/docs/core/settings.mdx | 4 ++-- python/cocoindex/flow.py | 6 +++--- python/cocoindex/setting.py | 6 +++--- src/base/spec.rs | 2 +- src/builder/analyzer.rs | 8 ++++---- src/settings.rs | 2 +- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/docs/docs/core/flow_def.mdx b/docs/docs/core/flow_def.mdx index 00b0257fc..9b2d37074 100644 --- a/docs/docs/core/flow_def.mdx +++ b/docs/docs/core/flow_def.mdx @@ -155,7 +155,7 @@ If nothing changed during the last refresh cycle, only list operations will be p You can pass the following arguments to `add_source()` to control the concurrency of the source operation: -* `max_inflight_count`: the maximum number of concurrent inflight requests for the source operation. +* `max_inflight_rows`: the maximum number of concurrent inflight requests for the source operation. The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variable](/docs/core/settings#list-of-environment-variables). diff --git a/docs/docs/core/settings.mdx b/docs/docs/core/settings.mdx index 80fc6a1d9..40f774888 100644 --- a/docs/docs/core/settings.mdx +++ b/docs/docs/core/settings.mdx @@ -109,7 +109,7 @@ If you use the Postgres database hosted by [Supabase](https://supabase.com/), pl `DefaultExecutionOptions` is used to configure the default execution options for the flow. It has the following fields: -* `source_max_inflight_count` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations. This only provides an default value, and can be overridden by the `max_inflight_count` argument passed to `FlowBuilder.add_source()` on per-source basis. +* `source_max_inflight_rows` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations. This only provides an default value, and can be overridden by the `max_inflight_rows` argument passed to `FlowBuilder.add_source()` on per-source basis. ## List of Environment Variables @@ -121,4 +121,4 @@ This is the list of environment variables, each of which has a corresponding fie | `COCOINDEX_DATABASE_URL` | `database.url` | Yes | | `COCOINDEX_DATABASE_USER` | `database.user` | No | | `COCOINDEX_DATABASE_PASSWORD` | `database.password` | No | -| `COCOINDEX_SOURCE_MAX_INFLIGHT_COUNT` | `default_execution_options.source_max_inflight_count` | No | +| `COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS` | `default_execution_options.source_max_inflight_rows` | No | diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index c12d88721..e05e7478b 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -418,7 +418,7 @@ class _SourceRefreshOptions: @dataclass class _ExecutionOptions: - max_inflight_count: int | None = None + max_inflight_rows: int | None = None class FlowBuilder: @@ -444,7 +444,7 @@ def add_source( *, name: str | None = None, refresh_interval: datetime.timedelta | None = None, - max_inflight_count: int | None = None, + max_inflight_rows: int | None = None, ) -> DataSlice[T]: """ Import a source to the flow. @@ -464,7 +464,7 @@ def add_source( _SourceRefreshOptions(refresh_interval=refresh_interval) ), execution_options=dump_engine_object( - _ExecutionOptions(max_inflight_count=max_inflight_count) + _ExecutionOptions(max_inflight_rows=max_inflight_rows) ), ), name, diff --git a/python/cocoindex/setting.py b/python/cocoindex/setting.py index 177de50fe..cab71ad3b 100644 --- a/python/cocoindex/setting.py +++ b/python/cocoindex/setting.py @@ -48,7 +48,7 @@ class DefaultExecutionOptions: """Default execution options.""" # The maximum number of concurrent inflight requests. - source_max_inflight_count: int | None = 256 + source_max_inflight_rows: int | None = 256 def _load_field( @@ -99,8 +99,8 @@ def from_env(cls) -> Self: exec_kwargs: dict[str, Any] = dict() _load_field( exec_kwargs, - "source_max_inflight_count", - "COCOINDEX_SOURCE_MAX_INFLIGHT_COUNT", + "source_max_inflight_rows", + "COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS", parse=int, ) default_execution_options = DefaultExecutionOptions(**exec_kwargs) diff --git a/src/base/spec.rs b/src/base/spec.rs index 9ec43346d..da3684888 100644 --- a/src/base/spec.rs +++ b/src/base/spec.rs @@ -255,7 +255,7 @@ impl SpecFormatter for OpSpec { #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct ExecutionOptions { - pub max_inflight_count: Option, + pub max_inflight_rows: Option, } #[derive(Debug, Clone, Serialize, Deserialize, Default)] diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 2b7cc237c..6366a5482 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -689,11 +689,11 @@ impl AnalyzerContext { .clone(); let output = op_scope.add_op_output(import_op.name, output_type)?; - let max_inflight_count = - (import_op.spec.execution_options.max_inflight_count).or_else(|| { + let max_inflight_rows = + (import_op.spec.execution_options.max_inflight_rows).or_else(|| { self.lib_ctx .default_execution_options - .source_max_inflight_count + .source_max_inflight_rows }); let result_fut = async move { trace!("Start building executor for source op `{}`", op_name); @@ -705,7 +705,7 @@ impl AnalyzerContext { primary_key_type, name: op_name, refresh_options: import_op.spec.refresh_options, - concurrency_controller: utils::ConcurrencyController::new(max_inflight_count), + concurrency_controller: utils::ConcurrencyController::new(max_inflight_rows), }) }; Ok(result_fut) diff --git a/src/settings.rs b/src/settings.rs index 0e5160ae9..d3acd4b1f 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -9,7 +9,7 @@ pub struct DatabaseConnectionSpec { #[derive(Deserialize, Debug, Default)] pub struct DefaultExecutionOptions { - pub source_max_inflight_count: Option, + pub source_max_inflight_rows: Option, } #[derive(Deserialize, Debug, Default)]