support vision efficient tuning finetune

## 查看改动点 ↓↓↓
### vision efficient tuning finetune
- Model模块改造成适配训练的
- Model模块在支持训练同时向下兼容之前发布的modecard
- Pipline兼容modelcard加载的preprocessor或直接定义的
- 添加 ImageClassificationPreprocessor (非mmcv版本)
- 添加 VisionEfficientTuningTrainer
- ~~添加 opencv_transforms==0.0.6~~ (以源代码引入必要)

### Modelcard
- test pipeline和trainer合并到一起
- 新增3个模型的test
- 新增demo service

### 公共组件
- ms_dataset.py: fix warning, [UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or xxx]
- preprocessor添加common:ToNumpy、Rename、Identity
- preprocessor common对于dict进行key判断再取值。
- ~~修复learning rate在iter级别变化的逻辑。~~ (本次不做了)
- ~~修复非dist状态下train data没有进行shuffle的bug。~~ (Master已有人改了)
- 修复训练时调用util中非cv包的异常 zhconv。

### 其他
- 为防止新引入的preprocessor模块在config中被原代码加载,导致在其他人做CI时会报错;所以暂时没有添加新的tag,等CR完成后,会进行打tag再rerun CI。
        Link: https://code.alibaba-inc.com/Ali-MaaS/MaaS-lib/codereview/11762108

* support vision efficient tuning finetune

* update test case

* update shuffle on IterableDataset

* update bitfit & sidetuning

* compatible with base trainer
This commit is contained in:
zeyinzi.jzyz
2023-03-08 16:42:23 +08:00
committed by wenmeng.zwm
parent 8298a3c31d
commit bf3a2b6c09
25 changed files with 2096 additions and 259 deletions

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:407d70db9f01bc7a6f34377e36c3f2f5eefdfca8bd3c578226bf5b31b73325dc
size 127213

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:9c67733db75dc7fd773561a5091329fd5ee919b2268a3a65718261722607698f
size 226882

View File

@@ -808,6 +808,7 @@ class CVTrainers(object):
image_classification = 'image-classification'
image_fewshot_detection = 'image-fewshot-detection'
nerf_recon_acc = 'nerf-recon-acc'
vision_efficient_tuning = 'vision-efficient-tuning'
class NLPTrainers(object):
@@ -919,6 +920,7 @@ class Preprocessors(object):
bad_image_detecting_preprocessor = 'bad-image-detecting-preprocessor'
nerf_recon_acc_preprocessor = 'nerf-recon-acc-preprocessor'
controllable_image_generation_preprocessor = 'controllable-image-generation-preprocessor'
image_classification_preprocessor = 'image-classification-preprocessor'
# nlp preprocessor
sen_sim_tokenizer = 'sen-sim-tokenizer'

View File

@@ -5,18 +5,11 @@ from modelscope.utils.import_utils import LazyImportModule
if TYPE_CHECKING:
from .vision_efficient_tuning_adapter import VisionEfficientTuningAdapterModel
from .vision_efficient_tuning_prompt import VisionEfficientTuningPromptModel
from .vision_efficient_tuning_prefix import VisionEfficientTuningPrefixModel
from .vision_efficient_tuning_lora import VisionEfficientTuningLoRAModel
from .model import VisionEfficientTuningModel
else:
_import_structure = {
'vision_efficient_tuning_adapter':
['VisionEfficientTuningAdapterModel'],
'vision_efficient_tuning_prompt': ['VisionEfficientTuningPromptModel'],
'vision_efficient_tuning_prefix': ['VisionEfficientTuningPrefixModel'],
'vision_efficient_tuning_lora': ['VisionEfficientTuningLoRAModel'],
'model': ['VisionEfficientTuningModel'],
}
import sys

View File

