[to #42322933] add single and multiple human parsing models

Link: https://code.alibaba-inc.com/Ali-MaaS/MaaS-lib/codereview/11508413
This commit is contained in:
hejunjie.hjj
2023-02-10 08:01:23 +00:00
parent 677e49eaf3
commit 6fc15926a3
16 changed files with 1587 additions and 2 deletions

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:176c824d99af119b36f743d3d90b44529167b0e4fc6db276da60fa140ee3f4a9
size 87228

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:2a1976ea249b4ad5409cdae403dcd154fac3c628909b6b1874cc968960e2c62d
size 8259

View File

@@ -99,6 +99,7 @@ class Models(object):
ddpm = 'ddpm'
ocr_recognition = 'OCRRecognition'
image_quality_assessment_mos = 'image-quality-assessment-mos'
m2fp = 'm2fp'
nerf_recon_acc = 'nerf-recon-acc'
bts_depth_estimation = 'bts-depth-estimation'
vision_efficient_tuning = 'vision-efficient-tuning'
@@ -363,6 +364,7 @@ class Pipelines(object):
video_colorization = 'video-colorization'
motion_generattion = 'mdm-motion-generation'
mobile_image_super_resolution = 'mobile-image-super-resolution'
image_human_parsing = 'm2fp-image-human-parsing'
object_detection_3d_depe = 'object-detection-3d-depe'
bad_image_detecting = 'bad-image-detecting'
nerf_recon_acc = 'nerf-recon-acc'

View File

@@ -0,0 +1,23 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
from typing import TYPE_CHECKING
from modelscope.utils.import_utils import LazyImportModule
if TYPE_CHECKING:
from .m2fp_net import M2FP
from parsing_utils import center_to_target_size_test
else:
_import_structure = {
'm2fp_net': ['M2FP'],
'parsing_utils': ['center_to_target_size_test']
}
import sys
sys.modules[__name__] = LazyImportModule(
__name__,
globals()['__file__'],
_import_structure,
module_spec=__spec__,
extra_objects={},
)

View File

@@ -0,0 +1,22 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
from typing import TYPE_CHECKING
from modelscope.utils.import_utils import LazyImportModule
if TYPE_CHECKING:
from .deeplab_resnet import build_resnet_deeplab_backbone
else:
_import_structure = {
'deeplab_resnet': ['build_resnet_deeplab_backbone'],
}
import sys
sys.modules[__name__] = LazyImportModule(
__name__,
globals()['__file__'],
_import_structure,
module_spec=__spec__,
extra_objects={},
)

View File

@@ -0,0 +1,377 @@
# Part of the implementation is borrowed and modified from Detectron2, publicly available at
# https://github.com/facebookresearch/detectron2/blob/main/projects/DeepLab/deeplab/resnet.py
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn
from modelscope.models.cv.image_instance_segmentation.maskdino.utils import \
Conv2d
def get_norm(norm, out_channels):
if norm is None:
return None
if isinstance(norm, str):
if len(norm) == 0:
return None
norm = {
'BN': torch.nn.BatchNorm2d,
'GN': lambda channels: nn.GroupNorm(32, channels),
'nnSyncBN': nn.SyncBatchNorm,
}[norm]
return norm(out_channels)
class BasicBlock(nn.Module):
def __init__(self, in_channels, out_channels, *, stride=1, norm='BN'):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.stride = stride
if in_channels != out_channels:
self.shortcut = Conv2d(
in_channels,
out_channels,
kernel_size=1,
stride=stride,
bias=False,
norm=get_norm(norm, out_channels))
else:
self.shortcut = None
self.conv1 = Conv2d(
in_channels,
out_channels,
kernel_size=3,
stride=stride,
padding=1,
bias=False,
norm=get_norm(norm, out_channels))
self.conv2 = Conv2d(
out_channels,
out_channels,
kernel_size=3,
stride=1,
padding=1,
bias=False,
norm=get_norm(norm, out_channels))
def forward(self, x):
out = self.conv1(x)
out = F.relu_(out)
out = self.conv2(out)
if self.shortcut is not None:
shortcut = self.shortcut(x)
else:
shortcut = x
out += shortcut
out = F.relu_(out)
return out
class BottleneckBlock(nn.Module):
def __init__(self,
in_channels,
out_channels,
*,
bottleneck_channels,
stride=1,
num_groups=1,
norm='BN',
stride_in_1x1=False,
dilation=1):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.stride = stride
if in_channels != out_channels:
self.shortcut = Conv2d(
in_channels,
out_channels,
kernel_size=1,
stride=stride,
bias=False,
norm=get_norm(norm, out_channels),
)
else:
self.shortcut = None
stride_1x1, stride_3x3 = (stride, 1) if stride_in_1x1 else (1, stride)
self.conv1 = Conv2d(
in_channels,
bottleneck_channels,
kernel_size=1,
stride=stride_1x1,
bias=False,
norm=get_norm(norm, bottleneck_channels))
self.conv2 = Conv2d(
bottleneck_channels,
bottleneck_channels,
kernel_size=3,
stride=stride_3x3,
padding=1 * dilation,
bias=False,
groups=num_groups,
dilation=dilation,
norm=get_norm(norm, bottleneck_channels))
self.conv3 = Conv2d(
bottleneck_channels,
out_channels,
kernel_size=1,
bias=False,
norm=get_norm(norm, out_channels))
def forward(self, x):
out = self.conv1(x)
out = F.relu_(out)
out = self.conv2(out)
out = F.relu_(out)
out = self.conv3(out)
if self.shortcut is not None:
shortcut = self.shortcut(x)
else:
shortcut = x
out += shortcut
out = F.relu_(out)
return out
class DeepLabStem(nn.Module):
def __init__(self, in_channels=3, out_channels=128, norm='BN'):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.stride = 4
self.conv1 = Conv2d(
in_channels,
out_channels // 2,
kernel_size=3,
stride=2,
padding=1,
bias=False,
norm=get_norm(norm, out_channels // 2))
self.conv2 = Conv2d(
out_channels // 2,
out_channels // 2,
kernel_size=3,
stride=1,
padding=1,
bias=False,
norm=get_norm(norm, out_channels // 2))
self.conv3 = Conv2d(
out_channels // 2,
out_channels,
kernel_size=3,
stride=1,
padding=1,
bias=False,
norm=get_norm(norm, out_channels))
def forward(self, x):
x = self.conv1(x)
x = F.relu_(x)
x = self.conv2(x)
x = F.relu_(x)
x = self.conv3(x)
x = F.relu_(x)
x = F.max_pool2d(x, kernel_size=3, stride=2, padding=1)
return x
class DeeplabResNet(nn.Module):
def __init__(self, stem, stages, num_classes=None, out_features=None):
super().__init__()
self.stem = stem
self.num_classes = num_classes
current_stride = self.stem.stride
self._out_feature_strides = {'stem': current_stride}
self._out_feature_channels = {'stem': self.stem.out_channels}
self.stage_names, self.stages = [], []
if out_features is not None:
num_stages = max([{
'res2': 1,
'res3': 2,
'res4': 3,
'res5': 4
}.get(f, 0) for f in out_features])
stages = stages[:num_stages]
for i, blocks in enumerate(stages):
assert len(blocks) > 0, len(blocks)
for block in blocks:
assert isinstance(block, nn.Module), block
name = 'res' + str(i + 2)
stage = nn.Sequential(*blocks)
self.add_module(name, stage)
self.stage_names.append(name)
self.stages.append(stage)
self._out_feature_strides[name] = current_stride = int(
current_stride * np.prod([k.stride for k in blocks]))
self._out_feature_channels[name] = curr_channels = blocks[
-1].out_channels
self.stage_names = tuple(
self.stage_names) # Make it static for scripting
if num_classes is not None:
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.linear = nn.Linear(curr_channels, num_classes)
nn.init.normal_(self.linear.weight, std=0.01)
name = 'linear'
if out_features is None:
out_features = [name]
self._out_features = out_features
assert len(self._out_features)
children = [x[0] for x in self.named_children()]
for out_feature in self._out_features:
assert out_feature in children, 'Available children: {}'.format(
', '.join(children))
def forward(self, x):
assert x.dim(
) == 4, f'ResNet takes an input of shape (N, C, H, W). Got {x.shape} instead!'
outputs = {}
x = self.stem(x)
if 'stem' in self._out_features:
outputs['stem'] = x
for name, stage in zip(self.stage_names, self.stages):
x = stage(x)
if name in self._out_features:
outputs[name] = x
if self.num_classes is not None:
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.linear(x)
if 'linear' in self._out_features:
outputs['linear'] = x
return outputs
def output_shape(self):
return {
name: dict(
channels=self._out_feature_channels[name],
stride=self._out_feature_strides[name])
for name in self._out_features
}
@property
def size_divisibility(self) -> int:
return 0
@staticmethod
def make_stage(block_class, num_blocks, *, in_channels, out_channels,
**kwargs):
blocks = []
for i in range(num_blocks):
curr_kwargs = {}
for k, v in kwargs.items():
if k.endswith('_per_block'):
assert len(v) == num_blocks, (
f"Argument '{k}' of make_stage should have the "
f'same length as num_blocks={num_blocks}.')
newk = k[:-len('_per_block')]
assert newk not in kwargs, f'Cannot call make_stage with both {k} and {newk}!'
curr_kwargs[newk] = v[i]
else:
curr_kwargs[k] = v
blocks.append(
block_class(
in_channels=in_channels,
out_channels=out_channels,
**curr_kwargs))
in_channels = out_channels
return blocks
def build_resnet_deeplab_backbone(out_features, depth, num_groups,
width_per_group, norm, stem_out_channels,
res2_out_channels, stride_in_1x1,
res4_dilation, res5_dilation,
res5_multi_grid, input_shape):
stem = DeepLabStem(
in_channels=input_shape['channels'],
out_channels=stem_out_channels,
norm=norm)
bottleneck_channels = num_groups * width_per_group
in_channels = stem_out_channels
out_channels = res2_out_channels
assert res4_dilation in {
1, 2
}, 'res4_dilation cannot be {}.'.format(res4_dilation)
assert res5_dilation in {
1, 2, 4
}, 'res5_dilation cannot be {}.'.format(res5_dilation)
if res4_dilation == 2:
# Always dilate res5 if res4 is dilated.
assert res5_dilation == 4
num_blocks_per_stage = {
50: [3, 4, 6, 3],
101: [3, 4, 23, 3],
152: [3, 8, 36, 3]
}[depth]
stages = []
out_stage_idx = [{
'res2': 2,
'res3': 3,
'res4': 4,
'res5': 5
}[f] for f in out_features]
max_stage_idx = max(out_stage_idx)
for idx, stage_idx in enumerate(range(2, max_stage_idx + 1)):
if stage_idx == 4:
dilation = res4_dilation
elif stage_idx == 5:
dilation = res5_dilation
else:
dilation = 1
first_stride = 1 if idx == 0 or dilation > 1 else 2
stride_per_block = [first_stride]
stride_per_block += [1] * (num_blocks_per_stage[idx] - 1)
stage_kargs = {
'num_blocks': num_blocks_per_stage[idx],
'stride_per_block': stride_per_block,
'in_channels': in_channels,
'out_channels': out_channels,
'norm': norm,
'bottleneck_channels': bottleneck_channels,
'stride_in_1x1': stride_in_1x1,
'dilation': dilation,
'num_groups': num_groups,
'block_class': BottleneckBlock
}
if stage_idx == 5:
stage_kargs.pop('dilation')
stage_kargs['dilation_per_block'] = [
dilation * mg for mg in res5_multi_grid
]
blocks = DeeplabResNet.make_stage(**stage_kargs)
in_channels = out_channels
out_channels *= 2
bottleneck_channels *= 2
stages.append(blocks)
return DeeplabResNet(stem, stages, out_features=out_features)

View File

@@ -0,0 +1,24 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
from typing import TYPE_CHECKING
from modelscope.utils.import_utils import LazyImportModule
if TYPE_CHECKING:
from .m2fp_encoder import MSDeformAttnPixelDecoder
from .m2fp_decoder import MultiScaleMaskedTransformerDecoder
else:
_import_structure = {
'm2fp_encoder': ['MSDeformAttnPixelDecoder'],
'm2fp_decoder': ['MultiScaleMaskedTransformerDecoder'],
}
import sys
sys.modules[__name__] = LazyImportModule(
__name__,
globals()['__file__'],
_import_structure,
module_spec=__spec__,
extra_objects={},
)

View File

@@ -0,0 +1,221 @@
# The implementation is adopted from Mask2Former, made publicly available under the MIT License at
# https://github.com/facebookresearch/Mask2Former
import torch
from torch import nn
from torch.nn import functional as F
from modelscope.models.cv.image_colorization.ddcolor.utils.transformer_utils import (
MLP, CrossAttentionLayer, FFNLayer, SelfAttentionLayer)
from modelscope.models.cv.image_instance_segmentation.maskdino.position_encoding import \
PositionEmbeddingSine
from modelscope.models.cv.image_instance_segmentation.maskdino.utils import \
Conv2d
class MultiScaleMaskedTransformerDecoder(nn.Module):
def __init__(
self,
in_channels,
mask_classification=True,
*,
num_classes: int,
hidden_dim: int,
num_queries: int,
nheads: int,
dim_feedforward: int,
dec_layers: int,
pre_norm: bool,
mask_dim: int,
enforce_input_project: bool,
):
"""
NOTE: this interface is experimental.
Args:
in_channels: channels of the input features
mask_classification: whether to add mask classifier or not
num_classes: number of classes
hidden_dim: Transformer feature dimension
num_queries: number of queries
nheads: number of heads
dim_feedforward: feature dimension in feedforward network
dec_layers: number of Transformer decoder layers
pre_norm: whether to use pre-LayerNorm or not
mask_dim: mask feature dimension
enforce_input_project: add input project 1x1 conv even if input
channels and hidden dim is identical
"""
super().__init__()
assert mask_classification, 'Only support mask classification model'
self.mask_classification = mask_classification
# positional encoding
N_steps = hidden_dim // 2
self.pe_layer = PositionEmbeddingSine(N_steps, normalize=True)
# define Transformer decoder here
self.num_heads = nheads
self.num_layers = dec_layers
self.num_classes = num_classes
self.transformer_self_attention_layers = nn.ModuleList()
self.transformer_cross_attention_layers = nn.ModuleList()
self.transformer_ffn_layers = nn.ModuleList()
for _ in range(self.num_layers):
self.transformer_self_attention_layers.append(
SelfAttentionLayer(
d_model=hidden_dim,
nhead=nheads,
dropout=0.0,
normalize_before=pre_norm,
))
self.transformer_cross_attention_layers.append(
CrossAttentionLayer(
d_model=hidden_dim,
nhead=nheads,
dropout=0.0,
normalize_before=pre_norm,
))
self.transformer_ffn_layers.append(
FFNLayer(
d_model=hidden_dim,
dim_feedforward=dim_feedforward,
dropout=0.0,
normalize_before=pre_norm,
))
self.decoder_norm = nn.LayerNorm(hidden_dim)
self.num_queries = num_queries
# learnable query features
self.query_feat = nn.Embedding(num_queries, hidden_dim)
# learnable query p.e.
self.query_embed = nn.Embedding(num_queries, hidden_dim)
# level embedding (we always use 3 scales)
self.num_feature_levels = 3
self.level_embed = nn.Embedding(self.num_feature_levels, hidden_dim)
self.input_proj = nn.ModuleList()
for _ in range(self.num_feature_levels):
if in_channels != hidden_dim or enforce_input_project:
self.input_proj.append(
Conv2d(in_channels, hidden_dim, kernel_size=1))
else:
self.input_proj.append(nn.Sequential())
# output FFNs
if self.mask_classification:
self.class_embed = nn.Linear(hidden_dim, num_classes + 1)
self.mask_embed = MLP(hidden_dim, hidden_dim, mask_dim, 3)
def forward(self, x, mask_features, mask=None):
# x is a list of multi-scale feature
assert len(x) == self.num_feature_levels
src = []
pos = []
size_list = []
# disable mask, it does not affect performance
del mask
for i in range(self.num_feature_levels):
size_list.append(x[i].shape[-2:])
pos.append(self.pe_layer(x[i], None).flatten(2))
src.append(self.input_proj[i](x[i]).flatten(2)
+ self.level_embed.weight[i][None, :, None])
# flatten NxCxHxW to HWxNxC
pos[-1] = pos[-1].permute(2, 0, 1)
src[-1] = src[-1].permute(2, 0, 1)
_, bs, _ = src[0].shape
# QxNxC
query_embed = self.query_embed.weight.unsqueeze(1).repeat(1, bs, 1)
output = self.query_feat.weight.unsqueeze(1).repeat(1, bs, 1)
predictions_class = []
predictions_mask = []
# prediction heads on learnable query features
outputs_class, outputs_mask, attn_mask = self.forward_prediction_heads(
output, mask_features, attn_mask_target_size=size_list[0])
predictions_class.append(outputs_class)
predictions_mask.append(outputs_mask)
for i in range(self.num_layers):
level_index = i % self.num_feature_levels
attn_mask[torch.where(
attn_mask.sum(-1) == attn_mask.shape[-1])] = False
# attention: cross-attention first
output = self.transformer_cross_attention_layers[i](
output,
src[level_index],
memory_mask=attn_mask,
memory_key_padding_mask=
None, # here we do not apply masking on padded region
pos=pos[level_index],
query_pos=query_embed)
output = self.transformer_self_attention_layers[i](
output,
tgt_mask=None,
tgt_key_padding_mask=None,
query_pos=query_embed)
# FFN
output = self.transformer_ffn_layers[i](output)
outputs_class, outputs_mask, attn_mask = \
self.forward_prediction_heads(
output, mask_features, attn_mask_target_size=size_list[(i + 1) % self.num_feature_levels])
predictions_class.append(outputs_class)
predictions_mask.append(outputs_mask)
assert len(predictions_class) == self.num_layers + 1
out = {
'pred_logits':
predictions_class[-1],
'pred_masks':
predictions_mask[-1],
'aux_outputs':
self._set_aux_loss(
predictions_class if self.mask_classification else None,
predictions_mask)
}
return out
def forward_prediction_heads(self, output, mask_features,
attn_mask_target_size):
decoder_output = self.decoder_norm(output)
decoder_output = decoder_output.transpose(0, 1)
outputs_class = self.class_embed(decoder_output)
mask_embed = self.mask_embed(decoder_output)
outputs_mask = torch.einsum('bqc,bchw->bqhw', mask_embed,
mask_features)
attn_mask = F.interpolate(
outputs_mask,
size=attn_mask_target_size,
mode='bilinear',
align_corners=False)
attn_mask = (attn_mask.sigmoid().flatten(2).unsqueeze(1).repeat(
1, self.num_heads, 1, 1).flatten(0, 1) < 0.5).bool()
attn_mask = attn_mask.detach()
return outputs_class, outputs_mask, attn_mask
@torch.jit.unused
def _set_aux_loss(self, outputs_class, outputs_seg_masks):
if self.mask_classification:
return [{
'pred_logits': a,
'pred_masks': b
} for a, b in zip(outputs_class[:-1], outputs_seg_masks[:-1])]
else:
return [{'pred_masks': b} for b in outputs_seg_masks[:-1]]

View File

@@ -0,0 +1,215 @@
# The implementation is adopted from Mask2Former, made publicly available under the MIT License at
# https://github.com/facebookresearch/Mask2Former
from typing import Any, Dict, List
import numpy as np
import torch
from torch import nn
from torch.cuda.amp import autocast
from torch.nn import functional as F
from modelscope.models.cv.image_instance_segmentation.maskdino.maskdino_encoder import \
MSDeformAttnTransformerEncoderOnly
from modelscope.models.cv.image_instance_segmentation.maskdino.position_encoding import \
PositionEmbeddingSine
from modelscope.models.cv.image_instance_segmentation.maskdino.utils import \
Conv2d
class MSDeformAttnPixelDecoder(nn.Module):
def __init__(
self,
input_shape: Dict[str, Any],
*,
transformer_dropout: float,
transformer_nheads: int,
transformer_dim_feedforward: int,
transformer_enc_layers: int,
conv_dim: int,
mask_dim: int,
# deformable transformer encoder args
transformer_in_features: List[str],
common_stride: int,
):
"""
NOTE: this interface is experimental.
Args:
input_shape: shapes (channels and stride) of the input features
transformer_dropout: dropout probability in transformer
transformer_nheads: number of heads in transformer
transformer_dim_feedforward: dimension of feedforward network
transformer_enc_layers: number of transformer encoder layers
conv_dim: number of output channels for the intermediate conv layers.
mask_dim: number of output channels for the final conv layer.
"""
super().__init__()
self.conv_dim = conv_dim
transformer_input_shape = {
k: v
for k, v in input_shape.items() if k in transformer_in_features
}
# this is the input shape of pixel decoder
input_shape = sorted(input_shape.items(), key=lambda x: x[1]['stride'])
self.in_features = [k for k, v in input_shape
] # starting from "res2" to "res5"
self.feature_strides = [v['stride'] for k, v in input_shape]
self.feature_channels = [v['channels'] for k, v in input_shape]
# this is the input shape of transformer encoder (could use less features than pixel decoder
transformer_input_shape = sorted(
transformer_input_shape.items(), key=lambda x: x[1]['stride'])
self.transformer_in_features = [k for k, v in transformer_input_shape
] # starting from "res2" to "res5"
transformer_in_channels = [
v['channels'] for k, v in transformer_input_shape
]
self.transformer_feature_strides = [
v['stride'] for k, v in transformer_input_shape
] # to decide extra FPN layers
self.transformer_num_feature_levels = len(self.transformer_in_features)
if self.transformer_num_feature_levels > 1:
input_proj_list = []
# from low resolution to high resolution (res5 -> res2)
for in_channels in transformer_in_channels[::-1]:
input_proj_list.append(
nn.Sequential(
nn.Conv2d(in_channels, conv_dim, kernel_size=1),
nn.GroupNorm(32, conv_dim),
))
self.input_proj = nn.ModuleList(input_proj_list)
else:
self.input_proj = nn.ModuleList([
nn.Sequential(
nn.Conv2d(
transformer_in_channels[-1], conv_dim, kernel_size=1),
nn.GroupNorm(32, conv_dim),
)
])
for proj in self.input_proj:
nn.init.xavier_uniform_(proj[0].weight, gain=1)
nn.init.constant_(proj[0].bias, 0)
self.transformer = MSDeformAttnTransformerEncoderOnly(
d_model=conv_dim,
dropout=transformer_dropout,
nhead=transformer_nheads,
dim_feedforward=transformer_dim_feedforward,
num_encoder_layers=transformer_enc_layers,
num_feature_levels=self.transformer_num_feature_levels,
)
N_steps = conv_dim // 2
self.pe_layer = PositionEmbeddingSine(N_steps, normalize=True)
self.mask_dim = mask_dim
# use 1x1 conv instead
self.mask_features = Conv2d(
conv_dim,
mask_dim,
kernel_size=1,
stride=1,
padding=0,
)
self.maskformer_num_feature_levels = 3 # always use 3 scales
self.common_stride = common_stride
# extra fpn levels
stride = min(self.transformer_feature_strides)
self.num_fpn_levels = int(
np.log2(stride) - np.log2(self.common_stride))
lateral_convs = []
output_convs = []
use_bias = False
for idx, in_channels in enumerate(
self.feature_channels[:self.num_fpn_levels]):
lateral_norm = nn.GroupNorm(32, conv_dim)
output_norm = nn.GroupNorm(32, conv_dim)
lateral_conv = Conv2d(
in_channels,
conv_dim,
kernel_size=1,
bias=use_bias,
norm=lateral_norm)
output_conv = Conv2d(
conv_dim,
conv_dim,
kernel_size=3,
stride=1,
padding=1,
bias=use_bias,
norm=output_norm,
activation=F.relu,
)
self.add_module('adapter_{}'.format(idx + 1), lateral_conv)
self.add_module('layer_{}'.format(idx + 1), output_conv)
lateral_convs.append(lateral_conv)
output_convs.append(output_conv)
# Place convs into top-down order (from low to high resolution)
# to make the top-down computation in forward clearer.
self.lateral_convs = lateral_convs[::-1]
self.output_convs = output_convs[::-1]
@autocast(enabled=False)
def forward_features(self, features):
srcs = []
pos = []
# Reverse feature maps into top-down order (from low to high resolution)
for idx, f in enumerate(self.transformer_in_features[::-1]):
x = features[f].float(
) # deformable detr does not support half precision
srcs.append(self.input_proj[idx](x))
pos.append(self.pe_layer(x))
y, spatial_shapes, level_start_index = self.transformer(
srcs, None, pos)
bs = y.shape[0]
split_size_or_sections = [None] * self.transformer_num_feature_levels
for i in range(self.transformer_num_feature_levels):
if i < self.transformer_num_feature_levels - 1:
split_size_or_sections[i] = level_start_index[
i + 1] - level_start_index[i]
else:
split_size_or_sections[i] = y.shape[1] - level_start_index[i]
y = torch.split(y, split_size_or_sections, dim=1)
out = []
multi_scale_features = []
num_cur_levels = 0
for i, z in enumerate(y):
out.append(
z.transpose(1, 2).view(bs, -1, spatial_shapes[i][0],
spatial_shapes[i][1]))
# append `out` with extra FPN levels
# Reverse feature maps into top-down order (from low to high resolution)
for idx, f in enumerate(self.in_features[:self.num_fpn_levels][::-1]):
x = features[f].float()
lateral_conv = self.lateral_convs[idx]
output_conv = self.output_convs[idx]
cur_fpn = lateral_conv(x)
# Following FPN implementation, we use nearest upsampling here
y = cur_fpn + F.interpolate(
out[-1],
size=cur_fpn.shape[-2:],
mode='bilinear',
align_corners=False)
y = output_conv(y)
out.append(y)
for o in out:
if num_cur_levels < self.maskformer_num_feature_levels:
multi_scale_features.append(o)
num_cur_levels += 1
return self.mask_features(out[-1]), out[0], multi_scale_features

View File

@@ -0,0 +1,363 @@
# Part of the implementation is borrowed and modified from M2FP, made publicly available
# under the CC BY-NC 4.0 License at https://github.com/soeaver/M2FP
import os
from typing import Any, Dict
import torch
import torch.nn as nn
import torch.nn.functional as F
from modelscope.metainfo import Models
from modelscope.models.base import TorchModel
from modelscope.models.builder import MODELS
from modelscope.models.cv.image_instance_segmentation.maskdino_swin import \
ImageList
from modelscope.utils.constant import ModelFile, Tasks
from modelscope.utils.logger import get_logger
from .backbone import build_resnet_deeplab_backbone
from .m2fp.m2fp_decoder import MultiScaleMaskedTransformerDecoder
from .m2fp.m2fp_encoder import MSDeformAttnPixelDecoder
logger = get_logger()
@MODELS.register_module(Tasks.image_segmentation, module_name=Models.m2fp)
class M2FP(TorchModel):
def __init__(self,
model_dir,
backbone=None,
encoder=None,
decoder=None,
pretrained=None,
input_single_human=None,
classes=None,
num_parsing=None,
single_human=True,
parsing_ins_score_thr=0.5,
parsing_on=False,
semantic_on=True,
sem_seg_postprocess_before_inference=True,
**kwargs):
"""
Deep Learning Technique for Human Parsing: A Survey and Outlook. See https://arxiv.org/abs/2301.00394
Args:
backbone (dict): backbone config.
encoder (dict): encoder config.
decoder (dict): decoder config.
pretrained (bool): whether to use pretrained model
input_single_human (dict): input size config for single human parsing
classes (list): class names
num_parsing (int): total number of parsing instances, only for multiple human parsing
single_human (bool): whether the task is single human parsing
parsing_ins_score_thr: instance score threshold for multiple human parsing
parsing_on (bool): whether to parse results, only for multiple human parsing
semantic_on (bool): whether to output semantic map
sem_seg_postprocess_before_inference: whether to resize the prediction back
to original input size before semantic segmentation inference or after.
"""
super(M2FP, self).__init__(model_dir, **kwargs)
self.register_buffer(
'pixel_mean',
torch.Tensor([123.675, 116.28, 103.53]).view(-1, 1, 1), False)
self.register_buffer(
'pixel_std',
torch.Tensor([58.395, 57.12, 57.375]).view(-1, 1, 1), False)
self.size_divisibility = 32
self.backbone = build_resnet_deeplab_backbone(
**backbone, input_shape={'channels': 3})
in_features = encoder.pop('in_features')
input_shape = {
k: v
for k, v in self.backbone.output_shape().items()
if k in in_features
}
encoder = MSDeformAttnPixelDecoder(input_shape=input_shape, **encoder)
decoder = MultiScaleMaskedTransformerDecoder(
in_channels=encoder.conv_dim, **decoder)
self.sem_seg_head = M2FPHead(
pixel_decoder=encoder, transformer_predictor=decoder)
self.num_classes = decoder.num_classes
self.num_queries = decoder.num_queries
self.test_topk_per_image = 100
self.input_single_human = input_single_human
self.classes = classes
self.num_parsing = num_parsing
self.single_human = single_human
self.parsing_ins_score_thr = parsing_ins_score_thr
self.parsing_on = parsing_on
self.semantic_on = semantic_on
self.sem_seg_postprocess_before_inference = sem_seg_postprocess_before_inference or parsing_on
if not self.semantic_on:
assert self.sem_seg_postprocess_before_inference
if pretrained:
model_path = os.path.join(model_dir, ModelFile.TORCH_MODEL_FILE)
logger.info(f'loading model from {model_path}')
weight = torch.load(model_path, map_location='cpu')['model']
tgt_weight = self.state_dict()
for name in list(weight.keys()):
if name in tgt_weight:
load_size = weight[name].size()
tgt_size = tgt_weight[name].size()
mis_match = False
if len(load_size) != len(tgt_size):
mis_match = True
else:
for n1, n2 in zip(load_size, tgt_size):
if n1 != n2:
mis_match = True
break
if mis_match:
logger.info(
f'size mismatch for {name} '
f'({load_size} -> {tgt_size}), skip loading.')
del weight[name]
else:
logger.info(
f'{name} doesn\'t exist in current model, skip loading.'
)
self.load_state_dict(weight, strict=False)
logger.info('load model done')
def forward(self, input: Dict[str, Any]) -> Dict[str, Any]:
batched_inputs = input['batched_inputs']
images = [x['image'].to(self.device) for x in batched_inputs]
images = [(x - self.pixel_mean) / self.pixel_std for x in images]
images = ImageList.from_tensors(images, self.size_divisibility)
features = self.backbone(images.tensor)
outputs = self.sem_seg_head(features)
return dict(
outputs=outputs, batched_inputs=batched_inputs, images=images)
def postprocess(self, input: Dict[str, Any]) -> Dict[str, Any]:
outputs = input['outputs']
batched_inputs = input['batched_inputs']
images = input['images']
if self.training:
raise NotImplementedError
else:
mask_cls_results = outputs['pred_logits'] # (B, Q, C+1)
mask_pred_results = outputs['pred_masks'] # (B, Q, H, W)
# upsample masks
mask_pred_results = F.interpolate(
mask_pred_results,
size=(images.tensor.shape[-2], images.tensor.shape[-1]),
mode='bilinear',
align_corners=False,
)
del outputs
processed_results = []
for mask_cls_result, mask_pred_result, input_per_image, image_size in zip(
mask_cls_results, mask_pred_results, batched_inputs,
images.image_sizes):
height = input_per_image.get('height', image_size[0])
width = input_per_image.get('width', image_size[1])
processed_results.append({}) # for each image
if self.sem_seg_postprocess_before_inference:
if not self.single_human:
mask_pred_result = self.sem_seg_postprocess(
mask_pred_result, image_size, height, width)
else:
mask_pred_result = self.single_human_sem_seg_postprocess(
mask_pred_result, image_size,
input_per_image['crop_box'], height, width)
mask_cls_result = mask_cls_result.to(mask_pred_result)
# semantic segmentation inference
if self.semantic_on:
r = self.semantic_inference(mask_cls_result,
mask_pred_result)
if not self.sem_seg_postprocess_before_inference:
if not self.single_human:
r = self.sem_seg_postprocess(
r, image_size, height, width)
else:
r = self.single_human_sem_seg_postprocess(
r, image_size, input_per_image['crop_box'],
height, width)
processed_results[-1]['sem_seg'] = r
# parsing inference
if self.parsing_on:
parsing_r = self.instance_parsing_inference(
mask_cls_result, mask_pred_result)
processed_results[-1]['parsing'] = parsing_r
return dict(eval_result=processed_results)
@property
def device(self):
return self.pixel_mean.device
def single_human_sem_seg_postprocess(self, result, img_size, crop_box,
output_height, output_width):
result = result[:, :img_size[0], :img_size[1]]
result = result[:, crop_box[1]:crop_box[3],
crop_box[0]:crop_box[2]].expand(1, -1, -1, -1)
result = F.interpolate(
result,
size=(output_height, output_width),
mode='bilinear',
align_corners=False)[0]
return result
def sem_seg_postprocess(self, result, img_size, output_height,
output_width):
result = result[:, :img_size[0], :img_size[1]].expand(1, -1, -1, -1)
result = F.interpolate(
result,
size=(output_height, output_width),
mode='bilinear',
align_corners=False)[0]
return result
def semantic_inference(self, mask_cls, mask_pred):
mask_cls = F.softmax(
mask_cls, dim=-1)[..., :-1] # discard non-sense category
mask_pred = mask_pred.sigmoid()
semseg = torch.einsum('qc,qhw->chw', mask_cls, mask_pred)
return semseg
def instance_parsing_inference(self, mask_cls, mask_pred):
scores = F.softmax(mask_cls, dim=-1)[:, :-1]
labels = torch.arange(
self.num_classes,
device=self.device).unsqueeze(0).repeat(self.num_queries,
1).flatten(0, 1)
scores_per_image, topk_indices = scores.flatten(0, 1).topk(
self.test_topk_per_image, sorted=False)
labels_per_image = labels[topk_indices]
topk_indices = topk_indices // self.num_classes
mask_pred = mask_pred[topk_indices]
binary_pred_masks = (mask_pred > 0).float()
mask_scores_per_image = (mask_pred.sigmoid().flatten(1) * binary_pred_masks.flatten(1)).sum(1) / \
(binary_pred_masks.flatten(1).sum(1) + 1e-6)
pred_scores = scores_per_image * mask_scores_per_image
pred_labels = labels_per_image
pred_masks = mask_pred
# prepare outputs
part_instance_res = []
human_instance_res = []
# bkg and part instances
bkg_part_index = torch.where(pred_labels != self.num_parsing)[0]
bkg_part_labels = pred_labels[bkg_part_index]
bkg_part_scores = pred_scores[bkg_part_index]
bkg_part_masks = pred_masks[bkg_part_index, :, :]
# human instances
human_index = torch.where(pred_labels == self.num_parsing)[0]
human_labels = pred_labels[human_index]
human_scores = pred_scores[human_index]
human_masks = pred_masks[human_index, :, :]
semantic_res = self.paste_instance_to_semseg_probs(
bkg_part_labels, bkg_part_scores, bkg_part_masks)
# part instances
part_index = torch.where(bkg_part_labels != 0)[0]
part_labels = bkg_part_labels[part_index]
part_scores = bkg_part_scores[part_index]
part_masks = bkg_part_masks[part_index, :, :]
# part instance results
for idx in range(part_labels.shape[0]):
if part_scores[idx] < 0.1:
continue
part_instance_res.append({
'category_id':
part_labels[idx].cpu().tolist(),
'score':
part_scores[idx].cpu().tolist(),
'mask':
part_masks[idx],
})
# human instance results
for human_idx in range(human_scores.shape[0]):
if human_scores[human_idx] > 0.1:
human_instance_res.append({
'category_id':
human_labels[human_idx].cpu().tolist(),
'score':
human_scores[human_idx].cpu().tolist(),
'mask':
human_masks[human_idx],
})
return {
'semantic_outputs': semantic_res,
'part_outputs': part_instance_res,
'human_outputs': human_instance_res,
}
def paste_instance_to_semseg_probs(self, labels, scores, mask_probs):
im_h, im_w = mask_probs.shape[-2:]
semseg_im = []
for cls_ind in range(self.num_parsing):
cate_inds = torch.where(labels == cls_ind)[0]
cate_scores = scores[cate_inds]
cate_mask_probs = mask_probs[cate_inds, :, :].sigmoid()
semseg_im.append(
self.paste_category_probs(cate_scores, cate_mask_probs, im_h,
im_w))
return torch.stack(semseg_im, dim=0)
def paste_category_probs(self, scores, mask_probs, h, w):
category_probs = torch.zeros((h, w),
dtype=torch.float32,
device=mask_probs.device)
paste_times = torch.zeros((h, w),
dtype=torch.float32,
device=mask_probs.device)
index = scores.argsort()
for k in range(len(index)):
if scores[index[k]] < self.parsing_ins_score_thr:
continue
ins_mask_probs = mask_probs[index[k], :, :] * scores[index[k]]
category_probs = torch.where(ins_mask_probs > 0.5,
ins_mask_probs + category_probs,
category_probs)
paste_times += torch.where(ins_mask_probs > 0.5, 1, 0)
paste_times = torch.where(paste_times == 0, paste_times + 1,
paste_times)
category_probs /= paste_times
return category_probs
class M2FPHead(nn.Module):
def __init__(self, pixel_decoder: nn.Module,
transformer_predictor: nn.Module):
super().__init__()
self.pixel_decoder = pixel_decoder
self.predictor = transformer_predictor
def forward(self, features, mask=None):
return self.layers(features, mask)
def layers(self, features, mask=None):
mask_features, transformer_encoder_features, multi_scale_features = self.pixel_decoder.forward_features(
features)
predictions = self.predictor(multi_scale_features, mask_features, mask)
return predictions

View File

@@ -0,0 +1,156 @@
# Part of the implementation is borrowed and modified from M2FP, made publicly available
# under the CC BY-NC 4.0 License at https://github.com/soeaver/M2FP
# Part of the implementation is borrowed and modified from Detectron2, made publicly available
# under the Apache-2.0 License at https://github.com/facebookresearch/detectron2
import copy
import numpy as np
import torch
import torch.nn.functional as F
from PIL import Image
def center_to_target_size_test(img, target_size):
src_h, src_w = img.shape[0], img.shape[1]
trg_h, trg_w = target_size[1], target_size[0]
new_h, new_w = 0, 0
tfm_list = []
if src_h > trg_h and src_w > trg_w:
if src_h > src_w:
new_h = trg_h
new_w = int(new_h * src_w / src_h)
if new_w > trg_w:
new_w = trg_w
new_h = int(new_w * src_h / src_w)
elif src_w > src_h:
new_w = trg_w
new_h = int(new_w * src_h / src_w)
if new_h > trg_h:
new_h = trg_h
new_w = int(new_h * src_w / src_h)
tfm_list.append(ResizeTransform(src_h, src_w, new_h, new_w))
tfm_list.append(PadTransform(new_h, new_w, trg_h, trg_w))
elif src_h > trg_h and src_w <= trg_w:
new_h = trg_h
new_w = int(new_h * src_w / src_h)
tfm_list.append(ResizeTransform(src_h, src_w, new_h, new_w))
tfm_list.append(PadTransform(new_h, new_w, trg_h, trg_w))
elif src_h <= trg_h and src_w > trg_w:
new_w = trg_w
new_h = int(new_w * src_h / src_w)
tfm_list.append(ResizeTransform(src_h, src_w, new_h, new_w))
tfm_list.append(PadTransform(new_h, new_w, trg_h, trg_w))
else:
new_h, new_w = src_h, src_w
tfm_list.append(PadTransform(new_h, new_w, trg_h, trg_w))
box = get_box(new_h, new_w, trg_h, trg_w)
new_img = copy.deepcopy(img)
for tfm in tfm_list:
new_img = tfm.apply_image(new_img)
return new_img, box
def get_box(src_h, src_w, trg_h, trg_w):
assert src_h <= trg_h, 'expect src_h <= trg_h'
assert src_w <= trg_w, 'expect src_w <= trg_w'
x0 = int((trg_w - src_w) / 2)
x1 = src_w + x0
y0 = int((trg_h - src_h) / 2)
y1 = src_h + y0
box = [x0, y0, x1, y1]
return box
class PadTransform:
def __init__(self, src_h, src_w, trg_h, trg_w):
super().__init__()
assert src_h <= trg_h, 'expect src_h <= trg_h'
assert src_w <= trg_w, 'expect src_w <= trg_w'
self.src_h, self.src_w = src_h, src_w
self.trg_h, self.trg_w = trg_h, trg_w
self.pad_left = int((trg_w - src_w) / 2)
self.pad_right = trg_w - src_w - self.pad_left
self.pad_top = int((trg_h - src_h) / 2)
self.pad_bottom = trg_h - src_h - self.pad_top
def apply_image(self, img, pad_value=128):
if self.pad_left == 0 and self.pad_top == 0:
return img
if len(img.shape) == 2:
return np.pad(
img, ((self.pad_top, self.pad_bottom),
(self.pad_left, self.pad_right)),
'constant',
constant_values=((pad_value, pad_value), (pad_value,
pad_value)))
elif len(img.shape) == 3:
return np.pad(
img, ((self.pad_top, self.pad_bottom),
(self.pad_left, self.pad_right), (0, 0)),
'constant',
constant_values=((pad_value, pad_value),
(pad_value, pad_value), (pad_value,
pad_value)))
class ResizeTransform:
def __init__(self, h, w, new_h, new_w, interp=None):
super().__init__()
if interp is None:
interp = Image.BILINEAR
self.h, self.w = h, w
self.new_h, self.new_w = new_h, new_w
self.interp = interp
def apply_image(self, img, interp=None):
assert img.shape[:2] == (self.h, self.w)
assert len(img.shape) <= 4
interp_method = interp if interp is not None else self.interp
if img.dtype == np.uint8:
if len(img.shape) > 2 and img.shape[2] == 1:
pil_image = Image.fromarray(img[:, :, 0], mode='L')
else:
pil_image = Image.fromarray(img)
pil_image = pil_image.resize((self.new_w, self.new_h),
interp_method)
ret = np.asarray(pil_image)
if len(img.shape) > 2 and img.shape[2] == 1:
ret = np.expand_dims(ret, -1)
else:
# PIL only supports uint8
if any(x < 0 for x in img.strides):
img = np.ascontiguousarray(img)
img = torch.from_numpy(img)
shape = list(img.shape)
shape_4d = shape[:2] + [1] * (4 - len(shape)) + shape[2:]
img = img.view(shape_4d).permute(2, 3, 0, 1) # hw(c) -> nchw
_PIL_RESIZE_TO_INTERPOLATE_MODE = {
Image.NEAREST: 'nearest',
Image.BILINEAR: 'bilinear',
Image.BICUBIC: 'bicubic',
}
mode = _PIL_RESIZE_TO_INTERPOLATE_MODE[interp_method]
align_corners = None if mode == 'nearest' else False
img = F.interpolate(
img, (self.new_h, self.new_w),
mode=mode,
align_corners=align_corners)
shape[:2] = (self.new_h, self.new_w)
ret = img.permute(2, 3, 0, 1).view(shape).numpy() # nchw -> hw(c)
return ret

View File

@@ -89,7 +89,7 @@ class CascadeMaskRCNNSwin(nn.Module):
model_path = os.path.join(kwargs['model_dir'],
ModelFile.TORCH_MODEL_FILE)
logger.info(f'loading model from {model_path}')
weight = torch.load(model_path)['state_dict']
weight = torch.load(model_path, map_location='cpu')['state_dict']
tgt_weight = self.state_dict()
for name in list(weight.keys()):
if name in tgt_weight:

View File

@@ -61,7 +61,7 @@ class MaskDINOSwin(nn.Module):
model_path = os.path.join(kwargs['model_dir'],
ModelFile.TORCH_MODEL_FILE)
logger.info(f'loading model from {model_path}')
weight = torch.load(model_path)['model']
weight = torch.load(model_path, map_location='cpu')['model']
tgt_weight = self.state_dict()
for name in list(weight.keys()):
if name in tgt_weight:

View File

@@ -105,6 +105,7 @@ if TYPE_CHECKING:
from .image_quality_assessment_mos_pipeline import ImageQualityAssessmentMosPipeline
from .bad_image_detecting_pipeline import BadImageDetecingPipeline
from .mobile_image_super_resolution_pipeline import MobileImageSuperResolutionPipeline
from .image_human_parsing_pipeline import ImageHumanParsingPipeline
from .nerf_recon_acc_pipeline import NeRFReconAccPipeline
else:
@@ -257,6 +258,7 @@ else:
'MobileImageSuperResolutionPipeline'
],
'bad_image_detecting_pipeline': ['BadImageDetecingPipeline'],
'image_human_parsing_pipeline': ['ImageHumanParsingPipeline'],
'nerf_recon_acc_pipeline': ['NeRFReconAccPipeline'],
}

View File

@@ -0,0 +1,126 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
from typing import Any, Dict, Optional, Union
import numpy as np
import torch
import torchvision.transforms as T
from modelscope.metainfo import Pipelines
from modelscope.models.cv.image_human_parsing import (
M2FP, center_to_target_size_test)
from modelscope.outputs import OutputKeys
from modelscope.pipelines.base import Input, Pipeline
from modelscope.pipelines.builder import PIPELINES
from modelscope.preprocessors import LoadImage
from modelscope.utils.constant import Tasks
from modelscope.utils.logger import get_logger
logger = get_logger()
@PIPELINES.register_module(
Tasks.image_segmentation, module_name=Pipelines.image_human_parsing)
class ImageHumanParsingPipeline(Pipeline):
def __init__(self,
model: Union[M2FP, str],
preprocessor: Optional = None,
**kwargs):
"""use `model` and `preprocessor` to create an image human parsing
pipeline for prediction
Args:
model (M2FPModel | str): a model instance
preprocessor (None): a preprocessor instance
"""
super().__init__(model=model, preprocessor=preprocessor, **kwargs)
self.model.eval()
def _get_preprocess_shape(self, oldh, oldw, short_edge_length, max_size):
h, w = oldh, oldw
size = short_edge_length * 1.0
scale = size / min(h, w)
if h < w:
newh, neww = size, scale * w
else:
newh, neww = scale * h, size
if max(newh, neww) > max_size:
scale = max_size * 1.0 / max(newh, neww)
newh = newh * scale
neww = neww * scale
neww = int(neww + 0.5)
newh = int(newh + 0.5)
return (newh, neww)
def preprocess(self,
input: Input,
min_size=640,
max_size=1333) -> Dict[str, Any]:
image = LoadImage.convert_to_img(input)
w, h = image.size[:2]
dataset_dict = {'width': w, 'height': h}
if self.model.single_human:
image = np.asarray(image)
image, crop_box = center_to_target_size_test(
image, self.model.input_single_human['sizes'][0])
dataset_dict['image'] = torch.as_tensor(
np.ascontiguousarray(image.transpose(2, 0, 1)))
dataset_dict['crop_box'] = crop_box
else:
new_h, new_w = self._get_preprocess_shape(h, w, min_size, max_size)
test_transforms = T.Compose([
T.Resize((new_h, new_w)),
T.ToTensor(),
])
image = test_transforms(image)
dataset_dict['image'] = image * 255.
result = {'batched_inputs': [dataset_dict]}
return result
def forward(self, input: Dict[str, Any],
**forward_params) -> Dict[str, Any]:
with torch.no_grad():
output = self.model(input)
return output
def postprocess(self,
inputs: Dict[str, Any],
score_thr=0.0) -> Dict[str, Any]:
predictions = inputs['eval_result'][0]
class_names = self.model.classes
results_dict = {
OutputKeys.MASKS: [],
OutputKeys.LABELS: [],
OutputKeys.SCORES: []
}
if 'sem_seg' in predictions:
semantic_pred = predictions['sem_seg']
semantic_seg = semantic_pred.argmax(dim=0).detach().cpu().numpy()
semantic_pred = semantic_pred.sigmoid().detach().cpu().numpy()
class_ids = np.unique(semantic_seg)
for class_id in class_ids:
label = class_names[class_id]
mask = np.array(semantic_seg == class_id, dtype=np.float64)
score = (mask * semantic_pred[class_id]).sum() / (
mask.sum() + 1)
results_dict[OutputKeys.SCORES].append(score)
results_dict[OutputKeys.LABELS].append(label)
results_dict[OutputKeys.MASKS].append(mask)
elif 'parsing' in predictions:
parsing_res = predictions['parsing']
part_outputs = parsing_res['part_outputs']
human_outputs = parsing_res['human_outputs']
# process semantic_outputs
for output in part_outputs + human_outputs:
score = output['score']
label = class_names[output['category_id']]
mask = (output['mask'] > 0).float().detach().cpu().numpy()
if score > score_thr:
results_dict[OutputKeys.SCORES].append(score)
results_dict[OutputKeys.LABELS].append(label)
results_dict[OutputKeys.MASKS].append(mask)
else:
raise NotImplementedError
return results_dict

View File

@@ -0,0 +1,48 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import unittest
from modelscope.models import Model
from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.demo_utils import DemoCompatibilityCheck
from modelscope.utils.test_utils import test_level
class ImageHumanParsingTest(unittest.TestCase, DemoCompatibilityCheck):
def setUp(self) -> None:
self.task = Tasks.image_segmentation
self.model_id_single = 'damo/cv_resnet101_image-single-human-parsing'
self.model_id_multiple = 'damo/cv_resnet101_image-multiple-human-parsing'
image_single = 'data/test/images/image_single_human_parsing.jpg'
image_multiple = 'data/test/images/image_multiple_human_parsing.jpg'
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_with_model_name(self):
pipeline_parsing = pipeline(
task=Tasks.image_segmentation, model=self.model_id_single)
print(pipeline_parsing(input=self.image_single)[OutputKeys.LABELS])
pipeline_parsing = pipeline(
task=Tasks.image_segmentation, model=self.model_id_multiple)
print(pipeline_parsing(input=self.image_multiple)[OutputKeys.LABELS])
@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_run_with_model_from_modelhub(self):
model = Model.from_pretrained(self.model_id_single)
pipeline_parsing = pipeline(
task=Tasks.image_segmentation, model=model, preprocessor=None)
print(pipeline_parsing(input=self.image_single)[OutputKeys.LABELS])
model = Model.from_pretrained(self.model_id_multiple)
pipeline_parsing = pipeline(
task=Tasks.image_segmentation, model=model, preprocessor=None)
print(pipeline_parsing(input=self.image_multiple)[OutputKeys.LABELS])
@unittest.skip('demo compatibility test is only enabled on a needed-basis')
def test_demo_compatibility(self):
self.compatibility_check()
if __name__ == '__main__':
unittest.main()