add yolopv2 model cv_yolopv2_image_driving_perception

Link: https://code.alibaba-inc.com/Ali-MaaS/MaaS-lib/codereview/11548442
This commit is contained in:
maojialiang.mjl
2023-02-09 10:31:38 +00:00
committed by wenmeng.zwm
parent 5d489f6cdf
commit 3cb3e61ff7
13 changed files with 653 additions and 3 deletions

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:1f6b6b4abfcc2fc9042c4e51c2e5f530ff84b345cd3176b11e8317143c5a7e0f
size 91130

View File

@@ -23,6 +23,7 @@ class Models(object):
classification_model = 'ClassificationModel'
easyrobust_model = 'EasyRobustModel'
bnext = 'bnext'
yolopv2 = 'yolopv2'
nafnet = 'nafnet'
csrnet = 'csrnet'
cascade_mask_rcnn_swin = 'cascade_mask_rcnn_swin'
@@ -252,6 +253,7 @@ class Pipelines(object):
nextvit_small_daily_image_classification = 'nextvit-small_image-classification_Dailylife-labels'
convnext_base_image_classification_garbage = 'convnext-base_image-classification_garbage'
bnext_small_image_classification = 'bnext-small_image-classification_ImageNet-labels'
yolopv2_image_driving_percetion_bdd100k = 'yolopv2_image-driving-percetion_bdd100k'
common_image_classification = 'common-image-classification'
image_color_enhance = 'csrnet-image-color-enhance'
virtual_try_on = 'virtual-try-on'
@@ -587,6 +589,9 @@ DEFAULT_MODEL_FOR_PIPELINE = {
Tasks.image_segmentation:
(Pipelines.image_instance_segmentation,
'damo/cv_swin-b_image-instance-segmentation_coco'),
Tasks.image_driving_perception:
(Pipelines.yolopv2_image_driving_percetion_bdd100k,
'damo/cv_yolopv2_image-driving-perception_bdd100k'),
Tasks.image_depth_estimation:
(Pipelines.image_depth_estimation,
'damo/cv_newcrfs_image-depth-estimation_indoor'),
@@ -665,9 +670,9 @@ DEFAULT_MODEL_FOR_PIPELINE = {
Tasks.face_emotion: (Pipelines.face_emotion, 'damo/cv_face-emotion'),
Tasks.product_segmentation: (Pipelines.product_segmentation,
'damo/cv_F3Net_product-segmentation'),
Tasks.referring_video_object_segmentation:
(Pipelines.referring_video_object_segmentation,
'damo/cv_swin-t_referring_video-object-segmentation'),
Tasks.referring_video_object_segmentation: (
Pipelines.referring_video_object_segmentation,
'damo/cv_swin-t_referring_video-object-segmentation'),
Tasks.video_summarization: (Pipelines.video_summarization,
'damo/cv_googlenet_pgl-video-summarization'),
Tasks.image_skychange: (Pipelines.image_skychange,
@@ -812,6 +817,7 @@ class Preprocessors(object):
image_classification_mmcv_preprocessor = 'image-classification-mmcv-preprocessor'
image_color_enhance_preprocessor = 'image-color-enhance-preprocessor'
image_instance_segmentation_preprocessor = 'image-instance-segmentation-preprocessor'
image_driving_perception_preprocessor = 'image-driving-perception-preprocessor'
image_portrait_enhancement_preprocessor = 'image-portrait-enhancement-preprocessor'
image_quality_assessment_mos_preprocessor = 'image-quality_assessment-mos-preprocessor'
video_summarization_preprocessor = 'video-summarization-preprocessor'

View File

@@ -0,0 +1,31 @@
# Copyright 2021-2022 The Alibaba Fundamental Vision Team Authors. All rights reserved.
from typing import TYPE_CHECKING
from modelscope.utils.import_utils import LazyImportModule
if TYPE_CHECKING:
from .image_driving_percetion_model import YOLOPv2
from .preprocessor import ImageDrivingPerceptionPreprocessor
from .utils import (scale_coords, non_max_suppression,
split_for_trace_model, driving_area_mask,
lane_line_mask)
else:
_import_structure = {
'image_driving_percetion_model': ['YOLOPv2'],
'preprocessor': ['ImageDrivingPerceptionPreprocessor'],
'utils': [
'scale_coords', 'non_max_suppression', 'split_for_trace_model',
'driving_area_mask', 'lane_line_mask'
],
}
import sys
sys.modules[__name__] = LazyImportModule(
__name__,
globals()['__file__'],
_import_structure,
module_spec=__spec__,
extra_objects={},
)

View File

@@ -0,0 +1,60 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import os
from typing import Any, Dict
import cv2
import numpy as np
import torch
from modelscope.metainfo import Models
from modelscope.models.base.base_torch_model import TorchModel
from modelscope.models.builder import MODELS
from modelscope.outputs import OutputKeys
from modelscope.utils.constant import ModelFile, Tasks
from modelscope.utils.logger import get_logger
logger = get_logger()
__all__ = ['YOLOPv2']
@MODELS.register_module(
Tasks.image_driving_perception, module_name=Models.yolopv2)
class YOLOPv2(TorchModel):
""" YOLOPv2 use E-ELAN which first adopted in Yolov7 as backbone, SPP+FPN+PAN as neck and head.
For more infomation, please refer to https://arxiv.org/pdf/2208.11434.pdf
"""
def __init__(self, model_dir: str, *args, **kwargs):
super().__init__(model_dir, *args, **kwargs)
self.model_dir = model_dir
self._load_pretrained_checkpoint()
def forward(self, data):
img = data['img']
with torch.no_grad():
[pred, anchor_grid], seg, ll = self.model(img)
return {
'img_hw': data['img'].shape[2:],
'pred': pred,
'anchor_grid': anchor_grid,
'driving_area_mask': seg,
'lane_line_mask': ll,
}
def postprocess(self, inputs: Dict[str, Any], **kwargs) -> Dict[str, Any]:
return super().postprocess(inputs, **kwargs)
def _load_pretrained_checkpoint(self):
model_path = os.path.join(self.model_dir, ModelFile.TORCH_MODEL_FILE)
logger.info(model_path)
if os.path.exists(model_path):
self.model = torch.jit.load(model_path, 'cpu')
self.model = self.model.eval()
else:
logger.error(
'[checkModelPath]:model path dose not exits!!! model Path:'
+ model_path)
raise Exception('[checkModelPath]:model path dose not exits!')

View File

@@ -0,0 +1,120 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
from typing import Any, Dict, Union
import cv2
import numpy as np
import torch
from modelscope.metainfo import Preprocessors
from modelscope.preprocessors import Preprocessor
from modelscope.preprocessors.builder import PREPROCESSORS
from modelscope.preprocessors.image import LoadImage
from modelscope.utils.constant import Fields, ModeKeys
from modelscope.utils.type_assert import type_assert
@PREPROCESSORS.register_module(
Fields.cv, module_name=Preprocessors.image_driving_perception_preprocessor)
class ImageDrivingPerceptionPreprocessor(Preprocessor):
def __init__(self, mode: str = ModeKeys.INFERENCE, *args, **kwargs):
"""
Args:
model_dir (str): model directory to initialize some resource.
mode: The mode for the preprocessor.
"""
super().__init__(mode, *args, **kwargs)
def _check_image(self, input_img):
whole_temp_shape = input_img.shape
if len(whole_temp_shape) == 2:
input_img = np.stack([input_img, input_img, input_img], axis=2)
elif whole_temp_shape[2] == 1:
input_img = np.concatenate([input_img, input_img, input_img],
axis=2)
elif whole_temp_shape[2] == 4:
input_img = input_img[:, :,
0:3] * 1.0 * input_img[:, :,
3:4] * 1.0 / 255.0
return input_img
def _letterbox(self,
img,
new_shape=(640, 640),
color=(114, 114, 114),
auto=True,
scaleFill=False,
scaleup=True,
stride=32):
# Resize and pad image while meeting stride-multiple constraints
shape = img.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
if not scaleup: # only scale down, do not scale up (for better test mAP)
r = min(r, 1.0)
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[
1] # wh padding
if auto: # minimum rectangle
dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding
elif scaleFill: # stretch
dw, dh = 0.0, 0.0
new_unpad = (new_shape[1], new_shape[0])
ratio = new_shape[1] / shape[1], new_shape[0] / shape[
0] # width, height ratios
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
img = cv2.copyMakeBorder(
img, top, bottom, left, right, cv2.BORDER_CONSTANT,
value=color) # add border
return img, ratio, (dw, dh)
@type_assert(object, object)
def __call__(
self, data: str, output_shape=(1280, 720), new_shape=(640, 640)
) -> Dict[str, Any]:
"""process the raw input data
Args:
data (str): image path
Returns:
Dict[ndarry, Any]: the preprocessed data
{
"img": the preprocessed resized image (640x640)
}
"""
img = LoadImage.convert_to_ndarray(data)
if img is not None:
img = self._check_image(img)
else:
raise Exception('img is None')
img = cv2.resize(img, output_shape, interpolation=cv2.INTER_LINEAR)
img = self._letterbox(img, new_shape)[0]
img = img.transpose(2, 0, 1) # to 3x640x640
img = np.ascontiguousarray(img)
img = torch.from_numpy(img)
img = img.float() # uint8 to fp16/32
# Convert
img /= 255.0 # 0 - 255 to 0.0 - 1.0
if img.ndimension() == 3:
img = img.unsqueeze(0)
return {
'img': img,
}

View File

@@ -0,0 +1,208 @@
# Part of the implementation is borrowed and modified from internet,
# publicly available at https://github.com/CAIC-AD/YOLOPv2
import time
import numpy as np
import torch
from torchvision.ops import nms
def _make_grid(nx=20, ny=20):
yv, xv = torch.meshgrid(
[torch.arange(ny), torch.arange(nx)], indexing='ij')
return torch.stack((xv, yv), 2).view((1, 1, ny, nx, 2)).float()
def split_for_trace_model(pred=None, anchor_grid=None):
z = []
st = [8, 16, 32]
for i in range(3):
bs, _, ny, nx = pred[i].shape
pred[i] = pred[i].view(bs, 3, 85, ny, nx).permute(0, 1, 3, 4,
2).contiguous()
y = pred[i].sigmoid()
gr = _make_grid(nx, ny).to(pred[i].device)
y[..., 0:2] = (y[..., 0:2] * 2. - 0.5 + gr) * st[i] # xy
y[..., 2:4] = (y[..., 2:4] * 2)**2 * anchor_grid[i] # wh
z.append(y.view(bs, -1, 85))
pred = torch.cat(z, 1)
return pred
def scale_coords(img1_shape,
coords,
img0_shape=(720, 1280, 3),
ratio_pad=None):
# Rescale coords (xyxy) from img1_shape to img0_shape
if ratio_pad is None: # calculate from img0_shape
gain = min(img1_shape[0] / img0_shape[0],
img1_shape[1] / img0_shape[1]) # gain = old / new
pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (
img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding
else:
gain = ratio_pad[0][0]
pad = ratio_pad[1]
coords[:, [0, 2]] -= pad[0] # x padding
coords[:, [1, 3]] -= pad[1] # y padding
coords[:, :4] /= gain
clip_coords(coords, img0_shape)
return coords
def clip_coords(boxes, img_shape):
# Clip bounding xyxy bounding boxes to image shape (height, width)
boxes[:, 0].clamp_(0, img_shape[1]) # x1
boxes[:, 1].clamp_(0, img_shape[0]) # y1
boxes[:, 2].clamp_(0, img_shape[1]) # x2
boxes[:, 3].clamp_(0, img_shape[0]) # y2
def xywh2xyxy(x):
# Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right
y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x)
y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x
y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y
y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x
y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y
return y
def non_max_suppression(prediction,
conf_thres=0.3,
iou_thres=0.45,
classes=None,
agnostic=False,
multi_label=False,
labels=()):
"""Runs Non-Maximum Suppression (NMS) on inference results
Returns:
list of detections, on (n,6) tensor per image [xyxy, conf, cls]
"""
nc = prediction.shape[2] - 5 # number of classes
xc = prediction[..., 4] > conf_thres # candidates
# Settings
max_wh = 4096 # (pixels) minimum and maximum box width and height
max_det = 300 # maximum number of detections per image
max_nms = 30000 # maximum number of boxes into torchvision.ops.nms()
time_limit = 10.0 # seconds to quit after
redundant = True # require redundant detections
multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img)
merge = False # use merge-NMS
t = time.time()
output = [torch.zeros(
(0, 6), device=prediction.device)] * prediction.shape[0]
for xi, x in enumerate(prediction): # image index, image inference
# Apply constraints
x = x[xc[xi]] # confidence
# Cat apriori labels if autolabelling
if labels and len(labels[xi]):
lbs = labels[xi]
v = torch.zeros((len(lbs), nc + 5), device=x.device)
v[:, :4] = lbs[:, 1:5] # box
v[:, 4] = 1.0 # conf
v[range(len(lbs)), lbs[:, 0].long() + 5] = 1.0 # cls
x = torch.cat((x, v), 0)
# If none remain process next image
if not x.shape[0]:
continue
# Compute conf
x[:, 5:] *= x[:, 4:5] # conf = obj_conf * cls_conf
# Box (center x, center y, width, height) to (x1, y1, x2, y2)
box = xywh2xyxy(x[:, :4])
# Detections matrix nx6 (xyxy, conf, cls)
if multi_label:
i, j = (x[:, 5:] > conf_thres).nonzero(as_tuple=False).T
x = torch.cat((box[i], x[i, j + 5, None], j[:, None].float()), 1)
else: # best class only
conf, j = x[:, 5:].max(1, keepdim=True)
x = torch.cat((box, conf, j.float()),
1)[conf.view(-1) > conf_thres]
# Filter by class
if classes is not None:
x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)]
# Check shape
n = x.shape[0] # number of boxes
if not n: # no boxes
continue
elif n > max_nms: # excess boxes
x = x[x[:, 4].argsort(
descending=True)[:max_nms]] # sort by confidence
# Batched NMS
c = x[:, 5:6] * (0 if agnostic else max_wh) # classes
boxes, scores = x[:, :4] + c, x[:,
4] # boxes (offset by class), scores
i = nms(boxes, scores, iou_thres) # NMS
if i.shape[0] > max_det: # limit detections
i = i[:max_det]
if merge and (1 < n and n < 3E3):
# update boxes as boxes(i,4) = weights(i,n) * boxes(n,4)
iou = box_iou(boxes[i], boxes) > iou_thres # iou matrix
weights = iou * scores[None] # box weights
x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(
1, keepdim=True) # merged boxes
if redundant:
i = i[iou.sum(1) > 1] # require redundancy
output[xi] = x[i]
if (time.time() - t) > time_limit:
print(f'WARNING: NMS time limit {time_limit}s exceeded')
break # time limit exceeded
return output
def box_iou(box1, box2):
# https://github.com/pytorch/vision/blob/master/torchvision/ops/boxes.py
"""
Return intersection-over-union (Jaccard index) of boxes.
Both sets of boxes are expected to be in (x1, y1, x2, y2) format.
Args:
box1 (Tensor[N, 4])
box2 (Tensor[M, 4])
Returns:
iou (Tensor[N, M]): the NxM matrix containing the pairwise
IoU values for every element in boxes1 and boxes2
"""
def box_area(box):
# box = 4xn
return (box[2] - box[0]) * (box[3] - box[1])
area1 = box_area(box1.T)
area2 = box_area(box2.T)
inter = (torch.min(box1[:, None, 2:], box2[:, 2:])
- torch.max(box1[:, None, :2], box2[:, :2])).clamp(0).prod(2)
return inter / (area1[:, None] + area2 - inter
) # iou = inter / (area1 + area2 - inter)
def driving_area_mask(seg=None):
da_predict = seg[:, :, 12:372, :]
da_seg_mask = torch.nn.functional.interpolate(
da_predict, scale_factor=2, mode='bilinear')
_, da_seg_mask = torch.max(da_seg_mask, 1)
da_seg_mask = da_seg_mask.int().squeeze().cpu().numpy()
return da_seg_mask
def lane_line_mask(ll=None):
ll_predict = ll[:, :, 12:372, :]
ll_seg_mask = torch.nn.functional.interpolate(
ll_predict, scale_factor=2, mode='bilinear')
ll_seg_mask = torch.round(ll_seg_mask).squeeze(1)
ll_seg_mask = ll_seg_mask.int().squeeze().cpu().numpy()
return ll_seg_mask