@@ -7,9 +7,10 @@ import torch
import torch.nn as nn
import torch.nn.functional as F
from .petl import Adapter, LoRA, Prefix, Prompt
from .petl import Adapter, LoRA, Prefix, Prompt, SideTune
from .timm_vision_transformer import (Attention, Block, DropPath, LayerScale,
Mlp, PatchEmbed, VisionTransformer)
Mlp, PatchEmbed, VisionTransformer,
checkpoint_seq)
class AttentionPETL(nn.Module):
@@ -212,40 +213,74 @@ class VisionTransformerPETL(VisionTransformer):
The implementation of several tuning methods (prompt, prefix, adapter, and LoRA) based on ViT.
"""
def __init__(
self,
img_size=224,
patch_size=16,
in_chans=3,
num_classes=1000,
global_pool='token',
embed_dim=768,
depth=12,
num_heads=12,
mlp_ratio=4.,
qkv_bias=True,
init_values=None,
class_token=True,
no_embed_class=False,
pre_norm=False,
fc_norm=None,
drop_rate=0.,
attn_drop_rate=0.,
drop_path_rate=0.,
weight_init='',
embed_layer=PatchEmbed,
norm_layer=None,
act_layer=None,
block_fn=Block,
prompt_length=None,
prompt_type=None,
prefix_length=None,
prefix_type=None,
adapter_length=None,
adapter_type=None,
lora_length=None,
lora_type=None,
):
def __init__(self,
img_size=224,
patch_size=16,
in_chans=3,
num_classes=1000,
global_pool='token',
embed_dim=768,
depth=12,
num_heads=12,
mlp_ratio=4.,
qkv_bias=True,
init_values=None,
class_token=True,
no_embed_class=False,
pre_norm=False,
fc_norm=None,
drop_rate=0.,
attn_drop_rate=0.,
drop_path_rate=0.,
weight_init='',
embed_layer=PatchEmbed,
norm_layer=None,
act_layer=None,
block_fn=Block,
prompt_length=None,
prompt_type=None,
prefix_length=None,
prefix_type=None,
adapter_length=None,
adapter_type=None,
lora_length=None,
lora_type=None,
sidetune_length=None,
sidetune_type=None):
""" Initialize a Parameter-efficient Transfer Learning Method based on Vision Transformer.
Args:
img_size (int, tuple): input image size
patch_size (int, tuple): patch size
in_chans (int): number of input channels
num_classes (int): number of classes for classification head
global_pool (str): type of global pooling for final sequence (default: 'token')
embed_dim (int): embedding dimension
depth (int): depth of transformer
num_heads (int): number of attention heads
mlp_ratio (int): ratio of mlp hidden dim to embedding dim
qkv_bias (bool): enable bias for qkv if True
init_values: (float): layer-scale init values
class_token (bool): use class token
fc_norm (Optional[bool]): pre-fc norm after pool, set if global_pool == 'avg' if None (default: None)
drop_rate (float): dropout rate
attn_drop_rate (float): attention dropout rate
drop_path_rate (float): stochastic depth rate
weight_init (str): weight init scheme
embed_layer (nn.Module): patch embedding layer
norm_layer: (nn.Module): normalization layer
act_layer: (nn.Module): MLP activation layer
prompt_length: An integer indicating the length of prompt tuning.
prompt_type: A string indicating the type of prompt tuning.
prefix_length: An integer indicating the length of prefix tuning.
prefix_type: A string indicating the type of prefix tuning.
adapter_length: An integer indicating the length of adapter tuning.
adapter_type: A string indicating the type of adapter tuning.
lora_length: An integer indicating the length of LoRA tuning.
lora_type: A string indicating the type of LoRA tuning.
sidetune_length: An integer indicating the linear dimension.
sidetune_type: A string indicating the type of side network.
"""
super().__init__()
assert global_pool in ('', 'avg', 'token')
@@ -349,3 +384,49 @@ class VisionTransformerPETL(VisionTransformer):
if weight_init != 'skip':
self.init_weights(weight_init)
if sidetune_type is not None:
self.sidetune = SideTune(sidetune_length, sidetune_type)
else:
self.sidetune = None
def forward_features(self, x):
""" feature forward function of VisionTransformer.
Args:
x (Tensor): the input data.
Returns:
res (Dict): the output data, contains:
- inputs: the original input.
- x: the intermediate feature.
"""
res = dict(inputs=x)
x = self.patch_embed(x)
x = self._pos_embed(x)
x = self.norm_pre(x)
if self.grad_checkpointing and not torch.jit.is_scripting():
x = checkpoint_seq(self.blocks, x)
else:
x = self.blocks(x)
x = self.norm(x)
res['x'] = x
return res
def forward_head(self, res, pre_logits: bool = False):
""" head forward function of VisionTransformer.
Args:
res (Dict): the input data, contains:
- inputs: the original input.
- x: the intermediate feature.
Returns:
x (Tensor): the output data.
"""
x = res['x']
if self.global_pool:
x = x[:, self.num_prefix_tokens:].mean(
dim=1) if self.global_pool == 'avg' else x[:, 0]
if self.sidetune and 'inputs' in res:
x = self.sidetune(res['inputs'], x)
x = self.fc_norm(x)
return x if pre_logits else self.head(x)

View File

@@ -0,0 +1,49 @@
# Copyright 2022-2023 The Alibaba Fundamental Vision Team Authors. All rights reserved.
from typing import Any, Dict
import torch
from modelscope.metainfo import Models
from modelscope.models.base.base_torch_model import TorchModel
from modelscope.models.builder import MODELS
from modelscope.utils.constant import Tasks
from .vision_efficient_tuning import VisionEfficientTuning
@MODELS.register_module(
Tasks.vision_efficient_tuning, module_name=Models.vision_efficient_tuning)
class VisionEfficientTuningModel(TorchModel):
""" The implementation of vision efficient tuning model based on TorchModel.
This model is constructed with the following parts:
- 'backbone': pre-trained backbone model with parameters.
- 'head': classification head with fine-tuning.
"""
def __init__(self, model_dir: str, **kwargs):
""" Initialize a vision efficient tuning model.
Args:
model_dir: model id or path, where model_dir/pytorch_model.pt contains:
- 'backbone_weight': parameters of backbone.
- 'head_weight': parameters of head.
"""
super().__init__(model_dir)
self.model = VisionEfficientTuning(model_dir=model_dir, **kwargs)
self.CLASSES = self.model.CLASSES
self.device = torch.device(
'cuda' if torch.cuda.is_available() else 'cpu')
self.model.to(self.device)
def forward(self, input: Dict[str, Any]) -> Dict[str, Any]:
""" Dynamic forward function of vision efficient tuning model.
Args:
input: the input data dict contanis:
- imgs: (B, 3, H, W).
- labels: (B), when training stage.
"""
output = self.model(**input)
return output

View File

@@ -1,8 +1,10 @@
# Copyright 2022-2023 The Alibaba Fundamental Vision Team Authors. All rights reserved.
import math
from collections import OrderedDict
import torch
import torch.nn as nn
import torchvision
class Prompt(nn.Module):
@@ -172,3 +174,101 @@ class Prefix(nn.Module):
k, v = torch.cat((k, prefix_key), dim=2), torch.cat((v, prefix_value),
dim=2)
return q, k, v
class SideTune(nn.Module):
"""The implementation of vision side-tuning method.
Side-Tuning only needs to train one side network and
weights the output of pre-trained model and side network.
'Side-Tuning: A Baseline for Network Adaptation via Additive Side Networks'
by Zhang et al.(2019)
See https://arxiv.org/abs/1912.13503
Attributes:
sidetune_length: An integer indicating the linear dimension.
sidetune_type: A string indicating the type of side network.
"""
def __init__(self, sidetune_length=None, sidetune_type=None):
super(SideTune, self).__init__()
self.sidetune_length = sidetune_length
self.sidetune_type = sidetune_type
if sidetune_type.lower() == 'fcn4':
self.side = FCN4(out_dims=self.sidetune_length)
if sidetune_type.lower() == 'alexnet':
mm = torchvision.models.alexnet(pretrained=True)
self.side = nn.Sequential(
OrderedDict([
('features', mm.features), ('avgpool', mm.avgpool),
('flatten', nn.Flatten()),
('fc', nn.Linear(9216, self.sidetune_length, bias=False))
]))
self.alpha = nn.Parameter(torch.tensor(0.0))
def forward(self, x, x_base):
alpha_squashed = torch.sigmoid(self.alpha)
x_side = self.side(x)
x_out = alpha_squashed * x_base + (1 - alpha_squashed) * x_side
return x_out
class FCN4(nn.Module):
"""The implementation of simple FCN4 network for side network.
"""
def __init__(self, out_dims=-1, **kwargs):
super(FCN4, self).__init__(**kwargs)
self.conv1 = nn.Sequential(
nn.Conv2d(
3,
16,
kernel_size=3,
stride=1,
padding=1,
bias=False,
dilation=1), nn.GroupNorm(2, 16), nn.ReLU())
self.conv2 = nn.Sequential(
nn.Conv2d(
16,
16,
kernel_size=3,
stride=2,
padding=0,
bias=False,
dilation=1), nn.GroupNorm(2, 16), nn.ReLU())
self.conv3 = nn.Sequential(
nn.Conv2d(
16,
32,
kernel_size=3,
stride=2,
padding=0,
bias=False,
dilation=1), nn.GroupNorm(2, 32), nn.ReLU())
self.conv4 = nn.Sequential(
nn.Conv2d(
32,
64,
kernel_size=3,
stride=1,
padding=0,
bias=False,
dilation=1), nn.GroupNorm(2, 64), nn.ReLU())
self.pool = nn.AdaptiveAvgPool2d((1, 1))
if out_dims > 0:
self.fc = nn.Linear(64, out_dims)
else:
self.fc = None
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = self.conv3(x)
x = self.conv4(x)
x = self.pool(x)
x = x.view(x.size(0), -1)
if self.fc is not None:
x = self.fc(x)
return x

View File

@@ -1,65 +1,154 @@
# Copyright 2022-2023 The Alibaba Fundamental Vision Team Authors. All rights reserved.
import os
from collections import OrderedDict
import torch
import torch.nn as nn
import torch.nn.functional as F
from modelscope.metainfo import Models
from modelscope.models.base.base_torch_model import TorchModel
from modelscope.models.builder import MODELS
from modelscope.utils.constant import ModelFile, Tasks
from modelscope.outputs import OutputKeys
from modelscope.utils.constant import ModelFile
@MODELS.register_module(
Tasks.vision_efficient_tuning, module_name=Models.vision_efficient_tuning)
class VisionEfficientTuningModel(TorchModel):
class VisionEfficientTuning(nn.Module):
""" The implementation of vision efficient tuning.
This model is constructed with the following parts:
- 'backbone': pre-trained backbone model with parameters.
- 'head': classification head with fine-tuning.
- 'loss': loss function for training.
"""
def __init__(self, model_dir: str, **kwargs):
def __init__(self,
backbone=None,
head=None,
loss=None,
pretrained=True,
finetune=False,
**kwargs):
""" Initialize a vision efficient tuning model.
Args:
model_dir: model id or path, where model_dir/pytorch_model.pt contains:
- 'backbone_cfg': config of backbone.
- 'backbone_weight': parameters of backbone.
- 'head_cfg': config of head.
- 'head_weight': parameters of head.
- 'CLASSES': list of label name.
backbone: config of backbone.
head: config of head.
loss: config of loss.
pretrained: whether to load the pretrained model.
finetune: whether to finetune the model.
"""
from .backbone import VisionTransformerPETL
from .head import ClassifierHead
super().__init__(model_dir)
model_path = os.path.join(model_dir, ModelFile.TORCH_MODEL_FILE)
model_dict = torch.load(model_path)
super(VisionEfficientTuning, self).__init__()
backbone_cfg = model_dict['backbone_cfg']
if 'type' in backbone_cfg:
backbone_cfg.pop('type')
self.backbone_model = VisionTransformerPETL(**backbone_cfg)
self.backbone_model.load_state_dict(
model_dict['backbone_weight'], strict=True)
if backbone and 'type' in backbone:
backbone.pop('type')
self.backbone = VisionTransformerPETL(**backbone)
else:
self.backbone = None
head_cfg = model_dict['head_cfg']
if 'type' in head_cfg:
head_cfg.pop('type')
self.head_model = ClassifierHead(**head_cfg)
self.head_model.load_state_dict(model_dict['head_weight'], strict=True)
# TODO Use a more elegant method to build the model.
if head and 'type' in head:
head.pop('type')
self.head = ClassifierHead(**head)
else:
self.head = None
self.CLASSES = model_dict['CLASSES']
if loss and 'type' in loss:
self.loss = getattr(torch.nn, loss['type'])()
else:
self.loss = torch.nn.CrossEntropyLoss()
def forward(self, inputs):
self.CLASSES = kwargs.pop('CLASSES', None)
self.pretrained_cfg = kwargs.pop('pretrained_cfg', None)
if pretrained:
assert 'model_dir' in kwargs, 'pretrained model dir is missing.'
model_path = os.path.join(kwargs['model_dir'],
ModelFile.TORCH_MODEL_FILE)
model_dict = torch.load(model_path, map_location='cpu')
if self.backbone is None and 'backbone_cfg' in model_dict:
model_dict['backbone_cfg'].pop('type')
self.backbone = VisionTransformerPETL(
**model_dict['backbone_cfg'])
if self.head is None and 'head_cfg' in model_dict:
model_dict['head_cfg'].pop('type')
self.head = ClassifierHead(**model_dict['head_cfg'])
if 'backbone_weight' in model_dict:
backbone_weight = model_dict['backbone_weight']
if finetune and self.pretrained_cfg and 'unload_part' in self.pretrained_cfg \
and 'backbone' in self.pretrained_cfg['unload_part']:
backbone_weight = self.filter_weight(
backbone_weight,
self.pretrained_cfg['unload_part']['backbone'])
self.backbone.load_state_dict(backbone_weight, strict=False)
if 'head_weight' in model_dict:
head_weight = model_dict['head_weight']
if finetune and self.pretrained_cfg and 'unload_part' in self.pretrained_cfg \
and 'head' in self.pretrained_cfg['unload_part']:
head_weight = self.filter_weight(
head_weight,
self.pretrained_cfg['unload_part']['head'])
self.head.load_state_dict(head_weight, strict=False)
self.CLASSES = model_dict[
'CLASSES'] if 'CLASSES' in model_dict else self.CLASSES
def filter_weight(self, weights, unload_part=[]):
""" Filter parameters that the model does not need to load.
Args:
weights: the parameters of the model.
unload_part: the config of unloading parameters.
"""
ret_dict = {}
for key, value in weights.items():
flag = sum([p in key for p in unload_part]) > 0
if not flag:
ret_dict[key] = value
return ret_dict
def forward(self, imgs, labels=None, **kwargs):
""" Dynamic forward function of vision efficient tuning.
Args:
inputs: the input images (B, 3, H, W).
imgs: (B, 3, H, W).
labels: (B), when training stage.
"""
return self.forward_train(imgs, labels, **kwargs) \
if self.training else self.forward_test(imgs, labels, **kwargs)
backbone_output = self.backbone_model(inputs)
head_output = self.head_model(backbone_output)
return head_output
def forward_train(self, imgs, labels=None):
""" Dynamic forward function of training stage.
Args:
imgs: (B, 3, H, W).
labels: (B), when training stage.
"""
output = OrderedDict()
backbone_output = self.backbone(imgs)
head_output = self.head(backbone_output)
loss = self.loss(head_output, labels)
output = {OutputKeys.LOSS: loss}
return output
def forward_test(self, imgs, labels=None):
""" Dynamic forward function of testing stage.
Args:
imgs: (B, 3, H, W).
labels: (B), when training stage.
"""
output = OrderedDict()
backbone_output = self.backbone(imgs)
head_output = self.head(backbone_output)
scores = F.softmax(head_output, dim=1)
preds = scores.topk(1, 1, True, True)[-1].squeeze(-1)
output = {OutputKeys.SCORES: scores, OutputKeys.LABELS: preds}
return output

View File

@@ -314,7 +314,7 @@ class MsDataset:
def type_converter(self, x):
import torch
if self.to_tensor:
if self.to_tensor and not isinstance(x, torch.Tensor):
return torch.tensor(x)
else:
return x

View File

@@ -10,7 +10,7 @@ from modelscope.metainfo import Pipelines
from modelscope.outputs import OutputKeys
from modelscope.pipelines.base import Input, Pipeline
from modelscope.pipelines.builder import PIPELINES
from modelscope.preprocessors import LoadImage
from modelscope.preprocessors import LoadImage, Preprocessor
from modelscope.utils.constant import Tasks
from modelscope.utils.logger import get_logger
@@ -40,25 +40,55 @@ class VisionEfficientTuningPipeline(Pipeline):
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.model = self.model.to(self.device)
self.model.eval()
self.transform = transforms.Compose([
transforms.Resize(224),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
def preprocess(self, input: Input) -> Dict[str, Any]:
img = LoadImage.convert_to_img(input)
data = self.transform(img).unsqueeze(0).to(self.device)
return data
self.preprocessor = Preprocessor.from_pretrained(
self.model.model_dir, **kwargs)
def forward(self, input: Dict[str, Any]) -> Dict[str, Any]:
if self.preprocessor is None:
self.preprocessor = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop((224, 224)),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
def preprocess(self, inputs: Input, **preprocess_params) -> Dict[str, Any]:
""" Preprocess method build from transforms or Preprocessor """
in_key = 'img_path:FILE'
other_in_keys = ['image']
out_key = 'imgs'
if isinstance(self.preprocessor, Preprocessor):
if not isinstance(inputs, dict):
inputs = {in_key: inputs}
elif in_key not in inputs:
for ik in other_in_keys:
if ik in inputs and isinstance(inputs[ik], str):
inputs = {in_key: inputs[ik]}
break
data = self.preprocessor(inputs)
result = {out_key: data[out_key].unsqueeze(0).to(self.device)}
else:
if isinstance(inputs, dict):
for ik in [in_key] + other_in_keys:
if ik in inputs:
inputs = inputs[ik]
break
img = LoadImage.convert_to_img(inputs)
data = self.preprocessor(img)
result = {out_key: data.unsqueeze(0).to(self.device)}
return result
def forward(self, inputs: Dict[str, Any],
**forward_params) -> Dict[str, Any]:
with torch.no_grad():
results = self.model(input)
results = self.model(inputs)
return results
def postprocess(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
scores = F.softmax(inputs, dim=1).cpu().numpy()
def postprocess(self, inputs: Dict[str, Any],
**post_params) -> Dict[str, Any]:
""" Postprocess for classification """
scores = inputs[OutputKeys.SCORES].cpu().numpy()
pred_scores = np.sort(scores, axis=1)[0][::-1][:5]
pred_labels = np.argsort(scores, axis=1)[0][::-1][:5]

View File

@@ -7,6 +7,7 @@ from typing import Mapping
import numpy as np
import torch
from modelscope.utils.registry import default_group
from .builder import PREPROCESSORS, build_preprocessor
@@ -28,13 +29,14 @@ class Compose(object):
for transform in transforms:
if isinstance(transform, dict):
if self.field_name is None:
transform = build_preprocessor(transform, field_name)
transform = build_preprocessor(transform, default_group)
else:
# if not found key in field_name, try field_name=None(default_group)
try:
transform = build_preprocessor(transform, field_name)
except KeyError:
transform = build_preprocessor(transform, None)
transform = build_preprocessor(transform,
default_group)
elif callable(transform):
pass
else:
@@ -108,7 +110,8 @@ class ToTensor(object):
self.keys = list(data.keys())
for key in self.keys:
data[key] = to_tensor(data[key])
if key in data:
data[key] = to_tensor(data[key])
else:
data = to_tensor(data)
@@ -135,9 +138,93 @@ class Filter(object):
reserved_data = {}
for key in self.reserved_keys:
reserved_data[key] = data[key]
if key in data:
reserved_data[key] = data[key]
return reserved_data
def __repr__(self):
return self.__class__.__name__ + f'(keys={self.reserved_keys})'
def to_numpy(data):
"""Convert objects of various python types to `numpy.ndarray`.
Args:
data (torch.Tensor | numpy.ndarray | Sequence | int | float): Data to
be converted.
"""
if isinstance(data, torch.Tensor):
return data.numpy()
elif isinstance(data, np.ndarray):
return data
elif isinstance(data, Sequence) and not isinstance(data, str):
return np.asarray(data)
elif isinstance(data, int):
return np.asarray(data, dtype=np.int64)
elif isinstance(data, float):
return np.asarray(data, dtype=np.float64)
else:
raise TypeError(f'type {type(data)} cannot be converted to tensor.')
@PREPROCESSORS.register_module()
class ToNumpy(object):
"""Convert target object to numpy.ndarray.
Args:
keys (Sequence[str]): Key of data to be converted to numpy.ndarray.
Only valid when data is type of `Mapping`. If `keys` is None,
all values of keys will be converted to numpy.ndarray by default.
"""
def __init__(self, keys=None):
self.keys = keys
def __call__(self, data):
if isinstance(data, Mapping):
if self.keys is None:
self.keys = list(data.keys())
for key in self.keys:
if key in data:
data[key] = to_numpy(data[key])
else:
data = to_numpy(data)
return data
def __repr__(self):
return self.__class__.__name__ + f'(keys={self.keys})'
@PREPROCESSORS.register_module()
class Rename(object):
"""Change the name of the input keys to output keys, respectively.
"""
def __init__(self, input_keys=[], output_keys=[]):
self.input_keys = input_keys
self.output_keys = output_keys
def __call__(self, data):
if isinstance(data, Mapping):
for in_key, out_key in zip(self.input_keys, self.output_keys):
if in_key in data and out_key not in data:
data[out_key] = data[in_key]
data.pop(in_key)
return data
def __repr__(self):
return self.__class__.__name__ + f'(keys={self.keys})'
@PREPROCESSORS.register_module()
class Identity(object):
def __init__(self):
pass
def __call__(self, item):
return item

View File

@@ -12,6 +12,7 @@ if TYPE_CHECKING:
from .image_restoration_preprocessor import ImageRestorationPreprocessor
from .bad_image_detecting_preprocessor import BadImageDetectingPreprocessor
from .controllable_image_generation import ControllableImageGenerationPreprocessor
from .image_classification_preprocessor import ImageClassificationPreprocessor
else:
_import_structure = {
@@ -24,6 +25,8 @@ else:
'bad_image_detecting_preprocessor': ['BadImageDetectingPreprocessor'],
'controllable_image_generation':
['ControllableImageGenerationPreprocessor'],
'image_classification_preprocessor':
['ImageClassificationPreprocessor']
}
import sys

View File

@@ -0,0 +1,559 @@
# The implementation is adopted from opencv_transforms,
# made publicly available under the MIT license at
# https://github.com/jbohnslav/opencv_transforms/blob/master/opencv_transforms/functional.py
# https://github.com/jbohnslav/opencv_transforms/blob/master/opencv_transforms/transforms.py
import collections
import math
import numbers
import random
import cv2
import numpy as np
import torch
_cv2_pad_to_str = {
'constant': cv2.BORDER_CONSTANT,
'edge': cv2.BORDER_REPLICATE,
'reflect': cv2.BORDER_REFLECT_101,
'symmetric': cv2.BORDER_REFLECT
}
_cv2_interpolation_to_str = {
'nearest': cv2.INTER_NEAREST,
'bilinear': cv2.INTER_LINEAR,
'area': cv2.INTER_AREA,
'bicubic': cv2.INTER_CUBIC,
'lanczos': cv2.INTER_LANCZOS4
}
_cv2_interpolation_from_str = {
v: k
for k, v in _cv2_interpolation_to_str.items()
}
def _is_tensor_image(img):
return torch.is_tensor(img) and img.ndimension() == 3
def _is_numpy_image(img):
return isinstance(img, np.ndarray) and (img.ndim in {2, 3})
def to_tensor(pic):
"""Convert a ``PIL Image`` or ``numpy.ndarray`` to tensor.
See ``ToTensor`` for more details.
Args:
pic (PIL Image or numpy.ndarray): Image to be converted to tensor.
Returns:
Tensor: Converted image.
"""
if not (_is_numpy_image(pic)):
raise TypeError('pic should be ndarray. Got {}'.format(type(pic)))
# handle numpy array
img = torch.from_numpy(pic.transpose((2, 0, 1)))
# backward compatibility
if isinstance(img, torch.ByteTensor) or img.dtype == torch.uint8:
return img.float().div(255)
else:
return img
def normalize(tensor, mean, std):
"""Normalize a tensor image with mean and standard deviation.
.. note::
This transform acts in-place, i.e., it mutates the input tensor.
See :class:`~torchvision.transforms.Normalize` for more details.
Args:
tensor (Tensor): Tensor image of size (C, H, W) to be normalized.
mean (sequence): Sequence of means for each channel.
std (sequence): Sequence of standard deviations for each channely.
Returns:
Tensor: Normalized Tensor image.
"""
if not _is_tensor_image(tensor):
raise TypeError('tensor is not a torch image.')
# This is faster than using broadcasting, don't change without benchmarking
for t, m, s in zip(tensor, mean, std):
t.sub_(m).div_(s)
return tensor
def resize(img, size, interpolation=cv2.INTER_LINEAR):
r"""Resize the input numpy ndarray to the given size.
Args:
img (numpy ndarray): Image to be resized.
size (sequence or int): Desired output size. If size is a sequence like
(h, w), the output size will be matched to this. If size is an int,
the smaller edge of the image will be matched to this number maintaing
the aspect ratio. i.e, if height > width, then image will be rescaled to
:math:`\left(\text{size} \times \frac{\text{height}}{\text{width}}, \text{size}\right)`
interpolation (int, optional): Desired interpolation. Default is
``cv2.INTER_LINEAR``
Returns:
PIL Image: Resized image.
"""
if not _is_numpy_image(img):
raise TypeError('img should be numpy image. Got {}'.format(type(img)))
if not (isinstance(size, int) or # noqa: W504
(isinstance(size, collections.abc.Iterable) and len(size) == 2)):
raise TypeError('Got inappropriate size arg: {}'.format(size))
h, w = img.shape[0], img.shape[1]
if isinstance(size, int):
if (w <= h and w == size) or (h <= w and h == size):
return img
if w < h:
ow = size
oh = int(size * h / w)
else:
oh = size
ow = int(size * w / h)
else:
ow, oh = size[1], size[0]
output = cv2.resize(img, dsize=(ow, oh), interpolation=interpolation)
if img.shape[2] == 1:
return output[:, :, np.newaxis]
else:
return output
def pad(img, padding, fill=0, padding_mode='constant'):
r"""Pad the given numpy ndarray on all sides with specified padding mode and fill value.
Args:
img (numpy ndarray): image to be padded.
padding (int or tuple): Padding on each border. If a single int is provided this
is used to pad all borders. If tuple of length 2 is provided this is the padding
on left/right and top/bottom respectively. If a tuple of length 4 is provided
this is the padding for the left, top, right and bottom borders
respectively.
fill: Pixel fill value for constant fill. Default is 0. If a tuple of
length 3, it is used to fill R, G, B channels respectively.
This value is only used when the padding_mode is constant
padding_mode: Type of padding. Should be: constant, edge, reflect or symmetric. Default is constant.
- constant: pads with a constant value, this value is specified with fill
- edge: pads with the last value on the edge of the image
- reflect: pads with reflection of image (without repeating the last value on the edge)
padding [1, 2, 3, 4] with 2 elements on both sides in reflect mode
will result in [3, 2, 1, 2, 3, 4, 3, 2]
- symmetric: pads with reflection of image (repeating the last value on the edge)
padding [1, 2, 3, 4] with 2 elements on both sides in symmetric mode
will result in [2, 1, 1, 2, 3, 4, 4, 3]
Returns:
Numpy image: padded image.
"""
if not _is_numpy_image(img):
raise TypeError('img should be numpy ndarray. Got {}'.format(
type(img)))
if not isinstance(padding, (numbers.Number, tuple, list)):
raise TypeError('Got inappropriate padding arg')
if not isinstance(fill, (numbers.Number, str, tuple)):
raise TypeError('Got inappropriate fill arg')
if not isinstance(padding_mode, str):
raise TypeError('Got inappropriate padding_mode arg')
if isinstance(padding,
collections.Sequence) and len(padding) not in [2, 4]:
raise ValueError(
'Padding must be an int or a 2, or 4 element tuple, not a '
+ '{} element tuple'.format(len(padding)))
assert padding_mode in ['constant', 'edge', 'reflect', 'symmetric'], \
'Padding mode should be either constant, edge, reflect or symmetric'
if isinstance(padding, int):
pad_left = pad_right = pad_top = pad_bottom = padding
if isinstance(padding, collections.Sequence) and len(padding) == 2:
pad_left = pad_right = padding[0]
pad_top = pad_bottom = padding[1]
if isinstance(padding, collections.Sequence) and len(padding) == 4:
pad_left = padding[0]
pad_top = padding[1]
pad_right = padding[2]
pad_bottom = padding[3]
if img.shape[2] == 1:
return cv2.copyMakeBorder(
img,
top=pad_top,
bottom=pad_bottom,
left=pad_left,
right=pad_right,
borderType=_cv2_pad_to_str[padding_mode],
value=fill)[:, :, np.newaxis]
else:
return cv2.copyMakeBorder(
img,
top=pad_top,
bottom=pad_bottom,
left=pad_left,
right=pad_right,
borderType=_cv2_pad_to_str[padding_mode],
value=fill)
def crop(img, i, j, h, w):
"""Crop the given PIL Image.
Args:
img (numpy ndarray): Image to be cropped.
i: Upper pixel coordinate.
j: Left pixel coordinate.
h: Height of the cropped image.
w: Width of the cropped image.
Returns:
numpy ndarray: Cropped image.
"""
if not _is_numpy_image(img):
raise TypeError('img should be numpy image. Got {}'.format(type(img)))
return img[i:i + h, j:j + w, :]
def center_crop(img, output_size):
if isinstance(output_size, numbers.Number):
output_size = (int(output_size), int(output_size))
h, w = img.shape[0:2]
th, tw = output_size
i = int(round((h - th) / 2.))
j = int(round((w - tw) / 2.))
return crop(img, i, j, th, tw)
def resized_crop(img, i, j, h, w, size, interpolation=cv2.INTER_LINEAR):
"""Crop the given numpy ndarray and resize it to desired size.
Notably used in :class:`~torchvision.transforms.RandomResizedCrop`.
Args:
img (numpy ndarray): Image to be cropped.
i: Upper pixel coordinate.
j: Left pixel coordinate.
h: Height of the cropped image.
w: Width of the cropped image.
size (sequence or int): Desired output size. Same semantics as ``scale``.
interpolation (int, optional): Desired interpolation. Default is
``cv2.INTER_CUBIC``.
Returns:
PIL Image: Cropped image.
"""
assert _is_numpy_image(img), 'img should be numpy image'
img = crop(img, i, j, h, w)
img = resize(img, size, interpolation=interpolation)
return img
def hflip(img):
"""Horizontally flip the given numpy ndarray.
Args:
img (numpy ndarray): image to be flipped.
Returns:
numpy ndarray: Horizontally flipped image.
"""
if not _is_numpy_image(img):
raise TypeError('img should be numpy image. Got {}'.format(type(img)))
# img[:,::-1] is much faster, but doesn't work with torch.from_numpy()!
if img.shape[2] == 1:
return cv2.flip(img, 1)[:, :, np.newaxis]
else:
return cv2.flip(img, 1)
class ToTensor(object):
"""Convert a ``PIL Image`` or ``numpy.ndarray`` to tensor.
Converts a PIL Image or numpy.ndarray (H x W x C) in the range
[0, 255] to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0].
"""
def __call__(self, pic):
"""
Args:
pic (PIL Image or numpy.ndarray): Image to be converted to tensor.
Returns:
Tensor: Converted image.
"""
return to_tensor(pic)
def __repr__(self):
return self.__class__.__name__ + '()'
class Normalize(object):
"""Normalize a tensor image with mean and standard deviation.
Given mean: ``(M1,...,Mn)`` and std: ``(S1,..,Sn)`` for ``n`` channels, this transform
will normalize each channel of the input ``torch.*Tensor`` i.e.
``input[channel] = (input[channel] - mean[channel]) / std[channel]``
.. note::
This transform acts in-place, i.e., it mutates the input tensor.
Args:
mean (sequence): Sequence of means for each channel.
std (sequence): Sequence of standard deviations for each channel.
"""
def __init__(self, mean, std):
self.mean = mean
self.std = std
def __call__(self, tensor):
"""
Args:
tensor (Tensor): Tensor image of size (C, H, W) to be normalized.
Returns:
Tensor: Normalized Tensor image.
"""
return normalize(tensor, self.mean, self.std)
def __repr__(self):
return self.__class__.__name__ + '(mean={0}, std={1})'.format(
self.mean, self.std)
class Resize(object):
"""Resize the input numpy ndarray to the given size.
Args:
size (sequence or int): Desired output size. If size is a sequence like
(h, w), output size will be matched to this. If size is an int,
smaller edge of the image will be matched to this number.
i.e, if height > width, then image will be rescaled to
(size * height / width, size)
interpolation (int, optional): Desired interpolation. Default is
``cv2.INTER_CUBIC``, bicubic interpolation
"""
def __init__(self, size, interpolation=cv2.INTER_LINEAR):
# assert isinstance(size, int) or (isinstance(size, collections.Iterable) and len(size) == 2)
if isinstance(size, int):
self.size = size
elif isinstance(size, collections.abc.Iterable) and len(size) == 2:
if type(size) == list:
size = tuple(size)
self.size = size
else:
raise ValueError('Unknown inputs for size: {}'.format(size))
self.interpolation = interpolation
def __call__(self, img):
"""
Args:
img (numpy ndarray): Image to be scaled.
Returns:
numpy ndarray: Rescaled image.
"""
return resize(img, self.size, self.interpolation)
def __repr__(self):
interpolate_str = _cv2_interpolation_from_str[self.interpolation]
return self.__class__.__name__ + '(size={0}, interpolation={1})'.format(
self.size, interpolate_str)
class CenterCrop(object):
"""Crops the given numpy ndarray at the center.
Args:
size (sequence or int): Desired output size of the crop. If size is an
int instead of sequence like (h, w), a square crop (size, size) is
made.
"""
def __init__(self, size):
if isinstance(size, numbers.Number):
self.size = (int(size), int(size))
else:
self.size = size
def __call__(self, img):
"""
Args:
img (numpy ndarray): Image to be cropped.
Returns:
numpy ndarray: Cropped image.
"""
return center_crop(img, self.size)
def __repr__(self):
return self.__class__.__name__ + '(size={0})'.format(self.size)
class RandomCrop(object):
"""Crop the given numpy ndarray at a random location.
Args:
size (sequence or int): Desired output size of the crop. If size is an
int instead of sequence like (h, w), a square crop (size, size) is
made.
padding (int or sequence, optional): Optional padding on each border
of the image. Default is None, i.e no padding. If a sequence of length
4 is provided, it is used to pad left, top, right, bottom borders
respectively. If a sequence of length 2 is provided, it is used to
pad left/right, top/bottom borders, respectively.
pad_if_needed (boolean): It will pad the image if smaller than the
desired size to avoid raising an exception.
fill: Pixel fill value for constant fill. Default is 0. If a tuple of
length 3, it is used to fill R, G, B channels respectively.
This value is only used when the padding_mode is constant
padding_mode: Type of padding. Should be: constant, edge, reflect or symmetric. Default is constant.
- constant: pads with a constant value, this value is specified with fill
- edge: pads with the last value on the edge of the image
- reflect: pads with reflection of image (without repeating the last value on the edge)
padding [1, 2, 3, 4] with 2 elements on both sides in reflect mode
will result in [3, 2, 1, 2, 3, 4, 3, 2]
- symmetric: pads with reflection of image (repeating the last value on the edge)
padding [1, 2, 3, 4] with 2 elements on both sides in symmetric mode
will result in [2, 1, 1, 2, 3, 4, 4, 3]
"""
def __init__(self,
size,
padding=None,
pad_if_needed=False,
fill=0,
padding_mode='constant'):
if isinstance(size, numbers.Number):
self.size = (int(size), int(size))
else:
self.size = size
self.padding = padding
self.pad_if_needed = pad_if_needed
self.fill = fill
self.padding_mode = padding_mode
@staticmethod
def get_params(img, output_size):
"""Get parameters for ``crop`` for a random crop.
Args:
img (numpy ndarray): Image to be cropped.
output_size (tuple): Expected output size of the crop.
Returns:
tuple: params (i, j, h, w) to be passed to ``crop`` for random crop.
"""
h, w = img.shape[0:2]
th, tw = output_size
if w == tw and h == th:
return 0, 0, h, w
i = random.randint(0, h - th)
j = random.randint(0, w - tw)
return i, j, th, tw
def __call__(self, img):
"""
Args:
img (numpy ndarray): Image to be cropped.
Returns:
numpy ndarray: Cropped image.
"""
if self.padding is not None:
img = pad(img, self.padding, self.fill, self.padding_mode)
# pad the width if needed
if self.pad_if_needed and img.shape[1] < self.size[1]:
img = pad(img, (self.size[1] - img.shape[1], 0), self.fill,
self.padding_mode)
# pad the height if needed
if self.pad_if_needed and img.shape[0] < self.size[0]:
img = pad(img, (0, self.size[0] - img.shape[0]), self.fill,
self.padding_mode)
i, j, h, w = self.get_params(img, self.size)
return crop(img, i, j, h, w)
def __repr__(self):
return self.__class__.__name__ + '(size={0}, padding={1})'.format(
self.size, self.padding)
class RandomResizedCrop(object):
"""Crop the given numpy ndarray to random size and aspect ratio.
A crop of random size (default: of 0.08 to 1.0) of the original size and a random
aspect ratio (default: of 3/4 to 4/3) of the original aspect ratio is made. This crop
is finally resized to given size.
This is popularly used to train the Inception networks.
Args:
size: expected output size of each edge
scale: range of size of the origin size cropped
ratio: range of aspect ratio of the origin aspect ratio cropped
interpolation: Default: cv2.INTER_CUBIC
"""
def __init__(self,
size,
scale=(0.08, 1.0),
ratio=(3. / 4., 4. / 3.),
interpolation=cv2.INTER_LINEAR):
self.size = (size, size)
self.interpolation = interpolation
self.scale = scale
self.ratio = ratio
@staticmethod
def get_params(img, scale, ratio):
"""Get parameters for ``crop`` for a random sized crop.
Args:
img (numpy ndarray): Image to be cropped.
scale (tuple): range of size of the origin size cropped
ratio (tuple): range of aspect ratio of the origin aspect ratio cropped
Returns:
tuple: params (i, j, h, w) to be passed to ``crop`` for a random
sized crop.
"""
for attempt in range(10):
area = img.shape[0] * img.shape[1]
target_area = random.uniform(*scale) * area
aspect_ratio = random.uniform(*ratio)
w = int(round(math.sqrt(target_area * aspect_ratio)))
h = int(round(math.sqrt(target_area / aspect_ratio)))
if random.random() < 0.5:
w, h = h, w
if w <= img.shape[1] and h <= img.shape[0]:
i = random.randint(0, img.shape[0] - h)
j = random.randint(0, img.shape[1] - w)
return i, j, h, w
# Fallback
w = min(img.shape[0], img.shape[1])
i = (img.shape[0] - w) // 2
j = (img.shape[1] - w) // 2
return i, j, w, w
def __call__(self, img):
"""
Args:
img (numpy ndarray): Image to be cropped and resized.
Returns:
numpy ndarray: Randomly cropped and resized image.
"""
i, j, h, w = self.get_params(img, self.scale, self.ratio)
return resized_crop(img, i, j, h, w, self.size, self.interpolation)
def __repr__(self):
interpolate_str = _cv2_interpolation_from_str[self.interpolation]
format_string = self.__class__.__name__ + '(size={0}'.format(self.size)
format_string += ', scale={0}'.format(
tuple(round(s, 4) for s in self.scale))
format_string += ', ratio={0}'.format(
tuple(round(r, 4) for r in self.ratio))
format_string += ', interpolation={0})'.format(interpolate_str)
return format_string
class RandomHorizontalFlip(object):
"""Horizontally flip the given PIL Image randomly with a given probability.
Args:
p (float): probability of the image being flipped. Default value is 0.5
"""
def __init__(self, p=0.5):
self.p = p
def __call__(self, img):
"""random
Args:
img (numpy ndarray): Image to be flipped.
Returns:
numpy ndarray: Randomly flipped image.
"""
if random.random() < self.p:
return hflip(img)
return img
def __repr__(self):
return self.__class__.__name__ + '(p={})'.format(self.p)

View File

@@ -0,0 +1,340 @@
# Copyright 2022-2023 The Alibaba Fundamental Vision Team Authors. All rights reserved.
# The part implementation is also open-sourced by the authors,
# and available at https://github.com/alibaba/EssentialMC2
import os
from typing import Any, Dict
import cv2
import numpy as np
import torch
import torchvision.transforms as transforms
from PIL import Image
from torchvision.transforms.functional import InterpolationMode
import modelscope.preprocessors.cv.cv2_transforms as cv2_transforms
from modelscope.fileio import File
from modelscope.metainfo import Preprocessors
from modelscope.preprocessors.base import Preprocessor
from modelscope.preprocessors.builder import PREPROCESSORS, build_preprocessor
from modelscope.utils.constant import Fields, ModeKeys
from modelscope.utils.registry import default_group
BACKEND_TORCHVISION = 'torchvision'
BACKEND_PILLOW = 'pillow'
BACKEND_CV2 = 'cv2'
BACKENDS = (BACKEND_PILLOW, BACKEND_CV2, BACKEND_TORCHVISION)
INTERPOLATION_STYLE = {
'bilinear': InterpolationMode('bilinear'),
'nearest': InterpolationMode('nearest'),
'bicubic': InterpolationMode('bicubic'),
}
INTERPOLATION_STYLE_CV2 = {
'bilinear': cv2.INTER_LINEAR,
'nearest': cv2.INTER_NEAREST,
'bicubic': cv2.INTER_CUBIC,
}
def is_pil_image(img):
return isinstance(img, Image.Image)
def is_cv2_image(img):
return isinstance(img, np.ndarray) and img.dtype == np.uint8
def is_tensor(t):
return isinstance(t, torch.Tensor)
class ImageTransform(object):
def __init__(self,
backend=BACKEND_PILLOW,
input_key=None,
output_key=None):
self.input_key = input_key or 'img'
self.output_key = output_key or 'img'
self.backend = backend
def check_image_type(self, input_img):
if self.backend == BACKEND_PILLOW:
assert is_pil_image(input_img), 'input should be PIL Image'
elif self.backend == BACKEND_CV2:
assert is_cv2_image(
input_img), 'input should be cv2 image(uint8 np.ndarray)'
@PREPROCESSORS.register_module(Fields.cv)
class RandomCrop(ImageTransform):
""" Crop a random portion of image.
If the image is torch Tensor, it is expected to have [..., H, W] shape.
Args:
size (sequence or int): Desired output size.
If size is a sequence like (h, w), the output size will be matched to this.
If size is an int, the output size will be matched to (size, size).
padding (sequence or int): Optional padding on each border of the image. Default is None.
pad_if_needed (bool): It will pad the image if smaller than the desired size to avoid raising an exception.
fill (number or str or tuple): Pixel fill value for constant fill. Default is 0.
padding_mode (str): Type of padding. Should be: constant, edge, reflect or symmetric.
Default is constant.
"""
def __init__(self,
size,
padding=None,
pad_if_needed=False,
fill=0,
padding_mode='constant',
**kwargs):
super(RandomCrop, self).__init__(**kwargs)
assert self.backend in BACKENDS
if self.backend in (BACKEND_PILLOW, BACKEND_TORCHVISION):
self.callable = transforms.RandomCrop(
size,
padding=padding,
pad_if_needed=pad_if_needed,
fill=fill,
padding_mode=padding_mode)
else:
self.callable = cv2_transforms.RandomCrop(
size,
padding=padding,
pad_if_needed=pad_if_needed,
fill=fill,
padding_mode=padding_mode)
def __call__(self, item):
self.check_image_type(item[self.input_key])
item[self.output_key] = self.callable(item[self.input_key])
return item
@PREPROCESSORS.register_module(Fields.cv)
class RandomResizedCrop(ImageTransform):
"""Crop a random portion of image and resize it to a given size.
If the image is torch Tensor, it is expected to have [..., H, W] shape.
Args:
size (int or sequence): Desired output size.
If size is a sequence like (h, w), the output size will be matched to this.
If size is an int, the output size will be matched to (size, size).
scale (tuple of float): Specifies the lower and upper bounds for the random area of the crop,
before resizing. The scale is defined with respect to the area of the original image.
ratio (tuple of float): lower and upper bounds for the random aspect ratio of the crop, before
resizing.
interpolation (str): Desired interpolation string, 'bilinear', 'nearest', 'bicubic' are supported.
"""
def __init__(self,
size,
scale=(0.08, 1.0),
ratio=(3. / 4., 4. / 3.),
interpolation='bilinear',
**kwargs):
super(RandomResizedCrop, self).__init__(**kwargs)
assert self.backend in BACKENDS
self.interpolation = interpolation
if self.backend in (BACKEND_PILLOW, BACKEND_TORCHVISION):
assert interpolation in INTERPOLATION_STYLE
else:
assert interpolation in INTERPOLATION_STYLE_CV2
self.callable = transforms.RandomResizedCrop(size, scale, ratio, INTERPOLATION_STYLE[interpolation]) \
if self.backend in (BACKEND_PILLOW, BACKEND_TORCHVISION) \
else cv2_transforms.RandomResizedCrop(size, scale, ratio, INTERPOLATION_STYLE_CV2[interpolation])
def __call__(self, item):
self.check_image_type(item[self.input_key])
item[self.output_key] = self.callable(item[self.input_key])
return item
@PREPROCESSORS.register_module(Fields.cv)
class Resize(ImageTransform):
"""Resize image to a given size.
If the image is torch Tensor, it is expected to have [..., H, W] shape.
Args:
size (int or sequence): Desired output size.
If size is a sequence like (h, w), the output size will be matched to this.
If size is an int, the smaller edge of the image will be matched to this
number maintaining the aspect ratio.
interpolation (str): Desired interpolation string, 'bilinear', 'nearest', 'bicubic' are supported.
"""
def __init__(self, size, interpolation='bilinear', **kwargs):
super(Resize, self).__init__(**kwargs)
assert self.backend in BACKENDS
self.size = size
self.interpolation = interpolation
if self.backend in (BACKEND_PILLOW, BACKEND_TORCHVISION):
assert interpolation in INTERPOLATION_STYLE
else:
assert interpolation in INTERPOLATION_STYLE_CV2
self.callable = transforms.Resize(size, INTERPOLATION_STYLE[interpolation]) \
if self.backend in (BACKEND_PILLOW, BACKEND_TORCHVISION) \
else cv2_transforms.Resize(size, INTERPOLATION_STYLE_CV2[interpolation])
def __call__(self, item):
self.check_image_type(item[self.input_key])
item[self.output_key] = self.callable(item[self.input_key])
return item
@PREPROCESSORS.register_module(Fields.cv)
class CenterCrop(ImageTransform):
""" Crops the given image at the center.
If the image is torch Tensor, it is expected to have [..., H, W] shape.
Args:
size (sequence or int): Desired output size.
If size is a sequence like (h, w), the output size will be matched to this.
If size is an int, the output size will be matched to (size, size).
"""
def __init__(self, size, **kwargs):
super(CenterCrop, self).__init__(**kwargs)
assert self.backend in BACKENDS
self.size = size
self.callable = transforms.CenterCrop(size) \
if self.backend in (BACKEND_PILLOW, BACKEND_TORCHVISION) else cv2_transforms.CenterCrop(size)
def __call__(self, item):
self.check_image_type(item[self.input_key])
item[self.output_key] = self.callable(item[self.input_key])
return item
@PREPROCESSORS.register_module(Fields.cv)
class RandomHorizontalFlip(ImageTransform):
""" Horizontally flip the given image randomly with a given probability.
If the image is torch Tensor, it is expected to have [..., H, W] shape.
Args:
p (float): probability of the image being flipped. Default value is 0.5
"""
def __init__(self, p=0.5, **kwargs):
super(RandomHorizontalFlip, self).__init__(**kwargs)
assert self.backend in BACKENDS
self.callable = transforms.RandomHorizontalFlip(p) \
if self.backend in (BACKEND_PILLOW, BACKEND_TORCHVISION) else cv2_transforms.RandomHorizontalFlip(p)
def __call__(self, item):
self.check_image_type(item[self.input_key])
item[self.output_key] = self.callable(item[self.input_key])
return item
@PREPROCESSORS.register_module(Fields.cv)
class Normalize(ImageTransform):
""" Normalize a tensor image with mean and standard deviation.
This transform only support tensor image.
Args:
mean (sequence): Sequence of means for each channel.
std (sequence): Sequence of standard deviations for each channel.
"""
def __init__(self, mean, std, **kwargs):
super(Normalize, self).__init__(**kwargs)
assert self.backend in BACKENDS
self.mean = np.array(mean, dtype=np.float32)
self.std = np.array(std, dtype=np.float32)
self.callable = transforms.Normalize(self.mean, self.std) \
if self.backend in (BACKEND_PILLOW, BACKEND_TORCHVISION) else cv2_transforms.Normalize(self.mean, self.std)
def __call__(self, item):
item[self.output_key] = self.callable(item[self.input_key])
return item
@PREPROCESSORS.register_module(Fields.cv)
class ImageToTensor(ImageTransform):
""" Convert a ``PIL Image`` or ``numpy.ndarray`` or uint8 type tensor to a float32 tensor,
and scale output to [0.0, 1.0].
"""
def __init__(self, **kwargs):
super(ImageToTensor, self).__init__(**kwargs)
assert self.backend in BACKENDS
if self.backend == BACKEND_PILLOW:
self.callable = transforms.ToTensor()
elif self.backend == BACKEND_CV2:
self.callable = cv2_transforms.ToTensor()
else:
self.callable = transforms.ConvertImageDtype(torch.float)
def __call__(self, item):
item[self.output_key] = self.callable(item[self.input_key])
return item
def build_preprocess_pipeline(pipeline, group_name=Fields.cv):
if isinstance(pipeline, list):
if len(pipeline) == 0:
return build_preprocessor(
dict(type='Identity'), field_name=default_group)
elif len(pipeline) == 1:
return build_preprocess_pipeline(pipeline[0])
else:
return build_preprocessor(
dict(
type='Compose', transforms=pipeline,
field_name=group_name),
field_name=default_group)
elif isinstance(pipeline, dict):
return build_preprocessor(pipeline, field_name=group_name)
elif pipeline is None:
return build_preprocessor(
dict(type='Identity'), field_name=default_group)
else:
raise TypeError(
f'Expect pipeline_cfg to be dict or list or None, got {type(pipeline)}'
)
@PREPROCESSORS.register_module(
Fields.cv, module_name=Preprocessors.image_classification_preprocessor)
class ImageClassificationPreprocessor(Preprocessor):
def __init__(self, *args, **kwargs):
"""image classification preprocessor in the fine-tune scenario
"""
super().__init__(*args, **kwargs)
self.training = kwargs.pop('training', True)
self.preprocessor_train_cfg = kwargs.pop('train', None)
self.preprocessor_test_cfg = kwargs.pop('val', None)
if self.preprocessor_train_cfg is not None:
self.train_preprocess_pipeline = build_preprocess_pipeline(
self.preprocessor_train_cfg)
if self.preprocessor_test_cfg is not None:
self.test_preprocess_pipeline = build_preprocess_pipeline(
self.preprocessor_test_cfg)
def __call__(self, results: Dict[str, Any]):
"""process the raw input data
Args:
results (dict): Result dict from loading pipeline.
Returns:
Dict[str, Any] | None: the preprocessed data
"""
if self.mode == ModeKeys.TRAIN:
pipline = self.train_preprocess_pipeline
else:
pipline = self.test_preprocess_pipeline
return pipline(results)

View File

@@ -24,10 +24,12 @@ class LoadImage:
"scale_factor" (1.0) and "img_norm_cfg" (means=0 and stds=1).
Args:
mode (str): See :ref:`PIL.Mode<https://pillow.readthedocs.io/en/stable/handbook/concepts.html#modes>`.
backend (str): Type of loading image. Should be: cv2 or pillow. Default is pillow.
"""
def __init__(self, mode='rgb'):
def __init__(self, mode='rgb', backend='pillow'):
self.mode = mode.upper()
self.backend = backend
def __call__(self, input: Union[str, Dict[str, str]]):
"""Call functions to load image and get image meta information.
@@ -42,21 +44,38 @@ class LoadImage:
else:
image_path_or_url = input
bytes = File.read(image_path_or_url)
# TODO @wenmeng.zwm add opencv decode as optional
# we should also look at the input format which is the most commonly
# used in Mind' image related models
with io.BytesIO(bytes) as infile:
img = Image.open(infile)
img = ImageOps.exif_transpose(img)
img = img.convert(self.mode)
if self.backend == 'cv2':
storage = File._get_storage(image_path_or_url)
with storage.as_local_path(image_path_or_url) as img_path:
img = cv2.imread(img_path, cv2.IMREAD_COLOR)
if self.mode == 'RGB':
cv2.cvtColor(img, cv2.COLOR_BGR2RGB, img)
img_h, img_w, img_c = img.shape[0], img.shape[1], img.shape[2]
img_shape = (img_h, img_w, img_c)
elif self.backend == 'pillow':
bytes = File.read(image_path_or_url)
# TODO @wenmeng.zwm add opencv decode as optional
# we should also look at the input format which is the most commonly
# used in Mind' image related models
with io.BytesIO(bytes) as infile:
img = Image.open(infile)
img = ImageOps.exif_transpose(img)
img = img.convert(self.mode)
img_shape = (img.size[1], img.size[0], 3)
else:
raise TypeError(f'backend should be either cv2 or pillow,'
f'but got {self.backend}')
results = {
'filename': image_path_or_url,
'img': img,
'img_shape': (img.size[1], img.size[0], 3),
'img_shape': img_shape,
'img_field': 'img',
}
if isinstance(input, dict):
input_ret = input.copy()
input_ret.update(results)
results = input_ret
return results
def __repr__(self):

View File

@@ -13,6 +13,7 @@ if TYPE_CHECKING:
from .image_defrcn_fewshot_detection_trainer import ImageDefrcnFewshotTrainer
from .cartoon_translation_trainer import CartoonTranslationTrainer
from .nerf_recon_acc_trainer import NeRFReconAccTrainer
from .vision_efficient_tuning_trainer import VisionEfficientTuningTrainer
else:
_import_structure = {
@@ -28,6 +29,7 @@ else:
['ImageDefrcnFewshotTrainer'],
'cartoon_translation_trainer': ['CartoonTranslationTrainer'],
'nerf_recon_acc_trainer': ['NeRFReconAccTrainer'],
'vision_efficient_tuning_trainer': ['VisionEfficientTuningTrainer'],
}
import sys

View File

@@ -0,0 +1,114 @@
# Copyright 2022-2023 The Alibaba Fundamental Vision Team Authors. All rights reserved.
from typing import Union
from torch import nn
from modelscope.metainfo import Trainers
from modelscope.models.base import Model, TorchModel
from modelscope.trainers.builder import TRAINERS
from modelscope.trainers.default_config import merge_hooks
from modelscope.trainers.trainer import EpochBasedTrainer
from modelscope.utils.constant import ModeKeys
@TRAINERS.register_module(module_name=Trainers.vision_efficient_tuning)
class VisionEfficientTuningTrainer(EpochBasedTrainer):
""" Vision Efficient Tuning Trainer based on EpochBasedTrainer
The trainer freezes the parameters of the pre-trained model and
tunes the extra parameters of the different parameter-efficient
transfer learning (PETL) method.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def build_model(self) -> Union[nn.Module, TorchModel]:
""" Instantiate a pytorch model and return.
By default, we will create a model using config from configuration file. You can
override this method in a subclass.
"""
model = Model.from_pretrained(self.model_dir, cfg_dict=self.cfg)
if 'freeze_cfg' in self.cfg['model']:
model = self.freeze(model, **self.cfg['model']['freeze_cfg'])
if not isinstance(model, nn.Module) and hasattr(model, 'model'):
return model.model
elif isinstance(model, nn.Module):
return model
def train(self, *args, **kwargs):
self.print_model_params_status()
super().train(*args, **kwargs)
def evaluate(self, *args, **kwargs):
metric_values = super().evaluate(*args, **kwargs)
return metric_values
def freeze(self, model, freeze_part=[], train_part=[]):
""" Freeze or train the model based on the config.
Args:
model: the current model.
freeze_part: the config of frozen parameters.
train_part: the config of trainable parameters.
"""
if hasattr(model, 'module'):
freeze_model = model.module
else:
freeze_model = model
if freeze_part and len(freeze_part) > 0:
if 'backbone' in freeze_part:
part = freeze_part['backbone']
for name, param in freeze_model.model.backbone.named_parameters(
):
freeze_flag = sum([p in name for p in part]) > 0
if freeze_flag:
param.requires_grad = False
elif 'head' in freeze_part:
part = freeze_part['head']
for name, param in freeze_model.model.head.named_parameters():
freeze_flag = sum([p in name for p in part]) > 0
if freeze_flag:
param.requires_grad = False
if train_part and len(train_part) > 0:
if 'backbone' in train_part:
part = train_part['backbone']
for name, param in freeze_model.model.backbone.named_parameters(
):
freeze_flag = sum([p in name for p in part]) > 0
if freeze_flag:
param.requires_grad = True
elif 'head' in train_part:
part = train_part['head']
for name, param in freeze_model.model.head.named_parameters():
freeze_flag = sum([p in name for p in part]) > 0
if freeze_flag:
param.requires_grad = True
return model
def print_model_params_status(self, model=None, logger=None):
"""Print the status and parameters of the model"""
if model is None:
model = self.model
if logger is None:
logger = self.logger
train_param_dict = {}
all_param_numel = 0
for key, val in model.named_parameters():
if val.requires_grad:
sub_key = '.'.join(key.split('.', 1)[-1].split('.', 2)[:2])
if sub_key in train_param_dict:
train_param_dict[sub_key] += val.numel()
else:
train_param_dict[sub_key] = val.numel()
all_param_numel += val.numel()
train_param_numel = sum(train_param_dict.values())
logger.info(
f'Load trainable params {train_param_numel} / {all_param_numel} = '
f'{train_param_numel/all_param_numel:.2%}, '
f'train part: {train_param_dict}.')

