Skip to content

Commit db8bed6

Browse files
authored
Split Storage Config (agentscope-ai#338)
1 parent ac5f8f4 commit db8bed6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+627
-478
lines changed

.github/workflows/unittest.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ jobs:
9797
fi
9898
fi
9999
100+
- name: Clean checkpoint dir
101+
working-directory: trinity-${{ github.run_id }}/.github/workflows/docker
102+
if: always()
103+
run: |
104+
docker compose exec trinity-node-1 rm -rf /mnt/checkpoints/*
105+
continue-on-error: true
106+
100107
- name: Upload test results
101108
if: env.tests_run == 'true' || failure()
102109
uses: actions/upload-artifact@v4

benchmark/config/countdown-template.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,11 @@ buffer:
4242
experience_buffer:
4343
name: experience_buffer
4444
storage_type: queue
45-
use_priority_queue: true
46-
replay_buffer_kwargs:
45+
replay_buffer:
46+
enable: true
4747
priority_fn: linear_decay
48-
decay: 0.1
48+
priority_fn_args:
49+
decay: 0.1
4950
explorer:
5051
runner_per_model: 8
5152
max_timeout: 900

benchmark/config/gsm8k-template.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,11 @@ buffer:
4747
experience_buffer:
4848
name: experience_buffer
4949
storage_type: queue
50-
use_priority_queue: true
51-
replay_buffer_kwargs:
50+
replay_buffer:
51+
enable: true
5252
priority_fn: linear_decay
53-
decay: 0.1
53+
priority_fn_args:
54+
decay: 0.1
5455
explorer:
5556
runner_per_model: 8
5657
max_timeout: 900

docs/sphinx_doc/source/tutorial/develop_operator.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
In Trinity-RFT, the operator module is responsible for processing experience data in the buffer module. It supports existing data processing capabilities from [Data-Juicer](https://github.com/modelscope/data-juicer) naturally, and allows developers to implement their own operators as well.
77
By customizing operators, developers can implement various data processing functionalities, such as data augmentation, filtering, and transformation. You can even implement advantages/returns calculation as operators, as shown in {ref}`Algorithms <Algorithms>` section.
88

9-
- **DataJuicerOperator** ({class}`trinity.data.operators.DataJuicerOperator`): The operator that wraps the data processing operators from Data-Juicer. It provides a simple interface for developers to list the Data-Juicer operators they want to use. The full list of Data-Juicer operators can be found [here](https://modelscope.github.io/data-juicer/en/main/docs/Operators.html).
10-
- **ExperienceOperator** ({class}`trinity.data.operators.ExperienceOperator`): The base class for all operators used in experience data processing. It defines the interface and common functionalities that all operators should have. Each operator processes a batch of experience data and returns the processed data with metrics for logging.
11-
- **ExperiencePipeline** ({class}`trinity.data.pipelines.ExperiencePipeline`): The experience data processing pipeline that manages a sequence of operators. It takes raw experiences from the `Explorer`, passes them through each operator in the pipeline, and writes the final processed experiences into the input buffer of the `Trainer`.
9+
- **DataJuicerOperator** ({class}`trinity.buffer.operators.DataJuicerOperator`): The operator that wraps the data processing operators from Data-Juicer. It provides a simple interface for developers to list the Data-Juicer operators they want to use. The full list of Data-Juicer operators can be found [here](https://modelscope.github.io/data-juicer/en/main/docs/Operators.html).
10+
- **ExperienceOperator** ({class}`trinity.buffer.operators.ExperienceOperator`): The base class for all operators used in experience data processing. It defines the interface and common functionalities that all operators should have. Each operator processes a batch of experience data and returns the processed data with metrics for logging.
11+
- **ExperiencePipeline** ({class}`trinity.buffer.pipelines.ExperiencePipeline`): The experience data processing pipeline that manages a sequence of operators. It takes raw experiences from the `Explorer`, passes them through each operator in the pipeline, and writes the final processed experiences into the input buffer of the `Trainer`.
1212

1313
```{note}
1414
Except for `ExperiencePipeline`, Trinity-RFT also provides `TaskPipeline` for task data processing.
@@ -56,7 +56,7 @@ class RewardFilter(ExperienceOperator):
5656
return filtered_exps, metrics
5757
```
5858

59-
After implementation, you need to register this module through {class}`trinity.data.operators.EXPERIENCE_OPERATORS`. Once registered, the module can be configured in the configuration file using the registered name.
59+
After implementation, you need to register this module through {class}`trinity.buffer.operators.EXPERIENCE_OPERATORS`. Once registered, the module can be configured in the configuration file using the registered name.
6060

6161
### Step 2: Use Your Operator
6262

docs/sphinx_doc/source/tutorial/develop_selector.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
# 🧪 Experimental: Task Selection & Scheduling System
32

43
```{note}

docs/sphinx_doc/source/tutorial/example_mix_algo.md

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,22 +85,21 @@ class MixSampleStrategy(SampleStrategy):
8585
expert_batch_size = ceil(self.expert_data_ratio * tot_batch_size)
8686

8787
# experience buffer
88-
usual_buffer_config = copy.deepcopy(buffer_config)
89-
usual_buffer_config.train_batch_size = tot_batch_size - expert_batch_size
90-
self.usual_exp_buffer = get_buffer_reader(
91-
buffer_config.trainer_input.experience_buffer, usual_buffer_config # type: ignore
92-
)
88+
usual_buffer_config = copy.deepcopy(buffer_config.trainer_input.experience_buffer)
89+
usual_buffer_config.batch_size = tot_batch_size - expert_batch_size
90+
self.usual_exp_buffer = get_buffer_reader(usual_buffer_config)
9391

9492
if buffer_config.trainer_input.auxiliary_buffers is None:
9593
raise ValueError(
9694
"`buffer_config.trainer_input.auxiliary_buffers` is required in MIX algorithm"
9795
)
9896

9997
# expert experience buffer
100-
expert_buffer_config = copy.deepcopy(buffer_config)
101-
expert_buffer_config.train_batch_size = expert_batch_size
98+
expert_buffer_config = copy.deepcopy(
99+
buffer_config.trainer_input.auxiliary_buffers[self.sft_dataset_name]
100+
)
101+
expert_buffer_config.batch_size = expert_batch_size
102102
self.expert_exp_buffer = get_buffer_reader(
103-
buffer_config.trainer_input.auxiliary_buffers[self.sft_dataset_name],
104103
expert_buffer_config,
105104
)
106105

docs/sphinx_doc/source/tutorial/example_step_wise.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ In general multi-step scenarios, each run may generate various number of experie
8181

8282
- `buffer.train_batch_size`: The number of experiences to be sampled from the buffer for training, which can be different from the number of generated experiences in each explore step.
8383

84-
- `buffer.trainer_input.use_priority_queue = true`: Using `PriorityQueue` allows the model to use the experiences with higher priority, which prefers newly-generated experiences by default.
84+
- `buffer.trainer_input.experience_buffer.replay_buffer`: Using `PriorityQueue` allows the model to use the experiences with higher priority, which prefers newly-generated experiences by default.
8585

8686
- `synchronizer.sync_style = dynamic_by_explorer`: The explorer determines when to synchronize the model weights with the trainer.
8787

@@ -126,7 +126,11 @@ buffer:
126126
experience_buffer:
127127
name: alfworld_buffer
128128
storage_type: queue
129-
use_priority_queue: true
129+
replay_buffer:
130+
enable: true
131+
priority_fn: linear_decay
132+
priority_fn_args:
133+
decay: 0.1
130134
explorer:
131135
max_repeat_times_per_runner: 1
132136
runner_per_model: 32

docs/sphinx_doc/source/tutorial/trinity_configs.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -273,14 +273,12 @@ The configuration for each task dataset is defined as follows:
273273
- `name`: Name of the dataset. This name will be used as the Ray actor's name, so it must be unique.
274274
- `storage_type`: How the dataset is stored. Options: `file`, `queue`, `sql`.
275275
- `file`: The dataset is stored in `jsonl`/`parquet` files. The data file organization is required to meet the huggingface standard. *We recommand using this storage type for most cases.*
276-
- `queue`: The dataset is stored in a queue. The queue is a simple FIFO queue that stores the task dataset. *Do not use this storage type for task dataset unless you know what you are doing.*
277276
- `sql`: The dataset is stored in a SQL database. *This type is unstable and will be optimized in the future versions.*
278277
- `path`: The path to the task dataset.
279278
- For `file` storage type, the path points to the directory that contains the task dataset files.
280-
- For `queue` storage type, the path is optional. You can back up the data in the queue by specifying a sqlite database path here.
281279
- For `sql` storage type, the path points to the sqlite database file.
282-
- `subset_name`: The subset name of the task dataset. Default is `None`.
283-
- `split`: The split of the task dataset. Default is `train`.
280+
- `subset_name`: The subset name of the task dataset, corresponding to the `name` parameter in huggingface datasets `load_dataset` function. Default is `None`.
281+
- `split`: The split of the task dataset, corresponding to the `split` parameter in huggingface datasets `load_dataset` function. Default is `train`.
284282
- `repeat_times`: The number of rollouts generated for a task. If not set, it will be automatically set to `algorithm.repeat_times` for `taskset`, and `1` for `eval_tasksets`.
285283
- `rollout_args`: The parameters for rollout.
286284
- `temperature`: The temperature for sampling.
@@ -324,7 +322,7 @@ buffer:
324322
- For `queue` storage type, this field is optional. You can specify a SQLite database or JSON file path here to back up the queue data.
325323
- For `file` storage type, the path points to the directory containing the dataset files.
326324
- For `sql` storage type, the path points to the SQLite database file.
327-
- `format`: Defines keys for prompts and responses in the dataset.
325+
- `format`: Mainly for SFT and DPO algorithm datasets, used to format the extracted data.
328326
- `prompt_type`: Specifies the type of prompts in the dataset. We support `plaintext`, `messages` for now.
329327
- `plaintext`: The prompt is in string format.
330328
- `messages`: The prompt is organized as a message list.
@@ -339,8 +337,11 @@ buffer:
339337
- `enable_concatenated_multi_turn`: Enable concatenated multi-turn SFT data preprocess. Only for `messages` and only take effect with SFT algorithm.
340338
- `chat_template`: Specifies the chat template in string format. If not provided, use `model.custom_chat_template`.
341339
- `max_read_timeout`: The maximum waiting time (in seconds) to read new experience data. If exceeded, an incomplete batch will be returned directly. Only take effect when `storage_type` is `queue`. Default is 1800 seconds (30 minutes).
342-
- `use_priority_queue`: Only take effect when `storage_type` is `queue`. If set to `True`, the queue will be a priority queue, which allows for prioritizing certain experiences over others. Default is `False`.
343-
- `reuse_cooldown_time`: Only take effect when `storage_type` is `queue` and `use_priority_queue` is `True`. If set, it specifies the cooldown time (in seconds) for reusing experiences. If not specified, the default value is `None`, meaning experiences can not be reused.
340+
- `replay_buffer`: Only take effect when `storage_type` is `queue`. Used to configure the replay buffer for experience reuse.
341+
- `enable`: Whether to enable the replay buffer. Default is `false`.
342+
- `reuse_cooldown_time`: Cooldown time (in seconds) for reusing experiences. If not specified, the default value is `None`, meaning experiences can not be reused.
343+
- `priority_fn`: Experience priority function used to determine the order of experience reuse. Currently supports `linear_decay` and `linear_decay_use_count_control_randomization`.
344+
- `priority_fn_args`: A dictionary of arguments passed to the priority function, specific parameters depend on the selected priority function.
344345
- `auxiliary_buffers`: Optional buffers used for trainer. It is a dictionary where each key is the buffer name and the value is the buffer configuration. Each buffer configuration is similar to the `experience_buffer`.
345346

346347
---

docs/sphinx_doc/source_zh/tutorial/develop_operator.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
Operator 模块负责处理由 Explorer 所生成的轨迹数据(我们称之为 `Experience`)。它原生支持来自 [Data-Juicer](https://github.com/modelscope/data-juicer) 的数据处理功能,也允许开发者实现自己的算子。
88
通过自定义数据处理算子,开发者可以实现各种数据处理功能,如数据增强、过滤和转换。你甚至可以将优势值/回报值计算实现为 Operator,如 {ref}`算法 <Algorithms>` 部分所示。
99

10-
- **DataJuicerOperator** ({class}`trinity.data.operators.DataJuicerOperator`):封装后的 Data-Juicer 算子,使用时只需在配置文件中标明想要使用的 Data-Juicer 算子列表即可。完整的 Data-Juicer 算子列表请见 [此处](https://modelscope.github.io/data-juicer/en/main/docs/Operators.html)
11-
- **ExperienceOperator** ({class}`trinity.data.operators.ExperienceOperator`):用于 experience 数据处理的所有数据处理算子的基类。定义了所有数据处理算子应具备的接口和通用功能。每个算子处理一批 experience 数据,并返回处理后的数据及用于日志记录的指标。
12-
- **ExperiencePipeline** ({class}`trinity.data.pipelines.ExperiencePipeline`):管理一系列数据处理算子的 experience 数据处理流水线。它从 `Explorer` 获取原始 experience,通过流水线中的每个算子处理,最后将最终处理过的 experience 写入 `Trainer` 的输入缓冲区。
10+
- **DataJuicerOperator** ({class}`trinity.buffer.operators.DataJuicerOperator`):封装后的 Data-Juicer 算子,使用时只需在配置文件中标明想要使用的 Data-Juicer 算子列表即可。完整的 Data-Juicer 算子列表请见 [此处](https://modelscope.github.io/data-juicer/en/main/docs/Operators.html)
11+
- **ExperienceOperator** ({class}`trinity.buffer.operators.ExperienceOperator`):用于 experience 数据处理的所有数据处理算子的基类。定义了所有数据处理算子应具备的接口和通用功能。每个算子处理一批 experience 数据,并返回处理后的数据及用于日志记录的指标。
12+
- **ExperiencePipeline** ({class}`trinity.buffer.pipelines.ExperiencePipeline`):管理一系列数据处理算子的 experience 数据处理流水线。它从 `Explorer` 获取原始 experience,通过流水线中的每个算子处理,最后将最终处理过的 experience 写入 `Trainer` 的输入缓冲区。
1313

1414
```{note}
1515
除了 `ExperiencePipeline`,Trinity-RFT 还提供 `TaskPipeline` 用于任务数据处理。
@@ -57,7 +57,7 @@ class RewardFilter(ExperienceOperator):
5757
return filtered_exps, metrics
5858
```
5959

60-
实现后,你需要通过 {class}`trinity.data.operators.EXPERIENCE_OPERATORS` 注册此模块。注册后,该模块可在配置文件中使用注册名称进行配置。
60+
实现后,你需要通过 {class}`trinity.buffer.operators.EXPERIENCE_OPERATORS` 注册此模块。注册后,该模块可在配置文件中使用注册名称进行配置。
6161

6262
### 步骤 2:使用此算子
6363

docs/sphinx_doc/source_zh/tutorial/example_mix_algo.md

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,22 +77,21 @@ class MixSampleStrategy(SampleStrategy):
7777
expert_batch_size = ceil(self.expert_data_ratio * tot_batch_size)
7878

7979
# experience buffer
80-
usual_buffer_config = copy.deepcopy(buffer_config)
81-
usual_buffer_config.train_batch_size = tot_batch_size - expert_batch_size
82-
self.usual_exp_buffer = get_buffer_reader(
83-
buffer_config.trainer_input.experience_buffer, usual_buffer_config # type: ignore
84-
)
80+
usual_buffer_config = copy.deepcopy(buffer_config.trainer_input.experience_buffer)
81+
usual_buffer_config.batch_size = tot_batch_size - expert_batch_size
82+
self.usual_exp_buffer = get_buffer_reader(usual_buffer_config)
8583

8684
if buffer_config.trainer_input.auxiliary_buffers is None:
8785
raise ValueError(
8886
"`buffer_config.trainer_input.auxiliary_buffers` is required in MIX algorithm"
8987
)
9088

9189
# expert experience buffer
92-
expert_buffer_config = copy.deepcopy(buffer_config)
93-
expert_buffer_config.train_batch_size = expert_batch_size
90+
expert_buffer_config = copy.deepcopy(
91+
buffer_config.trainer_input.auxiliary_buffers[self.sft_dataset_name]
92+
)
93+
expert_buffer_config.batch_size = expert_batch_size
9494
self.expert_exp_buffer = get_buffer_reader(
95-
buffer_config.trainer_input.auxiliary_buffers[self.sft_dataset_name],
9695
expert_buffer_config,
9796
)
9897

0 commit comments

Comments
 (0)