diff --git a/docs/source/BestPractices/Elastic.md b/docs/source/BestPractices/Elastic.md new file mode 100644 index 0000000000..9293aa91fc --- /dev/null +++ b/docs/source/BestPractices/Elastic.md @@ -0,0 +1,209 @@ +# Elastic + + + +## 安装依赖 + +集群部署K8S,并在集群中部署DLrover,[DLRover](https://github.com/intelligent-machine-learning/dlrover), +`pip install dlrover && pip install tornado && pip install kubernetes && pip install ms-swift` + +经过反复测试验证的训练镜像中的其它依赖以及版本: +deepspeed 0.16.5(需参考https://github.com/deepspeedai/DeepSpeed/pull/7585/files 修复universal checkpoint 相关问题) +pytorch 2.6.0 + + +## 如何启动 +命令组成=dlrover-run +dlrover 命令参数+swift 启动命令 +swift参数,dlrover-run除自定义的参数外,其他参数与torchrun一致; +dlrover-run 参数如下: +``` +usage: dlrover-run [-h] [--nnodes NNODES] [--nproc-per-node NPROC_PER_NODE] + [--rdzv-backend RDZV_BACKEND] [--rdzv-endpoint RDZV_ENDPOINT] [--rdzv-id RDZV_ID] + [--rdzv-conf RDZV_CONF] [--standalone] [--max-restarts MAX_RESTARTS] + [--monitor-interval MONITOR_INTERVAL] [--start-method {spawn,fork,forkserver}] + [--role ROLE] [-m] [--no-python] [--run-path] [--log-dir LOG_DIR] [-r REDIRECTS] + [-t TEE] [--local-ranks-filter LOCAL_RANKS_FILTER] [--node-rank NODE_RANK] + [--master-addr MASTER_ADDR] [--master-port MASTER_PORT] [--local-addr LOCAL_ADDR] + [--logs-specs LOGS_SPECS] [--precheck {0,1,2}] [--node_unit NODE_UNIT] + [--auto_config] [--auto_tunning] [--exclude-straggler] [--save_at_breakpoint] + [--accelerator {nvidia.com/gpu,ascend-npu}] [--training_port TRAINING_PORT] + [--switchbox-check] [--box-pairs PAIR [PAIR ...]] [--min-bandwidth MIN_BANDWIDTH] + [--min-channels MIN_CHANNELS] [--numa-affinity] [--network-check] + [--comm-perf-test] [--ucp_device_type UCP_DEVICE_TYPE] + training_script + +``` +在弹性训练中我们需要关注的参数为: + +--nnodes NNODES Number of nodes, or the range of nodes in form + :. + +--nproc-per-node NPROC_PER_NODE Number of processes per node. +示例: + +```bash +model=your model path +dataset=your dataset +output= your output dir +export CUDA_VISIBLE_DEVICES=0 根据实际使用的GPU情况设置 +deepspeed_config_or_type=deepspeed类型或者配置文件的路径,如 zero1 或者/xxx/ms-swift/swift/llm/ds_config/zero1.json + +dlrover-run --nnodes 1:$NODE_NUM --nproc_per_node=1 \ +/opt/conda/lib/python3.10/site-packages/swift/cli/sft.py --model $model \ +--model_type qwen3 \ +--train_type lora \ +--torch_dtype bfloat16 \ +--dataset $dataset \ +--num_train_epochs 4 \ +--per_device_train_batch_size 1 \ +--per_device_eval_batch_size 1 \ +--learning_rate 5e-7 \ +--gradient_accumulation_steps 8 \ +--eval_steps 500 \ +--save_steps 10 \ +--save_total_limit 20 \ +--logging_steps 1 \ +--output_dir $output \ +--warmup_ratio 0.01 \ +--dataloader_num_workers 4 \ +--temperature 1.0 \ +--system You\ are\ a\ helpful\ assistant. \ +--lora_rank 8 \ +--lora_alpha 32 \ +--target_modules all-linear \ +--dataset_num_proc 1 \ +--use_flash_ckpt true \ +--deepspeed $deepspeed_config_or_type \ +--elastic +``` + +## 配置文件示例 +默认情况下的zero1为以下示例配置, + +```json +{ + "fp16": { + "enabled": "auto", + "loss_scale": 0, + "loss_scale_window": 1000, + "initial_scale_power": 16, + "hysteresis": 2, + "min_loss_scale": 1 + }, + + "bf16": { + "enabled": "auto" + }, + + "zero_optimization": { + "stage": 1, + "offload_optimizer": { + "device": "none", + "pin_memory": true + }, + "allgather_partitions": true, + "allgather_bucket_size": 2e8, + "overlap_comm": false, + "reduce_scatter": true, + "reduce_bucket_size": 2e8, + "contiguous_gradients": true + }, + + "gradient_accumulation_steps": "auto", + "gradient_clipping": "auto", + "steps_per_print": 2000, + "train_batch_size": "auto", + "train_micro_batch_size_per_gpu": "auto", + "wall_clock_breakdown": false, + "elasticity": { + "ignore_non_elastic_batch_info": true, + "enabled": true, + "max_train_batch_size": 8, + "micro_batch_sizes": [ + 4, + 2 + ], + "min_gpus": 1, + "max_gpus": 4, + "min_time": 20, + "version": 0.1 + } +} +``` + +如果用户需要自定义,可以在启动命令中deepspeed_config_or_type指定自定义的zero1.json的存放路径,其中弹性相关的配置为: +```json +... + + "elasticity": { + "ignore_non_elastic_batch_info": true, + "enabled": true, + "max_train_batch_size": 8, + "micro_batch_sizes": [ + 4, + 2 + ], + "min_gpus": 1, + "max_gpus": 4, + "min_time": 20, + "version": 0.1 + } +``` + +- ignore_non_elastic_batch_info:代表在elasticity里的配置会忽略外层的batch_size相关的配置,训练过程中会根据实际的训练进程个数实时修改batch_size等相关的参数 +计算原则为: + global-training-batch-size = micro-batch-size * gradient-accumulation-steps * world-size +- max_train_batch_size:最大batch_size数 +- micro_batch_sizes:即train_micro_batch_size_per_gpu +- min_gpus:最小gpu数目 +- max_gpus:最大gpu数目 +更详细的内容见:[Deepspeed](https://www.deepspeed.ai/docs/config-json/#elastic-training-config-v01-and-v02) + + +## 启动训练 + +```yaml +--- +apiVersion: elastic.iml.github.io/v1alpha1 +kind: ElasticJob +metadata: + name: deepspeed-elastic-swift + namespace: dlrover +spec: + distributionStrategy: AllreduceStrategy + optimizeMode: single-job + replicaSpecs: + worker: + replicas: 1 #【这里需要与启动命令中的--nnodes NNODES的最大值一致】 + template: + spec: + restartPolicy: Never + containers: + - name: main + image: #【训练镜像,需要安装deepspeed,dlrover 和swift 】 + imagePullPolicy: IfNotPresent + command: + - /bin/bash + - -c + - sh start.sh # 启动脚本 + resources: + limits: + cpu: '8' + memory: 16Gi + nvidia.com/gpu: '1' + volumeMounts: + - mountPath: /model + name: volume-model + - mountPath: /dev/shm + name: volume-shm + restartPolicy: Never + volumes: + - hostPath: + path: /model + type: Directory + name: volume-model + - emptyDir: + medium: Memory + sizeLimit: 200Gi + name: volume-shm + +``` diff --git a/docs/source/Instruction/Command-line-parameters.md b/docs/source/Instruction/Command-line-parameters.md index 30c3bfc6ee..a2860a434d 100644 --- a/docs/source/Instruction/Command-line-parameters.md +++ b/docs/source/Instruction/Command-line-parameters.md @@ -484,7 +484,8 @@ Vera使用`target_modules`、`target_regex`、`modules_to_save`三个参数, - eval_dataset_args: 评测数据集参数,json格式,可设置多个数据集的参数。 - eval_limit: 评测数据集采样数。 - eval_generation_config: 评测时模型推理配置,json格式,默认为`{'max_tokens': 512}`。 -- use_flash_ckpt: 是否启用[DLRover Flash Checkpoint](https://github.com/intelligent-machine-learning/dlrover)的flash checkpoint。默认为`false`,启用后,权重会先保存至共享内存,之后异步持久化,目前暂不支持safetensors格式;建议搭配`PYTORCH_CUDA_ALLOC_CONF="expandable_segments:True"` 一起使用,避免训练过程CUDA OOM。 +- use_flash_ckpt: 是否启用[DLRover Flash Checkpoint](https://github.com/intelligent-machine-learning/dlrover)的flash checkpoint。默认为`false`,启用后,权重会先保存至共享内存,之后异步持久化;建议搭配`PYTORCH_CUDA_ALLOC_CONF="expandable_segments:True"` 一起使用,避免训练过程CUDA OOM。 +- elastic: 是否启用弹性,依赖[DLRover](https://github.com/intelligent-machine-learning/dlrover),`pip install dlrover && pip install tornado && pip install kubernetes `,具体使用参考[示例](../BestPractices/Elastic.md) - early_stop_interval: 早停的间隔,会检验best_metric在early_stop_interval个周期内(基于`save_steps`, 建议`eval_steps`和`save_steps`设为同值)没有提升时终止训练。具体代码在[callback plugin](https://github.com/modelscope/ms-swift/blob/main/swift/plugin/callback.py)中。同时,如果有较为复杂的早停需求,直接覆盖callback.py中的已有实现即可。 #### SWANLAB diff --git a/docs/source_en/BestPractices/Elastic.md b/docs/source_en/BestPractices/Elastic.md new file mode 100644 index 0000000000..85fb2350a4 --- /dev/null +++ b/docs/source_en/BestPractices/Elastic.md @@ -0,0 +1,211 @@ +# Elastic + + +## Installing Dependencies + +Deploy a K8S cluster and deploy [DLRover](https://github.com/intelligent-machine-learning/dlrover) in the cluster, and install the required packages using `pip install dlrover && pip install tornado && pip install kubernetes && pip install ms-swift` + +Other dependencies and versions verified through repeated testing in the training image: +deepspeed 0.16.5 (refer to this [PR](https://github.com/deepspeedai/DeepSpeed/pull/7585/files) to fix issues related to universal checkpoint) +pytorch 2.6.0 + + +## How to Start + +The command format is dlrover-run + DLrover command parameters + Swift startup command + Swift parameters.dlrover-run behaves like torchrun for most arguments, except for its custom parameters. + +The dlrover-run arguments are as follows: + +``` +usage: dlrover-run [-h] [--nnodes NNODES] [--nproc-per-node NPROC_PER_NODE] + [--rdzv-backend RDZV_BACKEND] [--rdzv-endpoint RDZV_ENDPOINT] [--rdzv-id RDZV_ID] + [--rdzv-conf RDZV_CONF] [--standalone] [--max-restarts MAX_RESTARTS] + [--monitor-interval MONITOR_INTERVAL] [--start-method {spawn,fork,forkserver}] + [--role ROLE] [-m] [--no-python] [--run-path] [--log-dir LOG_DIR] [-r REDIRECTS] + [-t TEE] [--local-ranks-filter LOCAL_RANKS_FILTER] [--node-rank NODE_RANK] + [--master-addr MASTER_ADDR] [--master-port MASTER_PORT] [--local-addr LOCAL_ADDR] + [--logs-specs LOGS_SPECS] [--precheck {0,1,2}] [--node_unit NODE_UNIT] + [--auto_config] [--auto_tunning] [--exclude-straggler] [--save_at_breakpoint] + [--accelerator {nvidia.com/gpu,ascend-npu}] [--training_port TRAINING_PORT] + [--switchbox-check] [--box-pairs PAIR [PAIR ...]] [--min-bandwidth MIN_BANDWIDTH] + [--min-channels MIN_CHANNELS] [--numa-affinity] [--network-check] + [--comm-perf-test] [--ucp_device_type UCP_DEVICE_TYPE] + training_script + +``` +In elastic training, the parameters you may pay attention to focus on are: + +--nnodes NNODES +Number of nodes, or the range of nodes in the form :. + +--nproc-per-node NPROC_PER_NODE +Number of processes per node. + +Example: + +```bash +model=your model path +dataset=your dataset +output= your output dir +export CUDA_VISIBLE_DEVICES=0 # Set according to the actual GPU usage +deepspeed_config_or_type=deepspeed type or configuration file path, e.g., zero1 or /xxx/ms-swift/swift/llm/ds_config/zero1.json + +dlrover-run --nnodes 1:$NODE_NUM --nproc_per_node=1 \ +/opt/conda/lib/python3.10/site-packages/swift/cli/sft.py --model $model \ +--model_type qwen3 \ +--train_type lora \ +--torch_dtype bfloat16 \ +--dataset $dataset \ +--num_train_epochs 4 \ +--per_device_train_batch_size 1 \ +--per_device_eval_batch_size 1 \ +--learning_rate 5e-7 \ +--gradient_accumulation_steps 8 \ +--eval_steps 500 \ +--save_steps 10 \ +--save_total_limit 20 \ +--logging_steps 1 \ +--output_dir $output \ +--warmup_ratio 0.01 \ +--dataloader_num_workers 4 \ +--temperature 1.0 \ +--system You\ are\ a\ helpful\ assistant. \ +--lora_rank 8 \ +--lora_alpha 32 \ +--target_modules all-linear \ +--dataset_num_proc 1 \ +--use_flash_ckpt true \ +--deepspeed $deepspeed_config_or_type \ +--elastic +``` + +## Configuration +By default, the zero1 configuration is as follows: + +```json +{ + "fp16": { + "enabled": "auto", + "loss_scale": 0, + "loss_scale_window": 1000, + "initial_scale_power": 16, + "hysteresis": 2, + "min_loss_scale": 1 + }, + + "bf16": { + "enabled": "auto" + }, + + "zero_optimization": { + "stage": 1, + "offload_optimizer": { + "device": "none", + "pin_memory": true + }, + "allgather_partitions": true, + "allgather_bucket_size": 2e8, + "overlap_comm": false, + "reduce_scatter": true, + "reduce_bucket_size": 2e8, + "contiguous_gradients": true + }, + + "gradient_accumulation_steps": "auto", + "gradient_clipping": "auto", + "steps_per_print": 2000, + "train_batch_size": "auto", + "train_micro_batch_size_per_gpu": "auto", + "wall_clock_breakdown": false, + "elasticity": { + "ignore_non_elastic_batch_info": true, + "enabled": true, + "max_train_batch_size": 8, + "micro_batch_sizes": [ + 4, + 2 + ], + "min_gpus": 1, + "max_gpus": 4, + "min_time": 20, + "version": 0.1 + } +} +``` + +If users need custom configurations, they can specify the path to the custom zero1.json file in the deepspeed_config_or_type parameter. The elasticity-related configuration is as follows: +```json +... + + "elasticity": { + "ignore_non_elastic_batch_info": true, + "enabled": true, + "max_train_batch_size": 8, + "micro_batch_sizes": [ + 4, + 2 + ], + "min_gpus": 1, + "max_gpus": 4, + "min_time": 20, + "version": 0.1 + } +``` + +- ignore_non_elastic_batch_info:Indicates that the batch size configurations outside the elasticity settings will be ignored. During training, the batch size and related parameters will be dynamically adjusted based on the number of training processes. +Calculation principle: + global-training-batch-size = micro-batch-size * gradient-accumulation-steps * world-size +- max_train_batch_size: Maximum batch size +- micro_batch_sizes:Equivalent to train_micro_batch_size_per_gpu. +- min_gpus:Minimum number of GPUs. +- max_gpus:Maximum number of GPUs. +For more details, see: [Deepspeed](https://www.deepspeed.ai/docs/config-json/#elastic-training-config-v01-and-v02) + +## Starting Training + +```yaml +--- +apiVersion: elastic.iml.github.io/v1alpha1 +kind: ElasticJob +metadata: + name: deepspeed-elastic-swift + namespace: dlrover +spec: + distributionStrategy: AllreduceStrategy + optimizeMode: single-job + replicaSpecs: + worker: + replicas: 1 # This should match the maximum value of --nnodes NNODES in the startup command + template: + spec: + restartPolicy: Never + containers: + - name: main + image: #【Training image, needs to have deepspeed, dlrover, and swift installed】 + imagePullPolicy: IfNotPresent + command: + - /bin/bash + - -c + - sh start.sh # Startup script + resources: + limits: + cpu: '8' + memory: 16Gi + nvidia.com/gpu: '1' + volumeMounts: + - mountPath: /model + name: volume-model + - mountPath: /dev/shm + name: volume-shm + restartPolicy: Never + volumes: + - hostPath: + path: /model + type: Directory + name: volume-model + - emptyDir: + medium: Memory + sizeLimit: 200Gi + name: volume-shm + +``` diff --git a/docs/source_en/Instruction/Command-line-parameters.md b/docs/source_en/Instruction/Command-line-parameters.md index 7afebd4a2c..1d6e5b9773 100644 --- a/docs/source_en/Instruction/Command-line-parameters.md +++ b/docs/source_en/Instruction/Command-line-parameters.md @@ -495,6 +495,7 @@ Training arguments include the [base arguments](#base-arguments), [Seq2SeqTraine - eval_limit: Number of samples from the evaluation dataset - eval_generation_config: Model inference configuration during evaluation, in JSON format, default is `{'max_tokens': 512}` - use_flash_ckpt: Whether to use [DLRover Flash Checkpoint](https://github.com/intelligent-machine-learning/dlrover). Default is `false`. If enabled, checkpoints are saved to memory synchronously, then persisted to storage asynchronously, the safetensors format is not supported currently. It's recommended to use this with the environment variable `PYTORCH_CUDA_ALLOC_CONF="expandable_segments:True"` to avoid CUDA OOM. +elastic: Whether to enable elasticity, which depends on [DLRover](https://github.com/intelligent-machine-learning/dlrover), Install the required packages using `pip install dlrover && pip install tornado && pip install kubernetes`, For specific usage, refer to the [example](../BestPractices/Elastic.md) - early_stop_interval: The interval for early stopping. It will check if the best_metric has not improved within early_stop_interval periods (based on save_steps; it's recommended to set eval_steps and save_steps to the same value) and terminate training when this condition is met. The specific code implementation is in the callback plugin. Additionally, if you have more complex early stopping requirements, you can directly override the existing implementation in [callback.py](https://github.com/modelscope/ms-swift/blob/main/swift/plugin/callback.py). diff --git a/swift/llm/argument/train_args.py b/swift/llm/argument/train_args.py index 942c8c07bf..e43b461856 100644 --- a/swift/llm/argument/train_args.py +++ b/swift/llm/argument/train_args.py @@ -286,6 +286,7 @@ def _init_deepspeed(self): 'To use `deepspeed_autotp_size`, you need to additionally set the `--deepspeed` argument.') self.deepspeed['tensor_parallel'] = {'autotp_size': self.deepspeed_autotp_size} self.deepspeed['zero_optimization']['gather_16bit_weights_on_model_save'] = True + logger.info(f'Using deepspeed: {self.deepspeed}') def _init_fsdp(self): diff --git a/swift/llm/train/callback.py b/swift/llm/train/callback.py index 2c466519b9..6e007f28f5 100644 --- a/swift/llm/train/callback.py +++ b/swift/llm/train/callback.py @@ -3,9 +3,10 @@ import numpy as np import torch +import torch.distributed as dist from transformers import TrainerCallback -from swift.utils import get_logger +from swift.utils import ShutdownManager, get_device, get_logger logger = get_logger() @@ -78,3 +79,29 @@ def switch_active_layers(self): for idx in self.active_layers_indices: for param in layers[idx].parameters(): param.requires_grad = True + + +class GracefulExitCallBack(TrainerCallback): + + def __init__(self): + shutdown_manager = ShutdownManager() + shutdown_manager.register() + self.shutdown_manager = shutdown_manager + + def on_step_end(self, args, state, control, **kwargs): + device_type = get_device() + + local_req = 1 if self.shutdown_manager.should_shutdown() else 0 + if dist.is_available() and dist.is_initialized(): + + t = torch.tensor([local_req], dtype=torch.uint8, device=device_type) + # all_reduce with MAX: if any rank has 1 -> result 1 everywhere + dist.all_reduce(t, op=dist.ReduceOp.MAX) + any_req = bool(int(t.item())) + else: + any_req = bool(local_req) + + if any_req: + control.should_save = True + control.should_training_stop = True + return control diff --git a/swift/llm/train/sft.py b/swift/llm/train/sft.py index d6e722cfa5..893d8b2398 100644 --- a/swift/llm/train/sft.py +++ b/swift/llm/train/sft.py @@ -255,17 +255,26 @@ def train(self, trainer): logging_path = os.path.join(trainer.args.output_dir, 'logging.jsonl') logger.info(f'The logging file will be saved in: {logging_path}') try: - trainer.train(trainer.args.resume_from_checkpoint) + resume_checkpoint = None + if self.args.use_flash_ckpt: + resume_checkpoint = trainer.get_resume_checkpoint() + if self.args.elastic and resume_checkpoint and not os.path.exists( + os.path.join(resume_checkpoint, 'latest_universal')): + resume_checkpoint = trainer.get_resume_checkpoint_until_find_ucp() + if self.args.resume_from_checkpoint: + resume_checkpoint = self.args.resume_from_checkpoint + trainer.train(resume_checkpoint) finally: res = self._save_trainer_state(trainer) - if self.args.use_flash_ckpt: - trainer.wait_latest_checkpoint(trainer.FLASH_CKPT_WAIT_TIMEOUT) + if self.args.use_flash_ckpt and hasattr(trainer, 'flash_checkpointer'): + trainer.wait_latest_checkpoint(trainer.FLASH_CKPT_WAIT_TIMEOUT, trainer.state.global_step) return res @RayHelper.function(group='default') def _prepare_callbacks(self): - from .callback import DynamicLayerActivationCallback, TrainerAdapterCallback + from .callback import DynamicLayerActivationCallback, TrainerAdapterCallback, GracefulExitCallBack + from swift.plugin import DeepspeedElasticCallBack args = self.args callbacks = [] if args.lisa_activated_layers > 0: @@ -276,7 +285,10 @@ def _prepare_callbacks(self): model=self.model) lisa_callback.switch_active_layers() # Make trainable parameters printing a correct value callbacks.append(lisa_callback) - + if args.elastic: + graceful_exit_callback = GracefulExitCallBack() + callbacks.append(graceful_exit_callback) + callbacks.append(DeepspeedElasticCallBack()) if args.is_adapter and args.train_type == 'adalora': callbacks.append(TrainerAdapterCallback(args)) callbacks += extra_callbacks diff --git a/swift/plugin/__init__.py b/swift/plugin/__init__.py index 8e7afd1277..555a8e9c16 100644 --- a/swift/plugin/__init__.py +++ b/swift/plugin/__init__.py @@ -17,6 +17,7 @@ from .rm_plugin import rm_plugins from .env import envs, Env from .context_manager import context_managers, ContextManager + from swift.plugin.deepspeed_elastic import DeepspeedElasticCallBack else: _import_structure = { @@ -34,6 +35,7 @@ 'rm_plugin': ['rm_plugins'], 'env': ['envs', 'Env'], 'context_manager': ['context_managers', 'ContextManager'], + 'deepspeed_elastic': ['DeepspeedElasticCallBack'], } import sys diff --git a/swift/plugin/deepspeed_elastic.py b/swift/plugin/deepspeed_elastic.py new file mode 100644 index 0000000000..f0d54691cc --- /dev/null +++ b/swift/plugin/deepspeed_elastic.py @@ -0,0 +1,44 @@ +# Copyright (c) Alibaba, Inc. and its affiliates. +import types + +import numpy as np +import torch +import torch.distributed as dist +from packaging import version +from transformers import TrainerCallback, TrainerControl, TrainerState, TrainingArguments + + +class DeepspeedElasticCallBack(TrainerCallback): + + def on_init_end(self, args: TrainingArguments, state: TrainerState, control: TrainerControl, **kwargs): + """ + Event called at the beginning of training. + """ + # with self.template.forward_context(self.model, inputs),get_act_offloading_ctx_manager(model): + # + + if args.deepspeed and args.elastic: + from deepspeed.elasticity import compute_elastic_config + from deepspeed.git_version_info import version as __version__ + args.deepspeed['checkpoint'] = {'load_universal': True} + if 'elasticity' not in args.deepspeed: + args.deepspeed['elasticity'] = { + 'ignore_non_elastic_batch_info': True, + 'enabled': True, + 'max_train_batch_size': 8, + 'micro_batch_sizes': [2], + 'min_gpus': 1, + 'max_gpus': 4, + 'min_time': 20, + 'version': 0.1 + } + args.deepspeed['checkpoint'] = {'load_universal': True} + final_batch_size, valid_gpus, micro_batch_size = compute_elastic_config( + ds_config=args.deepspeed, + target_deepspeed_version=__version__, + world_size=dist.get_world_size(), + ) + gradient_accu_steps = final_batch_size // (micro_batch_size * dist.get_world_size()) + args.per_device_train_batch_size = micro_batch_size + args.gradient_accumulation_steps = gradient_accu_steps + state.train_batch_size = args.per_device_train_batch_size * max(1, args.n_gpu) diff --git a/swift/trainers/arguments.py b/swift/trainers/arguments.py index 597ad63ae9..3e13f55c17 100644 --- a/swift/trainers/arguments.py +++ b/swift/trainers/arguments.py @@ -142,6 +142,8 @@ class TrainArgumentsMixin: # dlrover flash_checkpoint use_flash_ckpt: bool = False + # elastic + elastic: bool = False @staticmethod def _patch_liger_kernel(): diff --git a/swift/trainers/mixin.py b/swift/trainers/mixin.py index fa2a3f4863..1946a51eab 100644 --- a/swift/trainers/mixin.py +++ b/swift/trainers/mixin.py @@ -16,6 +16,7 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Union import datasets +import json import numpy as np import safetensors import torch @@ -48,7 +49,8 @@ from ..llm.model.patcher import (gather_sequence_parallel_outputs, get_lm_head_model, revert_padding_free, transformers_seq_cls_forward) from .arguments import TrainingArguments -from .utils import can_return_loss, find_labels, get_function, is_instance_of_ms_model +from .utils import (can_return_loss, find_labels, get_function, get_resume_dir, is_instance_of_ms_model, + replace_index_file) try: from trl import AutoModelForCausalLMWithValueHead @@ -477,13 +479,52 @@ def _get_last_checkpoint_step(self): step = int(f.read()) return step - def wait_latest_checkpoint(self, timeout=FLASH_CKPT_WAIT_TIMEOUT): + def get_resume_checkpoint(self): + """ + Get the path of the last complete checkpoint. Some latter directories + may not have the complete checkpoint because the asynchronous + persistence may not finish. The step in the `dlrover_latest.txt` is + the last step of complete checkpoint. We can get the path by the step. + """ + resume_dir = get_resume_dir(self.args.output_dir) + if resume_dir is None: + return None + tracer_file = os.path.join(resume_dir, 'dlrover_latest.txt') + if not os.path.exists(tracer_file): + step = 0 + if step == 0: + return None + with open(tracer_file, 'r') as f: + step = int(f.read()) + checkpoint_folder = f'{PREFIX_CHECKPOINT_DIR}-{step}' + + ckpt_dir = os.path.join(resume_dir, checkpoint_folder) + with open(os.path.join(ckpt_dir, TRAINER_STATE_NAME), 'r', encoding='utf-8') as f: + train_state = json.load(f) + if train_state is not None and train_state.get('max_steps') == step: + return None + return ckpt_dir + + def get_resume_checkpoint_until_find_ucp(self): + resume_dir = get_resume_dir(self.args.output_dir) + tracer_file = os.path.join(resume_dir, 'ucp.txt') + if not os.path.exists(tracer_file): + step = 0 + if step == 0: + return None + with open(tracer_file, 'r') as f: + step = int(f.read()) + checkpoint_folder = f'{PREFIX_CHECKPOINT_DIR}-{step}' + ckpt_dir = os.path.join(resume_dir, checkpoint_folder) + return ckpt_dir + + def wait_latest_checkpoint(self, timeout=FLASH_CKPT_WAIT_TIMEOUT, max_steps=None): """ Wait for the latest checkpoint. Args: timeout (second): The timeout to wait. """ - self.flash_checkpointer.async_save_engine.wait_latest_checkpoint(timeout) + self.flash_checkpointer.async_save_engine.wait_latest_checkpoint(timeout, max_steps) def _fix_zero3_gather_all_parameters(self) -> None: if is_deepspeed_zero3_enabled() and not hasattr(self.deepspeed, '_zero3_consolidated_16bit_state_dict_origin'): @@ -606,9 +647,16 @@ def _save_flash_checkpoint(self, model, trial, metrics=None): rng_states, os.path.join(output_dir, f'rng_state_{self.args.process_index}.pth'), ) + if self.args.save_safetensors: + torch.save({'safe_serialization': True}, 'safe_serialization') + replace_index_file(output_dir) torch.save = torch_native_save - success = self.flash_checkpointer.save_checkpoint_to_storage(self.state.global_step) + if (self.state.global_step == self.state.max_steps): + success = self.flash_checkpointer.save_checkpoint_to_storage(self.state.global_step, True) + else: + success = self.flash_checkpointer.save_checkpoint_to_storage(self.state.global_step) + if not success: logger.info(f'Skip saving the checkpoint of step {self.state.global_step} ' 'because the latest checkpoint is not finished.') diff --git a/swift/trainers/utils.py b/swift/trainers/utils.py index bae8b79928..ed84d72046 100644 --- a/swift/trainers/utils.py +++ b/swift/trainers/utils.py @@ -3,7 +3,7 @@ import inspect import os from types import FunctionType, MethodType -from typing import List, Union +from typing import List, Optional, Union import torch import torch.nn.functional as F @@ -107,3 +107,66 @@ def per_token_loss_func(outputs, labels, enable_dft_loss: bool = False, **kwargs target_probs = torch.exp(-loss) loss *= target_probs return loss + + +def extract_version(name: str) -> Optional[int]: + if not name.startswith('v'): + return None + try: + num = name[1:].split('-', 1)[0] + return int(num) + except ValueError: + return None + + +def get_previous_version_from_path(current_path: str) -> Optional[str]: + from pathlib import Path + current = Path(current_path) + parent = current.parent + current_name = current.name + + candidates = [d for d in parent.iterdir() if d.is_dir()] + + valid = [(d.name, extract_version(d.name)) for d in candidates] + valid = [(name, ver) for name, ver in valid if ver is not None] + + valid.sort(key=lambda x: x[1]) + names = [name for name, _ in valid] + + if current_name not in names: + return None + + idx = names.index(current_name) + if idx == 0: + return None + + prev_name = names[idx - 1] + return str(parent / prev_name) + + +def get_resume_dir(output_dir): + return get_previous_version_from_path(output_dir) + + +def replace_index_file(output_dir: str): + from transformers.utils import WEIGHTS_INDEX_NAME, SAFE_WEIGHTS_INDEX_NAME + import os + import json + index_file = os.path.join(output_dir, WEIGHTS_INDEX_NAME) + + if not os.path.exists(index_file): + return + with open(index_file, 'r', encoding='utf-8') as f: + bin_data = json.load(f) + if 'weight_map' not in bin_data: + return + bin_data['weight_map'] = { + k: v.replace('pytorch_model', 'model').replace('.bin', '.safetensors') + for k, v in bin_data['weight_map'].items() + } + safe_path = os.path.join(output_dir, SAFE_WEIGHTS_INDEX_NAME) + with open(safe_path, 'w', encoding='utf-8') as f: + json.dump(bin_data, f, indent=2) + from contextlib import suppress + with suppress(FileNotFoundError): + os.remove(os.path.join(output_dir, WEIGHTS_INDEX_NAME)) diff --git a/swift/utils/__init__.py b/swift/utils/__init__.py index 86ea0c4525..df27182250 100644 --- a/swift/utils/__init__.py +++ b/swift/utils/__init__.py @@ -8,6 +8,7 @@ from .io_utils import JsonlWriter, append_to_jsonl, download_ms_file, get_file_mm_type, read_from_jsonl, write_to_jsonl from .logger import get_logger, ms_logger_context from .np_utils import get_seed, stat_array, transform_jsonl_to_df +from .shutdown_manager import ShutdownManager from .tb_utils import TB_COLOR, TB_COLOR_SMOOTH, plot_images, read_tensorboard_file, tensorboard_smoothing from .torch_utils import (Serializer, activate_parameters, check_shared_disk, disable_safe_ddp_context_use_barrier, empty_cache, find_all_linears, find_embedding, find_layers, find_norm, freeze_parameters, diff --git a/swift/utils/shutdown_manager.py b/swift/utils/shutdown_manager.py new file mode 100644 index 0000000000..349debcd89 --- /dev/null +++ b/swift/utils/shutdown_manager.py @@ -0,0 +1,24 @@ +import signal + + +class ShutdownManager: + + def __init__(self, signals=None): + if signals is None: + signals = [signal.SIGUSR1, signal.SIGUSR2] + self._signals = signals + + self._shutdown_requested = False + + def _handler(self, signum, frame): + self._shutdown_requested = True + + def register(self): + for s in self._signals: + signal.signal(s, self._handler) + + def should_shutdown(self) -> bool: + return self._shutdown_requested + + def reset(self): + self._shutdown_requested = False diff --git a/swift/utils/utils.py b/swift/utils/utils.py index c697866d5b..83803c23ee 100644 --- a/swift/utils/utils.py +++ b/swift/utils/utils.py @@ -23,6 +23,7 @@ import torch import torch.distributed as dist from transformers import HfArgumentParser, enable_full_determinism, set_seed +from transformers.trainer import PREFIX_CHECKPOINT_DIR, TRAINER_STATE_NAME from transformers.utils import strtobool from .env import is_dist, is_master @@ -476,6 +477,41 @@ def get_modules_to_not_convert(model): return res if res else None +def extract_version(name: str) -> Optional[int]: + if not name.startswith('v'): + return None + try: + num = name[1:].split('-', 1)[0] + return int(num) + except ValueError: + return None + + +def get_previous_version_from_path(current_path: str) -> Optional[str]: + from pathlib import Path + current = Path(current_path) + parent = current.parent + current_name = current.name + + candidates = [d for d in parent.iterdir() if d.is_dir()] + + valid = [(d.name, extract_version(d.name)) for d in candidates] + valid = [(name, ver) for name, ver in valid if ver is not None] + + valid.sort(key=lambda x: x[1]) + names = [name for name, _ in valid] + + if current_name not in names: + return None + + idx = names.index(current_name) + if idx == 0: + return None + + prev_name = names[idx - 1] + return str(parent / prev_name) + + def retry_decorator(retry=3): def _retry(func):