Merge branch 'master' of gitlab.alibaba-inc.com:Ali-MaaS/MaaS-lib into release/1.2

This commit is contained in:
wenmeng.zwm
2023-01-13 10:10:14 +08:00
10 changed files with 377 additions and 2 deletions

View File

@@ -52,6 +52,7 @@ class Models(object):
mogface = 'mogface'
mtcnn = 'mtcnn'
ulfd = 'ulfd'
rts = 'rts'
flir = 'flir'
arcface = 'arcface'
facemask = 'facemask'
@@ -243,6 +244,7 @@ class Pipelines(object):
realtime_object_detection = 'cspnet_realtime-object-detection_yolox'
realtime_video_object_detection = 'cspnet_realtime-video-object-detection_streamyolo'
face_recognition = 'ir101-face-recognition-cfglint'
face_recognition_ood = 'ir-face-recognition-ood-rts'
arc_face_recognition = 'ir50-face-recognition-arcface'
mask_face_recognition = 'resnet-face-recognition-facemask'
image_instance_segmentation = 'cascade-mask-rcnn-swin-image-instance-segmentation'

View File

@@ -0,0 +1,15 @@
# Copyright 2021-2022 The Alibaba Fundamental Vision Team Authors. All rights reserved.
from typing import TYPE_CHECKING
from modelscope.utils.import_utils import LazyImportModule
if TYPE_CHECKING:
from .rts_backbone import RTSBackbone
else:
_import_structure = {'rts_backbone': ['RTSBackbone']}
import sys
sys.modules[__name__] = LazyImportModule(
__name__,
globals()['__file__'],
_import_structure,
module_spec=__spec__)

View File

