diff --git a/examples/pytorch/text_generation/finetune_llama.py b/examples/pytorch/text_generation/finetune_llama.py new file mode 100644 index 00000000..9b5ffe62 --- /dev/null +++ b/examples/pytorch/text_generation/finetune_llama.py @@ -0,0 +1,234 @@ +# Copyright 2023 Rohan Taori, Ishaan Gulrajani, Tianyi Zhang, Yann Dubois, Xuechen Li +# Copyright (c) Alibaba, Inc. and its affiliates. + +import os +import utils +import copy +import logging +import shutil +import torch +import tempfile +import unittest +from dataclasses import dataclass + +from modelscope.metainfo import Trainers +from modelscope.models.nlp.llama import (LlamaForTextGeneration, + LlamaTokenizerFast) +from modelscope.msdatasets.dataset_cls.custom_datasets.torch_custom_dataset import \ + TorchCustomDataset +from modelscope.trainers import build_trainer +from modelscope.utils.test_utils import DistributedTestCase, test_level + +IGNORE_INDEX = -100 +DEFAULT_PAD_TOKEN = "[PAD]" +DEFAULT_EOS_TOKEN = "" +DEFAULT_BOS_TOKEN = "" +DEFAULT_UNK_TOKEN = "" +PROMPT_DICT = { + "prompt_input": ( + "Below is an instruction that describes a task, paired with an input that provides further context. " + "Write a response that appropriately completes the request.\n\n" + "### Instruction:\n{instruction}\n\n### Input:\n{input}\n\n### Response:" + ), + "prompt_no_input": ( + "Below is an instruction that describes a task. " + "Write a response that appropriately completes the request.\n\n" + "### Instruction:\n{instruction}\n\n### Response:" + ), +} + +def _tokenize_fn(strings, tokenizer): + """Tokenize a list of strings.""" + tokenized_list = [ + tokenizer( + text, + return_tensors="pt", + padding="longest", + max_length=tokenizer.model_max_length, + truncation=True, + ) + for text in strings + ] + input_ids = labels = [tokenized.input_ids[0] for tokenized in tokenized_list] + input_ids_lens = labels_lens = [ + tokenized.input_ids.ne(tokenizer.pad_token_id).sum().item() for tokenized in tokenized_list + ] + return dict( + input_ids=input_ids, + labels=labels, + input_ids_lens=input_ids_lens, + labels_lens=labels_lens, + ) + +def preprocess(sources, targets, tokenizer): + """Preprocess the data by tokenizing.""" + examples = [s + t for s, t in zip(sources, targets)] + examples_tokenized, sources_tokenized = [_tokenize_fn(strings, tokenizer) for strings in (examples, sources)] + input_ids = examples_tokenized["input_ids"] + labels = copy.deepcopy(input_ids) + for label, source_len in zip(labels, sources_tokenized["input_ids_lens"]): + label[:source_len] = IGNORE_INDEX + return dict(input_ids=input_ids, labels=labels) + + + +def smart_tokenizer_and_embedding_resize(special_tokens_dict, tokenizer, model): + """Resize tokenizer and embedding. + + Note: This is the unoptimized version that may make your embedding size not be divisible by 64. + """ + num_new_tokens = tokenizer.add_special_tokens(special_tokens_dict) + model.resize_token_embeddings(len(tokenizer)) + + if num_new_tokens > 0: + input_embeddings = model.get_input_embeddings().weight.data + output_embeddings = model.get_output_embeddings().weight.data + + input_embeddings_avg = input_embeddings[:-num_new_tokens].mean(dim=0, keepdim=True) + output_embeddings_avg = output_embeddings[:-num_new_tokens].mean(dim=0, keepdim=True) + + input_embeddings[-num_new_tokens:] = input_embeddings_avg + output_embeddings[-num_new_tokens:] = output_embeddings_avg + + + +class SupervisedDataset(TorchCustomDataset): + """Dataset for supervised fine-tuning.""" + + def __init__(self, data_path: str, + tokenizer): + logging.warning('Loading data...') + list_data_dict = utils.jload(data_path) + + logging.warning('Formatting inputs...') + prompt_input, prompt_no_input = PROMPT_DICT[ + 'prompt_input'], PROMPT_DICT['prompt_no_input'] + sources = [ + prompt_input.format_map(example) if example.get('input', '') != '' + else prompt_no_input.format_map(example) + for example in list_data_dict + ] + targets = [ + f"{example['output']}{tokenizer.eos_token}" + for example in list_data_dict + ] + + logging.warning('Tokenizing inputs... This may take some time...') + data_dict = preprocess(sources, targets, tokenizer) + + self.input_ids = data_dict['input_ids'] + self.labels = data_dict['labels'] + + def __len__(self): + return len(self.input_ids) + + def __getitem__(self, i): + return dict(input_ids=self.input_ids[i], labels=self.labels[i]) + +@dataclass +class DataCollatorForSupervisedDataset(object): + """Collate examples for supervised fine-tuning.""" + + tokenizer: LlamaTokenizerFast + + def __call__(self, instances): + input_ids, labels = tuple([instance[key] for instance in instances] for key in ("input_ids", "labels")) + input_ids = torch.nn.utils.rnn.pad_sequence( + input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id + ) + labels = torch.nn.utils.rnn.pad_sequence(labels, batch_first=True, padding_value=IGNORE_INDEX) + return dict( + input_ids=input_ids, + labels=labels, + attention_mask=input_ids.ne(self.tokenizer.pad_token_id), + ) + + + + +if __name__ == '__main__': + + def cfg_modify_fn(cfg): + cfg.train.lr_scheduler = { + 'type': 'CosineAnnealingLR', + 'T_max': 1, + 'options': { + 'by_epoch': False + } + } + cfg.train.optimizer = { + 'type': 'AdamW', + 'lr': 2e-5, + 'weight_decay': 0.0, + 'options': { + 'warmup': { + 'type': 'LinearWarmup', + 'warmup_ratio': 0.03 + } + } + } + cfg.train['bf16'] = True + cfg.train['gradient_accumulation_steps'] = 8 + cfg.train['checkpoint']: {'interval': 1, 'by_epoch': False} + cfg.train.dataloader = {'batch_size_per_gpu': 4, 'workers_per_gpu': 2} + cfg.train.hooks.append({ + 'type': 'DeepspeedHook', + 'config': + '/root/work/stanford_alpaca/configs/default_offload_opt_param.json', + 'save_zero_checkpoint': True, + 'with_mpu': False, + }) + + cfg.preprocessor.sequence_length = 512 + return cfg + + model_name_or_path = '/run/model/llama-7b' + model = LlamaForTextGeneration.from_pretrained( + model_name_or_path, + cache_dir='/run/model/ms_out', + ) + + tokenizer = LlamaTokenizerFast.from_pretrained( + model_name_or_path, + cache_dir='/run/model/ms_out', + model_max_length=512, + padding_side='right', + use_fast=False, + ) + + special_tokens_dict = dict() + if tokenizer.pad_token is None: + special_tokens_dict['pad_token'] = DEFAULT_PAD_TOKEN + if tokenizer.eos_token is None: + special_tokens_dict['eos_token'] = DEFAULT_EOS_TOKEN + if tokenizer.bos_token is None: + special_tokens_dict['bos_token'] = DEFAULT_BOS_TOKEN + if tokenizer.unk_token is None: + special_tokens_dict['unk_token'] = DEFAULT_UNK_TOKEN + + smart_tokenizer_and_embedding_resize( + special_tokens_dict=special_tokens_dict, + tokenizer=tokenizer, + model=model, + ) + + train_dataset = SupervisedDataset( + tokenizer=tokenizer, + data_path='/root/work/stanford_alpaca/alpaca_data.json') + data_collator = DataCollatorForSupervisedDataset(tokenizer=tokenizer) + + kwargs = dict( + model=model, + cfg_file=os.path.join(model_name_or_path, 'configuration.json'), + train_dataset=train_dataset, + eval_dataset=None, + data_collator=data_collator, + max_epochs=1, + launcher='pytorch', + work_dir='/run/model/ms_out', + cfg_modify_fn=cfg_modify_fn) + + # Construct trainer and train + trainer = build_trainer( + name=Trainers.text_generation_trainer, default_args=kwargs) + trainer.train() diff --git a/examples/pytorch/text_generation/run_train_llama.sh b/examples/pytorch/text_generation/run_train_llama.sh new file mode 100644 index 00000000..dbc8c7cb --- /dev/null +++ b/examples/pytorch/text_generation/run_train_llama.sh @@ -0,0 +1,2 @@ +torchrun --nproc_per_node=2 --master_port=1666 examples/pytorch/text_generation/finetune_llama.py +#torchrun tst_train.py diff --git a/modelscope/trainers/hooks/deepspeed_hook.py b/modelscope/trainers/hooks/deepspeed_hook.py index 8fe300f0..4020aed3 100644 --- a/modelscope/trainers/hooks/deepspeed_hook.py +++ b/modelscope/trainers/hooks/deepspeed_hook.py @@ -1,27 +1,26 @@ # Copyright 2020 The HuggingFace Team. All rights reserved. # Copyright (c) Alibaba, Inc. and its affiliates. +import math import os import shutil +from functools import partialmethod -import math import deepspeed import torch -from functools import partialmethod from deepspeed import DeepSpeedEngine from megatron_util import mpu, print_rank_0 +from transformers.deepspeed import HfTrainerDeepSpeedConfig -from modelscope.utils.torch_utils import get_local_rank, get_dist_info from modelscope.metainfo import Hooks from modelscope.trainers.hooks.builder import HOOKS from modelscope.trainers.hooks.hook import Hook from modelscope.trainers.hooks.priority import Priority from modelscope.utils.checkpoint import save_checkpoint -from modelscope.utils.logger import get_logger -from .checkpoint_hook import CheckpointHook, LoadCheckpointHook from modelscope.utils.constant import DistributedParallelType +from modelscope.utils.logger import get_logger +from modelscope.utils.torch_utils import get_dist_info, get_local_rank +from .checkpoint_hook import CheckpointHook, LoadCheckpointHook -# from accelerate.utils.deepspeed import HfDeepSpeedConfig -from transformers.deepspeed import HfTrainerDeepSpeedConfig class DeepSpeedConfig(HfTrainerDeepSpeedConfig): """ @@ -39,66 +38,73 @@ class DeepSpeedConfig(HfTrainerDeepSpeedConfig): # deal with config keys that use `auto` value and rely on model's hidden_size hidden_size_based_keys = [ - "zero_optimization.reduce_bucket_size", - "zero_optimization.stage3_prefetch_bucket_size", - "zero_optimization.stage3_param_persistence_threshold", + 'zero_optimization.reduce_bucket_size', + 'zero_optimization.stage3_prefetch_bucket_size', + 'zero_optimization.stage3_param_persistence_threshold', + ] + hidden_size_auto_keys = [ + x for x in hidden_size_based_keys if self.is_auto(x) ] - hidden_size_auto_keys = [x for x in hidden_size_based_keys if self.is_auto(x)] if len(hidden_size_auto_keys) > 0: - if hasattr(model.config, "hidden_size"): + if hasattr(model.config, 'hidden_size'): hidden_size = model.config.hidden_size - elif hasattr(model.config, "hidden_sizes"): + elif hasattr(model.config, 'hidden_sizes'): # if there are many hidden sizes pick the largest one hidden_size = max(model.config.hidden_sizes) else: raise ValueError( "The model's config file has neither `hidden_size` nor `hidden_sizes` entry, " "therefore it's not possible to automatically fill out the following `auto` entries " - f"in the DeepSpeed config file: {hidden_size_auto_keys}. You can fix that by replacing " - "`auto` values for these keys with an integer value of your choice." + f'in the DeepSpeed config file: {hidden_size_auto_keys}. You can fix that by replacing ' + '`auto` values for these keys with an integer value of your choice.' ) - self.fill_only("zero_optimization.reduce_bucket_size", hidden_size * hidden_size) + self.fill_only('zero_optimization.reduce_bucket_size', + hidden_size * hidden_size) if self.is_zero3(): # automatically assign the optimal config values based on model config - self.fill_only("zero_optimization.stage3_prefetch_bucket_size", 0.9 * hidden_size * hidden_size) - self.fill_only("zero_optimization.stage3_param_persistence_threshold", 10 * hidden_size) + self.fill_only('zero_optimization.stage3_prefetch_bucket_size', + 0.9 * hidden_size * hidden_size) + self.fill_only( + 'zero_optimization.stage3_param_persistence_threshold', + 10 * hidden_size) # scheduler - warmup = args.train.optimizer["options"].get("warmup", {}) - warmup_steps = warmup.get("warmup_steps", 0) - warmup_ratio = warmup.get("warmup_ratio", 0.0) - warmup_steps = warmup_steps if warmup_steps > 0 else math.ceil(num_training_steps * warmup_ratio) - self.fill_match("scheduler.params.total_num_steps", num_training_steps) - self.fill_match("scheduler.params.warmup_num_steps", warmup_steps) - + warmup = args.train.optimizer['options'].get('warmup', {}) + warmup_steps = warmup.get('warmup_steps', 0) + warmup_ratio = warmup.get('warmup_ratio', 0.0) + warmup_steps = warmup_steps if warmup_steps > 0 else math.ceil( + num_training_steps * warmup_ratio) + self.fill_match('scheduler.params.total_num_steps', num_training_steps) + self.fill_match('scheduler.params.warmup_num_steps', warmup_steps) if len(self.mismatches) > 0: - mismatches = "\n".join(self.mismatches) + mismatches = '\n'.join(self.mismatches) raise ValueError( - "Please correct the following DeepSpeed config values that mismatch TrainingArguments" + 'Please correct the following DeepSpeed config values that mismatch TrainingArguments' f" values:\n{mismatches}\nThe easiest method is to set these DeepSpeed config values to 'auto'." ) + def deepspeed_optim_sched(trainer, hf_deepspeed_config, num_training_steps): config = hf_deepspeed_config.config optimizer = None - if "optimizer" not in config: + if 'optimizer' not in config: if hf_deepspeed_config.is_offload(): logger.info( - "Detected ZeRO Offload and non-DeepSpeed optimizers: This combination should work as long as the" - " custom optimizer has both CPU and GPU implementation (except LAMB)" + 'Detected ZeRO Offload and non-DeepSpeed optimizers: This combination should work as long as the' + ' custom optimizer has both CPU and GPU implementation (except LAMB)' ) # ds supports Adam, OneBitAdam, and Lamb optimizers and can import other optimizers from torch. # But trainer uses AdamW by default. optimizer = trainer.optimizer # To use other optimizers requires voiding warranty with: `zero_allow_untested_optimizer` - config["zero_allow_untested_optimizer"] = True + config['zero_allow_untested_optimizer'] = True lr_scheduler = None - if "scheduler" not in config: + if 'scheduler' not in config: lr_scheduler = trainer.scheduler return optimizer, lr_scheduler @@ -119,14 +125,15 @@ class DeepspeedHook(Hook): # TODO without mpu self.with_mpu = with_mpu self.deepspeed_config = config - #assert with_mpu, 'DeepspeedHook now is only for mpu models.' def register_strategy(self): Hook.overload(name='OptimizerHook.backward', function=self.backward) Hook.overload( name='OptimizerHook.initialize_optimizer', function=self.idle) Hook.overload(name='LrSchedulerHook.step', function=self.idle) - Hook.overload(name='LrSchedulerHook.get_current_lr', function=self.get_current_lr) + Hook.overload( + name='LrSchedulerHook.get_current_lr', + function=self.get_current_lr) Hook.overload( name='CheckpointHook.save_checkpoints', function=self.save_checkpoints) @@ -138,17 +145,18 @@ class DeepspeedHook(Hook): function=self.remove_checkpoints) Hook.overload( name='CheckpointHook.prepare_output', function=self.prepare_output) + Hook.overload(name='DDPHook.wrap_module', function=self.wrap_module) Hook.overload( - name='DDPHook.wrap_module', function=self.wrap_module) - if self.with_mpu: - Hook.overload( - name='CheckpointHook.should_save_on_rank', - function=self.should_save_on_rank) + name='CheckpointHook.should_save_on_rank', + function=self.should_save_on_rank) def should_save_on_rank(self, trainer): - # TODO - return (not torch.distributed.is_initialized() - ) or mpu.get_data_parallel_rank() == 0 + return True + + def get_bin_file(self): + mp_rank = mpu.get_tensor_model_parallel_rank() + rank = '{:02d}'.format(mp_rank) + return f'mp_rank_{rank}_model_states.pt' def wrap_module(self, trainer): # deepspeed initializes its own ddp @@ -180,7 +188,8 @@ class DeepspeedHook(Hook): pass def get_current_lr(self, trainer): - if isinstance(trainer.optimizer, torch.optim.Optimizer) or isinstance(trainer.optimizer, deepspeed.DeepSpeedOptimizer): + if isinstance(trainer.optimizer, torch.optim.Optimizer) or isinstance( + trainer.optimizer, deepspeed.DeepSpeedOptimizer): lr = [group['lr'] for group in trainer.optimizer.param_groups] elif isinstance(trainer.optimizer, dict): lr = dict() @@ -191,7 +200,6 @@ class DeepspeedHook(Hook): 'lr is not applicable because optimizer does not exist.') return lr - def save_checkpoints(self, trainer, checkpoint_path_prefix, @@ -208,15 +216,6 @@ class DeepspeedHook(Hook): prefix = os.path.basename(checkpoint_path_prefix) trainer.model.save_checkpoint(save_dir, prefix) - bin_file = self.get_bin_file() - src_file = os.path.join(checkpoint_path_prefix, bin_file) - dest_file = os.path.join(save_dir, output_sub_dir, self._BIN_FILE_DIR, - bin_file) - if os.path.isfile(dest_file): - os.unlink(dest_file) - - os.link(src_file, dest_file) - def remove_checkpoints(self, trainer, checkpoint_path_prefix): _train_state_file = checkpoint_path_prefix + self.rank_name( ) + CheckpointHook.TRAINER_STATE_SUFFIX @@ -274,30 +273,28 @@ class DeepspeedHook(Hook): def before_val(self, trainer): pass - def after_init(self, trainer): - device_id = get_local_rank() - trainer.device = f'cuda:{device_id}' - #trainer.parallel_groups[DistributedParallelType.DP] = None - def prepare_args(self, args): - args.per_device_train_batch_size = args.train.dataloader.get("batch_size_per_gpu", 4) - args.gradient_accumulation_steps = args.train.get("gradient_accumulation_steps", 1) - args.max_grad_norm = args.train.get("clip_grad", 1.0) - args.learning_rate = args.train.optimizer.get("lr", 2e-5) - args.adam_beta1 = args.train.optimizer.get("adam_beta1", 0.9) - args.adam_beta2 = args.train.optimizer.get("adam_beta2", 0.999) - args.adam_epsilon = args.train.optimizer.get("adam_epsilon", 1e-8) - args.weight_decay = args.train.optimizer.get("weight_decay", 0.0) - args.fp16 = args.train.get("use_fp16", False) - args.fp16_full_eval = args.train.get("use_fp16", False) - args.fp16_backend = args.train.get("fp16_backend", "amp") - args.save_on_each_node = args.train.get("save_on_each_node", False) - args.fp16_opt_level = args.train.get("fp16_opt_level", None) - args.fp16_opt_level = next((item["opt_level"] for item in args.train.hooks if item["type"] == "ApexAMPOptimizerHook"), args.fp16_opt_level) + args.per_device_train_batch_size = args.train.dataloader.get( + 'batch_size_per_gpu', 4) + args.gradient_accumulation_steps = args.train.get( + 'gradient_accumulation_steps', 1) + args.max_grad_norm = args.train.get('clip_grad', 1.0) + args.learning_rate = args.train.optimizer.get('lr', 2e-5) + args.adam_beta1 = args.train.optimizer.get('adam_beta1', 0.9) + args.adam_beta2 = args.train.optimizer.get('adam_beta2', 0.999) + args.adam_epsilon = args.train.optimizer.get('adam_epsilon', 1e-8) + args.weight_decay = args.train.optimizer.get('weight_decay', 0.0) + args.fp16 = args.train.get('use_fp16', False) + args.fp16_full_eval = args.train.get('use_fp16', False) + args.fp16_backend = args.train.get('fp16_backend', 'amp') + args.save_on_each_node = args.train.get('save_on_each_node', False) + args.fp16_opt_level = args.train.get('fp16_opt_level', None) + args.fp16_opt_level = next( + (item['opt_level'] for item in args.train.hooks + if item['type'] == 'ApexAMPOptimizerHook'), args.fp16_opt_level) if not args.fp16_opt_level: - args.fp16_opt_level = "O1" - args.bf16 = args.train.get("bf16", False) - + args.fp16_opt_level = 'O1' + args.bf16 = args.train.get('bf16', False) def get_deepspeed_config(self, trainer, max_steps): args = trainer.cfg @@ -308,7 +305,7 @@ class DeepspeedHook(Hook): else: deepspeed_config = os.path.join(trainer.model_dir, self.deepspeed_config) - self.logger.info(f"Loading deepspeed config from {deepspeed_config}") + self.logger.info(f'Loading deepspeed config from {deepspeed_config}') ds_config = DeepSpeedConfig(deepspeed_config) ds_config.trainer_config_process(args) @@ -316,18 +313,6 @@ class DeepspeedHook(Hook): ds_config.trainer_config_finalize(args, trainer.model, max_steps) return ds_config - # def prepare_for_init(self, trainer): - - # gradient_accumulation_steps = trainer.cfg.train.get("gradient_accumulation_steps", 1) - # num_update_steps_per_epoch = trainer.iters_per_epoch // gradient_accumulation_steps - # max_steps = math.ceil(trainer._max_epochs * num_update_steps_per_epoch) - - # ds_config = self.get_deepspeed_config(trainer, max_steps) - - # optimizer, lr_scheduler = deepspeed_optim_sched(trainer, ds_config, max_steps) - # config = ds_config.config - # return config, optimizer, lr_scheduler - def before_run(self, trainer): if not hasattr(trainer, 'logger'): self.logger = get_logger() @@ -335,19 +320,19 @@ class DeepspeedHook(Hook): self.logger = trainer.logger # deepspeed init - gradient_accumulation_steps = trainer.cfg.train.get("gradient_accumulation_steps", 1) + gradient_accumulation_steps = trainer.cfg.train.get( + 'gradient_accumulation_steps', 1) num_update_steps_per_epoch = trainer.iters_per_epoch // gradient_accumulation_steps max_steps = math.ceil(trainer._max_epochs * num_update_steps_per_epoch) ds_config = self.get_deepspeed_config(trainer, max_steps) - optimizer, lr_scheduler = deepspeed_optim_sched(trainer, ds_config, max_steps) + optimizer, lr_scheduler = deepspeed_optim_sched( + trainer, ds_config, max_steps) config = ds_config.config - - # TODO: 判断是否需要dist_init 和 mpu 而非写死; + trainer.model, trainer.optimizer, _, trainer.lr_scheduler = deepspeed.initialize( model=trainer.model, optimizer=optimizer, config=config, lr_scheduler=lr_scheduler) - trainer.model.save_zero_checkpoint = self.save_zero_checkpoint diff --git a/modelscope/trainers/trainer.py b/modelscope/trainers/trainer.py index 5b608c25..75c272a1 100644 --- a/modelscope/trainers/trainer.py +++ b/modelscope/trainers/trainer.py @@ -990,7 +990,8 @@ class EpochBasedTrainer(BaseTrainer): optimizer = self.build_optimizer(cfg=optimizer_cfg) if lr_scheduler is None: - lr_scheduler_cfg = deepcopy(self.cfg.train.get('lr_scheduler', None)) + lr_scheduler_cfg = deepcopy( + self.cfg.train.get('lr_scheduler', None)) else: lr_scheduler_cfg = None