View File

@@ -3,8 +3,6 @@
import re
import string
from zhconv import convert
CHINESE_PUNCTUATION = '"#$%&'()*+,-/:;<=>@[\]^_`{|}~⦅⦆「」、\u3000、〃〈〉《》「」『』【】〔〕〖〗〘〙〚〛〜〝〞〟〰〾〿–—‘’‛“”„‟…‧﹏﹑﹔·!?。。'
ENGLISH_PUNCTUATION = string.punctuation
@@ -58,6 +56,8 @@ def _is_chinese_char(cp: str) -> bool:
def normalize_chinese_number(text):
from zhconv import convert
chinese_number = ['', '', '', '', '', '', '', '', '', '']
new_text = ''
for x in text:

View File

@@ -30,6 +30,7 @@ TASKS_INPUT_TEMPLATES = {
Tasks.ocr_detection: TasksIODescriptions.image_to_text,
Tasks.ocr_recognition: TasksIODescriptions.image_to_text,
Tasks.body_2d_keypoints: TasksIODescriptions.image_to_text,
Tasks.vision_efficient_tuning: TasksIODescriptions.image_to_text,
# nlp tasks
Tasks.text_classification: TasksIODescriptions.text_to_text,

View File

@@ -0,0 +1,154 @@
# Copyright 2022-2023 The Alibaba Fundamental Vision Team Authors. All rights reserved.
import unittest
from modelscope.models import Model
from modelscope.models.cv.vision_efficient_tuning.model import \
VisionEfficientTuningModel
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.demo_utils import DemoCompatibilityCheck
from modelscope.utils.test_utils import test_level
class VisionEfficientTuningTest(unittest.TestCase, DemoCompatibilityCheck):
def setUp(self) -> None:
self.task = Tasks.vision_efficient_tuning
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_adapter_run_pipeline(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-adapter'
img_path = 'data/test/images/vision_efficient_tuning_test_1.png'
petl_pipeline = pipeline(self.task, model_id)
result = petl_pipeline(img_path)
print(f'Vision-efficient-tuning-adapter output: {result}.')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_vision_efficient_tuning_adapter_load_model_from_pretrained(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-adapter'
model = Model.from_pretrained(model_id)
self.assertTrue(model.__class__ == VisionEfficientTuningModel)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_adapter_demo_compatibility(self):
self.model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-adapter'
self.compatibility_check()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_lora_run_pipeline(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-lora'
img_path = 'data/test/images/vision_efficient_tuning_test_1.png'
petl_pipeline = pipeline(self.task, model_id)
result = petl_pipeline(img_path)
print(f'Vision-efficient-tuning-lora output: {result}.')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_vision_efficient_tuning_lora_load_model_from_pretrained(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-lora'
model = Model.from_pretrained(model_id)
self.assertTrue(model.__class__ == VisionEfficientTuningModel)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_lora_demo_compatibility(self):
self.model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-lora'
self.compatibility_check()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_prefix_run_pipeline(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-prefix'
img_path = 'data/test/images/vision_efficient_tuning_test_1.png'
petl_pipeline = pipeline(self.task, model_id)
result = petl_pipeline(img_path)
print(f'Vision-efficient-tuning-prefix output: {result}.')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_vision_efficient_tuning_prefix_load_model_from_pretrained(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-prefix'
model = Model.from_pretrained(model_id)
self.assertTrue(model.__class__ == VisionEfficientTuningModel)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_prefix_demo_compatibility(self):
self.model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-prefix'
self.compatibility_check()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_prompt_run_pipeline(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-prompt'
img_path = 'data/test/images/vision_efficient_tuning_test_1.png'
petl_pipeline = pipeline(self.task, model_id)
result = petl_pipeline(img_path)
print(f'Vision-efficient-tuning-prompt output: {result}.')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_vision_efficient_tuning_prompt_load_model_from_pretrained(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-prompt'
model = Model.from_pretrained(model_id)
self.assertTrue(model.__class__ == VisionEfficientTuningModel)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_prompt_demo_compatibility(self):
self.model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-prompt'
self.compatibility_check()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_bitfit_run_pipeline(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-bitfit'
img_path = 'data/test/images/vision_efficient_tuning_test_1.png'
petl_pipeline = pipeline(self.task, model_id)
result = petl_pipeline(img_path)
print(f'Vision-efficient-tuning-bitfit output: {result}.')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_vision_efficient_tuning_bitfit_load_model_from_pretrained(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-bitfit'
model = Model.from_pretrained(model_id)
self.assertTrue(model.__class__ == VisionEfficientTuningModel)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_bitfit_demo_compatibility(self):
self.model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-bitfit'
self.compatibility_check()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_sidetuning_run_pipeline(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-sidetuning'
img_path = 'data/test/images/vision_efficient_tuning_test_1.png'
petl_pipeline = pipeline(self.task, model_id)
result = petl_pipeline(img_path)
print(f'Vision-efficient-tuning-sidetuning output: {result}.')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_vision_efficient_tuning_sidetuning_load_model_from_pretrained(
self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-sidetuning'
model = Model.from_pretrained(model_id)
self.assertTrue(model.__class__ == VisionEfficientTuningModel)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_sidetuning_demo_compatibility(self):
self.model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-sidetuning'
self.compatibility_check()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_utuning_run_pipeline(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-utuning'
img_path = 'data/test/images/vision_efficient_tuning_test_1.png'
petl_pipeline = pipeline(self.task, model_id)
result = petl_pipeline(img_path)
print(f'Vision-efficient-tuning-utuning output: {result}.')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_vision_efficient_tuning_utuning_load_model_from_pretrained(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-utuning'
model = Model.from_pretrained(model_id)
self.assertTrue(model.__class__ == VisionEfficientTuningModel)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_utuning_demo_compatibility(self):
self.model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-utuning'
self.compatibility_check()
if __name__ == '__main__':
unittest.main()

View File

@@ -1,37 +0,0 @@
# Copyright 2022-2023 The Alibaba Fundamental Vision Team Authors. All rights reserved.
import unittest
from modelscope.models import Model
from modelscope.models.cv.vision_efficient_tuning.vision_efficient_tuning import \
VisionEfficientTuningModel
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.demo_utils import DemoCompatibilityCheck
from modelscope.utils.test_utils import test_level
class VisionEfficientTuningAdapterTest(unittest.TestCase,
DemoCompatibilityCheck):
def setUp(self) -> None:
self.task = Tasks.vision_efficient_tuning
self.model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-adapter'
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_pipeline(self):
petl_pipeline = pipeline(self.task, self.model_id)
result = petl_pipeline(
'data/test/images/vision_efficient_tuning_test_1.png')
print(f'Vision-efficient-tuning-adapter output: {result}.')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_load_model_from_pretrained(self):
model = Model.from_pretrained(
'damo/cv_vitb16_classification_vision-efficient-tuning-adapter')
self.assertTrue(model.__class__ == VisionEfficientTuningModel)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,36 +0,0 @@
# Copyright 2022-2023 The Alibaba Fundamental Vision Team Authors. All rights reserved.
import unittest
from modelscope.models import Model
from modelscope.models.cv.vision_efficient_tuning.vision_efficient_tuning import \
VisionEfficientTuningModel
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.demo_utils import DemoCompatibilityCheck
from modelscope.utils.test_utils import test_level
class VisionEfficientTuningLoRATest(unittest.TestCase, DemoCompatibilityCheck):
def setUp(self) -> None:
self.task = Tasks.vision_efficient_tuning
self.model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-lora'
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_pipeline(self):
petl_pipeline = pipeline(self.task, self.model_id)
result = petl_pipeline(
'data/test/images/vision_efficient_tuning_test_1.png')
print(f'Vision-efficient-tuning-lora output: {result}.')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_load_model_from_pretrained(self):
model = Model.from_pretrained(
'damo/cv_vitb16_classification_vision-efficient-tuning-lora')
self.assertTrue(model.__class__ == VisionEfficientTuningModel)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,37 +0,0 @@
# Copyright 2022-2023 The Alibaba Fundamental Vision Team Authors. All rights reserved.
import unittest
from modelscope.models import Model
from modelscope.models.cv.vision_efficient_tuning.vision_efficient_tuning import \
VisionEfficientTuningModel
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.demo_utils import DemoCompatibilityCheck
from modelscope.utils.test_utils import test_level
class VisionEfficientTuningPrefixTest(unittest.TestCase,
DemoCompatibilityCheck):
def setUp(self) -> None:
self.task = Tasks.vision_efficient_tuning
self.model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-prefix'
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_pipeline(self):
petl_pipeline = pipeline(self.task, self.model_id)
result = petl_pipeline(
'data/test/images/vision_efficient_tuning_test_1.png')
print(f'Vision-efficient-tuning-prefix output: {result}.')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_load_model_from_pretrained(self):
model = Model.from_pretrained(
'damo/cv_vitb16_classification_vision-efficient-tuning-prefix')
self.assertTrue(model.__class__ == VisionEfficientTuningModel)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,37 +0,0 @@
# Copyright 2022-2023 The Alibaba Fundamental Vision Team Authors. All rights reserved.
import unittest
from modelscope.models import Model
from modelscope.models.cv.vision_efficient_tuning.vision_efficient_tuning import \
VisionEfficientTuningModel
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.demo_utils import DemoCompatibilityCheck
from modelscope.utils.test_utils import test_level
class VisionEfficientTuningPromptTest(unittest.TestCase,
DemoCompatibilityCheck):
def setUp(self) -> None:
self.task = Tasks.vision_efficient_tuning
self.model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-prompt'
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_pipeline(self):
petl_pipeline = pipeline(self.task, self.model_id)
result = petl_pipeline(
'data/test/images/vision_efficient_tuning_test_1.png')
print(f'Vision-efficient-tuning-prompt output: {result}.')
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_load_model_from_pretrained(self):
model = Model.from_pretrained(
'damo/cv_vitb16_classification_vision-efficient-tuning-prompt')
self.assertTrue(model.__class__ == VisionEfficientTuningModel)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,355 @@
# Copyright 2022-2023 The Alibaba Fundamental Vision Team Authors. All rights reserved.
import os
import shutil
import tempfile
import unittest
from modelscope.metainfo import Trainers
from modelscope.msdatasets import MsDataset
from modelscope.trainers import build_trainer
from modelscope.utils.test_utils import test_level
class TestVisionEfficientTuningTrainer(unittest.TestCase):
def setUp(self):
print(('Testing %s.%s' % (type(self).__name__, self._testMethodName)))
self.train_dataset = MsDataset.load(
'foundation_model_evaluation_benchmark',
namespace='damo',
subset_name='OxfordFlowers',
split='train')
self.eval_dataset = MsDataset.load(
'foundation_model_evaluation_benchmark',
namespace='damo',
subset_name='OxfordFlowers',
split='eval')
self.max_epochs = 1
self.num_classes = 102
self.tune_length = 10
self.tmp_dir = tempfile.TemporaryDirectory().name
if not os.path.exists(self.tmp_dir):
os.makedirs(self.tmp_dir)
def tearDown(self):
shutil.rmtree(self.tmp_dir)
super().tearDown()
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_adapter_train(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-adapter'
def cfg_modify_fn(cfg):
cfg.model.head.num_classes = self.num_classes
cfg.model.finetune = True
cfg.train.max_epochs = self.max_epochs
cfg.train.lr_scheduler.T_max = self.max_epochs
cfg.model.backbone.adapter_length = self.tune_length
return cfg
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=self.train_dataset,
eval_dataset=self.eval_dataset,
cfg_modify_fn=cfg_modify_fn)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
trainer.train()
result = trainer.evaluate()
print(f'Vision-efficient-tuning-adapter train output: {result}.')
results_files = os.listdir(self.tmp_dir)
self.assertIn(f'{trainer.timestamp}.log.json', results_files)
for i in range(self.max_epochs):
self.assertIn(f'epoch_{i+1}.pth', results_files)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_adapter_eval(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-adapter'
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=None,
eval_dataset=self.eval_dataset)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
result = trainer.evaluate()
print(f'Vision-efficient-tuning-adapter eval output: {result}.')
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_lora_train(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-lora'
def cfg_modify_fn(cfg):
cfg.model.head.num_classes = self.num_classes
cfg.model.finetune = True
cfg.train.max_epochs = self.max_epochs
cfg.train.lr_scheduler.T_max = self.max_epochs
cfg.model.backbone.lora_length = self.tune_length
return cfg
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=self.train_dataset,
eval_dataset=self.eval_dataset,
cfg_modify_fn=cfg_modify_fn)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
trainer.train()
result = trainer.evaluate()
print(f'Vision-efficient-tuning-lora train output: {result}.')
results_files = os.listdir(self.tmp_dir)
self.assertIn(f'{trainer.timestamp}.log.json', results_files)
for i in range(self.max_epochs):
self.assertIn(f'epoch_{i+1}.pth', results_files)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_lora_eval(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-lora'
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=None,
eval_dataset=self.eval_dataset)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
result = trainer.evaluate()
print(f'Vision-efficient-tuning-lora eval output: {result}.')
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_prefix_train(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-prefix'
def cfg_modify_fn(cfg):
cfg.model.head.num_classes = self.num_classes
cfg.model.finetune = True
cfg.train.max_epochs = self.max_epochs
cfg.train.lr_scheduler.T_max = self.max_epochs
cfg.model.backbone.prefix_length = self.tune_length
return cfg
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=self.train_dataset,
eval_dataset=self.eval_dataset,
cfg_modify_fn=cfg_modify_fn)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
trainer.train()
result = trainer.evaluate()
print(f'Vision-efficient-tuning-prefix train output: {result}.')
results_files = os.listdir(self.tmp_dir)
self.assertIn(f'{trainer.timestamp}.log.json', results_files)
for i in range(self.max_epochs):
self.assertIn(f'epoch_{i+1}.pth', results_files)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_prefix_eval(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-prefix'
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=None,
eval_dataset=self.eval_dataset)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
result = trainer.evaluate()
print(f'Vision-efficient-tuning-prefix eval output: {result}.')
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_prompt_train(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-prompt'
def cfg_modify_fn(cfg):
cfg.model.head.num_classes = self.num_classes
cfg.model.finetune = True
cfg.train.max_epochs = self.max_epochs
cfg.train.lr_scheduler.T_max = self.max_epochs
cfg.model.backbone.prompt_length = self.tune_length
return cfg
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=self.train_dataset,
eval_dataset=self.eval_dataset,
cfg_modify_fn=cfg_modify_fn)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
trainer.train()
result = trainer.evaluate()
print(f'Vision-efficient-tuning-prompt train output: {result}.')
results_files = os.listdir(self.tmp_dir)
self.assertIn(f'{trainer.timestamp}.log.json', results_files)
for i in range(self.max_epochs):
self.assertIn(f'epoch_{i+1}.pth', results_files)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_prompt_eval(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-prompt'
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=None,
eval_dataset=self.eval_dataset)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
result = trainer.evaluate()
print(f'Vision-efficient-tuning-prompt eval output: {result}.')
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_bitfit_train(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-bitfit'
# model_id = '../modelcard/cv_vitb16_classification_vision-efficient-tuning-bitfit'
def cfg_modify_fn(cfg):
cfg.model.head.num_classes = self.num_classes
cfg.model.finetune = True
cfg.train.max_epochs = self.max_epochs
cfg.train.lr_scheduler.T_max = self.max_epochs
return cfg
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=self.train_dataset,
eval_dataset=self.eval_dataset,
cfg_modify_fn=cfg_modify_fn)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
trainer.train()
result = trainer.evaluate()
print(f'Vision-efficient-tuning-bitfit train output: {result}.')
results_files = os.listdir(self.tmp_dir)
self.assertIn(f'{trainer.timestamp}.log.json', results_files)
for i in range(self.max_epochs):
self.assertIn(f'epoch_{i+1}.pth', results_files)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_bitfit_eval(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-bitfit'
# model_id = '../modelcard/cv_vitb16_classification_vision-efficient-tuning-bitfit'
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=None,
eval_dataset=self.eval_dataset)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
result = trainer.evaluate()
print(f'Vision-efficient-tuning-bitfit eval output: {result}.')
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_sidetuning_train(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-sidetuning'
def cfg_modify_fn(cfg):
cfg.model.head.num_classes = self.num_classes
cfg.model.finetune = True
cfg.train.max_epochs = self.max_epochs
cfg.train.lr_scheduler.T_max = self.max_epochs
return cfg
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=self.train_dataset,
eval_dataset=self.eval_dataset,
cfg_modify_fn=cfg_modify_fn)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
trainer.train()
result = trainer.evaluate()
print(f'Vision-efficient-tuning-sidetuning train output: {result}.')
results_files = os.listdir(self.tmp_dir)
self.assertIn(f'{trainer.timestamp}.log.json', results_files)
for i in range(self.max_epochs):
self.assertIn(f'epoch_{i+1}.pth', results_files)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_sidetuning_eval(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-sidetuning'
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=None,
eval_dataset=self.eval_dataset)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
result = trainer.evaluate()
print(f'Vision-efficient-tuning-sidetuning eval output: {result}.')
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_utuning_train(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-utuning'
def cfg_modify_fn(cfg):
cfg.model.head.num_classes = self.num_classes
cfg.model.finetune = True
cfg.train.max_epochs = self.max_epochs
cfg.train.lr_scheduler.T_max = self.max_epochs
return cfg
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=self.train_dataset,
eval_dataset=self.eval_dataset,
cfg_modify_fn=cfg_modify_fn)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
trainer.train()
result = trainer.evaluate()
print(f'Vision-efficient-tuning-utuning train output: {result}.')
results_files = os.listdir(self.tmp_dir)
self.assertIn(f'{trainer.timestamp}.log.json', results_files)
for i in range(self.max_epochs):
self.assertIn(f'epoch_{i+1}.pth', results_files)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_vision_efficient_tuning_utuning_eval(self):
model_id = 'damo/cv_vitb16_classification_vision-efficient-tuning-utuning'
kwargs = dict(
model=model_id,
work_dir=self.tmp_dir,
train_dataset=None,
eval_dataset=self.eval_dataset)
trainer = build_trainer(
name=Trainers.vision_efficient_tuning, default_args=kwargs)
result = trainer.evaluate()
print(f'Vision-efficient-tuning-utuning eval output: {result}.')
if __name__ == '__main__':
unittest.main()