View File

@@ -147,6 +147,21 @@ TASK_OUTPUTS = {
Tasks.card_detection:
[OutputKeys.SCORES, OutputKeys.BOXES, OutputKeys.KEYPOINTS],
# image driving perception result for single sample
# {
# "boxes": [
# [x1, y1, x2, y2],
# [x1, y1, x2, y2],
# [x1, y1, x2, y2],
# [x1, y1, x2, y2],
# ],
# "masks": [
# [np.array], # with fixed shape(h=720, w=1280, 3) containing only 0, 1
# [np.array], # with fixed shape(h=720, w=1280, 3) containing only 0, 1
# ]
# }
Tasks.image_driving_perception: [OutputKeys.BOXES, OutputKeys.MASKS],
# facial expression recognition result for single sample
# {
# "scores": [0.9]

View File

@@ -86,6 +86,8 @@ TASK_INPUTS = {
InputType.IMAGE,
Tasks.image_fewshot_detection:
InputType.IMAGE,
Tasks.image_driving_perception:
InputType.IMAGE,
Tasks.vision_efficient_tuning:
InputType.IMAGE,

View File

@@ -81,6 +81,7 @@ if TYPE_CHECKING:
from .vision_middleware_pipeline import VisionMiddlewarePipeline
from .video_frame_interpolation_pipeline import VideoFrameInterpolationPipeline
from .image_skychange_pipeline import ImageSkychangePipeline
from .image_driving_perception_pipeline import ImageDrivingPerceptionPipeline
from .vop_retrieval_pipeline import VopRetrievalPipeline
from .video_object_segmentation_pipeline import VideoObjectSegmentationPipeline
from .image_matching_pipeline import ImageMatchingPipeline
@@ -211,6 +212,9 @@ else:
'VideoFrameInterpolationPipeline'
],
'image_skychange_pipeline': ['ImageSkychangePipeline'],
'image_driving_perception_pipeline': [
'ImageDrivingPerceptionPipeline'
],
'vop_retrieval_pipeline': ['VopRetrievalPipeline'],
'video_object_segmentation_pipeline': [
'VideoObjectSegmentationPipeline'

View File

@@ -0,0 +1,100 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import os.path as osp
from typing import Any, Dict
import cv2
import numpy as np
from modelscope.metainfo import Pipelines
from modelscope.models.cv.image_driving_perception import (
ImageDrivingPerceptionPreprocessor, driving_area_mask, lane_line_mask,
non_max_suppression, scale_coords, split_for_trace_model)
from modelscope.outputs import OutputKeys
from modelscope.pipelines.base import Input, Pipeline
from modelscope.pipelines.builder import PIPELINES
from modelscope.preprocessors import LoadImage
from modelscope.utils.constant import ModelFile, Tasks
from modelscope.utils.logger import get_logger
logger = get_logger()
@PIPELINES.register_module(
Tasks.image_driving_perception,
module_name=Pipelines.yolopv2_image_driving_percetion_bdd100k)
class ImageDrivingPerceptionPipeline(Pipeline):
""" Image Driving Perception Pipeline. Given a image,
pipeline will detects cars, and segments both lane lines and drivable areas.
Example:
```python
>>> from modelscope.pipelines import pipeline
>>> image_driving_perception_pipeline = pipeline(Tasks.image_driving_perception,
model='damo/cv_yolopv2_image-driving-perception_bdd100k')
>>> image_driving_perception_pipeline(img_path)
{
'boxes': [
tensor([[1.0000e+00, 2.8600e+02, 4.0700e+02, 6.2600e+02],
[8.8200e+02, 2.9600e+02, 1.0910e+03, 4.4700e+02],
[3.7200e+02, 2.7500e+02, 5.2100e+02, 3.5500e+02],
...,
[7.8600e+02, 2.8100e+02, 8.0400e+02, 3.0800e+02],
[5.7000e+02, 2.8000e+02, 5.9400e+02, 3.0000e+02],
[7.0500e+02, 2.7800e+02, 7.2100e+02, 2.9000e+02]])
],
'masks': [
array([[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
...,
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0]], dtype=int32),
array([[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
...,
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0]], dtype=int32)
]
}
>>> #
```
"""
def __init__(self, model: str, **kwargs):
"""
use `model` and 'preprocessor' to create a image driving percetion pipeline for prediction
"""
super().__init__(model=model, auto_collate=True, **kwargs)
if self.preprocessor is None:
self.preprocessor = ImageDrivingPerceptionPreprocessor()
logger.info('load model done')
def forward(self, input: Dict[str, Any]) -> Dict[str, Any]:
return self.model(input)
def postprocess(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
results_dict = {
OutputKeys.BOXES: [],
OutputKeys.MASKS: [],
}
pred = split_for_trace_model(inputs['pred'], inputs['anchor_grid'])
# Apply NMS
pred = non_max_suppression(pred)
da_seg_mask = driving_area_mask(inputs['driving_area_mask'])
ll_seg_mask = lane_line_mask(inputs['lane_line_mask'])
for det in pred: # detections per image
if len(det):
# Rescale boxes from img_size to (720, 1280)
det[:, :4] = scale_coords(inputs['img_hw'], det[:, :4]).round()
results_dict[OutputKeys.BOXES].append(det[:, :4])
results_dict[OutputKeys.MASKS].append(da_seg_mask)
results_dict[OutputKeys.MASKS].append(ll_seg_mask)
return results_dict

View File

@@ -50,6 +50,7 @@ class CVTasks(object):
image_segmentation = 'image-segmentation'
semantic_segmentation = 'semantic-segmentation'
image_driving_perception = 'image-driving-perception'
image_depth_estimation = 'image-depth-estimation'
indoor_layout_estimation = 'indoor-layout-estimation'
video_depth_estimation = 'video-depth-estimation'

View File

@@ -494,6 +494,38 @@ def show_video_depth_estimation_result(depths, video_save_path):
out.release()
def show_image_driving_perception_result(img,
results,
out_file='result.jpg',
if_draw=[1, 1, 1]):
assert img.shape == (720, 1280,
3), 'input image shape need fix to (720, 1280, 3)'
bboxes = results.get(OutputKeys.BOXES)[0]
if if_draw[0]:
for x in bboxes:
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
cv2.rectangle(
img, c1, c2, [255, 255, 0], thickness=2, lineType=cv2.LINE_AA)
result = results.get(OutputKeys.MASKS)
color_area = np.zeros((result[0].shape[0], result[0].shape[1], 3),
dtype=np.uint8)
if if_draw[1]:
color_area[result[0] == 1] = [0, 255, 0]
if if_draw[2]:
color_area[result[1] == 1] = [255, 0, 0]
color_seg = color_area
color_mask = np.mean(color_seg, 2)
msk_idx = color_mask != 0
img[msk_idx] = img[msk_idx] * 0.5 + color_seg[msk_idx] * 0.5
if out_file is not None:
cv2.imwrite(out_file, img[:, :, ::-1])
return img
def masks_visualization(masks, palette):
vis_masks = []
for f in range(masks.shape[0]):

View File

@@ -0,0 +1,68 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import os
import os.path as osp
import unittest
import cv2
import modelscope
from modelscope.hub.snapshot_download import snapshot_download
from modelscope.models import Model
from modelscope.models.cv.image_driving_perception import YOLOPv2
from modelscope.outputs import OutputKeys
from modelscope.pipelines import pipeline
from modelscope.pipelines.base import Pipeline
from modelscope.pipelines.cv import ImageDrivingPerceptionPipeline
from modelscope.preprocessors.image import LoadImage
from modelscope.utils.constant import Tasks
from modelscope.utils.cv.image_utils import \
show_image_driving_perception_result
from modelscope.utils.demo_utils import DemoCompatibilityCheck
from modelscope.utils.test_utils import test_level
class ImageDrivingPerceptionTest(unittest.TestCase, DemoCompatibilityCheck):
def setUp(self) -> None:
self.model_id = 'damo/cv_yolopv2_image-driving-perception_bdd100k'
self.img_path = 'data/test/images/image_driving_perception.jpg'
def pipeline_inference(self, pipeline: Pipeline, img_path: str):
result = pipeline(img_path)
img = LoadImage.convert_to_ndarray(img_path)
img = cv2.resize(img, (1280, 720), interpolation=cv2.INTER_LINEAR)
show_image_driving_perception_result(
img, result, out_file='result.jpg', if_draw=[1, 1, 1])
print(f'Output written to {osp.abspath("result.jpg")}')
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_with_model_name(self):
image_driving_perception_pipeline = pipeline(
Tasks.image_driving_perception, model=self.model_id)
self.pipeline_inference(image_driving_perception_pipeline,
self.img_path)
@unittest.skipUnless(test_level() >= 1, 'skip test in current test level')
def test_run_with_model_from_modelhub(self):
model = Model.from_pretrained(self.model_id)
image_driving_perception_pipeline = pipeline(
task=Tasks.image_driving_perception, model=model)
self.pipeline_inference(image_driving_perception_pipeline,
self.img_path)
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_run_by_direct_model_download(self):
cache_path = snapshot_download(self.model_id)
model = YOLOPv2(cache_path)
image_driving_perception_pipeline = ImageDrivingPerceptionPipeline(
model, preprocessor=None)
self.pipeline_inference(image_driving_perception_pipeline,
self.img_path)
@unittest.skip('demo compatibility test is only enabled on a needed-basis')
def test_demo_compatibility(self):
self.compatibility_check()
if __name__ == '__main__':
unittest.main()