Files
modelscope/tests/trainers/hooks/test_optimizer_hook.py
xingjun.wang 48c0d2a9af add 1.6
2023-05-22 10:53:18 +08:00

267 lines
9.7 KiB
Python

# Copyright (c) Alibaba, Inc. and its affiliates.
import os
import shutil
import tempfile
import unittest
import json
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
from torch.optim import SGD
from torch.optim.lr_scheduler import MultiStepLR
from modelscope.metainfo import Trainers
from modelscope.models.base import Model
from modelscope.trainers import build_trainer
from modelscope.utils.constant import ModelFile, TrainerStages
from modelscope.utils.test_utils import create_dummy_test_dataset
dummy_dataset = create_dummy_test_dataset(
np.random.random(size=(2, )), np.random.randint(0, 2, (1, )), 10)
class DummyModel(nn.Module, Model):
def __init__(self):
super().__init__()
self.linear = nn.Linear(2, 2)
self.bn = nn.BatchNorm1d(2)
def forward(self, feat, labels):
x = self.linear(feat)
x = self.bn(x)
loss = F.cross_entropy(x, labels.to(torch.long).squeeze())
return dict(logits=x, loss=loss)
class OptimizerHookTest(unittest.TestCase):
def setUp(self):
print(('Testing %s.%s' % (type(self).__name__, self._testMethodName)))
self.tmp_dir = tempfile.TemporaryDirectory().name
if not os.path.exists(self.tmp_dir):
os.makedirs(self.tmp_dir)
def tearDown(self):
super().tearDown()
shutil.rmtree(self.tmp_dir)
def test_optimizer_hook(self):
json_cfg = {
'task': 'image_classification',
'train': {
'work_dir': self.tmp_dir,
'dataloader': {
'batch_size_per_gpu': 2,
'workers_per_gpu': 1
}
}
}
config_path = os.path.join(self.tmp_dir, ModelFile.CONFIGURATION)
with open(config_path, 'w') as f:
json.dump(json_cfg, f)
model = DummyModel()
optimizer = SGD(model.parameters(), lr=0.01)
lr_scheduler = MultiStepLR(optimizer, milestones=[1, 2])
trainer_name = Trainers.default
kwargs = dict(
cfg_file=config_path,
model=model,
train_dataset=dummy_dataset,
optimizers=(optimizer, lr_scheduler),
max_epochs=2,
device='cpu')
trainer = build_trainer(trainer_name, kwargs)
train_dataloader = trainer._build_dataloader_with_dataset(
trainer.train_dataset, **trainer.cfg.train.get('dataloader', {}))
trainer.register_optimizers_hook()
trainer._hooks = [
hook for hook in trainer._hooks if hook.__class__.__name__ not in
['CheckpointHook', 'TextLoggerHook', 'IterTimerHook']
]
trainer.invoke_hook(TrainerStages.before_run)
for _ in range(trainer._epoch, trainer._max_epochs):
trainer.invoke_hook(TrainerStages.before_train_epoch)
for _, data_batch in enumerate(train_dataloader):
trainer.invoke_hook(TrainerStages.before_train_iter)
trainer.train_step(trainer.model, data_batch)
trainer.invoke_hook(TrainerStages.after_train_iter)
self.assertEqual(
len(trainer.optimizer.param_groups[0]['params']), 4)
for i in range(4):
self.assertTrue(trainer.optimizer.param_groups[0]['params']
[i].requires_grad)
trainer.invoke_hook(TrainerStages.after_train_epoch)
trainer._epoch += 1
trainer.invoke_hook(TrainerStages.after_run)
class TorchAMPOptimizerHookTest(unittest.TestCase):
def setUp(self):
print(('Testing %s.%s' % (type(self).__name__, self._testMethodName)))
self.tmp_dir = tempfile.TemporaryDirectory().name
if not os.path.exists(self.tmp_dir):
os.makedirs(self.tmp_dir)
def tearDown(self):
super().tearDown()
shutil.rmtree(self.tmp_dir)
@unittest.skipIf(not torch.cuda.is_available(),
'skip this test when cuda is not available')
def test_amp_optimizer_hook(self):
json_cfg = {
'task': 'image_classification',
'train': {
'work_dir': self.tmp_dir,
'dataloader': {
'batch_size_per_gpu': 2,
'workers_per_gpu': 1
}
}
}
config_path = os.path.join(self.tmp_dir, ModelFile.CONFIGURATION)
with open(config_path, 'w') as f:
json.dump(json_cfg, f)
model = DummyModel().cuda()
optimizer = SGD(model.parameters(), lr=0.01)
lr_scheduler = MultiStepLR(optimizer, milestones=[1, 2])
trainer_name = Trainers.default
kwargs = dict(
cfg_file=config_path,
model=model,
train_dataset=dummy_dataset,
optimizers=(optimizer, lr_scheduler),
max_epochs=2,
use_fp16=True)
trainer = build_trainer(trainer_name, kwargs)
train_dataloader = trainer._build_dataloader_with_dataset(
trainer.train_dataset, **trainer.cfg.train.get('dataloader', {}))
trainer.register_optimizers_hook()
trainer.register_processors()
trainer._hooks = [
hook for hook in trainer._hooks if hook.__class__.__name__ not in
['CheckpointHook', 'TextLoggerHook', 'IterTimerHook']
]
trainer.invoke_hook(TrainerStages.before_run)
for _ in range(trainer._epoch, trainer._max_epochs):
trainer.invoke_hook(TrainerStages.before_train_epoch)
for _, data_batch in enumerate(train_dataloader):
for k, v in data_batch.items():
data_batch[k] = v.cuda()
trainer.invoke_hook(TrainerStages.before_train_iter)
trainer.train_step(trainer.model, data_batch)
trainer.invoke_hook(TrainerStages.after_train_iter)
self.assertEqual(trainer.train_outputs['logits'].dtype,
torch.float16)
# test if `after_train_iter`, whether the model is reset to fp32
trainer.train_step(trainer.model, data_batch)
self.assertEqual(trainer.train_outputs['logits'].dtype,
torch.float32)
self.assertEqual(
len(trainer.optimizer.param_groups[0]['params']), 4)
for i in range(4):
self.assertTrue(trainer.optimizer.param_groups[0]['params']
[i].requires_grad)
trainer.invoke_hook(TrainerStages.after_train_epoch)
trainer._epoch += 1
trainer.invoke_hook(TrainerStages.after_run)
class TorchApexOptimizerHookTest(unittest.TestCase):
def setUp(self):
print(('Testing %s.%s' % (type(self).__name__, self._testMethodName)))
self.tmp_dir = tempfile.TemporaryDirectory().name
if not os.path.exists(self.tmp_dir):
os.makedirs(self.tmp_dir)
def tearDown(self):
super().tearDown()
shutil.rmtree(self.tmp_dir)
@unittest.skip('Apex works abnormally with torch 1.13')
def test_apex_optimizer_hook(self):
json_cfg = {
'task': 'image_classification',
'train': {
'work_dir': self.tmp_dir,
'dataloader': {
'batch_size_per_gpu': 2,
'workers_per_gpu': 1
},
}
}
config_path = os.path.join(self.tmp_dir, ModelFile.CONFIGURATION)
with open(config_path, 'w') as f:
json.dump(json_cfg, f)
model = DummyModel().cuda()
optimizer = SGD(model.parameters(), lr=0.01)
lr_scheduler = MultiStepLR(optimizer, milestones=[1, 2])
trainer_name = Trainers.default
kwargs = dict(
cfg_file=config_path,
model=model,
train_dataset=dummy_dataset,
optimizers=(optimizer, lr_scheduler),
max_epochs=2)
trainer = build_trainer(trainer_name, kwargs)
train_dataloader = trainer._build_dataloader_with_dataset(
trainer.train_dataset, **trainer.cfg.train.get('dataloader', {}))
trainer.register_optimizers_hook()
trainer.register_hook_from_cfg([{'type': 'ApexAMPOptimizerHook'}])
trainer._hooks = [
hook for hook in trainer._hooks if hook.__class__.__name__ not in
['CheckpointHook', 'TextLoggerHook', 'IterTimerHook']
]
trainer.invoke_hook(TrainerStages.before_run)
for _ in range(trainer._epoch, trainer._max_epochs):
trainer.invoke_hook(TrainerStages.before_train_epoch)
for _, data_batch in enumerate(train_dataloader):
for k, v in data_batch.items():
data_batch[k] = v.cuda()
trainer.invoke_hook(TrainerStages.before_train_iter)
trainer.train_step(trainer.model, data_batch)
trainer.invoke_hook(TrainerStages.after_train_iter)
self.assertEqual(trainer.train_outputs['logits'].dtype,
torch.float16)
# test if `after_train_iter`, whether the model is reset to fp32
trainer.train_step(trainer.model, data_batch)
self.assertEqual(
len(trainer.optimizer.param_groups[0]['params']), 4)
for i in range(4):
self.assertTrue(trainer.optimizer.param_groups[0]['params']
[i].requires_grad)
trainer.invoke_hook(TrainerStages.after_train_epoch)
trainer._epoch += 1
trainer.invoke_hook(TrainerStages.after_run)
if __name__ == '__main__':
unittest.main()