|
| 1 | +--- |
| 2 | +title: Preview-Batched Inference |
| 3 | +createTime: 2025/12/30 11:47:05 |
| 4 | +permalink: /en/guide/batch/ |
| 5 | +--- |
| 6 | + |
| 7 | + |
| 8 | +# Beta: Checkpoint Resume |
| 9 | + |
| 10 | +> This feature is currently in beta and may contain bugs. If you encounter any issues, please report them via issues. Thank you for your understanding. |
| 11 | +
|
| 12 | +## Overview |
| 13 | + |
| 14 | +During inference, if an operator holds a large amount of data—such as thousands of records—and an unexpected interruption occurs midway, the portion that has already been inferred will be lost, resulting in wasted API calls. |
| 15 | + |
| 16 | +To address this problem, we designed a **batched inference** interface with the following workflow: |
| 17 | + |
| 18 | +1. Start inference for the first operator |
| 19 | +2. The first operator processes only one batch at a time |
| 20 | +3. The output of the current step is stored in an appendable output step file (e.g., JSONL or CSV) |
| 21 | +4. Once the entire dataset has been processed by the current operator, proceed to the next operator |
| 22 | +5. The entire pipeline is completed using this approach |
| 23 | + |
| 24 | +## Usage |
| 25 | + |
| 26 | +This is similar to [Framework Design – Resume Pipeline](/en/guide/basicinfo/framework/#breakpoint-resume-pipelines-resume), except that the pipeline must inherit from another base class, `BatchedPipelineABC`. It also needs to be compiled, and the `forward` function requires additional parameters. In addition, `Storage` currently needs to inherit from a special `BatchedFileStorage` class. |
| 27 | + |
| 28 | +```python |
| 29 | +import re |
| 30 | +from dataflow.pipeline import BatchedPipelineABC # [!code highlight] |
| 31 | +from dataflow.operators.general_text import ( |
| 32 | + LLMLanguageFilter, |
| 33 | +) |
| 34 | +from dataflow.operators.text_pt import MetaSampleEvaluator |
| 35 | +from dataflow.operators.core_text import PromptedGenerator |
| 36 | +from dataflow.serving import APILLMServing_request |
| 37 | +from dataflow.utils.storage import BatchedFileStorage # [!code highlight] |
| 38 | + |
| 39 | +class AutoOPPipeline(BatchedPipelineABC): # [!code highlight] |
| 40 | + |
| 41 | + def __init__(self): |
| 42 | + super().__init__() |
| 43 | + self.storage = BatchedFileStorage( # [!code highlight] |
| 44 | + first_entry_file_name="./dataflow/example/GeneralTextPipeline/pt_input.jsonl", |
| 45 | + cache_path="./cache_autoop", |
| 46 | + file_name_prefix="dataflow_cache_auto_run", |
| 47 | + cache_type="jsonl", |
| 48 | + ) |
| 49 | + self.llm_serving = APILLMServing_request( |
| 50 | + api_url="http://api.openai.com/v1/chat/completions", |
| 51 | + model_name="gpt-4o", |
| 52 | + max_workers=30 |
| 53 | + ) |
| 54 | + self.op1 = PromptedGenerator( |
| 55 | + llm_serving=self.llm_serving1, |
| 56 | + system_prompt="Please translate the following content into Chinese:", |
| 57 | + ) |
| 58 | + self.op2 = PromptedGenerator( |
| 59 | + llm_serving=self.llm_serving1, |
| 60 | + system_prompt="Please translate the following content into Korean:", |
| 61 | + ) |
| 62 | + self.op3 = PromptedGenerator( |
| 63 | + llm_serving=self.llm_serving1, |
| 64 | + system_prompt="Please translate the following content into Japanese:" |
| 65 | + ) |
| 66 | + |
| 67 | + def forward(self): |
| 68 | + self.op1.run( |
| 69 | + self.storage.step(), |
| 70 | + input_key='raw_content', |
| 71 | + output_key='content_cn1' |
| 72 | + ) |
| 73 | + self.op2.run( |
| 74 | + self.storage.step(), |
| 75 | + input_key='raw_content', |
| 76 | + output_key='content_cn2' |
| 77 | + ) |
| 78 | + self.op3.run( |
| 79 | + self.storage.step(), |
| 80 | + input_key='raw_content', |
| 81 | + output_key='content_cn3' |
| 82 | + ) |
| 83 | + |
| 84 | +if __name__ == "__main__": |
| 85 | + pipeline = AutoOPPipeline() |
| 86 | + pipeline.compile() # [!code highlight] |
| 87 | + pipeline.forward( |
| 88 | + batch_size=2, # [!code highlight] |
| 89 | + resume_from_last=True |
| 90 | + ) |
| 91 | +``` |
| 92 | + |
| 93 | +`resume_from_last` allows the pipeline to automatically continue from the last step file found in the current cache path. |
| 94 | + |
| 95 | +Alternatively, you can use `resume_step` to resume from a specific previous operator step. Note that **only one of these parameters can be set at a time** to avoid logical conflicts. |
| 96 | + |
| 97 | +```python |
| 98 | + # The following invocation is also valid |
| 99 | + ... |
| 100 | + pipeline.compile() |
| 101 | + pipeline.forward( |
| 102 | + batch_size=2, |
| 103 | + resume_step=2 # Resume from the operator with index 2; the first operator has index 0 |
| 104 | + ) |
| 105 | +``` |
| 106 | + |
| 107 | +--- |
0 commit comments