From ffeeae82c60e90356d3c2d7b387cec3d1c321048 Mon Sep 17 00:00:00 2001 From: ly261666 Date: Mon, 9 Jan 2023 09:31:57 +0800 Subject: [PATCH] [to #42322933] remove dup contents Link: https://code.alibaba-inc.com/Ali-MaaS/MaaS-lib/codereview/11275055 --- .../cv/arc_face_recognition_pipeline.py | 2 +- .../cv/face_attribute_recognition_pipeline.py | 72 +---------------- .../pipelines/cv/face_liveness_ir_pipeline.py | 2 +- .../cv/face_processing_base_pipeline.py | 9 ++- .../pipelines/cv/face_recognition_pipeline.py | 79 ++----------------- .../facial_expression_recognition_pipeline.py | 73 +---------------- .../cv/facial_landmark_confidence_pipeline.py | 3 +- .../cv/mask_face_recognition_pipeline.py | 78 ++---------------- 8 files changed, 31 insertions(+), 287 deletions(-) diff --git a/modelscope/pipelines/cv/arc_face_recognition_pipeline.py b/modelscope/pipelines/cv/arc_face_recognition_pipeline.py index 241dd39f..72ebffc8 100644 --- a/modelscope/pipelines/cv/arc_face_recognition_pipeline.py +++ b/modelscope/pipelines/cv/arc_face_recognition_pipeline.py @@ -47,7 +47,7 @@ class ArcFaceRecognitionPipeline(FaceProcessingBasePipeline): logger.info('face recognition model loaded!') def preprocess(self, input: Input) -> Dict[str, Any]: - result = super(ArcFaceRecognitionPipeline, self).preprocess(input) + result = super().preprocess(input) align_img = result['img'] face_img = align_img[:, :, ::-1] # to rgb face_img = np.transpose(face_img, axes=(2, 0, 1)) diff --git a/modelscope/pipelines/cv/face_attribute_recognition_pipeline.py b/modelscope/pipelines/cv/face_attribute_recognition_pipeline.py index ddf3bc5d..f7645aa5 100644 --- a/modelscope/pipelines/cv/face_attribute_recognition_pipeline.py +++ b/modelscope/pipelines/cv/face_attribute_recognition_pipeline.py @@ -18,6 +18,7 @@ from modelscope.pipelines.builder import PIPELINES from modelscope.preprocessors import LoadImage from modelscope.utils.constant import ModelFile, Tasks from modelscope.utils.logger import get_logger +from . import FaceProcessingBasePipeline logger = get_logger() @@ -25,7 +26,7 @@ logger = get_logger() @PIPELINES.register_module( Tasks.face_attribute_recognition, module_name=Pipelines.face_attribute_recognition) -class FaceAttributeRecognitionPipeline(Pipeline): +class FaceAttributeRecognitionPipeline(FaceProcessingBasePipeline): def __init__(self, model: str, **kwargs): """ @@ -44,82 +45,15 @@ class FaceAttributeRecognitionPipeline(Pipeline): self.device = device logger.info('load model done') - # face detect pipeline - det_model_id = 'damo/cv_resnet50_face-detection_retinaface' male_list = ['Male', 'Female'] age_list = [ '0-2', '3-9', '10-19', '20-29', '30-39', '40-49', '50-59', '60-69', '70+' ] self.map_list = [male_list, age_list] - self.face_detection = pipeline( - Tasks.face_detection, model=det_model_id) - - def _choose_face(self, - det_result, - min_face=10, - top_face=1, - center_face=False): - ''' - choose face with maximum area - Args: - det_result: output of face detection pipeline - min_face: minimum size of valid face w/h - top_face: take faces with top max areas - center_face: choose the most centerd face from multi faces, only valid if top_face > 1 - ''' - bboxes = np.array(det_result[OutputKeys.BOXES]) - landmarks = np.array(det_result[OutputKeys.KEYPOINTS]) - if bboxes.shape[0] == 0: - logger.info('Warning: No face detected!') - return None - # face idx with enough size - face_idx = [] - for i in range(bboxes.shape[0]): - box = bboxes[i] - if (box[2] - box[0]) >= min_face and (box[3] - box[1]) >= min_face: - face_idx += [i] - if len(face_idx) == 0: - logger.info( - f'Warning: Face size not enough, less than {min_face}x{min_face}!' - ) - return None - bboxes = bboxes[face_idx] - landmarks = landmarks[face_idx] - # find max faces - boxes = np.array(bboxes) - area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) - sort_idx = np.argsort(area)[-top_face:] - # find center face - if top_face > 1 and center_face and bboxes.shape[0] > 1: - img_center = [img.shape[1] // 2, img.shape[0] // 2] - min_dist = float('inf') - sel_idx = -1 - for _idx in sort_idx: - box = boxes[_idx] - dist = np.square( - np.abs((box[0] + box[2]) / 2 - img_center[0])) + np.square( - np.abs((box[1] + box[3]) / 2 - img_center[1])) - if dist < min_dist: - min_dist = dist - sel_idx = _idx - sort_idx = [sel_idx] - main_idx = sort_idx[-1] - return bboxes[main_idx], landmarks[main_idx] def preprocess(self, input: Input) -> Dict[str, Any]: - img = LoadImage.convert_to_ndarray(input) - img = img[:, :, ::-1] - det_result = self.face_detection(img.copy()) - rtn = self._choose_face(det_result) - face_img = None - if rtn is not None: - _, face_lmks = rtn - face_lmks = face_lmks.reshape(5, 2) - face_img, _ = align_face(img, (112, 112), face_lmks) - face_img = face_img.astype(np.float32) - result = {} - result['img'] = face_img + result = super().preprocess(input) return result def forward(self, input: Dict[str, Any]) -> Dict[str, Any]: diff --git a/modelscope/pipelines/cv/face_liveness_ir_pipeline.py b/modelscope/pipelines/cv/face_liveness_ir_pipeline.py index 4383439b..a54d4577 100644 --- a/modelscope/pipelines/cv/face_liveness_ir_pipeline.py +++ b/modelscope/pipelines/cv/face_liveness_ir_pipeline.py @@ -56,7 +56,7 @@ class FaceLivenessIrPipeline(FaceProcessingBasePipeline): def preprocess(self, input: Input) -> Dict[str, Any]: - result = super(FaceLivenessIrPipeline, self).preprocess(input) + result = super().preprocess(input) orig_img = LoadImage.convert_to_ndarray(input) orig_img = orig_img[:, :, ::-1] img = super(FaceLivenessIrPipeline, diff --git a/modelscope/pipelines/cv/face_processing_base_pipeline.py b/modelscope/pipelines/cv/face_processing_base_pipeline.py index e37d9663..bb6f0397 100644 --- a/modelscope/pipelines/cv/face_processing_base_pipeline.py +++ b/modelscope/pipelines/cv/face_processing_base_pipeline.py @@ -40,7 +40,8 @@ class FaceProcessingBasePipeline(Pipeline): det_result, min_face=10, top_face=1, - center_face=False): + center_face=False, + img_shape=None): ''' choose face with maximum area Args: @@ -74,8 +75,8 @@ class FaceProcessingBasePipeline(Pipeline): area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) sort_idx = np.argsort(area)[-top_face:] # find center face - if top_face > 1 and center_face and bboxes.shape[0] > 1: - img_center = [img.shape[1] // 2, img.shape[0] // 2] + if top_face > 1 and center_face and bboxes.shape[0] > 1 and img_shape: + img_center = [img_shape[1] // 2, img_shape[0] // 2] min_dist = float('inf') sel_idx = -1 for _idx in sort_idx: @@ -94,7 +95,7 @@ class FaceProcessingBasePipeline(Pipeline): img = LoadImage.convert_to_ndarray(input) img = img[:, :, ::-1] det_result = self.face_detection(img.copy()) - rtn = self._choose_face(det_result) + rtn = self._choose_face(det_result, img_shape=img.shape) if rtn is not None: scores, bboxes, face_lmks = rtn face_lmks = face_lmks.reshape(5, 2) diff --git a/modelscope/pipelines/cv/face_recognition_pipeline.py b/modelscope/pipelines/cv/face_recognition_pipeline.py index 873e4a1f..4af5a04f 100644 --- a/modelscope/pipelines/cv/face_recognition_pipeline.py +++ b/modelscope/pipelines/cv/face_recognition_pipeline.py @@ -17,13 +17,14 @@ from modelscope.pipelines.builder import PIPELINES from modelscope.preprocessors import LoadImage from modelscope.utils.constant import ModelFile, Tasks from modelscope.utils.logger import get_logger +from . import FaceProcessingBasePipeline logger = get_logger() @PIPELINES.register_module( Tasks.face_recognition, module_name=Pipelines.face_recognition) -class FaceRecognitionPipeline(Pipeline): +class FaceRecognitionPipeline(FaceProcessingBasePipeline): def __init__(self, model: str, **kwargs): """ @@ -46,78 +47,14 @@ class FaceRecognitionPipeline(Pipeline): face_model.eval() self.face_model = face_model logger.info('face recognition model loaded!') - # face detect pipeline - det_model_id = 'damo/cv_resnet_facedetection_scrfd10gkps' - self.face_detection = pipeline( - Tasks.face_detection, model=det_model_id) - - def _choose_face(self, - det_result, - min_face=10, - top_face=1, - center_face=False): - ''' - choose face with maximum area - Args: - det_result: output of face detection pipeline - min_face: minimum size of valid face w/h - top_face: take faces with top max areas - center_face: choose the most centerd face from multi faces, only valid if top_face > 1 - ''' - bboxes = np.array(det_result[OutputKeys.BOXES]) - landmarks = np.array(det_result[OutputKeys.KEYPOINTS]) - # scores = np.array(det_result[OutputKeys.SCORES]) - if bboxes.shape[0] == 0: - logger.info('No face detected!') - return None - # face idx with enough size - face_idx = [] - for i in range(bboxes.shape[0]): - box = bboxes[i] - if (box[2] - box[0]) >= min_face and (box[3] - box[1]) >= min_face: - face_idx += [i] - if len(face_idx) == 0: - logger.info( - f'Face size not enough, less than {min_face}x{min_face}!') - return None - bboxes = bboxes[face_idx] - landmarks = landmarks[face_idx] - # find max faces - boxes = np.array(bboxes) - area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) - sort_idx = np.argsort(area)[-top_face:] - # find center face - if top_face > 1 and center_face and bboxes.shape[0] > 1: - img_center = [img.shape[1] // 2, img.shape[0] // 2] - min_dist = float('inf') - sel_idx = -1 - for _idx in sort_idx: - box = boxes[_idx] - dist = np.square( - np.abs((box[0] + box[2]) / 2 - img_center[0])) + np.square( - np.abs((box[1] + box[3]) / 2 - img_center[1])) - if dist < min_dist: - min_dist = dist - sel_idx = _idx - sort_idx = [sel_idx] - main_idx = sort_idx[-1] - return bboxes[main_idx], landmarks[main_idx] def preprocess(self, input: Input) -> Dict[str, Any]: - img = LoadImage.convert_to_ndarray(input) - img = img[:, :, ::-1] - det_result = self.face_detection(img.copy()) - rtn = self._choose_face(det_result) - face_img = None - if rtn is not None: - _, face_lmks = rtn - face_lmks = face_lmks.reshape(5, 2) - align_img, _ = align_face(img, (112, 112), face_lmks) - face_img = align_img[:, :, ::-1] # to rgb - face_img = np.transpose(face_img, axes=(2, 0, 1)) - face_img = (face_img / 255. - 0.5) / 0.5 - face_img = face_img.astype(np.float32) - result = {} + result = super().preprocess(input) + align_img = result['img'] + face_img = align_img[:, :, ::-1] # to rgb + face_img = np.transpose(face_img, axes=(2, 0, 1)) + face_img = (face_img / 255. - 0.5) / 0.5 + face_img = face_img.astype(np.float32) result['img'] = face_img return result diff --git a/modelscope/pipelines/cv/facial_expression_recognition_pipeline.py b/modelscope/pipelines/cv/facial_expression_recognition_pipeline.py index 3c85ae62..d7e617f8 100644 --- a/modelscope/pipelines/cv/facial_expression_recognition_pipeline.py +++ b/modelscope/pipelines/cv/facial_expression_recognition_pipeline.py @@ -18,6 +18,7 @@ from modelscope.pipelines.builder import PIPELINES from modelscope.preprocessors import LoadImage from modelscope.utils.constant import ModelFile, Tasks from modelscope.utils.logger import get_logger +from . import FaceProcessingBasePipeline logger = get_logger() @@ -25,7 +26,7 @@ logger = get_logger() @PIPELINES.register_module( Tasks.facial_expression_recognition, module_name=Pipelines.facial_expression_recognition) -class FacialExpressionRecognitionPipeline(Pipeline): +class FacialExpressionRecognitionPipeline(FaceProcessingBasePipeline): def __init__(self, model: str, **kwargs): """ @@ -43,79 +44,13 @@ class FacialExpressionRecognitionPipeline(Pipeline): self.device = device logger.info('load model done') - # face detect pipeline - det_model_id = 'damo/cv_resnet_facedetection_scrfd10gkps' self.map_list = [ 'Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral' ] - self.face_detection = pipeline( - Tasks.face_detection, model=det_model_id) - - def _choose_face(self, - det_result, - min_face=10, - top_face=1, - center_face=False): - ''' - choose face with maximum area - Args: - det_result: output of face detection pipeline - min_face: minimum size of valid face w/h - top_face: take faces with top max areas - center_face: choose the most centerd face from multi faces, only valid if top_face > 1 - ''' - bboxes = np.array(det_result[OutputKeys.BOXES]) - landmarks = np.array(det_result[OutputKeys.KEYPOINTS]) - if bboxes.shape[0] == 0: - logger.info('Warning: No face detected!') - return None - # face idx with enough size - face_idx = [] - for i in range(bboxes.shape[0]): - box = bboxes[i] - if (box[2] - box[0]) >= min_face and (box[3] - box[1]) >= min_face: - face_idx += [i] - if len(face_idx) == 0: - logger.info( - f'Warning: Face size not enough, less than {min_face}x{min_face}!' - ) - return None - bboxes = bboxes[face_idx] - landmarks = landmarks[face_idx] - # find max faces - boxes = np.array(bboxes) - area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) - sort_idx = np.argsort(area)[-top_face:] - # find center face - if top_face > 1 and center_face and bboxes.shape[0] > 1: - img_center = [img.shape[1] // 2, img.shape[0] // 2] - min_dist = float('inf') - sel_idx = -1 - for _idx in sort_idx: - box = boxes[_idx] - dist = np.square( - np.abs((box[0] + box[2]) / 2 - img_center[0])) + np.square( - np.abs((box[1] + box[3]) / 2 - img_center[1])) - if dist < min_dist: - min_dist = dist - sel_idx = _idx - sort_idx = [sel_idx] - main_idx = sort_idx[-1] - return bboxes[main_idx], landmarks[main_idx] def preprocess(self, input: Input) -> Dict[str, Any]: - img = LoadImage.convert_to_ndarray(input) - img = img[:, :, ::-1] - det_result = self.face_detection(img.copy()) - rtn = self._choose_face(det_result) - face_img = None - if rtn is not None: - _, face_lmks = rtn - face_lmks = face_lmks.reshape(5, 2) - face_img, _ = align_face(img, (112, 112), face_lmks) - face_img = face_img.astype(np.float32) - result = {} - result['img'] = face_img + result = super(FacialExpressionRecognitionPipeline, + self).preprocess(input) return result def forward(self, input: Dict[str, Any]) -> Dict[str, Any]: diff --git a/modelscope/pipelines/cv/facial_landmark_confidence_pipeline.py b/modelscope/pipelines/cv/facial_landmark_confidence_pipeline.py index 26e8e733..cab8310e 100644 --- a/modelscope/pipelines/cv/facial_landmark_confidence_pipeline.py +++ b/modelscope/pipelines/cv/facial_landmark_confidence_pipeline.py @@ -44,8 +44,7 @@ class FacialLandmarkConfidencePipeline(FaceProcessingBasePipeline): def preprocess(self, input: Input) -> Dict[str, Any]: - result = super(FacialLandmarkConfidencePipeline, - self).preprocess(input) + result = super().preprocess(input) img = LoadImage.convert_to_ndarray(input) img = img[:, :, ::-1] result['orig_img'] = img.astype(np.float32) diff --git a/modelscope/pipelines/cv/mask_face_recognition_pipeline.py b/modelscope/pipelines/cv/mask_face_recognition_pipeline.py index 2190b6d0..5a59c962 100644 --- a/modelscope/pipelines/cv/mask_face_recognition_pipeline.py +++ b/modelscope/pipelines/cv/mask_face_recognition_pipeline.py @@ -19,13 +19,14 @@ from modelscope.pipelines.builder import PIPELINES from modelscope.preprocessors import LoadImage from modelscope.utils.constant import ModelFile, Tasks from modelscope.utils.logger import get_logger +from . import FaceProcessingBasePipeline logger = get_logger() @PIPELINES.register_module( Tasks.face_recognition, module_name=Pipelines.mask_face_recognition) -class MaskFaceRecognitionPipeline(Pipeline): +class MaskFaceRecognitionPipeline(FaceProcessingBasePipeline): def __init__(self, model: str, **kwargs): """ @@ -44,10 +45,6 @@ class MaskFaceRecognitionPipeline(Pipeline): face_model.eval() self.face_model = face_model logger.info('face recognition model loaded!') - # face detect pipeline - det_model_id = 'damo/cv_resnet50_face-detection_retinaface' - self.face_detection = pipeline( - Tasks.face_detection, model=det_model_id) def _prefix_revision(self, state_dict): new_state_dict = OrderedDict() @@ -58,72 +55,13 @@ class MaskFaceRecognitionPipeline(Pipeline): state = new_state_dict return state - def _choose_face(self, - det_result, - min_face=10, - top_face=1, - center_face=False): - ''' - choose face with maximum area - Args: - det_result: output of face detection pipeline - min_face: minimum size of valid face w/h - top_face: take faces with top max areas - center_face: choose the most centerd face from multi faces, only valid if top_face > 1 - ''' - bboxes = np.array(det_result[OutputKeys.BOXES]) - landmarks = np.array(det_result[OutputKeys.KEYPOINTS]) - if bboxes.shape[0] == 0: - logger.info('No face detected!') - return None - # face idx with enough size - face_idx = [] - for i in range(bboxes.shape[0]): - box = bboxes[i] - if (box[2] - box[0]) >= min_face and (box[3] - box[1]) >= min_face: - face_idx += [i] - if len(face_idx) == 0: - logger.info( - f'Face size not enough, less than {min_face}x{min_face}!') - return None - bboxes = bboxes[face_idx] - landmarks = landmarks[face_idx] - # find max faces - boxes = np.array(bboxes) - area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]) - sort_idx = np.argsort(area)[-top_face:] - # find center face - if top_face > 1 and center_face and bboxes.shape[0] > 1: - img_center = [img.shape[1] // 2, img.shape[0] // 2] - min_dist = float('inf') - sel_idx = -1 - for _idx in sort_idx: - box = boxes[_idx] - dist = np.square( - np.abs((box[0] + box[2]) / 2 - img_center[0])) + np.square( - np.abs((box[1] + box[3]) / 2 - img_center[1])) - if dist < min_dist: - min_dist = dist - sel_idx = _idx - sort_idx = [sel_idx] - main_idx = sort_idx[-1] - return bboxes[main_idx], landmarks[main_idx] - def preprocess(self, input: Input) -> Dict[str, Any]: - img = LoadImage.convert_to_ndarray(input) - img = img[:, :, ::-1] - det_result = self.face_detection(img.copy()) - rtn = self._choose_face(det_result) - face_img = None - if rtn is not None: - _, face_lmks = rtn - face_lmks = face_lmks.reshape(5, 2) - align_img, _ = align_face(img, (112, 112), face_lmks) - face_img = align_img[:, :, ::-1] # to rgb - face_img = np.transpose(face_img, axes=(2, 0, 1)) - face_img = (face_img / 255. - 0.5) / 0.5 - face_img = face_img.astype(np.float32) - result = {} + result = super().preprocess(input) + align_img = result['img'] + face_img = align_img[:, :, ::-1] # to rgb + face_img = np.transpose(face_img, axes=(2, 0, 1)) + face_img = (face_img / 255. - 0.5) / 0.5 + face_img = face_img.astype(np.float32) result['img'] = face_img return result