@@ -0,0 +1,223 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import os
from collections import namedtuple
from math import lgamma
import torch
import torch.nn as nn
from torch.nn import (AdaptiveAvgPool2d, BatchNorm1d, BatchNorm2d, Conv2d,
Dropout, Linear, MaxPool2d, Module, PReLU, ReLU,
Sequential, Sigmoid)
from torch.nn.modules.flatten import Flatten
from modelscope.models import MODELS
from modelscope.models.base import TorchModel
from modelscope.utils.constant import ModelFile
from modelscope.utils.logger import get_logger
logger = get_logger()
@MODELS.register_module('face-recognition-ood', 'rts-backbone')
class RTSBackbone(TorchModel):
def __init__(self, *args, **kwargs):
super(RTSBackbone, self).__init__()
# model initialization
self.alpha = kwargs.get('alpha')
self.rts_plus = kwargs.get('rts_plus')
resnet = Backbone([112, 112], 64, mode='ir_se')
self.features = nn.Sequential(
resnet.input_layer, resnet.body,
Sequential(
BatchNorm2d(512),
Dropout(),
Flatten(),
))
self.features_backbone = nn.Sequential(
Linear(512 * 7 * 7, 512),
BatchNorm1d(512),
)
self.logvar_rts_backbone = nn.Sequential(
Linear(512 * 7 * 7, 1),
BatchNorm1d(1),
)
self.logvar_rts_plus_backbone = nn.Sequential(
Linear(512 * 7 * 7, self.alpha),
BatchNorm1d(self.alpha),
)
def forward(self, img):
x = self.features(img)
image_features = self.features_backbone(x)
if not self.rts_plus:
logvar = self.logvar_rts_backbone(x)
else:
logvar = self.logvar_rts_plus_backbone(x)
return image_features, logvar
@classmethod
def _instantiate(cls, **kwargs):
model_file = kwargs.get('am_model_name', ModelFile.TORCH_MODEL_FILE)
ckpt_path = os.path.join(kwargs['model_dir'], model_file)
logger.info(f'loading model from {ckpt_path}')
model_dir = kwargs.pop('model_dir')
model = cls(**kwargs)
ckpt_path = os.path.join(model_dir, model_file)
model.load_state_dict(torch.load(ckpt_path, map_location='cpu'))
return model
def l2_norm(input, axis=1):
norm = torch.norm(input, 2, axis, True)
output = torch.div(input, norm)
return output
class SEModule(Module):
def __init__(self, channels, reduction):
super(SEModule, self).__init__()
self.avg_pool = AdaptiveAvgPool2d(1)
self.fc1 = Conv2d(
channels,
channels // reduction,
kernel_size=1,
padding=0,
bias=False)
nn.init.xavier_uniform_(self.fc1.weight.data)
self.relu = ReLU(inplace=True)
self.fc2 = Conv2d(
channels // reduction,
channels,
kernel_size=1,
padding=0,
bias=False)
self.sigmoid = Sigmoid()
def forward(self, x):
module_input = x
x = self.avg_pool(x)
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
x = self.sigmoid(x)
return module_input * x
class bottleneck_IR_SE(Module):
def __init__(self, in_channel, depth, stride):
super(bottleneck_IR_SE, self).__init__()
if in_channel == depth:
self.shortcut_layer = MaxPool2d(1, stride)
else:
self.shortcut_layer = Sequential(
Conv2d(in_channel, depth, (1, 1), stride, bias=False),
BatchNorm2d(depth))
self.res_layer = Sequential(
BatchNorm2d(in_channel),
Conv2d(in_channel, depth, (3, 3), (1, 1), 1, bias=False),
PReLU(depth), Conv2d(depth, depth, (3, 3), stride, 1, bias=False),
BatchNorm2d(depth), SEModule(depth, 16))
def forward(self, x):
shortcut = self.shortcut_layer(x)
res = self.res_layer(x)
return res + shortcut
class Bottleneck(namedtuple('Block', ['in_channel', 'depth', 'stride'])):
'''A named tuple describing a ResNet block.'''
def get_block(in_channel, depth, num_units, stride=2):
return [Bottleneck(in_channel, depth, stride)
] + [Bottleneck(depth, depth, 1) for i in range(num_units - 1)]
def get_blocks(num_layers):
if num_layers == 50:
blocks = [
get_block(in_channel=64, depth=64, num_units=3),
get_block(in_channel=64, depth=128, num_units=4),
get_block(in_channel=128, depth=256, num_units=14),
get_block(in_channel=256, depth=512, num_units=3)
]
elif num_layers == 64:
blocks = [
get_block(in_channel=64, depth=64, num_units=3),
get_block(in_channel=64, depth=128, num_units=8),
get_block(in_channel=128, depth=256, num_units=16),
get_block(in_channel=256, depth=512, num_units=3)
]
elif num_layers == 100:
blocks = [
get_block(in_channel=64, depth=64, num_units=3),
get_block(in_channel=64, depth=128, num_units=13),
get_block(in_channel=128, depth=256, num_units=30),
get_block(in_channel=256, depth=512, num_units=3)
]
elif num_layers == 152:
blocks = [
get_block(in_channel=64, depth=64, num_units=3),
get_block(in_channel=64, depth=128, num_units=8),
get_block(in_channel=128, depth=256, num_units=36),
get_block(in_channel=256, depth=512, num_units=3)
]
return blocks
class Backbone(Module):
def __init__(self, input_size, num_layers, mode='ir'):
super(Backbone, self).__init__()
assert input_size[0] in [
112, 224
], 'input_size should be [112, 112] or [224, 224]'
assert num_layers in [50, 64, 100,
152], 'num_layers should be 50, 64, 100 or 152'
assert mode in ['ir', 'ir_se'], 'mode should be ir or ir_se'
blocks = get_blocks(num_layers)
if mode == 'ir':
unit_module = bottleneck_IR
elif mode == 'ir_se':
unit_module = bottleneck_IR_SE
self.input_layer = Sequential(
Conv2d(3, 64, (3, 3), 1, 1, bias=False), BatchNorm2d(64),
PReLU(64))
if input_size[0] == 112:
self.output_layer = Sequential(
BatchNorm2d(512), Dropout(), Flatten(),
Linear(512 * 7 * 7, 512), BatchNorm1d(512))
else:
self.output_layer = Sequential(
BatchNorm2d(512), Dropout(), Flatten(),
Linear(512 * 14 * 14, 512), BatchNorm1d(512))
modules = []
for block in blocks:
for bottleneck in block:
modules.append(
unit_module(bottleneck.in_channel, bottleneck.depth,
bottleneck.stride))
self.body = Sequential(*modules)
def forward(self, x):
x = self.input_layer(x)
x = self.body(x)
x = self.output_layer(x)
return x

