Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/core/flow_def.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ If nothing changed during the last refresh cycle, only list operations will be p
You can pass the following arguments to `add_source()` to control the concurrency of the source operation:

* `max_inflight_rows`: the maximum number of concurrent inflight requests for the source operation.
* `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the source operation.

The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variable](/docs/core/settings#list-of-environment-variables).

Expand Down
6 changes: 5 additions & 1 deletion docs/docs/core/settings.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ If you use the Postgres database hosted by [Supabase](https://supabase.com/), pl

`DefaultExecutionOptions` is used to configure the default execution options for the flow. It has the following fields:

* `source_max_inflight_rows` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations. This only provides an default value, and can be overridden by the `max_inflight_rows` argument passed to `FlowBuilder.add_source()` on per-source basis.
* `source_max_inflight_rows` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations.
* `source_max_inflight_bytes` (type: `int`, optional): The maximum number of concurrent inflight bytes for source operations.

The options provide default values, and can be overridden by arguments passed to `FlowBuilder.add_source()` on per-source basis ([details](/docs/core/flow_def#concurrency-control)).

## List of Environment Variables

Expand All @@ -122,3 +125,4 @@ This is the list of environment variables, each of which has a corresponding fie
| `COCOINDEX_DATABASE_USER` | `database.user` | No |
| `COCOINDEX_DATABASE_PASSWORD` | `database.password` | No |
| `COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS` | `default_execution_options.source_max_inflight_rows` | No |
| `COCOINDEX_SOURCE_MAX_INFLIGHT_BYTES` | `default_execution_options.source_max_inflight_bytes` | No |
7 changes: 6 additions & 1 deletion python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ class _SourceRefreshOptions:
@dataclass
class _ExecutionOptions:
max_inflight_rows: int | None = None
max_inflight_bytes: int | None = None


class FlowBuilder:
Expand All @@ -445,6 +446,7 @@ def add_source(
name: str | None = None,
refresh_interval: datetime.timedelta | None = None,
max_inflight_rows: int | None = None,
max_inflight_bytes: int | None = None,
) -> DataSlice[T]:
"""
Import a source to the flow.
Expand All @@ -464,7 +466,10 @@ def add_source(
_SourceRefreshOptions(refresh_interval=refresh_interval)
),
execution_options=dump_engine_object(
_ExecutionOptions(max_inflight_rows=max_inflight_rows)
_ExecutionOptions(
max_inflight_rows=max_inflight_rows,
max_inflight_bytes=max_inflight_bytes,
)
),
),
name,
Expand Down
7 changes: 7 additions & 0 deletions python/cocoindex/setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class DefaultExecutionOptions:

# The maximum number of concurrent inflight requests.
source_max_inflight_rows: int | None = 256
source_max_inflight_bytes: int | None = 1024 * 1024 * 1024


def _load_field(
Expand Down Expand Up @@ -103,6 +104,12 @@ def from_env(cls) -> Self:
"COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS",
parse=int,
)
_load_field(
exec_kwargs,
"source_max_inflight_bytes",
"COCOINDEX_SOURCE_MAX_INFLIGHT_BYTES",
parse=int,
)
default_execution_options = DefaultExecutionOptions(**exec_kwargs)

app_namespace = os.getenv("COCOINDEX_APP_NAMESPACE", "")
Expand Down
3 changes: 2 additions & 1 deletion src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ impl SpecFormatter for OpSpec {

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ExecutionOptions {
pub max_inflight_rows: Option<u32>,
pub max_inflight_rows: Option<usize>,
pub max_inflight_bytes: Option<usize>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
Expand Down
12 changes: 8 additions & 4 deletions src/base/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ impl KeyValue {
}
}

pub fn estimated_detached_byte_size(&self) -> usize {
fn estimated_detached_byte_size(&self) -> usize {
match self {
KeyValue::Bytes(v) => v.len(),
KeyValue::Str(v) => v.len(),
Expand Down Expand Up @@ -568,7 +568,7 @@ impl BasicValue {
}

/// Returns the estimated byte size of the value, for detached data (i.e. allocated on heap).
pub fn estimated_detached_byte_size(&self) -> usize {
fn estimated_detached_byte_size(&self) -> usize {
fn json_estimated_detached_byte_size(val: &serde_json::Value) -> usize {
match val {
serde_json::Value::String(s) => s.len(),
Expand Down Expand Up @@ -862,7 +862,7 @@ impl Value<ScopeValue> {
Value::Null => 0,
Value::Basic(v) => v.estimated_detached_byte_size(),
Value::Struct(v) => v.estimated_detached_byte_size(),
(Value::UTable(v) | Value::LTable(v)) => {
Value::UTable(v) | Value::LTable(v) => {
v.iter()
.map(|v| v.estimated_detached_byte_size())
.sum::<usize>()
Expand Down Expand Up @@ -955,13 +955,17 @@ where
}

impl FieldValues<ScopeValue> {
pub fn estimated_detached_byte_size(&self) -> usize {
fn estimated_detached_byte_size(&self) -> usize {
self.fields
.iter()
.map(Value::estimated_byte_size)
.sum::<usize>()
+ self.fields.len() * std::mem::size_of::<Value<ScopeValue>>()
}

pub fn estimated_byte_size(&self) -> usize {
self.estimated_detached_byte_size() + std::mem::size_of::<Self>()
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
Expand Down
11 changes: 10 additions & 1 deletion src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,12 @@ impl AnalyzerContext {
.default_execution_options
.source_max_inflight_rows
});
let max_inflight_bytes =
(import_op.spec.execution_options.max_inflight_bytes).or_else(|| {
self.lib_ctx
.default_execution_options
.source_max_inflight_bytes
});
let result_fut = async move {
trace!("Start building executor for source op `{}`", op_name);
let executor = executor.await?;
Expand All @@ -705,7 +711,10 @@ impl AnalyzerContext {
primary_key_type,
name: op_name,
refresh_options: import_op.spec.refresh_options,
concurrency_controller: utils::ConcurrencyController::new(max_inflight_rows),
concurrency_controller: concur_control::ConcurrencyController::new(
max_inflight_rows,
max_inflight_bytes,
),
})
};
Ok(result_fut)
Expand Down
3 changes: 2 additions & 1 deletion src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ pub struct AnalyzedImportOp {
pub output: AnalyzedOpOutput,
pub primary_key_type: schema::ValueType,
pub refresh_options: spec::SourceRefreshOptions,
pub concurrency_controller: utils::ConcurrencyController,

pub concurrency_controller: concur_control::ConcurrencyController,
}

pub struct AnalyzedFunctionExecInfo {
Expand Down
5 changes: 5 additions & 0 deletions src/execution/live_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,15 @@ async fn update_source(
});
for change in change_msg.changes {
let ack_fn = ack_fn.clone();
let concur_permit = import_op
.concurrency_controller
.acquire(concur_control::BYTES_UNKNOWN_YET)
.await?;
tokio::spawn(source_context.clone().process_source_key(
change.key,
change.data,
change_stream_stats.clone(),
concur_permit,
ack_fn.map(|ack_fn| {
move || async move { SharedAckFn::ack(&ack_fn).await }
}),
Expand Down
136 changes: 79 additions & 57 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl SourceIndexingContext {
key: value::KeyValue,
source_data: Option<interface::SourceData>,
update_stats: Arc<stats::UpdateStats>,
_concur_permit: concur_control::ConcurrencyControllerPermit,
ack_fn: Option<AckFn>,
pool: PgPool,
) {
Expand Down Expand Up @@ -160,66 +161,76 @@ impl SourceIndexingContext {
}
};

let permit = processing_sem.acquire().await?;
let result = row_indexer::update_source_row(
&SourceRowEvaluationContext {
plan: &plan,
import_op,
schema,
key: &key,
import_op_idx: self.source_idx,
},
&self.setup_execution_ctx,
source_data.value,
&source_version,
&pool,
&update_stats,
)
.await?;
let target_source_version = if let SkippedOr::Skipped(existing_source_version) = result
{
Some(existing_source_version)
} else if source_version.kind == row_indexer::SourceVersionKind::NonExistence {
Some(source_version)
} else {
None
};
if let Some(target_source_version) = target_source_version {
let mut state = self.state.lock().unwrap();
let scan_generation = state.scan_generation;
let entry = state.rows.entry(key.clone());
match entry {
hash_map::Entry::Occupied(mut entry) => {
if !entry
.get()
.source_version
.should_skip(&target_source_version, None)
{
if target_source_version.kind
== row_indexer::SourceVersionKind::NonExistence
let _processing_permit = processing_sem.acquire().await?;
let _concur_permit = match &source_data.value {
interface::SourceValue::Existence(value) => {
import_op
.concurrency_controller
.acquire_bytes_with_reservation(|| value.estimated_byte_size())
.await?
}
interface::SourceValue::NonExistence => None,
};
let result = row_indexer::update_source_row(
&SourceRowEvaluationContext {
plan: &plan,
import_op,
schema,
key: &key,
import_op_idx: self.source_idx,
},
&self.setup_execution_ctx,
source_data.value,
&source_version,
&pool,
&update_stats,
)
.await?;
let target_source_version =
if let SkippedOr::Skipped(existing_source_version) = result {
Some(existing_source_version)
} else if source_version.kind == row_indexer::SourceVersionKind::NonExistence {
Some(source_version)
} else {
None
};
if let Some(target_source_version) = target_source_version {
let mut state = self.state.lock().unwrap();
let scan_generation = state.scan_generation;
let entry = state.rows.entry(key.clone());
match entry {
hash_map::Entry::Occupied(mut entry) => {
if !entry
.get()
.source_version
.should_skip(&target_source_version, None)
{
entry.remove();
} else {
let mut_entry = entry.get_mut();
mut_entry.source_version = target_source_version;
mut_entry.touched_generation = scan_generation;
if target_source_version.kind
== row_indexer::SourceVersionKind::NonExistence
{
entry.remove();
} else {
let mut_entry = entry.get_mut();
mut_entry.source_version = target_source_version;
mut_entry.touched_generation = scan_generation;
}
}
}
}
hash_map::Entry::Vacant(entry) => {
if target_source_version.kind
!= row_indexer::SourceVersionKind::NonExistence
{
entry.insert(SourceRowIndexingState {
source_version: target_source_version,
touched_generation: scan_generation,
..Default::default()
});
hash_map::Entry::Vacant(entry) => {
if target_source_version.kind
!= row_indexer::SourceVersionKind::NonExistence
{
entry.insert(SourceRowIndexingState {
source_version: target_source_version,
touched_generation: scan_generation,
..Default::default()
});
}
}
}
}
}
drop(permit);
if let Some(ack_fn) = ack_fn {
ack_fn().await?;
}
Expand All @@ -243,6 +254,7 @@ impl SourceIndexingContext {
key: value::KeyValue,
source_version: SourceVersion,
update_stats: &Arc<stats::UpdateStats>,
concur_permit: concur_control::ConcurrencyControllerPermit,
pool: &PgPool,
) -> Option<impl Future<Output = ()> + Send + 'static> {
{
Expand All @@ -257,10 +269,14 @@ impl SourceIndexingContext {
return None;
}
}
Some(
self.clone()
.process_source_key(key, None, update_stats.clone(), NO_ACK, pool.clone()),
)
Some(self.clone().process_source_key(
key,
None,
update_stats.clone(),
concur_permit,
NO_ACK,
pool.clone(),
))
}

pub async fn update(
Expand All @@ -282,15 +298,19 @@ impl SourceIndexingContext {
state.scan_generation
};
while let Some(row) = rows_stream.next().await {
let _ = import_op.concurrency_controller.acquire().await?;
for row in row? {
let concur_permit = import_op
.concurrency_controller
.acquire(concur_control::BYTES_UNKNOWN_YET)
.await?;
self.process_source_key_if_newer(
row.key,
SourceVersion::from_current_with_ordinal(
row.ordinal
.ok_or_else(|| anyhow::anyhow!("ordinal is not available"))?,
),
update_stats,
concur_permit,
pool,
)
.map(|fut| join_set.spawn(fut));
Expand Down Expand Up @@ -322,10 +342,12 @@ impl SourceIndexingContext {
value: interface::SourceValue::NonExistence,
ordinal: source_ordinal,
});
let concur_permit = import_op.concurrency_controller.acquire(Some(|| 0)).await?;
join_set.spawn(self.clone().process_source_key(
key,
source_data,
update_stats.clone(),
concur_permit,
NO_ACK,
pool.clone(),
));
Expand Down
2 changes: 1 addition & 1 deletion src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(crate) use crate::ops::interface;
pub(crate) use crate::service::error::{ApiError, invariance_violation};
pub(crate) use crate::setup;
pub(crate) use crate::setup::AuthRegistry;
pub(crate) use crate::utils::{self, retryable};
pub(crate) use crate::utils::{self, concur_control, retryable};
pub(crate) use crate::{api_bail, api_error};

pub(crate) use anyhow::{anyhow, bail};
Expand Down
3 changes: 2 additions & 1 deletion src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ pub struct DatabaseConnectionSpec {

#[derive(Deserialize, Debug, Default)]
pub struct DefaultExecutionOptions {
pub source_max_inflight_rows: Option<u32>,
pub source_max_inflight_rows: Option<usize>,
pub source_max_inflight_bytes: Option<usize>,
}

#[derive(Deserialize, Debug, Default)]
Expand Down
Loading