Skip to content

Commit 9b102a0

Browse files
rmitschRaphael Mitsch
andauthored
feat: Add conditions for conditional task execution (#195)
* refactor: Move inference_mode into GenerationSettings. * docs: Update docs. * feat: Add conditional task execution logic. * docs: Update docs. --------- Co-authored-by: Raphael Mitsch <raphael@climatiq.com>
1 parent dd9a4da commit 9b102a0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+657
-115
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ build modern NLP applications. It provides:
8686
- [`langchain`](https://github.com/langchain-ai/langchain)
8787
- [`outlines`](https://github.com/dottxt-ai/outlines)
8888
- [`transformer`](https://github.com/huggingface/transformers)
89-
- :arrow_forward: **Observable Pipelines:** Easy debugging and monitoring
89+
- :arrow_forward: **Observable Pipelines:** Easy debugging and monitoring with conditional task execution
9090
- :hammer_and_wrench: **Integrated Tools:**
9191
- Document parsing (optional via `ingestion` extra): [`docling`](https://github.com/DS4SD/docling), [`marker`](https://github.com/VikParuchuri/marker)
9292
- Text chunking: [`chonkie`](https://github.com/chonkie-ai/chonkie)
@@ -246,6 +246,7 @@ Encapsulates a single processing step in a pipeline.
246246
- Defines input arguments
247247
- Wraps and initializes `Bridge` instances handling task-engine-specific logic
248248
- Implements task-specific dataset export
249+
- Supports **conditional execution**: skip documents based on custom logic without materializing all docs upfront
249250

250251
#### `GenerationSettings`
251252
Controls behavior of structured generation across tasks.

docs/guides/optimization.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ Optimization is valuable when:
2222
> Optimization involves **multiple LLM calls** during the search process. Costs depend on:
2323
> - Dataset size (more examples = more evaluations)
2424
> - DSPy optimizer configuration (`num_candidates`, `num_trials`)
25-
> - Model pricing (larger models cost more per call)
25+
> - Model pricing (larger models cost more per _call)
2626
>
2727
> Start with small datasets and conservative optimizer settings to control costs.
2828
@@ -112,7 +112,7 @@ These tasks use a **generic LLM-as-judge evaluator** that compares ground truth
112112
- **Translation** - Evaluates translation quality
113113
- **Question Answering** - Evaluates answer correctness
114114

115-
> **Note**: LLM-based evaluation adds additional costs since each evaluation requires an extra LLM call.
115+
> **Note**: LLM-based evaluation adds additional costs since each evaluation requires an extra LLM _call.
116116
117117
## Optimizer Configuration
118118

docs/pipeline.md

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,84 @@ Note: Ingestion libraries (e.g., `docling`) are optional and not installed by de
4040
pip install "sieves[ingestion]"
4141
```
4242

43+
## Conditional Task Execution
44+
45+
Tasks support optional conditional execution via the `condition` parameter. This allows you to skip processing certain documents based on custom logic, without materializing all documents upfront.
46+
47+
### Basic Usage
48+
49+
Pass a callable `Condition[[Doc], bool]` to any task to conditionally process documents:
50+
51+
```python
52+
from sieves import Pipeline, tasks, Doc
53+
54+
docs = [
55+
Doc(text="short"),
56+
Doc(text="this is a much longer document that will be processed"),
57+
Doc(text="med"),
58+
]
59+
60+
# Define a condition function
61+
def is_long(doc: Doc) -> bool:
62+
return len(doc.text or "") > 20
63+
64+
# Create a task with a condition
65+
task = tasks.Classification(
66+
labels=["science", "politics"],
67+
model=model,
68+
condition=is_long
69+
)
70+
71+
# Run pipeline
72+
pipe = Pipeline([task])
73+
for doc in pipe(docs):
74+
# doc.results[task.id] will be None for documents that failed the condition
75+
print(doc.results[task.id])
76+
```
77+
78+
### Key Behaviors
79+
80+
- **Per-document evaluation**: The condition is evaluated for each document individually
81+
- **Lazy evaluation**: Documents are not materialized upfront; passing documents are batched together for efficient processing
82+
- **Result tracking**: Skipped documents have `results[task_id] = None`
83+
- **Order preservation**: Document order is always maintained, regardless of which documents are skipped
84+
- **No-op when None**: If `condition=None`, all documents are processed
85+
86+
### Multiple Tasks with Different Conditions
87+
88+
Different tasks in a pipeline can have different conditions:
89+
90+
```python
91+
from sieves import Pipeline, tasks, Doc
92+
93+
docs = [
94+
Doc(text="short"),
95+
Doc(text="this is a much longer document"),
96+
Doc(text="medium text here"),
97+
]
98+
99+
# Task 1: Process only documents longer than 10 characters
100+
task1 = tasks.Chunking(chunker, condition=lambda d: len(d.text or "") > 10)
101+
102+
# Task 2: Process only documents longer than 20 characters
103+
task2 = tasks.Classification(
104+
labels=["science", "politics"],
105+
model=model,
106+
condition=lambda d: len(d.text or "") > 20
107+
)
108+
109+
# First doc: skipped by both tasks (too short)
110+
# Second doc: processed by both tasks (long enough)
111+
# Third doc: processed by task1, skipped by task2
112+
pipe = Pipeline([task1, task2])
113+
for doc in pipe(docs):
114+
print(doc.results[task1.id], doc.results[task2.id])
115+
```
116+
117+
### Use Cases
118+
119+
- **Skip expensive processing** for documents that don't meet quality criteria
120+
- **Segment processing** by document properties (size, language, format)
121+
- **Optimize pipelines** by processing subsets of data through specific tasks
122+
43123
::: sieves.pipeline.core

docs/tasks/task.md

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,92 @@
11
# Task
22

3-
::: sieves.tasks.core.Task
3+
## Conditional Execution
4+
5+
All tasks support optional conditional execution through the `condition` parameter. This feature allows you to skip processing certain documents based on custom criteria without materializing all documents upfront.
6+
7+
### Overview
8+
9+
The `condition` parameter accepts an optional callable with signature `Callable[[Doc], bool]`:
10+
11+
```python
12+
def condition(doc: Doc) -> bool:
13+
# Return True to process the document
14+
# Return False to skip it
15+
return True
16+
```
17+
18+
### Implementation Details
19+
20+
When a task is executed with a condition:
21+
22+
1. **Per-Document Evaluation**: Each document is evaluated against the condition individually
23+
2. **Lazy Batching**: Only documents that pass the condition are batched together and sent to the task's `_call()` method
24+
3. **Order Preservation**: Documents are returned in their original order, even if some were skipped
25+
4. **Result Storage**: Skipped documents have `results[task_id] = None`
26+
27+
### Examples
28+
29+
#### Skip Documents by Size
30+
31+
```python
32+
from sieves import tasks, Pipeline, Doc
33+
34+
# Only process documents longer than 100 characters
35+
task = tasks.Classification(
36+
labels=["positive", "negative"],
37+
model=model,
38+
condition=lambda doc: len(doc.text or "") > 100
39+
)
40+
41+
pipe = Pipeline([task])
42+
docs = [Doc(text="short"), Doc(text="a very long document " * 10)]
43+
results = list(pipe(docs))
44+
45+
# First doc: results[task.id] == None (skipped)
46+
# Second doc: results[task.id] contains classification results
47+
```
48+
49+
#### Skip Documents Based on Metadata
50+
51+
```python
52+
# Only process documents from specific sources
53+
def should_process(doc: Doc) -> bool:
54+
return doc.meta.get("source") in ["source_a", "source_b"]
55+
56+
task = tasks.NER(
57+
entities=["PERSON", "LOCATION"],
58+
model=model,
59+
condition=should_process
60+
)
61+
```
62+
63+
#### Multiple Conditions in Pipeline
64+
65+
```python
66+
# Different conditions for different tasks
67+
import_task = tasks.Ingestion(export_format="markdown")
68+
69+
# Only chunk long documents
70+
chunking_task = tasks.Chunking(
71+
chunker,
72+
condition=lambda doc: len(doc.text or "") > 500
73+
)
74+
75+
# Only classify chunked documents
76+
classification_task = tasks.Classification(
77+
labels=["science", "fiction"],
78+
model=model,
79+
condition=lambda doc: len(doc.text or "") > 500
80+
)
81+
82+
pipe = Pipeline([import_task, chunking_task, classification_task])
83+
```
84+
85+
### Technical Notes
86+
87+
- **No Materialization**: Documents are processed using iterators; passing documents are batched together without materializing the entire document collection upfront
88+
- **Index-Based Tracking**: The implementation uses document indices for efficient filtering and reordering
89+
- **All Engines Supported**: Conditional execution works with all supported engines (DSPy, LangChain, Outlines, HuggingFace, GLiNER, etc.)
90+
- **Serialization**: Non-callable condition values (like `None`) serialize naturally; callable conditions are serialized as placeholders
91+
92+
::: sieves.tasks.core.Task

sieves/engines/huggingface_.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def build_executable(
4747
assert isinstance(prompt_signature, list)
4848

4949
# Render template with few-shot examples. Note that we don't use extracted document values here, as HF zero-shot
50-
# pipelines only support one hypothesis template per call - and we want to batch, so our hypothesis template
50+
# pipelines only support one hypothesis template per _call - and we want to batch, so our hypothesis template
5151
# will be document-invariant.
5252
fewshot_examples_dict = HuggingFace.convert_fewshot_examples(fewshot_examples)
5353
# Render hypothesis template with everything but text.

sieves/serialization.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def create(cls, cls_obj: type, attributes: dict[str, Attribute]) -> Config:
9292
:param attributes: Attributes to include in config.
9393
:return Config: Instance of dynamic config class.
9494
"""
95-
config_type = pydantic.create_model( # type: ignore[call-overload]
95+
config_type = pydantic.create_model( # type: ignore[_call-overload]
9696
f"{cls_obj}Config",
9797
__base__=Config,
9898
**{attr_id: (Attribute, ...) for attr_id in attributes},
@@ -186,7 +186,7 @@ def load(cls, path: Path | str) -> Config:
186186
with open(path) as file:
187187
data = yaml.safe_load(file)
188188

189-
config = pydantic.create_model( # type: ignore[call-overload]
189+
config = pydantic.create_model( # type: ignore[_call-overload]
190190
f"{data['cls_name']}Config",
191191
__base__=Config,
192192
cls_name=(str, ...),

sieves/tasks/core.py

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from __future__ import annotations
44

55
import abc
6-
from collections.abc import Iterable
6+
import itertools
7+
from collections.abc import Callable, Iterable, Iterator
78
from typing import TYPE_CHECKING, Any
89

910
from sieves.data import Doc
@@ -17,17 +18,27 @@
1718
class Task(abc.ABC):
1819
"""Abstract base class for tasks that can be executed on documents."""
1920

20-
def __init__(self, task_id: str | None, include_meta: bool, batch_size: int):
21+
def __init__(
22+
self,
23+
task_id: str | None,
24+
include_meta: bool,
25+
batch_size: int,
26+
condition: Callable[[Doc], bool] | None = None,
27+
):
2128
"""
2229
Initiate new Task.
2330
2431
:param task_id: Task ID.
2532
:param include_meta: Whether to include meta information generated by the task.
2633
:param batch_size: Batch size for processing documents. Use -1 to process all documents at once.
34+
:param condition: Optional callable that determines whether to process each document.
35+
If provided, called with each Doc; if returns False, document is skipped
36+
and results[task_id] is set to None.
2737
"""
2838
self._task_id = task_id if task_id else self.__class__.__name__
2939
self._include_meta = include_meta
3040
self._batch_size = batch_size
41+
self._condition = condition
3142

3243
@property
3344
def id(self) -> str:
@@ -39,9 +50,46 @@ def id(self) -> str:
3950
"""
4051
return self._task_id
4152

42-
@abc.abstractmethod
4353
def __call__(self, docs: Iterable[Doc]) -> Iterable[Doc]:
44-
"""Execute task.
54+
"""Execute task with conditional logic.
55+
56+
Checks the condition for each document without materializing all docs upfront.
57+
Passes all documents that pass the condition to _call() for proper batching.
58+
Documents that fail the condition have results[task_id] set to None.
59+
60+
:param docs: Docs to process.
61+
:return: Processed docs (in original order).
62+
"""
63+
# Create three independent iterators:
64+
# 1. Check which docs pass condition.
65+
# 2. Yield only passing docs to _call().
66+
# 3. Iterate and yield results in order.
67+
docs_iters = itertools.tee(docs, 3)
68+
69+
# First pass: determine which docs pass the condition by index
70+
passing_indices: set[int] = set()
71+
72+
for idx, doc in enumerate(docs_iters[0]):
73+
if self._condition is None or self._condition(doc):
74+
passing_indices.add(idx)
75+
76+
# Process all passing docs together.
77+
processed = self._call(d for i, d in enumerate(docs_iters[1]) if i in passing_indices)
78+
processed_iter = iter(processed) if not isinstance(processed, Iterator) else processed
79+
80+
# Iterate through original docs in order and yield results
81+
for idx, doc in enumerate(docs_iters[2]):
82+
if idx in passing_indices:
83+
# Doc passed condition - use processed result.
84+
yield next(processed_iter)
85+
else:
86+
# Doc failed condition - set None result and yield original.
87+
doc.results[self.id] = None
88+
yield doc
89+
90+
@abc.abstractmethod
91+
def _call(self, docs: Iterable[Doc]) -> Iterable[Doc]:
92+
"""Execute task logic (to be implemented by subclasses).
4593
4694
:param docs: Docs to process.
4795
:return: Processed docs.
@@ -83,6 +131,7 @@ def _state(self) -> dict[str, Any]:
83131
"task_id": self._task_id,
84132
"include_meta": self._include_meta,
85133
"batch_size": self._batch_size,
134+
"condition": self._condition,
86135
}
87136

88137
def serialize(self) -> Config:

sieves/tasks/predictive/classification/bridges.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ def _prompt_conclusion(self) -> str | None:
415415
@cached_property
416416
def prompt_signature(self) -> type[pydantic.BaseModel] | list[str]:
417417
if self._multi_label:
418-
prompt_sig = pydantic.create_model( # type: ignore[call-overload]
418+
prompt_sig = pydantic.create_model( # type: ignore[_call-overload]
419419
"MultilabelClassification",
420420
__base__=pydantic.BaseModel,
421421
__doc__="Result of multi-label classification.",

sieves/tasks/predictive/classification/core.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from __future__ import annotations
44

55
import json
6-
from collections.abc import Iterable, Sequence
6+
from collections.abc import Callable, Iterable, Sequence
77
from pathlib import Path
88
from typing import Any, override
99

@@ -97,6 +97,7 @@ def __init__(
9797
label_descriptions: dict[str, str] | None = None,
9898
multi_label: bool = True,
9999
generation_settings: GenerationSettings = GenerationSettings(),
100+
condition: Callable[[Doc], bool] | None = None,
100101
) -> None:
101102
"""Initialize new PredictiveTask.
102103
@@ -112,6 +113,7 @@ def __init__(
112113
most likely class label. In the latter case label forcing mechanisms are utilized, which can lead to higher
113114
accuracy.
114115
:param generation_settings: Generation settings.
116+
:param condition: Optional callable that determines whether to process each document.
115117
"""
116118
self._labels = labels
117119
self._label_descriptions = label_descriptions or {}
@@ -127,6 +129,7 @@ def __init__(
127129
prompt_instructions=prompt_instructions,
128130
fewshot_examples=fewshot_examples,
129131
generation_settings=generation_settings,
132+
condition=condition,
130133
)
131134
self._fewshot_examples: Sequence[FewshotExample]
132135

0 commit comments

Comments
 (0)