Skip to content

Commit 814e7eb

Browse files
authored
[doc] tutorial & best practice on concurrency control (#1302)
1 parent 9cd928b commit 814e7eb

File tree

8 files changed

+156
-0
lines changed

8 files changed

+156
-0
lines changed
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
---
2+
title: 'Control Processing Concurrency Best Practices'
3+
description: "Learn how to tune the concurrency control in CocoIndex to optimize data processing performance, prevent system overload, and ensure stable, efficient pipelines at scale."
4+
image: /img/tutorials/control-flow/cover.png
5+
---
6+
7+
![Control Processing Concurrency Tuning Guide](/img/tutorials/control-flow/cover.png)
8+
9+
CocoIndex’s parallelism model boosts speed by processing multiple data items at once, but **more parallelism isn’t always better**. Left unconstrained, excessive concurrency can strain—or even destabilize—your systems. That’s why CocoIndex includes **built-in concurrency control mechanisms** that strike the right balance between **raw performance** and **system stability**, even at massive scale.
10+
11+
Processing too many items simultaneously can cause:
12+
13+
- **Memory exhaustion** – large datasets loaded at once consume massive amounts of RAM.
14+
- **Resource contention** – CPU, disk I/O, and network bandwidth get overwhelmed by competing operations.
15+
- **System instability** – timeouts, degraded performance, or outright crashes.
16+
17+
Unlike generic concurrency features, CocoIndex lets you:
18+
19+
- Constrain both data volume (rows) and memory usage (bytes).
20+
- Set limits at multiple layers: global, per source, and per-row iteration.
21+
- Combine controls: *all* specified constraints must be satisfied before processing proceeds.
22+
23+
This layered approach ensures that resource-heavy sources don’t overwhelm the system, and nested tasks (such as splitting documents into chunks) remain predictable and safe.
24+
25+
![flow-control](/img/tutorials/control-flow/flow-control.png)
26+
27+
You can review the full documentation [here](https://cocoindex.io/docs/core/flow_def#control-processing-concurrency). CocoIndex is powering users process at the scale of millions in production, :star: [star us](https://github.com/cocoindex-io/cocoindex) if you like it!
28+
29+
30+
## Concurrency Options
31+
32+
CocoIndex provides two primary settings:
33+
34+
| Option | Purpose | Unit |
35+
| --- | --- | --- |
36+
| **`max_inflight_rows`** | Maximum number of rows processed concurrently. | rows |
37+
| **`max_inflight_bytes`** | Maximum memory footprint of concurrently processed data (before transformations). | bytes |
38+
39+
When a limit is reached, CocoIndex **pauses new processing** until some existing work completes. This keeps throughput high without pushing your system past its limits.
40+
41+
:::note
42+
For simplicity, `max_inflight_bytes` only measures the size of data already in memory before any transformations—it does **not** include temporary memory used during processing steps.
43+
:::
44+
45+
## Where to Apply Concurrency Controls
46+
47+
48+
### 1. Source Level
49+
50+
Controls how many rows from a data source are processed simultaneously. This prevents overwhelming your system when ingesting large datasets.
51+
52+
Source level control happens at two different granularities
53+
54+
- Global, in which all sources across all indexing flows share the same budget.
55+
- Per-source, in which each source has its own budget.
56+
57+
Both **global** and **per-source** limits must pass before a new row is processed—providing two layers of safety.
58+
59+
60+
#### Global Concurrency: One Setting to Shield All Flows
61+
62+
![global-level concurrency](/img/tutorials/control-flow/global-level.png)
63+
64+
Global limits ensure your system never overshoots safe operating thresholds, even if individual flows attempt higher concurrency.
65+
66+
Apply system-wide protections either via environment variables or programmatic control:
67+
68+
The easiest way is to control it via environment variables:
69+
70+
```sh
71+
export COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS=256
72+
export COCOINDEX_SOURCE_MAX_INFLIGHT_BYTES=1048576
73+
```
74+
Programmatically, configure it when calling `cocoindex.init()`, which will take precedence over the environment variable:
75+
76+
```python
77+
from cocoindex import GlobalExecutionOptions
78+
79+
cocoindex.init(
80+
cocoindex.Settings(
81+
...,
82+
global_execution_options = GlobalExecutionOptions(
83+
source_max_inflight_rows=256,
84+
source_max_inflight_bytes=1_048_576
85+
)
86+
)
87+
)
88+
```
89+
90+
Currently, CocoIndex uses 1024 as the default value of global max inflight rows, if you don't explicitly set it.
91+
92+
#### Per-Source Concurrency: Granular Customization
93+
94+
![per-source concurrency](/img/tutorials/control-flow/per-source.png)
95+
96+
Set different limits for each source according to workload and data characteristics:
97+
98+
```python
99+
@cocoindex.flow_def(name="DemoFlow")
100+
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
101+
data_scope["documents"] = flow_builder.add_source(
102+
DemoSourceSpec(...),
103+
max_inflight_rows=10,
104+
max_inflight_bytes=100*1024*1024 # 100 MB
105+
)
106+
```
107+
108+
109+
### 2. Nested Iteration Level Concurrency: Deep Structural Control
110+
![nested-iteration concurrency](/img/tutorials/control-flow/nested-level.png)
111+
112+
When processing nested rows, such as processing each chunk of each document, you can configure the maximum concurrent rows and/or bytes:
113+
114+
```python
115+
with data_scope["documents"].row() as doc:
116+
doc["chunks"] = doc["content"].transform(SplitRecursively(...))
117+
with doc["chunks"].row(max_inflight_rows=100, max_inflight_bytes=100*1000*1000):
118+
# Process up to 100 chunks in parallel per document
119+
...
120+
```
121+
122+
### Summary Table: Concurrency Configuration in CocoIndex
123+
124+
| Level | Configuration Path | Applies To |
125+
| --- | --- | --- |
126+
| Global | Environment variables, or pass `GlobalExecutionOptions` to `cocoindex.init()` | All sources, all flows, added together |
127+
| Per-Source | Arguments to `FlowBuilder.add_source()` | Specific source/flow |
128+
| Row Iteration | Arguments to DataSlice.`row(max_inflight_rows=...)` | Nested iterations |
129+
130+
131+
## Best Practices
132+
133+
In actual incremental pipelines, the processing bottleneck is usually at a few heavy operations, such as running inference using an AI model locally or via a remote API. It's common to keep more data in memory even if it cannot be processed immediately—in this way, once the busy backend becomes available, new workloads can be taken on right away to keep the backends busy. However, we need a reasonable bound on this to prevent memory exhaustion and similar issues. That's where concurrency control comes in.
134+
135+
![tuning-guide](/img/tutorials/control-flow/tuning-guide.png)
136+
137+
### Most Situations - Default is Good Enough
138+
For most cases, the default global source max rows limit (`COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS`: 1024) already fits the situation described above: loading more than what heavy operations can consume, but still within a reasonable bound. You don't need to do anything.
139+
140+
### When and What to Tune
141+
Decrease the limit if it's not stable enough (e.g. memory overuse, timeout observed on certain operations), or increase the limit if it's very stable but you want it to go faster. What to tune?
142+
143+
- Most cases, tuning the global source row limit (`COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS`) will just work.
144+
145+
- Only start to touch more specific knobs when deemed necessary - usually only for some complex and unbalanced situations.
146+
- On highly unbalanced row (file) size, set the max bytes limit to prevent a small number of abnormally large inputs from overloading the system. e.g., when the distribution of your input data size follows a long-tail distribution rather than a normal distribution).
147+
- On highly unbalanced complexity across sources, set per-source concurrency. This happens when you want to run multiple flows within the same process, or have multiple sources within the same flow, and they vary in processing complexity (e.g., one source goes through a very heavy and slow model, and another only does simple data movement).
148+
- On high fanout in nested iterations and unbalanced nested rows, set concurrency control options on nested iterations. For example, you have a high number of nested rows to process, and the specific number varies significantly.
149+
150+
This concurrency control framework gives you **safe, scalable, and customizable flow performance**. You gain flexibility (configure per-flow), control (set global limits), and the confidence to scale cocoindex flows smoothly across diverse workloads.
151+
152+
## Support us
153+
154+
We’re constantly improving our runtime. Please ⭐ star [CocoIndex on GitHub](https://github.com/cocoindex-io/cocoindex) and share it with others.
155+
156+
Need help crafting a more detailed code snippet, or insight into using byte-based or default concurrency settings? Just let me know!
-1.67 MB
Loading
43.8 KB
Loading
151 KB
Loading
101 KB
Loading
115 KB
Loading
106 KB
Loading
36.9 KB
Loading

0 commit comments

Comments
 (0)