View File

@@ -193,6 +193,13 @@ TASK_OUTPUTS = {
# }
Tasks.face_recognition: [OutputKeys.IMG_EMBEDDING],
# face recognition ood result for single sample
# {
# "img_embedding": np.array with shape [1, D],
# "ood_score ": [0.95]
# }
Tasks.face_recognition_ood: [OutputKeys.IMG_EMBEDDING, OutputKeys.SCORES],
# human detection result for single sample
# {
# "scores": [0.9, 0.1, 0.05, 0.05]
@@ -723,6 +730,10 @@ TASK_OUTPUTS = {
# { "text": "每一天都要快乐喔"}
Tasks.auto_speech_recognition: [OutputKeys.TEXT],
# itn result for single sample
# {"text": "123"}
Tasks.inverse_text_processing: [OutputKeys.TEXT],
# speaker verification for single compare task
# {'score': 84.2332}
Tasks.speaker_verification: [OutputKeys.SCORES],

View File

@@ -44,14 +44,16 @@ class InverseTextProcessingPipeline(Pipeline):
super().__init__(model=model, **kwargs)
self.model_cfg = self.model.forward()
def __call__(self, text_in: str = None) -> str:
def __call__(self, text_in: str = None) -> Dict[str, Any]:
if len(text_in) == 0:
raise ValueError('The input of ITN should not be null.')
else:
self.text_in = text_in
output = {}
itn_result = self.forward(self.text_in)
output['text'] = itn_result
output = self.forward(self.text_in)
return output
def postprocess(self, inputs: Dict[str, Any]) -> Dict[str, Any]:

View File

@@ -18,6 +18,7 @@ if TYPE_CHECKING:
from .face_detection_pipeline import FaceDetectionPipeline
from .face_image_generation_pipeline import FaceImageGenerationPipeline
from .face_recognition_pipeline import FaceRecognitionPipeline
from .face_recognition_ood_pipeline import FaceRecognitionOodPipeline
from .arc_face_recognition_pipeline import ArcFaceRecognitionPipeline
from .mask_face_recognition_pipeline import MaskFaceRecognitionPipeline
from .general_recognition_pipeline import GeneralRecognitionPipeline
@@ -101,6 +102,7 @@ else:
'face_detection_pipeline': ['FaceDetectionPipeline'],
'face_image_generation_pipeline': ['FaceImageGenerationPipeline'],
'face_recognition_pipeline': ['FaceRecognitionPipeline'],
'face_recognition_ood_pipeline': ['FaceRecognitionOodPipeline'],
'arc_face_recognition_pipeline': ['ArcFaceRecognitionPipeline'],
'mask_face_recognition_pipeline': ['MaskFaceRecognitionPipeline'],
'general_recognition_pipeline': ['GeneralRecognitionPipeline'],

View File

@@ -0,0 +1,73 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import os.path as osp
from typing import Any, Dict
import cv2
import numpy as np
import PIL
import torch
from modelscope.metainfo import Pipelines
from modelscope.models import Model
from modelscope.models.cv.face_recognition.align_face import align_face
from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.pipelines.base import Input, Pipeline
from modelscope.pipelines.builder import PIPELINES
from modelscope.preprocessors import LoadImage
from modelscope.utils.constant import ModelFile, Tasks
from modelscope.utils.logger import get_logger
from . import FaceProcessingBasePipeline
logger = get_logger()
@PIPELINES.register_module(
Tasks.face_recognition_ood, module_name=Pipelines.face_recognition_ood)
class FaceRecognitionOodPipeline(FaceProcessingBasePipeline):
def __init__(self, model: str, **kwargs):
"""
use `model` to create a face recognition ood pipeline for prediction
Args:
model: model id on modelscope hub.
Example:
```python
>>> from modelscope.pipelines import pipeline
>>> fr_ood= pipeline('face-recognition-ood', 'damo/cv_ir_face-recognition-ood_rts')
>>> fr_ood("https://modelscope.oss-cn-beijing.aliyuncs.com/test/images/face_recognition_1.png")
{{'img_embedding': array([[ 0.02276129, -0.00761525, ...,0.05735306]],
dtype=float32, 'scores': [[0.7656678557395935]]}
```
"""
# face recong model
super().__init__(model=model, **kwargs)
face_model = self.model
face_model = face_model.to(self.device)
face_model.eval()
self.face_model = face_model
logger.info('face recognition model loaded!')
def preprocess(self, input: Input) -> Dict[str, Any]:
result = super().preprocess(input)
align_img = result['img']
face_img = align_img[:, :, ::-1] # to rgb
face_img = np.transpose(face_img, axes=(2, 0, 1))
face_img = (face_img / 255. - 0.5) / 0.5
face_img = face_img.astype(np.float32)
result['img'] = face_img
return result
def forward(self, input: Dict[str, Any]) -> Dict[str, Any]:
assert input['img'] is not None
img = input['img'].unsqueeze(0)
output = self.face_model(img)
emb = output[0].detach().cpu().numpy()
emb /= np.sqrt(np.sum(emb**2, -1, keepdims=True)) # l2 norm
scores = output[1].exp().detach().cpu().numpy().tolist()
return {OutputKeys.IMG_EMBEDDING: emb, OutputKeys.SCORES: scores}
def postprocess(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
return inputs

View File

@@ -288,6 +288,8 @@ class AstScaning(object):
for node in nodes:
if type(node).__name__ == 'Str':
result.append((node.s, None))
elif type(node).__name__ == 'Constant':
result.append((node.value, None))
else:
result.append(_get_attribute_item(node))
return result

View File

@@ -25,6 +25,7 @@ class CVTasks(object):
face_liveness = 'face-liveness'
card_detection = 'card-detection'
face_recognition = 'face-recognition'
face_recognition_ood = 'face-recognition-ood'
facial_expression_recognition = 'facial-expression-recognition'
facial_landmark_confidence = 'facial-landmark-confidence'
face_processing_base = 'face-processing-base'

View File

@@ -0,0 +1,44 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import unittest
import numpy as np
from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
from modelscope.utils.demo_utils import DemoCompatibilityCheck
from modelscope.utils.test_utils import test_level
class FaceRecognitionOodTest(unittest.TestCase, DemoCompatibilityCheck):
def setUp(self) -> None:
self.task = Tasks.face_recognition_ood
self.model_id = 'damo/cv_ir_face-recognition-ood_rts'
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_face_compare(self):
img1 = 'data/test/images/face_recognition_1.png'
img2 = 'data/test/images/face_recognition_2.png'
face_recognition = pipeline(
Tasks.face_recognition_ood, model=self.model_id)
result1 = face_recognition(img1)
emb1 = result1[OutputKeys.IMG_EMBEDDING]
score1 = result1[OutputKeys.SCORES][0][0]
result2 = face_recognition(img2)
emb2 = result2[OutputKeys.IMG_EMBEDDING]
score2 = result2[OutputKeys.SCORES][0][0]
sim = np.dot(emb1[0], emb2[0])
print(f'Cos similarity={sim:.3f}, img1:{img1} img2:{img2}')
print(f'OOD score: img1:{score1:.3f} img2:{score2:.3f}')
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_demo_compatibility(self):
self.compatibility_check()
if __name__ == '__main__':
unittest.main()