Skip to content

Commit 9350045

Browse files
authored
docs(flow-control): revise the doc for flow control (#713)
1 parent 2c6a30e commit 9350045

File tree

2 files changed

+33
-39
lines changed

2 files changed

+33
-39
lines changed

docs/docs/core/flow_def.mdx

Lines changed: 32 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -151,25 +151,6 @@ If nothing changed during the last refresh cycle, only list operations will be p
151151

152152
:::
153153

154-
#### Concurrency control
155-
156-
You can pass the following arguments to `add_source()` to control the concurrency of the source operation:
157-
158-
* `max_inflight_rows`: the maximum number of concurrent inflight requests for the source operation.
159-
* `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the source operation.
160-
161-
For example:
162-
163-
```py
164-
@cocoindex.flow_def(name="DemoFlow")
165-
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
166-
data_scope["documents"] = flow_builder.add_source(
167-
DemoSourceSpec(...), max_inflight_rows=10, max_inflight_bytes=100*1024*1024)
168-
......
169-
```
170-
171-
The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variable](/docs/core/settings#list-of-environment-variables).
172-
173154
### Transform
174155

175156
`transform()` method transforms the data slice by a function, which creates another data slice.
@@ -195,7 +176,7 @@ def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataSco
195176
</TabItem>
196177
</Tabs>
197178

198-
### For each row
179+
### For-each-row
199180

200181
If the data slice has [table type](/docs/core/data_types#table-types), you can call `row()` method to obtain a child scope representing each row, to apply operations on each row.
201182

@@ -214,24 +195,6 @@ def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataSco
214195
</TabItem>
215196
</Tabs>
216197

217-
#### Concurrency control
218-
219-
You can pass the following arguments to `row()` to control the concurrency of the for-each operation:
220-
221-
* `max_inflight_rows`: the maximum number of concurrent inflight requests for the for-each operation.
222-
* `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the for-each operation.
223-
We only take the number of bytes from this row before this for-each operation into account.
224-
225-
For example:
226-
227-
```python
228-
@cocoindex.flow_def(name="DemoFlow")
229-
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
230-
...
231-
with data_scope["table1"].row(max_inflight_rows=10, max_inflight_bytes=10*1024*1024) as table1_row:
232-
# Children operations
233-
table1_row["field2"] = table1_row["field1"].transform(DemoFunctionSpec(...))
234-
```
235198

236199
### Get a sub field
237200

@@ -380,6 +343,37 @@ doc_embeddings.export(
380343

381344
It will use `Staging__doc_embeddings` as the collection name if the current app namespace is `Staging`, and use `doc_embeddings` if the app namespace is empty.
382345

346+
### Control Processing Concurrency
347+
348+
You can control the concurrency of the processing by setting the following options:
349+
350+
* `max_inflight_rows`: the maximum number of concurrent inflight requests for the processing.
351+
* `max_inflight_bytes`: the maximum number of concurrent inflight bytes for the processing.
352+
353+
These options can be passed in to the following APIs:
354+
355+
* [`FlowBuilder.add_source()`](#import-from-source): The options above control the processing concurrency of multiple rows from a source. New rows will not be loaded in memory if it'll be over the limit.
356+
357+
The default value can be specified by [`DefaultExecutionOptions`](/docs/core/settings#defaultexecutionoptions) or corresponding [environment variables](/docs/core/settings#list-of-environment-variables).
358+
359+
* [`DataSlice.row()`](#for-each-row): The options above provides a finer-grained control, to limit the processing concurrency of multiple rows within a table at any level.
360+
361+
`max_inflight_bytes` only counts the number of bytes already existing in the current row before any further processing.
362+
363+
For example:
364+
365+
```py
366+
@cocoindex.flow_def(name="DemoFlow")
367+
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
368+
data_scope["documents"] = flow_builder.add_source(
369+
DemoSourceSpec(...), max_inflight_rows=10, max_inflight_bytes=100*1024*1024)
370+
371+
with data_scope["documents"].row() as doc:
372+
doc["chunks"] = doc["content"].transform(SplitRecursively(...))
373+
with doc["chunks"].row(max_inflight_rows=100) as chunk:
374+
......
375+
```
376+
383377
### Target Declarations
384378

385379
Most time a target is created by calling `export()` method on a collector, and this `export()` call comes with configurations needed for the target, e.g. options for storage indexes.

docs/docs/core/settings.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ If you use the Postgres database hosted by [Supabase](https://supabase.com/), pl
112112
* `source_max_inflight_rows` (type: `int`, optional): The maximum number of concurrent inflight requests for source operations.
113113
* `source_max_inflight_bytes` (type: `int`, optional): The maximum number of concurrent inflight bytes for source operations.
114114

115-
The options provide default values, and can be overridden by arguments passed to `FlowBuilder.add_source()` on per-source basis ([details](/docs/core/flow_def#concurrency-control)).
115+
The options provide default values, and can be overridden by arguments passed to `FlowBuilder.add_source()` on per-source basis ([details](/docs/core/flow_def#control-processing-concurrency)).
116116

117117
## List of Environment Variables
118118

0 commit comments

Comments
 (0)