[to #42322933] add files

Link: https://code.alibaba-inc.com/Ali-MaaS/MaaS-lib/codereview/13158565
* [to #42322933] add files

* [to #42322933] add files

* [to #42322933] add files

* [to #42322933] add files

* [to #42322933] add files

* update test data

* [to #42322933] add files

* Merge remote-tracking branch 'origin' into feature/sal_try_on

* [to #42322933] add files

* Merge remote-tracking branch 'origin' into feature/sal_try_on
This commit is contained in:
tingwei.gtw
2023-07-24 10:16:29 +08:00
parent 13e345f6d9
commit d16522723a
11 changed files with 2529 additions and 0 deletions

View File

@@ -119,6 +119,7 @@ class Models(object):
longshortnet = 'longshortnet'
fastinst = 'fastinst'
pedestrian_attribute_recognition = 'pedestrian-attribute-recognition'
image_try_on = 'image-try-on'
# nlp models
bert = 'bert'
@@ -415,6 +416,7 @@ class Pipelines(object):
vision_efficient_tuning = 'vision-efficient-tuning'
image_bts_depth_estimation = 'image-bts-depth-estimation'
pedestrian_attribute_recognition = 'resnet50_pedestrian-attribute-recognition_image'
image_try_on = 'image-try-on'
# nlp tasks
automatic_post_editing = 'automatic-post-editing'
@@ -852,6 +854,8 @@ DEFAULT_MODEL_FOR_PIPELINE = {
Tasks.pedestrian_attribute_recognition: (
Pipelines.pedestrian_attribute_recognition,
'damo/cv_resnet50_pedestrian-attribute-recognition_image'),
Tasks.image_try_on: (Pipelines.image_try_on,
'damo/cv_SAL-VTON_virtual-try-on')
}

View File

@@ -0,0 +1,20 @@
# Copyright 2021-2022 The Alibaba Fundamental Vision Team Authors. All rights reserved.
from typing import TYPE_CHECKING
from modelscope.utils.import_utils import LazyImportModule
if TYPE_CHECKING:
from .try_on_infer import SALForImageTryOn
else:
_import_structure = {'try_on_infer': ['SALForImageTryOn']}
import sys
sys.modules[__name__] = LazyImportModule(
__name__,
globals()['__file__'],
_import_structure,
module_spec=__spec__,
extra_objects={},
)

View File

@@ -0,0 +1,474 @@
# The implementation here is modified based on spade,
# originally Apache 2.0 License and publicly avaialbe at https://github.com/NVlabs/SPADE
import functools
import os
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.parallel
from torchvision import models
class ResidualBlock(nn.Module):
def __init__(self, in_features=64, norm_layer=nn.BatchNorm2d):
super(ResidualBlock, self).__init__()
self.relu = nn.PReLU()
if norm_layer is None:
self.block = nn.Sequential(
nn.Conv2d(in_features, in_features, 3, 1, 1, bias=False),
nn.PReLU(),
nn.Conv2d(in_features, in_features, 3, 1, 1, bias=False),
)
else:
self.block = nn.Sequential(
nn.Conv2d(in_features, in_features, 3, 1, 1, bias=False),
norm_layer(in_features), nn.PReLU(),
nn.Conv2d(in_features, in_features, 3, 1, 1, bias=False),
norm_layer(in_features))
def forward(self, x):
residual = x
out = self.block(x)
out += residual
out = self.relu(out)
return out
# Defines the submodule with skip connection.
# X -------------------identity---------------------- X
# |-- downsampling -- |submodule| -- upsampling --|
class ResUnetSkipConnectionBlock(nn.Module):
def __init__(self,
outer_nc,
inner_nc,
input_nc=None,
submodule=None,
outermost=False,
innermost=False,
norm_layer=nn.BatchNorm2d,
use_dropout=False):
super(ResUnetSkipConnectionBlock, self).__init__()
self.outermost = outermost
use_bias = norm_layer == nn.InstanceNorm2d
if input_nc is None:
input_nc = outer_nc
downconv = nn.Conv2d(
input_nc,
inner_nc,
kernel_size=3,
stride=2,
padding=1,
bias=use_bias)
# add two resblock
res_downconv = [
ResidualBlock(inner_nc, norm_layer),
ResidualBlock(inner_nc, norm_layer),
ResidualBlock(inner_nc, norm_layer)
]
res_upconv = [
ResidualBlock(outer_nc, norm_layer),
ResidualBlock(outer_nc, norm_layer),
ResidualBlock(outer_nc, norm_layer)
]
downrelu = nn.PReLU()
uprelu = nn.PReLU()
if norm_layer is not None:
downnorm = norm_layer(inner_nc)
upnorm = norm_layer(outer_nc)
if outermost:
upsample = nn.Upsample(scale_factor=2, mode='nearest')
upconv = nn.Conv2d(
inner_nc * 2,
outer_nc,
kernel_size=3,
stride=1,
padding=1,
bias=use_bias)
down = [downconv, downrelu] + res_downconv
up = [upsample, upconv]
model = down + [submodule] + up
elif innermost:
upsample = nn.Upsample(scale_factor=2, mode='nearest')
upconv = nn.Conv2d(
inner_nc,
outer_nc,
kernel_size=3,
stride=1,
padding=1,
bias=use_bias)
down = [downconv, downrelu] + res_downconv
if norm_layer is None:
up = [upsample, upconv, uprelu] + res_upconv
else:
up = [upsample, upconv, upnorm, uprelu] + res_upconv
model = down + up
else:
upsample = nn.Upsample(scale_factor=2, mode='nearest')
upconv = nn.Conv2d(
inner_nc * 2,
outer_nc,
kernel_size=3,
stride=1,
padding=1,
bias=use_bias)
if norm_layer is None:
down = [downconv, downrelu] + res_downconv
up = [upsample, upconv, uprelu] + res_upconv
else:
down = [downconv, downnorm, downrelu] + res_downconv
up = [upsample, upconv, upnorm, uprelu] + res_upconv
if use_dropout:
model = down + [submodule] + up + [nn.Dropout(0.5)]
else:
model = down + [submodule] + up
self.model = nn.Sequential(*model)
def forward(self, x):
if self.outermost:
return self.model(x)
else:
return torch.cat([x, self.model(x)], 1)
class LandmarkNorm(nn.Module):
def __init__(self, param_free_norm_type, norm_nc, label_nc):
super().__init__()
if param_free_norm_type == 'instance':
self.param_free_norm = nn.InstanceNorm2d(norm_nc, affine=False)
elif param_free_norm_type == 'syncbatch':
self.param_free_norm = SynchronizedBatchNorm2d(
norm_nc, affine=False)
elif param_free_norm_type == 'batch':
self.param_free_norm = nn.BatchNorm2d(norm_nc, affine=False)
else:
raise ValueError(
'%s is not a recognized param-free norm type in LandmarkNorm'
% param_free_norm_type)
nhidden = 128
ks = 3
pw = ks // 2
self.mlp_shared = nn.Sequential(
nn.Conv2d(label_nc, nhidden, kernel_size=ks, padding=pw),
nn.ReLU())
self.mlp_gamma = nn.Conv2d(
nhidden, norm_nc, kernel_size=ks, padding=pw)
self.mlp_beta = nn.Conv2d(nhidden, norm_nc, kernel_size=ks, padding=pw)
def forward(self, x, segmap):
# Part 1. generate parameter-free normalized activations
normalized = self.param_free_norm(x)
# Part 2. produce scaling and bias conditioned on semantic map
segmap = F.interpolate(segmap, size=x.size()[2:], mode='nearest')
actv = self.mlp_shared(segmap)
gamma = self.mlp_gamma(actv)
beta = self.mlp_beta(actv)
# apply scale and bias
out = normalized * (1 + gamma) + beta
return out
class LandmarkNormResnetBlock(nn.Module):
def __init__(self, fin, fout):
super().__init__()
# Attributes
self.learned_shortcut = (fin != fout)
fmiddle = min(fin, fout)
# create conv layers
self.conv_0 = nn.Conv2d(fin, fmiddle, kernel_size=3, padding=1)
self.conv_1 = nn.Conv2d(fmiddle, fout, kernel_size=3, padding=1)
if self.learned_shortcut:
self.conv_s = nn.Conv2d(fin, fout, kernel_size=1, bias=False)
landmarknorm_config_str = 'batch'
semantic_nc = 32
self.norm_0 = LandmarkNorm(landmarknorm_config_str, fin, semantic_nc)
self.norm_1 = LandmarkNorm(landmarknorm_config_str, fmiddle,
semantic_nc)
if self.learned_shortcut:
self.norm_s = LandmarkNorm(landmarknorm_config_str, fin,
semantic_nc)
def forward(self, x, seg):
x_s = self.shortcut(x, seg)
dx = self.conv_0(self.actvn(self.norm_0(x, seg)))
dx = self.conv_1(self.actvn(self.norm_1(dx, seg)))
out = x_s + dx
return out
def shortcut(self, x, seg):
if self.learned_shortcut:
x_s = self.conv_s(self.norm_s(x, seg))
else:
x_s = x
return x_s
def actvn(self, x):
return F.leaky_relu(x, 2e-1)
class VTONGenerator(nn.Module):
""" initialize the try on generator model
"""
def __init__(self,
input_nc,
output_nc,
num_downs,
ngf=64,
norm_layer=nn.BatchNorm2d,
use_dropout=False):
super(VTONGenerator, self).__init__()
use_bias = norm_layer == nn.InstanceNorm2d
ngf_list = [ngf * 1, ngf * 2, ngf * 4, ngf * 8, ngf * 8]
self.num_downs = num_downs
self.Encoder = []
self.Decoder = []
self.LMnorm = []
for i in range(num_downs):
# Encoder
if i == 0:
in_nc = input_nc
inner_nc = ngf_list[i]
else:
in_nc, inner_nc = ngf_list[i - 1], ngf_list[i]
downconv = nn.Conv2d(
in_nc,
inner_nc,
kernel_size=3,
stride=2,
padding=1,
bias=use_bias)
downnorm = norm_layer(inner_nc)
downrelu = nn.PReLU()
res_downconv = [
ResidualBlock(inner_nc, norm_layer),
ResidualBlock(inner_nc, norm_layer),
ResidualBlock(inner_nc, norm_layer)
]
# Decoder
if i == (num_downs - 1):
outer_nc = ngf // 2
inner_nc = 2 * ngf_list[0]
elif i == 0:
inner_nc, outer_nc = ngf_list[num_downs - i
- 1], ngf_list[num_downs - i - 1]
else:
inner_nc, outer_nc = 2 * ngf_list[num_downs - i
- 1], ngf_list[num_downs - i
- 2]
upsample = nn.Upsample(scale_factor=2, mode='nearest')
upconv = nn.Conv2d(
inner_nc,
outer_nc,
kernel_size=3,
stride=1,
padding=1,
bias=use_bias)
upnorm = norm_layer(outer_nc)
uprelu = nn.PReLU()
res_upconv = [
ResidualBlock(outer_nc, norm_layer),
ResidualBlock(outer_nc, norm_layer),
ResidualBlock(outer_nc, norm_layer)
]
if i == 0:
encoderLayer = [downconv, downrelu] + res_downconv
decoderLayer = [upsample, upconv, upnorm, uprelu] + res_upconv
elif i == (num_downs - 1):
encoderLayer = [downconv, downrelu] + res_downconv
decoderLayer = [upsample, upconv]
else:
encoderLayer = [downconv, downnorm, downrelu] + res_downconv
decoderLayer = [upsample, upconv, upnorm, uprelu] + res_upconv
encoderLayer = nn.Sequential(*encoderLayer)
decoderLayer = nn.Sequential(*decoderLayer)
self.Encoder.append(encoderLayer)
self.Decoder.append(decoderLayer)
LMnorm = LandmarkNormResnetBlock(outer_nc, outer_nc)
self.LMnorm.append(LMnorm)
self.Encoder = nn.ModuleList(self.Encoder)
self.Decoder = nn.ModuleList(self.Decoder)
self.LMnorm = nn.ModuleList(self.LMnorm)
self.conv_img = nn.Conv2d(ngf // 2, 3, kernel_size=3, padding=1)
self.act = nn.PReLU()
self.tanh = nn.Tanh()
def forward(self, inputs, p_point_heatmap):
en_fea = []
x = inputs
for i in range(self.num_downs):
x = self.Encoder[i](x)
if i < (self.num_downs - 1):
en_fea.append(x)
for i in range(self.num_downs):
if i != 0:
x = torch.cat([en_fea[-i], x], 1)
x = self.Decoder[i](x)
x = self.LMnorm[i](x, p_point_heatmap)
x = self.conv_img(self.act(x))
x = self.tanh(x)
return x
class ResUnetGenerator(nn.Module):
def __init__(self,
input_nc,
output_nc,
num_downs,
ngf=64,
norm_layer=nn.BatchNorm2d,
use_dropout=False):
super(ResUnetGenerator, self).__init__()
# construct unet structure
unet_block = ResUnetSkipConnectionBlock(
ngf * 8,
ngf * 8,
input_nc=None,
submodule=None,
norm_layer=norm_layer,
innermost=True)
for i in range(num_downs - 5):
unet_block = ResUnetSkipConnectionBlock(
ngf * 8,
ngf * 8,
input_nc=None,
submodule=unet_block,
norm_layer=norm_layer,
use_dropout=use_dropout)
unet_block = ResUnetSkipConnectionBlock(
ngf * 4,
ngf * 8,
input_nc=None,
submodule=unet_block,
norm_layer=norm_layer)
unet_block = ResUnetSkipConnectionBlock(
ngf * 2,
ngf * 4,
input_nc=None,
submodule=unet_block,
norm_layer=norm_layer)
unet_block = ResUnetSkipConnectionBlock(
ngf,
ngf * 2,
input_nc=None,
submodule=unet_block,
norm_layer=norm_layer)
unet_block = ResUnetSkipConnectionBlock(
output_nc,
ngf,
input_nc=input_nc,
submodule=unet_block,
outermost=True,
norm_layer=norm_layer)
self.model = unet_block
def forward(self, input):
return self.model(input)
class Vgg19(nn.Module):
def __init__(self, requires_grad=False):
super(Vgg19, self).__init__()
vgg_pretrained_features = models.vgg19(pretrained=False)
# for torchvision >= 0.4.0 or torch >= 1.2.0
for x in vgg_pretrained_features.modules():
if isinstance(x, nn.MaxPool2d) or isinstance(
x, nn.AdaptiveAvgPool2d):
x.ceil_mode = True
vgg_pretrained_features.load_state_dict(torch.load(vgg_path))
vgg_pretrained_features = vgg_pretrained_features.features
self.slice1 = nn.Sequential()
self.slice2 = nn.Sequential()
self.slice3 = nn.Sequential()
self.slice4 = nn.Sequential()
self.slice5 = nn.Sequential()
for x in range(2):
self.slice1.add_module(str(x), vgg_pretrained_features[x])
for x in range(2, 7):
self.slice2.add_module(str(x), vgg_pretrained_features[x])
for x in range(7, 12):
self.slice3.add_module(str(x), vgg_pretrained_features[x])
for x in range(12, 21):
self.slice4.add_module(str(x), vgg_pretrained_features[x])
for x in range(21, 30):
self.slice5.add_module(str(x), vgg_pretrained_features[x])
if not requires_grad:
for param in self.parameters():
param.requires_grad = False
def forward(self, X):
h_relu1 = self.slice1(X)
h_relu2 = self.slice2(h_relu1)
h_relu3 = self.slice3(h_relu2)
h_relu4 = self.slice4(h_relu3)
h_relu5 = self.slice5(h_relu4)
out = [h_relu1, h_relu2, h_relu3, h_relu4, h_relu5]
return out
class VGGLoss(nn.Module):
def __init__(self, layids=None):
super(VGGLoss, self).__init__()
self.vgg = Vgg19()
self.vgg.cuda()
self.criterion = nn.L1Loss()
self.weights = [1.0 / 32, 1.0 / 16, 1.0 / 8, 1.0 / 4, 1.0]
self.layids = layids
def forward(self, x, y):
x_vgg, y_vgg = self.vgg(x), self.vgg(y)
loss = 0
if self.layids is None:
self.layids = list(range(len(x_vgg)))
for i in self.layids:
loss += self.weights[i] * self.criterion(x_vgg[i],
y_vgg[i].detach())
return loss
def load_checkpoint_parallel(model, checkpoint_path):
checkpoint = torch.load(
checkpoint_path, map_location=lambda storage, loc: storage)
checkpoint_new = model.state_dict()
for param in checkpoint_new:
checkpoint_new[param] = checkpoint[param]
model.load_state_dict(checkpoint_new)

View File

@@ -0,0 +1,431 @@
# The implementation here is modified based on hrnet,
# originally Apache 2.0 License and publicly avaialbe at https://github.com/HRNet/HigherHRNet-Human-Pose-Estimation
import logging
import os
import torch
import torch.nn as nn
from torch.nn import functional as F
from modelscope.models.cv.body_2d_keypoints.hrnet_basic_modules import (
BasicBlock, Bottleneck, HighResolutionModule, conv3x3)
BN_MOMENTUM = 0.1
logger = logging.getLogger(__name__)
blocks_dict = {'BASIC': BasicBlock, 'BOTTLENECK': Bottleneck}
class DownSample(nn.Module):
def __init__(self, in_channels, out_channels):
super(DownSample, self).__init__()
self.block = nn.Sequential(
nn.Conv2d(
in_channels,
out_channels,
kernel_size=3,
stride=2,
padding=1,
bias=False), nn.BatchNorm2d(in_channels), nn.PReLU())
def forward(self, x):
return self.block(x)
class LandmarkNet(nn.Module):
def __init__(self, cfg, in_channel=3, class_num=3, **kwargs):
self.inplanes = 64
extra = cfg['MODEL']['EXTRA']
super(LandmarkNet, self).__init__()
# stem net
self.conv1 = nn.Conv2d(
in_channel, 64, kernel_size=3, stride=2, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64, momentum=BN_MOMENTUM)
self.conv2 = nn.Conv2d(
64, 64, kernel_size=3, stride=2, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(64, momentum=BN_MOMENTUM)
self.relu = nn.ReLU(inplace=True)
self.layer1 = self._make_layer(Bottleneck, 64, 4)
self.stage2_cfg = extra['STAGE2']
num_channels = self.stage2_cfg['NUM_CHANNELS']
block = blocks_dict[self.stage2_cfg['BLOCK']]
num_channels = [
num_channels[i] * block.expansion
for i in range(len(num_channels))
]
self.transition1 = self._make_transition_layer([256], num_channels)
self.stage2, pre_stage_channels = self._make_stage(
self.stage2_cfg, num_channels)
self.stage3_cfg = extra['STAGE3']
num_channels = self.stage3_cfg['NUM_CHANNELS']
block = blocks_dict[self.stage3_cfg['BLOCK']]
num_channels = [
num_channels[i] * block.expansion
for i in range(len(num_channels))
]
self.transition2 = self._make_transition_layer(pre_stage_channels,
num_channels)
self.stage3, pre_stage_channels = self._make_stage(
self.stage3_cfg, num_channels)
self.stage4_cfg = extra['STAGE4']
num_channels = self.stage4_cfg['NUM_CHANNELS']
block = blocks_dict[self.stage4_cfg['BLOCK']]
num_channels = [
num_channels[i] * block.expansion
for i in range(len(num_channels))
]
self.transition3 = self._make_transition_layer(pre_stage_channels,
num_channels)
self.stage4, pre_stage_channels = self._make_stage(
self.stage4_cfg, num_channels, multi_scale_output=True)
self.final_layer = nn.Conv2d(
in_channels=pre_stage_channels[0],
out_channels=cfg['MODEL']['NUM_JOINTS'],
kernel_size=extra['FINAL_CONV_KERNEL'],
stride=1,
padding=1 if extra['FINAL_CONV_KERNEL'] == 3 else 0)
self.pretrained_layers = extra['PRETRAINED_LAYERS']
self.active_func = nn.Sigmoid()
self.downsample = nn.Sequential(
DownSample(384, 384), DownSample(384, 384),
nn.AdaptiveAvgPool2d((1, class_num)))
self.property_conv = nn.Sequential(
nn.Conv2d(
384, out_channels=192, kernel_size=1, stride=1, padding=0),
nn.Conv2d(
192, out_channels=32, kernel_size=1, stride=1, padding=0))
def _make_transition_layer(self, num_channels_pre_layer,
num_channels_cur_layer):
num_branches_cur = len(num_channels_cur_layer)
num_branches_pre = len(num_channels_pre_layer)
transition_layers = []
for i in range(num_branches_cur):
if i < num_branches_pre:
if num_channels_cur_layer[i] != num_channels_pre_layer[i]:
transition_layers.append(
nn.Sequential(
nn.Conv2d(
num_channels_pre_layer[i],
num_channels_cur_layer[i],
3,
1,
1,
bias=False),
nn.BatchNorm2d(num_channels_cur_layer[i]),
nn.ReLU(inplace=True)))
else:
transition_layers.append(None)
else:
conv3x3s = []
for j in range(i + 1 - num_branches_pre):
inchannels = num_channels_pre_layer[-1]
outchannels = num_channels_cur_layer[i] \
if j == i - num_branches_pre else inchannels
conv3x3s.append(
nn.Sequential(
nn.Conv2d(
inchannels, outchannels, 3, 2, 1, bias=False),
nn.BatchNorm2d(outchannels),
nn.ReLU(inplace=True)))
transition_layers.append(nn.Sequential(*conv3x3s))
return nn.ModuleList(transition_layers)
def _make_layer(self, block, planes, blocks, stride=1):
downsample = None
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(
self.inplanes,
planes * block.expansion,
kernel_size=1,
stride=stride,
bias=False),
nn.BatchNorm2d(planes * block.expansion, momentum=BN_MOMENTUM),
)
layers = []
layers.append(block(self.inplanes, planes, stride, downsample))
self.inplanes = planes * block.expansion
for i in range(1, blocks):
layers.append(block(self.inplanes, planes))
return nn.Sequential(*layers)
def _make_stage(self,
layer_config,
num_inchannels,
multi_scale_output=True):
num_modules = layer_config['NUM_MODULES']
num_branches = layer_config['NUM_BRANCHES']
num_blocks = layer_config['NUM_BLOCKS']
num_channels = layer_config['NUM_CHANNELS']
block = blocks_dict[layer_config['BLOCK']]
fuse_method = layer_config['FUSE_METHOD']
modules = []
for i in range(num_modules):
# multi_scale_output is only used last module
if not multi_scale_output and i == num_modules - 1:
reset_multi_scale_output = False
else:
reset_multi_scale_output = True
modules.append(
HighResolutionModule(num_branches, block, num_blocks,
num_inchannels, num_channels, fuse_method,
reset_multi_scale_output))
num_inchannels = modules[-1].get_num_inchannels()
return nn.Sequential(*modules), num_inchannels
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x)
x = self.relu(x)
x = self.layer1(x)
x_list = []
for i in range(self.stage2_cfg['NUM_BRANCHES']):
if self.transition1[i] is not None:
x_list.append(self.transition1[i](x))
else:
x_list.append(x)
y_list = self.stage2(x_list)
x_list = []
for i in range(self.stage3_cfg['NUM_BRANCHES']):
if self.transition2[i] is not None:
x_list.append(self.transition2[i](y_list[-1]))
else:
x_list.append(y_list[i])
y_list = self.stage3(x_list)
x_list = []
for i in range(self.stage4_cfg['NUM_BRANCHES']):
if self.transition3[i] is not None:
x_list.append(self.transition3[i](y_list[-1]))
else:
x_list.append(y_list[i])
y_list = self.stage4(x_list)
property_x = y_list[3]
x = self.final_layer(y_list[0])
x = self.active_func(x)
property_x = self.downsample(property_x)
property_x = torch.squeeze(self.property_conv(property_x),
2).permute(0, 2, 1)
return x, property_x
def init_weights(self, pretrained=''):
logger.info('=> init weights from normal distribution')
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.normal_(m.weight, std=0.001)
for name, _ in m.named_parameters():
if name in ['bias']:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.ConvTranspose2d):
nn.init.normal_(m.weight, std=0.001)
for name, _ in m.named_parameters():
if name in ['bias']:
nn.init.constant_(m.bias, 0)
if os.path.isfile(pretrained):
pretrained_state_dict = torch.load(pretrained)
logger.info('=> loading pretrained model {}'.format(pretrained))
need_init_state_dict = {}
for name, m in pretrained_state_dict.items():
if name.split('.')[0] in self.pretrained_layers \
or self.pretrained_layers[0] == '*':
need_init_state_dict[name] = m
self.load_state_dict(need_init_state_dict, strict=False)
elif pretrained:
logger.error('=> please download pre-trained models first!')
raise ValueError('{} is not exist!'.format(pretrained))
class VTONLandmark(nn.Module):
"""initialize the try on landmark model
"""
def __init__(self, **kwargs):
super(VTONLandmark, self).__init__()
cfg = {
'AUTO_RESUME': True,
'CUDNN': {
'BENCHMARK': True,
'DETERMINISTIC': False,
'ENABLED': True
},
'DATA_DIR': '',
'GPUS': '(0,1,2,3)',
'OUTPUT_DIR': 'output',
'LOG_DIR': 'log',
'WORKERS': 24,
'PRINT_FREQ': 100,
'DATASET': {
'COLOR_RGB': True,
'DATASET': 'mpii',
'DATA_FORMAT': 'jpg',
'FLIP': True,
'NUM_JOINTS_HALF_BODY': 8,
'PROB_HALF_BODY': -1.0,
'ROOT': 'data/mpii/',
'ROT_FACTOR': 30,
'SCALE_FACTOR': 0.25,
'TEST_SET': 'valid',
'TRAIN_SET': 'train'
},
'MODEL': {
'INIT_WEIGHTS': True,
'NAME': 'pose_hrnet',
'NUM_JOINTS': 32,
'PRETRAINED': 'models/pytorch/imagenet/hrnet_w48-8ef0771d.pth',
'TARGET_TYPE': 'gaussian',
'IMAGE_SIZE': [256, 256],
'HEATMAP_SIZE': [64, 64],
'SIGMA': 2,
'EXTRA': {
'PRETRAINED_LAYERS': [
'conv1', 'bn1', 'conv2', 'bn2', 'layer1',
'transition1', 'stage2', 'transition2', 'stage3',
'transition3', 'stage4'
],
'FINAL_CONV_KERNEL':
1,
'STAGE2': {
'NUM_MODULES': 1,
'NUM_BRANCHES': 2,
'BLOCK': 'BASIC',
'NUM_BLOCKS': [4, 4],
'NUM_CHANNELS': [48, 96],
'FUSE_METHOD': 'SUM'
},
'STAGE3': {
'NUM_MODULES': 4,
'NUM_BRANCHES': 3,
'BLOCK': 'BASIC',
'NUM_BLOCKS': [4, 4, 4],
'NUM_CHANNELS': [48, 96, 192],
'FUSE_METHOD': 'SUM'
},
'STAGE4': {
'NUM_MODULES': 3,
'NUM_BRANCHES': 4,
'BLOCK': 'BASIC',
'NUM_BLOCKS': [4, 4, 4, 4],
'NUM_CHANNELS': [48, 96, 192, 384],
'FUSE_METHOD': 'SUM'
}
}
},
'LOSS': {
'USE_TARGET_WEIGHT': True
},
'TRAIN': {
'BATCH_SIZE_PER_GPU': 32,
'SHUFFLE': True,
'BEGIN_EPOCH': 0,
'END_EPOCH': 210,
'OPTIMIZER': 'adam',
'LR': 0.001,
'LR_FACTOR': 0.1,
'LR_STEP': [170, 200],
'WD': 0.0001,
'GAMMA1': 0.99,
'GAMMA2': 0.0,
'MOMENTUM': 0.9,
'NESTEROV': False
},
'TEST': {
'BATCH_SIZE_PER_GPU': 32,
'MODEL_FILE': '',
'FLIP_TEST': True,
'POST_PROCESS': True,
'SHIFT_HEATMAP': True
},
'DEBUG': {
'DEBUG': True,
'SAVE_BATCH_IMAGES_GT': True,
'SAVE_BATCH_IMAGES_PRED': True,
'SAVE_HEATMAPS_GT': True,
'SAVE_HEATMAPS_PRED': True
}
}
# stem net
self.stage1Net = LandmarkNet(cfg, in_channel=3, class_num=2)
self.stage2Net = LandmarkNet(cfg, in_channel=38)
self.stage = 2
def forward(self, cloth, person):
c_landmark, c_property = self.stage1Net(cloth)
if self.stage == 2:
pred_class = torch.argmax(c_property, dim=1)
c_heatmap = F.upsample(
c_landmark,
scale_factor=4,
mode='bilinear',
align_corners=True)
c_heatmap = c_heatmap * pred_class.unsqueeze(2).unsqueeze(2)
input2 = torch.cat([person, cloth, c_heatmap], 1)
p_landmark, p_property = self.stage2Net(input2)
return c_landmark, c_property, p_landmark, p_property
else:
return c_landmark, c_property
def init_weights(self, pretrained=''):
logger.info('=> init weights from normal distribution')
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.normal_(m.weight, std=0.001)
for name, _ in m.named_parameters():
if name in ['bias']:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.ConvTranspose2d):
nn.init.normal_(m.weight, std=0.001)
for name, _ in m.named_parameters():
if name in ['bias']:
nn.init.constant_(m.bias, 0)
if os.path.isfile(pretrained):
pretrained_state_dict = torch.load(pretrained)
logger.info('=> loading pretrained model {}'.format(pretrained))
need_init_state_dict = {}
for name, m in pretrained_state_dict.items():
if name.split('.')[0] in self.pretrained_layers \
or self.pretrained_layers[0] == '*':
need_init_state_dict[name] = m
self.load_state_dict(need_init_state_dict, strict=False)
elif pretrained:
logger.error('=> please download pre-trained models first!')
raise ValueError('{} is not exist!'.format(pretrained))

View File

@@ -0,0 +1,222 @@
# Copyright 2021-2022 The Alibaba Fundamental Vision Team Authors. All rights reserved.
import argparse
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import yaml
from PIL import Image
from torch.nn import functional as F
from modelscope.metainfo import Models
from modelscope.models.base import TorchModel
from modelscope.models.builder import MODELS
from modelscope.utils.constant import ModelFile, Tasks
from modelscope.utils.logger import get_logger
from .generator import VTONGenerator
from .landmark import VTONLandmark
from .warping import Warping
logger = get_logger()
def load_checkpoint(model, checkpoint_path, device):
params = torch.load(checkpoint_path, map_location=device)
model.load_state_dict(params, strict=False)
model.to(device)
model.eval()
return model
@MODELS.register_module(Tasks.image_try_on, module_name=Models.image_try_on)
class SALForImageTryOn(TorchModel):
"""initialize the image try on model from the `model_dir` path.
Args:
model_dir (str): the model path.
"""
def __init__(self, model_dir, device_id=0, *args, **kwargs):
super().__init__(
model_dir=model_dir, device_id=device_id, *args, **kwargs)
if torch.cuda.is_available():
self.device = 'cuda'
logger.info('Use GPU')
else:
self.device = 'cpu'
logger.info('Use CPU')
self.model = VTONGenerator(12, 3, 5, ngf=96, norm_layer=nn.BatchNorm2d)
self.model = load_checkpoint(
self.model, model_dir + '/' + ModelFile.TORCH_MODEL_BIN_FILE,
self.device)
def forward(self, x, y):
pred_result = self.model(x, y)
return pred_result
def infer(ourgen_model, model_path, person_img, garment_img, mask_img, device):
ourwarp_model = Warping()
landmark_model = VTONLandmark()
ourwarp_model = load_checkpoint(ourwarp_model, model_path + '/warp.pth',
device)
landmark_model.load_state_dict(
torch.load(model_path + '/landmark.pth', map_location=device))
landmark_model.to(device).eval()
input_scale = 4
with torch.no_grad():
garment_img = cv2.imread(garment_img)
garment_img = cv2.cvtColor(garment_img, cv2.COLOR_BGR2RGB)
clothes = cv2.resize(garment_img, (768, 1024))
mask_img = cv2.imread(mask_img)
person_img = cv2.imread(person_img)
person_img = cv2.cvtColor(person_img, cv2.COLOR_BGR2RGB)
cm = mask_img[:, :, 0]
input_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
clothes = input_transform(clothes).unsqueeze(0).to(device)
cm_array = np.array(cm)
cm_array = (cm_array >= 128).astype(np.float32)
cm = torch.from_numpy(cm_array)
cm = cm.unsqueeze(0).unsqueeze(0)
cm = torch.FloatTensor((cm.numpy() > 0.5).astype(np.float)).to(device)
im = person_img
h_ori, w_ori = im.shape[0:2]
im = cv2.resize(im, (768, 1024))
im = input_transform(im).unsqueeze(0).to(device)
h, w = 512, 384
p_down = F.interpolate(im, size=(h, w), mode='bilinear')
c_down = F.interpolate(clothes, size=(h, w), mode='bilinear')
c_heatmap, c_property, p_heatmap, p_property = landmark_model(
c_down, p_down)
N = c_heatmap.shape[0]
paired_cloth = clothes[0].cpu()
color_map = {'1': (0, 0, 255), '0': (255, 0, 0)}
c_im = (np.array(paired_cloth.permute(1, 2, 0)).copy() + 1) / 2 * 255
c_im = cv2.cvtColor(c_im, cv2.COLOR_RGB2BGR)
pred_class = torch.argmax(c_property, dim=1)
point_ind = torch.argmax(
c_heatmap.view(N, 32, -1), dim=2).cpu().numpy()
pred_y, pred_x = 8 * (point_ind // 96), 8 * (point_ind % 96)
for ind in range(32):
point_class = int(pred_class[0, ind])
if point_class < 0.9:
continue
point_color = color_map[str(point_class)]
y, x = pred_y[0][ind], pred_x[0][ind]
cv2.circle(c_im, (x, y), 2, point_color, 4)
cv2.putText(
c_im,
str(ind), (x + 4, y + 4),
cv2.FONT_HERSHEY_SIMPLEX,
fontScale=0.75,
color=point_color,
thickness=1)
paired_im = im[0].cpu()
color_map = {'2': (0, 0, 255), '1': (0, 255, 0), '0': (255, 0, 0)}
p_im = (np.array(paired_im.permute(1, 2, 0)).copy() + 1) / 2 * 255
p_im = cv2.cvtColor(p_im, cv2.COLOR_RGB2BGR)
pred_class = torch.argmax(p_property, dim=1)
point_ind = torch.argmax(
p_heatmap.view(N, 32, -1), dim=2).cpu().numpy()
pred_y, pred_x = 8 * (point_ind // 96), 8 * (point_ind % 96)
for ind in range(32):
point_class = int(pred_class[0, ind])
if point_class < 0.9:
continue
point_color = color_map[str(point_class)]
y, x = pred_y[0][ind], pred_x[0][ind]
cv2.circle(p_im, (x, y), 2, point_color, 4)
cv2.putText(
p_im,
str(ind), (x + 4, y + 4),
cv2.FONT_HERSHEY_SIMPLEX,
fontScale=0.75,
color=point_color,
thickness=1)
valid_c_point = np.zeros((32, 2)).astype(np.float32)
valid_p_point = np.zeros((32, 2)).astype(np.float32)
c_point_heatmap = -1 * torch.ones(32, 1024, 768)
p_point_heatmap = -1 * torch.ones(32, 1024, 768)
cloth_property, person_property = torch.argmax(
c_property, dim=1), torch.argmax(
p_property, dim=1)
cloth_point_ind = torch.argmax(
c_heatmap.view(N, 32, -1), dim=2).cpu().numpy()
cloth_y, cloth_x = 8 * (cloth_point_ind // 96), 8 * (
cloth_point_ind % 96)
person_point_ind = torch.argmax(
p_heatmap.view(N, 32, -1), dim=2).cpu().numpy()
person_y, person_x = 8 * (person_point_ind // 96), 8 * (
person_point_ind % 96)
r = 20
for k in range(32):
property_c, property_p = cloth_property[0,
k], person_property[0,
k] - 1
if property_c > 0.1:
c_x, c_y = cloth_x[0, k], cloth_y[0, k]
x_min, y_min, x_max, y_max = max(c_x - r - 1, 0), max(
c_y - r - 1, 0), min(c_x + r, 768), min(c_y + r, 1024)
c_point_heatmap[k, y_min:y_max,
x_min:x_max] = torch.tensor(property_c)
valid_c_point[k, 0], valid_c_point[k, 1] = c_x, c_y
if property_p > -0.99:
p_x, p_y = person_x[0, k], person_y[0, k]
x_min, y_min, x_max, y_max = max(p_x - r - 1, 0), max(
p_y - r - 1, 0), min(p_x + r, 768), min(p_y + r, 1024)
p_point_heatmap[k, y_min:y_max,
x_min:x_max] = torch.tensor(property_p)
if property_p > 0:
valid_p_point[k, 0], valid_p_point[k, 1] = p_x, p_y
c_point_plane = torch.tensor(valid_c_point).unsqueeze(0).to(device)
p_point_plane = torch.tensor(valid_p_point).unsqueeze(0).to(device)
c_point_heatmap = c_point_heatmap.unsqueeze(0).to(device)
p_point_heatmap = p_point_heatmap.unsqueeze(0).to(device)
if input_scale > 1:
h, w = 1024 // input_scale, 768 // input_scale
c_point_plane = c_point_plane // input_scale
p_point_plane = p_point_plane // input_scale
c_point_heatmap = F.interpolate(
c_point_heatmap, size=(h, w), mode='nearest')
p_point_heatmap = F.interpolate(
p_point_heatmap, size=(h, w), mode='nearest')
im_down = F.interpolate(im, size=(h, w), mode='bilinear')
c_down = F.interpolate(cm * clothes, size=(h, w), mode='bilinear')
cm_down = F.interpolate(cm, size=(h, w), mode='nearest')
warping_input = [
c_down, im_down, c_point_heatmap, p_point_heatmap, c_point_plane,
p_point_plane, cm_down, cm * clothes, device
]
final_warped_cloth, last_flow, last_flow_all, flow_all, delta_list, x_all, x_edge_all, delta_x_all, \
delta_y_all, local_warped_cloth_list, fuse_cloth, globalmap, up_cloth = ourwarp_model(warping_input)
gen_inputs = torch.cat([im, up_cloth], 1)
gen_outputs = ourgen_model(gen_inputs, p_point_heatmap)
combine = torch.cat([gen_outputs[0]], 2).squeeze()
cv_img = (combine.permute(1, 2, 0).detach().cpu().numpy() + 1) / 2
rgb = (cv_img * 255).astype(np.uint8)
bgr = cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR)
bgr = cv2.resize(bgr, (w_ori, h_ori))
return bgr

File diff suppressed because it is too large Load Diff

View File

@@ -1493,6 +1493,12 @@ TASK_OUTPUTS = {
Tasks.document_grounded_dialog_retrieval: [OutputKeys.OUTPUT],
Tasks.video_temporal_grounding: [OutputKeys.SCORES, OutputKeys.TBOUNDS],
Tasks.text_to_video_synthesis: [OutputKeys.OUTPUT_VIDEO],
# Tasks.image_try_on result for a single sample
# {
# "output_img": np.ndarray with shape [height, width, 3]
# }
Tasks.image_try_on: [OutputKeys.OUTPUT_IMG],
}

View File

@@ -215,6 +215,11 @@ TASK_INPUTS = {
InputType.VIDEO,
Tasks.bad_image_detecting:
InputType.IMAGE,
Tasks.image_try_on: {
InputKeys.IMAGE: InputType.IMAGE,
InputKeys.IMAGE: InputType.IMAGE,
InputKeys.IMAGE: InputType.IMAGE
},
# ============ nlp tasks ===================
Tasks.chat: {

View File

@@ -0,0 +1,62 @@
# Copyright 2021-2022 The Alibaba Fundamental Vision Team Authors. All rights reserved.
from typing import Any, Dict
import numpy as np
import torch
from modelscope.metainfo import Pipelines
from modelscope.models.cv.image_try_on import try_on_infer
from modelscope.outputs import OutputKeys
from modelscope.pipelines.base import Input, Pipeline
from modelscope.pipelines.builder import PIPELINES
from modelscope.preprocessors import LoadImage
from modelscope.utils.constant import Tasks
from modelscope.utils.logger import get_logger
logger = get_logger()
@PIPELINES.register_module(
Tasks.image_try_on, module_name=Pipelines.image_try_on)
class SALForImageTryOnPipeline(Pipeline):
r""" Image Try On Pipeline.
Examples:
>>> image_try_on = pipeline(Tasks.image_try_on, model='damo/cv_SAL-VTON_virtual-try-on', revision='v1.0.1')
>>> input_images = {'person_input_path': '/your_path/image_try_on_person.jpg',
>>> 'garment_input_path': '/your_path/image_try_on_garment.jpg',
>>> 'mask_input_path': '/your_path/image_try_on_mask.jpg'}
>>> result = image_try_on(input_images)
>>> result[OutputKeys.OUTPUT_IMG]
"""
def __init__(self, model: str, **kwargs):
"""
use `model` to create image try on pipeline for prediction
Args:
model: model id on modelscope hub.
"""
super().__init__(model=model, **kwargs)
self.model_path = model
logger.info('load model done')
if torch.cuda.is_available():
self.device = 'cuda'
logger.info('Use GPU')
else:
self.device = 'cpu'
logger.info('Use CPU')
def preprocess(self, input: Input) -> Dict[str, Any]:
return input
def forward(self, input: Dict[str, Any]) -> Dict[str, Any]:
try_on_image = try_on_infer.infer(self.model, self.model_path,
input['person_input_path'],
input['garment_input_path'],
input['mask_input_path'],
self.device)
return {OutputKeys.OUTPUT_IMG: try_on_image}
def postprocess(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
return inputs

View File

@@ -96,6 +96,7 @@ class CVTasks(object):
image_face_fusion = 'image-face-fusion'
product_retrieval_embedding = 'product-retrieval-embedding'
controllable_image_generation = 'controllable-image-generation'
image_try_on = 'image-try-on'
# video recognition
live_category = 'live-category'

View File

@@ -0,0 +1,44 @@
# Copyright 2021-2022 The Alibaba Fundamental Vision Team Authors. All rights reserved.
import unittest
import cv2
from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.pipelines.base import Pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.logger import get_logger
from modelscope.utils.test_utils import test_level
logger = get_logger()
class ImageTryOnTest(unittest.TestCase):
def setUp(self) -> None:
self.model_id = 'damo/cv_SAL-VTON_virtual-try-on'
self.input = {
'person_input_path': 'data/test/images/image_try_on_person.jpg',
'garment_input_path': 'data/test/images/image_try_on_garment.jpg',
'mask_input_path': 'data/test/images/image_try_on_mask.jpg'
}
def pipeline_inference(self, pipeline: Pipeline, input: str):
result = pipeline(input)
logger.info(result)
cv2.imwrite('result.jpg', result[OutputKeys.OUTPUT_IMG])
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_modelhub(self):
image_try_on = pipeline(
Tasks.image_try_on, model=self.model_id, revision='v1.0.1')
self.pipeline_inference(image_try_on, self.input)
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_run_modelhub_default_model(self):
image_try_on = pipeline(Tasks.image_try_on)
self.pipeline_inference(image_try_on, self.input)
if __name__ == '__main__':
unittest.main()