Skip to content

Commit 7996528

Browse files
authored
Merge branch 'cocoindex-io:main' into main
2 parents 541e693 + d18dd6e commit 7996528

File tree

14 files changed

+409
-51
lines changed

14 files changed

+409
-51
lines changed

docs/docs/custom_ops/custom_functions.mdx

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ Custom functions take the following additional parameters:
145145
* `batching: bool`: Whether the executor will consume requests in batch.
146146
See the [Batching](#batching) section below for details.
147147

148+
* `max_batch_size: int | None`: The maximum batch size for the executor.
149+
148150
* `behavior_version: int`: The version of the behavior of the function.
149151
When the version is changed, the function will be re-executed even if cache is enabled.
150152
It's required to be set if `cache` is `True`.
@@ -221,5 +223,25 @@ class ComputeSomethingExecutor:
221223
...
222224
```
223225

226+
### Controlling Batch Size
227+
228+
You can control the maximum batch size using the `max_batch_size` parameter. This is useful for:
229+
* Limiting memory usage when processing large batches
230+
* Reducing latency by flushing batches before they grow too large
231+
* Working with APIs that have request size limits
232+
233+
```python
234+
@cocoindex.op.function(batching=True, max_batch_size=32)
235+
def compute_something(args: list[str]) -> list[str]:
236+
...
237+
```
238+
239+
With `max_batch_size` set, a batch will be flushed when either:
240+
241+
1. No ongoing batches are running
242+
2. The pending batch size reaches `max_batch_size`
243+
244+
This ensures that requests don't wait indefinitely for a batch to fill up, while still allowing efficient batch processing.
245+
224246
</TabItem>
225247
</Tabs>

docs/docs/sources/amazons3.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ The spec takes the following fields:
131131

132132
:::
133133

134+
* `max_file_size` (`int`, optional): if provided, files exceeding this size in bytes will be treated as non-existent and skipped during processing.
135+
This is useful to avoid processing large files that are not relevant to your use case, such as videos or backups.
136+
If not specified, no size limit is applied.
134137
* `sqs_queue_url` (`str`, optional): if provided, the source will receive change event notifications from Amazon S3 via this SQS queue.
135138

136139
:::info

examples/code_embedding/main.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ def code_to_embedding(
1616
Embed the text using a SentenceTransformer model.
1717
"""
1818
# You can also switch to Voyage embedding model:
19-
# return text.transform(
20-
# cocoindex.functions.EmbedText(
21-
# api_type=cocoindex.LlmApiType.VOYAGE,
22-
# model="voyage-code-3",
23-
# )
24-
# )
19+
# return text.transform(
20+
# cocoindex.functions.EmbedText(
21+
# api_type=cocoindex.LlmApiType.GEMINI,
22+
# model="text-embedding-004",
23+
# )
24+
# )
2525
return text.transform(
2626
cocoindex.functions.SentenceTransformerEmbed(
2727
model="sentence-transformers/all-MiniLM-L6-v2"

python/cocoindex/functions/colpali.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ class ColPaliEmbedImage(op.FunctionSpec):
125125
gpu=True,
126126
cache=True,
127127
batching=True,
128+
max_batch_size=32,
128129
behavior_version=1,
129130
)
130131
class ColPaliEmbedImageExecutor:
@@ -204,6 +205,7 @@ class ColPaliEmbedQuery(op.FunctionSpec):
204205
cache=True,
205206
behavior_version=1,
206207
batching=True,
208+
max_batch_size=32,
207209
)
208210
class ColPaliEmbedQueryExecutor:
209211
"""Executor for ColVision query embedding (ColPali, ColQwen2, ColSmol, etc.)."""

python/cocoindex/functions/sbert.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class SentenceTransformerEmbed(op.FunctionSpec):
3131
gpu=True,
3232
cache=True,
3333
batching=True,
34+
max_batch_size=512,
3435
behavior_version=1,
3536
arg_relationship=(op.ArgRelationship.EMBEDDING_ORIGIN_TEXT, "text"),
3637
)

python/cocoindex/op.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ class OpArgs:
151151
- gpu: Whether the executor will be executed on GPU.
152152
- cache: Whether the executor will be cached.
153153
- batching: Whether the executor will be batched.
154+
- max_batch_size: The maximum batch size for the executor. Only valid if `batching` is True.
154155
- behavior_version: The behavior version of the executor. Cache will be invalidated if it
155156
changes. Must be provided if `cache` is True.
156157
- arg_relationship: It specifies the relationship between an input argument and the output,
@@ -161,6 +162,7 @@ class OpArgs:
161162
gpu: bool = False
162163
cache: bool = False
163164
batching: bool = False
165+
max_batch_size: int | None = None
164166
behavior_version: int | None = None
165167
arg_relationship: tuple[ArgRelationship, str] | None = None
166168

@@ -389,11 +391,17 @@ def enable_cache(self) -> bool:
389391
def behavior_version(self) -> int | None:
390392
return op_args.behavior_version
391393

394+
def batching_options(self) -> dict[str, Any] | None:
395+
if op_args.batching:
396+
return {
397+
"max_batch_size": op_args.max_batch_size,
398+
}
399+
else:
400+
return None
401+
392402
if category == OpCategory.FUNCTION:
393403
_engine.register_function_factory(
394-
op_kind,
395-
_EngineFunctionExecutorFactory(spec_loader, _WrappedExecutor),
396-
op_args.batching,
404+
op_kind, _EngineFunctionExecutorFactory(spec_loader, _WrappedExecutor)
397405
)
398406
else:
399407
raise ValueError(f"Unsupported executor type {category}")

python/cocoindex/sources/_engine_builtin_specs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class AmazonS3(op.SourceSpec):
5656
binary: bool = False
5757
included_patterns: list[str] | None = None
5858
excluded_patterns: list[str] | None = None
59+
max_file_size: int | None = None
5960
sqs_queue_url: str | None = None
6061
redis: RedisNotification | None = None
6162

src/execution/source_indexer.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,10 @@ impl SourceIndexingContext {
304304
rows_to_retry,
305305
}),
306306
setup_execution_ctx,
307-
update_once_batcher: batching::Batcher::new(UpdateOnceRunner),
307+
update_once_batcher: batching::Batcher::new(
308+
UpdateOnceRunner,
309+
batching::BatchingOptions::default(),
310+
),
308311
}))
309312
}
310313

src/ops/factory_bases.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,8 @@ pub trait BatchedFunctionExecutor: Send + Sync + Sized + 'static {
381381
fn into_fn_executor(self) -> impl SimpleFunctionExecutor {
382382
BatchedFunctionExecutorWrapper::new(self)
383383
}
384+
385+
fn batching_options(&self) -> batching::BatchingOptions;
384386
}
385387

386388
#[async_trait]
@@ -404,10 +406,11 @@ struct BatchedFunctionExecutorWrapper<E: BatchedFunctionExecutor> {
404406

405407
impl<E: BatchedFunctionExecutor> BatchedFunctionExecutorWrapper<E> {
406408
fn new(executor: E) -> Self {
409+
let batching_options = executor.batching_options();
407410
Self {
408411
enable_cache: executor.enable_cache(),
409412
behavior_version: executor.behavior_version(),
410-
batcher: batching::Batcher::new(executor),
413+
batcher: batching::Batcher::new(executor, batching_options),
411414
}
412415
}
413416
}

src/ops/functions/embed_text.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@ impl BatchedFunctionExecutor for Executor {
3636
true
3737
}
3838

39+
fn batching_options(&self) -> batching::BatchingOptions {
40+
// A safe default for most embeddings providers.
41+
// May tune it for specific providers later.
42+
batching::BatchingOptions {
43+
max_batch_size: Some(64),
44+
}
45+
}
46+
3947
async fn evaluate_batch(&self, args: Vec<Vec<Value>>) -> Result<Vec<Value>> {
4048
let texts = args
4149
.iter()

0 commit comments

Comments
 (0)