Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/docs/core/flow_def.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ If nothing changed during the last refresh cycle, only list operations will be p

:::

#### Concurrency control

You can pass the following arguments to `add_source()` to control the concurrency of the source operation:

* `max_inflight_count`: the maximum number of concurrent inflight requests for the source operation.

The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variable](/docs/core/settings#list-of-environment-variables).

### Transform

Expand Down
10 changes: 9 additions & 1 deletion docs/docs/core/settings.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ You have two ways to launch CocoIndex:

* `app_namespace` (type: `str`, required): The namespace of the application.
* `database` (type: `DatabaseConnectionSpec`, required): The connection to the Postgres database.
* `default_execution_options` (type: `DefaultExecutionOptions`, optional): The default execution options for the flow.

### App Namespace

Expand Down Expand Up @@ -104,13 +105,20 @@ If you use the Postgres database hosted by [Supabase](https://supabase.com/), pl

:::

### DefaultExecutionOptions

`DefaultExecutionOptions` is used to configure the default execution options for the flow. It has the following fields:

* `source_max_inflight_count` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations. This only provides an default value, and can be overridden by the `max_inflight_count` argument passed to `FlowBuilder.add_source()` on per-source basis.

## List of Environment Variables

This is the list of environment variables, each of which has a corresponding field in `Settings`:

| environment variable | corresponding field in `Settings` | required? |
|---------------------|-------------------|----------|
| `COCOINDEX_APP_NAMESPACE` | `app_namespace` | No |
| `COCOINDEX_DATABASE_URL` | `database.url` | Yes |
| `COCOINDEX_DATABASE_USER` | `database.user` | No |
| `COCOINDEX_DATABASE_PASSWORD` | `database.password` | No |
| `COCOINDEX_APP_NAMESPACE` | `app_namespace` | No |
| `COCOINDEX_SOURCE_MAX_INFLIGHT_COUNT` | `default_execution_options.source_max_inflight_count` | No |
34 changes: 32 additions & 2 deletions python/cocoindex/setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ class DatabaseConnectionSpec:
password: str | None = None


@dataclass
class DefaultExecutionOptions:
"""Default execution options."""

# The maximum number of concurrent inflight requests.
source_max_inflight_count: int | None = 256


def _load_field(
target: dict[str, Any],
name: str,
Expand All @@ -55,7 +63,15 @@ def _load_field(
if required:
raise ValueError(f"{env_name} is not set")
else:
target[name] = value if parse is None else parse(value)
if parse is None:
target[name] = value
else:
try:
target[name] = parse(value)
except Exception as e:
raise ValueError(
f"failed to parse environment variable {env_name}: {value}"
) from e


@dataclass
Expand All @@ -64,6 +80,7 @@ class Settings:

database: DatabaseConnectionSpec | None = None
app_namespace: str = ""
default_execution_options: DefaultExecutionOptions | None = None

@classmethod
def from_env(cls) -> Self:
Expand All @@ -79,9 +96,22 @@ def from_env(cls) -> Self:
else:
database = None

exec_kwargs: dict[str, Any] = dict()
_load_field(
exec_kwargs,
"source_max_inflight_count",
"COCOINDEX_SOURCE_MAX_INFLIGHT_COUNT",
parse=int,
)
default_execution_options = DefaultExecutionOptions(**exec_kwargs)

app_namespace = os.getenv("COCOINDEX_APP_NAMESPACE", "")

return cls(database=database, app_namespace=app_namespace)
return cls(
database=database,
app_namespace=app_namespace,
default_execution_options=default_execution_options,
)


@dataclass
Expand Down
22 changes: 17 additions & 5 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ impl DataScopeBuilder {
}

pub(super) struct AnalyzerContext {
pub lib_ctx: Arc<LibContext>,
pub flow_ctx: Arc<FlowInstanceContext>,
}

Expand Down Expand Up @@ -687,6 +688,13 @@ impl AnalyzerContext {
.typ
.clone();
let output = op_scope.add_op_output(import_op.name, output_type)?;

let max_inflight_count =
(import_op.spec.execution_options.max_inflight_count).or_else(|| {
self.lib_ctx
.default_execution_options
.source_max_inflight_count
});
let result_fut = async move {
trace!("Start building executor for source op `{}`", op_name);
let executor = executor.await?;
Expand All @@ -697,9 +705,7 @@ impl AnalyzerContext {
primary_key_type,
name: op_name,
refresh_options: import_op.spec.refresh_options,
concurrency_controller: utils::ConcurrencyController::new(
import_op.spec.execution_options.max_inflight_count,
),
concurrency_controller: utils::ConcurrencyController::new(max_inflight_count),
})
};
Ok(result_fut)
Expand Down Expand Up @@ -1013,7 +1019,10 @@ pub async fn analyze_flow(
AnalyzedSetupState,
impl Future<Output = Result<ExecutionPlan>> + Send + use<>,
)> {
let analyzer_ctx = AnalyzerContext { flow_ctx };
let analyzer_ctx = AnalyzerContext {
lib_ctx: get_lib_context()?,
flow_ctx,
};
let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new()));
let root_op_scope = OpScope::new(ROOT_SCOPE_NAME.to_string(), None, root_data_scope);
let mut import_ops_futs = Vec::with_capacity(flow_inst.import_ops.len());
Expand Down Expand Up @@ -1125,7 +1134,10 @@ pub async fn analyze_transient_flow<'a>(
impl Future<Output = Result<TransientExecutionPlan>> + Send + 'a,
)> {
let mut root_data_scope = DataScopeBuilder::new();
let analyzer_ctx = AnalyzerContext { flow_ctx };
let analyzer_ctx = AnalyzerContext {
lib_ctx: get_lib_context()?,
flow_ctx,
};
let mut input_fields = vec![];
for field in flow_inst.input_fields.iter() {
let analyzed_field = root_data_scope.add_field(field.name.clone(), &field.value_type)?;
Expand Down
3 changes: 3 additions & 0 deletions src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ impl FlowBuilder {
},
};
let analyzer_ctx = AnalyzerContext {
lib_ctx: self.lib_context.clone(),
flow_ctx: self.flow_inst_context.clone(),
};
let analyzed = py
Expand Down Expand Up @@ -417,6 +418,7 @@ impl FlowBuilder {
};

let analyzer_ctx = AnalyzerContext {
lib_ctx: self.lib_context.clone(),
flow_ctx: self.flow_inst_context.clone(),
};
let analyzed = py
Expand Down Expand Up @@ -464,6 +466,7 @@ impl FlowBuilder {
};

let analyzer_ctx = AnalyzerContext {
lib_ctx: self.lib_context.clone(),
flow_ctx: self.flow_inst_context.clone(),
};
let analyzed = py
Expand Down
24 changes: 5 additions & 19 deletions src/lib_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ pub struct LibContext {
pub db_pools: DbPools,
pub persistence_ctx: Option<PersistenceContext>,
pub flows: Mutex<BTreeMap<String, Arc<FlowContext>>>,

pub default_execution_options: settings::DefaultExecutionOptions,
}

impl LibContext {
Expand Down Expand Up @@ -262,6 +264,7 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
db_pools,
persistence_ctx,
flows: Mutex::new(BTreeMap::new()),
default_execution_options: settings.default_execution_options,
})
}

Expand Down Expand Up @@ -296,26 +299,9 @@ mod tests {
assert!(db_pools.pools.lock().unwrap().is_empty());
}

#[test]
fn test_settings_structure_without_database() {
let settings = settings::Settings {
database: None,
app_namespace: "test".to_string(),
};

// Test that we can create the basic structure
assert!(settings.database.is_none());
assert_eq!(settings.app_namespace, "test");
}

#[test]
fn test_lib_context_without_database() {
let settings = settings::Settings {
database: None,
app_namespace: "test".to_string(),
};

let lib_context = create_lib_context(settings).unwrap();
let lib_context = create_lib_context(settings::Settings::default()).unwrap();
assert!(lib_context.persistence_ctx.is_none());
assert!(lib_context.require_builtin_db_pool().is_err());
}
Expand All @@ -329,7 +315,7 @@ mod tests {
user: None,
password: None,
}),
app_namespace: "test".to_string(),
..Default::default()
};

// This would fail at runtime due to invalid connection, but we're testing the structure
Expand Down
9 changes: 8 additions & 1 deletion src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@ pub struct DatabaseConnectionSpec {
pub password: Option<String>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Default)]
pub struct DefaultExecutionOptions {
pub source_max_inflight_count: Option<u32>,
}

#[derive(Deserialize, Debug, Default)]
pub struct Settings {
#[serde(default)]
pub database: Option<DatabaseConnectionSpec>,
#[serde(default)]
#[allow(dead_code)] // Used via serialization/deserialization to Python
pub app_namespace: String,
#[serde(default)]
pub default_execution_options: DefaultExecutionOptions,
}

#[cfg(test)]
Expand Down
Loading