Skip to content

Commit 3e19f7a

Browse files
authored
refactor(naming): max_inflight_rows instead of max_inflight_count (#707)
1 parent 97059da commit 3e19f7a

File tree

7 files changed

+15
-15
lines changed

7 files changed

+15
-15
lines changed

docs/docs/core/flow_def.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ If nothing changed during the last refresh cycle, only list operations will be p
155155

156156
You can pass the following arguments to `add_source()` to control the concurrency of the source operation:
157157

158-
* `max_inflight_count`: the maximum number of concurrent inflight requests for the source operation.
158+
* `max_inflight_rows`: the maximum number of concurrent inflight requests for the source operation.
159159

160160
The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variable](/docs/core/settings#list-of-environment-variables).
161161

docs/docs/core/settings.mdx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ If you use the Postgres database hosted by [Supabase](https://supabase.com/), pl
109109

110110
`DefaultExecutionOptions` is used to configure the default execution options for the flow. It has the following fields:
111111

112-
* `source_max_inflight_count` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations. This only provides an default value, and can be overridden by the `max_inflight_count` argument passed to `FlowBuilder.add_source()` on per-source basis.
112+
* `source_max_inflight_rows` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations. This only provides an default value, and can be overridden by the `max_inflight_rows` argument passed to `FlowBuilder.add_source()` on per-source basis.
113113

114114
## List of Environment Variables
115115

@@ -121,4 +121,4 @@ This is the list of environment variables, each of which has a corresponding fie
121121
| `COCOINDEX_DATABASE_URL` | `database.url` | Yes |
122122
| `COCOINDEX_DATABASE_USER` | `database.user` | No |
123123
| `COCOINDEX_DATABASE_PASSWORD` | `database.password` | No |
124-
| `COCOINDEX_SOURCE_MAX_INFLIGHT_COUNT` | `default_execution_options.source_max_inflight_count` | No |
124+
| `COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS` | `default_execution_options.source_max_inflight_rows` | No |

python/cocoindex/flow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ class _SourceRefreshOptions:
418418

419419
@dataclass
420420
class _ExecutionOptions:
421-
max_inflight_count: int | None = None
421+
max_inflight_rows: int | None = None
422422

423423

424424
class FlowBuilder:
@@ -444,7 +444,7 @@ def add_source(
444444
*,
445445
name: str | None = None,
446446
refresh_interval: datetime.timedelta | None = None,
447-
max_inflight_count: int | None = None,
447+
max_inflight_rows: int | None = None,
448448
) -> DataSlice[T]:
449449
"""
450450
Import a source to the flow.
@@ -464,7 +464,7 @@ def add_source(
464464
_SourceRefreshOptions(refresh_interval=refresh_interval)
465465
),
466466
execution_options=dump_engine_object(
467-
_ExecutionOptions(max_inflight_count=max_inflight_count)
467+
_ExecutionOptions(max_inflight_rows=max_inflight_rows)
468468
),
469469
),
470470
name,

python/cocoindex/setting.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class DefaultExecutionOptions:
4848
"""Default execution options."""
4949

5050
# The maximum number of concurrent inflight requests.
51-
source_max_inflight_count: int | None = 256
51+
source_max_inflight_rows: int | None = 256
5252

5353

5454
def _load_field(
@@ -99,8 +99,8 @@ def from_env(cls) -> Self:
9999
exec_kwargs: dict[str, Any] = dict()
100100
_load_field(
101101
exec_kwargs,
102-
"source_max_inflight_count",
103-
"COCOINDEX_SOURCE_MAX_INFLIGHT_COUNT",
102+
"source_max_inflight_rows",
103+
"COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS",
104104
parse=int,
105105
)
106106
default_execution_options = DefaultExecutionOptions(**exec_kwargs)

src/base/spec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ impl SpecFormatter for OpSpec {
255255

256256
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
257257
pub struct ExecutionOptions {
258-
pub max_inflight_count: Option<u32>,
258+
pub max_inflight_rows: Option<u32>,
259259
}
260260

261261
#[derive(Debug, Clone, Serialize, Deserialize, Default)]

src/builder/analyzer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -689,11 +689,11 @@ impl AnalyzerContext {
689689
.clone();
690690
let output = op_scope.add_op_output(import_op.name, output_type)?;
691691

692-
let max_inflight_count =
693-
(import_op.spec.execution_options.max_inflight_count).or_else(|| {
692+
let max_inflight_rows =
693+
(import_op.spec.execution_options.max_inflight_rows).or_else(|| {
694694
self.lib_ctx
695695
.default_execution_options
696-
.source_max_inflight_count
696+
.source_max_inflight_rows
697697
});
698698
let result_fut = async move {
699699
trace!("Start building executor for source op `{}`", op_name);
@@ -705,7 +705,7 @@ impl AnalyzerContext {
705705
primary_key_type,
706706
name: op_name,
707707
refresh_options: import_op.spec.refresh_options,
708-
concurrency_controller: utils::ConcurrencyController::new(max_inflight_count),
708+
concurrency_controller: utils::ConcurrencyController::new(max_inflight_rows),
709709
})
710710
};
711711
Ok(result_fut)

src/settings.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub struct DatabaseConnectionSpec {
99

1010
#[derive(Deserialize, Debug, Default)]
1111
pub struct DefaultExecutionOptions {
12-
pub source_max_inflight_count: Option<u32>,
12+
pub source_max_inflight_rows: Option<u32>,
1313
}
1414

1515
#[derive(Deserialize, Debug, Default)]

0 commit comments

Comments
 (0)