Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/core/flow_def.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ If nothing changed during the last refresh cycle, only list operations will be p

You can pass the following arguments to `add_source()` to control the concurrency of the source operation:

* `max_inflight_count`: the maximum number of concurrent inflight requests for the source operation.
* `max_inflight_rows`: the maximum number of concurrent inflight requests for the source operation.

The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variable](/docs/core/settings#list-of-environment-variables).

Expand Down
4 changes: 2 additions & 2 deletions docs/docs/core/settings.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ If you use the Postgres database hosted by [Supabase](https://supabase.com/), pl

`DefaultExecutionOptions` is used to configure the default execution options for the flow. It has the following fields:

* `source_max_inflight_count` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations. This only provides an default value, and can be overridden by the `max_inflight_count` argument passed to `FlowBuilder.add_source()` on per-source basis.
* `source_max_inflight_rows` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations. This only provides an default value, and can be overridden by the `max_inflight_rows` argument passed to `FlowBuilder.add_source()` on per-source basis.

## List of Environment Variables

Expand All @@ -121,4 +121,4 @@ This is the list of environment variables, each of which has a corresponding fie
| `COCOINDEX_DATABASE_URL` | `database.url` | Yes |
| `COCOINDEX_DATABASE_USER` | `database.user` | No |
| `COCOINDEX_DATABASE_PASSWORD` | `database.password` | No |
| `COCOINDEX_SOURCE_MAX_INFLIGHT_COUNT` | `default_execution_options.source_max_inflight_count` | No |
| `COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS` | `default_execution_options.source_max_inflight_rows` | No |
6 changes: 3 additions & 3 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ class _SourceRefreshOptions:

@dataclass
class _ExecutionOptions:
max_inflight_count: int | None = None
max_inflight_rows: int | None = None


class FlowBuilder:
Expand All @@ -444,7 +444,7 @@ def add_source(
*,
name: str | None = None,
refresh_interval: datetime.timedelta | None = None,
max_inflight_count: int | None = None,
max_inflight_rows: int | None = None,
) -> DataSlice[T]:
"""
Import a source to the flow.
Expand All @@ -464,7 +464,7 @@ def add_source(
_SourceRefreshOptions(refresh_interval=refresh_interval)
),
execution_options=dump_engine_object(
_ExecutionOptions(max_inflight_count=max_inflight_count)
_ExecutionOptions(max_inflight_rows=max_inflight_rows)
),
),
name,
Expand Down
6 changes: 3 additions & 3 deletions python/cocoindex/setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class DefaultExecutionOptions:
"""Default execution options."""

# The maximum number of concurrent inflight requests.
source_max_inflight_count: int | None = 256
source_max_inflight_rows: int | None = 256


def _load_field(
Expand Down Expand Up @@ -99,8 +99,8 @@ def from_env(cls) -> Self:
exec_kwargs: dict[str, Any] = dict()
_load_field(
exec_kwargs,
"source_max_inflight_count",
"COCOINDEX_SOURCE_MAX_INFLIGHT_COUNT",
"source_max_inflight_rows",
"COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS",
parse=int,
)
default_execution_options = DefaultExecutionOptions(**exec_kwargs)
Expand Down
2 changes: 1 addition & 1 deletion src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl SpecFormatter for OpSpec {

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ExecutionOptions {
pub max_inflight_count: Option<u32>,
pub max_inflight_rows: Option<u32>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
Expand Down
8 changes: 4 additions & 4 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,11 +689,11 @@ impl AnalyzerContext {
.clone();
let output = op_scope.add_op_output(import_op.name, output_type)?;

let max_inflight_count =
(import_op.spec.execution_options.max_inflight_count).or_else(|| {
let max_inflight_rows =
(import_op.spec.execution_options.max_inflight_rows).or_else(|| {
self.lib_ctx
.default_execution_options
.source_max_inflight_count
.source_max_inflight_rows
});
let result_fut = async move {
trace!("Start building executor for source op `{}`", op_name);
Expand All @@ -705,7 +705,7 @@ impl AnalyzerContext {
primary_key_type,
name: op_name,
refresh_options: import_op.spec.refresh_options,
concurrency_controller: utils::ConcurrencyController::new(max_inflight_count),
concurrency_controller: utils::ConcurrencyController::new(max_inflight_rows),
})
};
Ok(result_fut)
Expand Down
2 changes: 1 addition & 1 deletion src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct DatabaseConnectionSpec {

#[derive(Deserialize, Debug, Default)]
pub struct DefaultExecutionOptions {
pub source_max_inflight_count: Option<u32>,
pub source_max_inflight_rows: Option<u32>,
}

#[derive(Deserialize, Debug, Default)]
Expand Down