[to #42322933] Add ocr-detection-vlpt-pipeline to maas lib

Link: https://code.alibaba-inc.com/Ali-MaaS/MaaS-lib/codereview/11213036
This commit is contained in:
wjq264216
2022-12-28 06:26:15 +08:00
committed by yingda.chen
parent cb9f1bfb8d
commit e57424eaf0
6 changed files with 712 additions and 108 deletions

View File

@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:3f24570355f178d2a8226112d1443d735837e59573545cfff12458dd791ae341
size 308158

View File

@@ -1,22 +1,25 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import math
import os.path as osp
from typing import Any, Dict
import cv2
import numpy as np
import tensorflow as tf
import torch
from modelscope.metainfo import Pipelines
from modelscope.outputs import OutputKeys
from modelscope.pipelines.base import Input, Pipeline
from modelscope.pipelines.builder import PIPELINES
from modelscope.pipelines.cv.ocr_utils.model_vlpt import VLPTModel
from modelscope.preprocessors import LoadImage
from modelscope.utils.constant import ModelFile, Tasks
from modelscope.utils.device import device_placement
from modelscope.utils.logger import get_logger
from .ocr_utils import (SegLinkDetector, cal_width, combine_segments_python,
decode_segments_links_python, nms_python,
rboxes_to_polygons)
polygons_from_bitmap, rboxes_to_polygons)
if tf.__version__ >= '2.0':
import tf_slim as slim
@@ -53,132 +56,188 @@ class OCRDetectionPipeline(Pipeline):
model: model id on modelscope hub.
"""
super().__init__(model=model, **kwargs)
tf.reset_default_graph()
model_path = osp.join(
osp.join(self.model, ModelFile.TF_CHECKPOINT_FOLDER),
'checkpoint-80000')
self._graph = tf.get_default_graph()
config = tf.ConfigProto(allow_soft_placement=True)
config.gpu_options.allow_growth = True
self._session = tf.Session(config=config)
if 'vlpt' in self.model:
model_path = osp.join(self.model, ModelFile.TORCH_MODEL_FILE)
logger.info(f'loading model from {model_path}')
with self._graph.as_default():
with device_placement(self.framework, self.device_name):
self.input_images = tf.placeholder(
tf.float32, shape=[1, 1024, 1024, 3], name='input_images')
self.output = {}
self.thresh = 0.3
self.image_short_side = 736
self.device = torch.device(
'cuda' if torch.cuda.is_available() else 'cpu')
self.infer_model = VLPTModel().to(self.device)
self.infer_model.eval()
checkpoint = torch.load(model_path, map_location=self.device)
if 'state_dict' in checkpoint:
self.infer_model.load_state_dict(checkpoint['state_dict'])
else:
self.infer_model.load_state_dict(checkpoint)
else:
tf.reset_default_graph()
model_path = osp.join(
osp.join(self.model, ModelFile.TF_CHECKPOINT_FOLDER),
'checkpoint-80000')
self._graph = tf.get_default_graph()
config = tf.ConfigProto(allow_soft_placement=True)
config.gpu_options.allow_growth = True
self._session = tf.Session(config=config)
with tf.variable_scope('', reuse=tf.AUTO_REUSE):
global_step = tf.get_variable(
'global_step', [],
initializer=tf.constant_initializer(0),
dtype=tf.int64,
trainable=False)
variable_averages = tf.train.ExponentialMovingAverage(
0.997, global_step)
with self._graph.as_default():
with device_placement(self.framework, self.device_name):
self.input_images = tf.placeholder(
tf.float32,
shape=[1, 1024, 1024, 3],
name='input_images')
self.output = {}
# detector
detector = SegLinkDetector()
all_maps = detector.build_model(
self.input_images, is_training=False)
with tf.variable_scope('', reuse=tf.AUTO_REUSE):
global_step = tf.get_variable(
'global_step', [],
initializer=tf.constant_initializer(0),
dtype=tf.int64,
trainable=False)
variable_averages = tf.train.ExponentialMovingAverage(
0.997, global_step)
# decode local predictions
all_nodes, all_links, all_reg = [], [], []
for i, maps in enumerate(all_maps):
cls_maps, lnk_maps, reg_maps = maps[0], maps[1], maps[
2]
reg_maps = tf.multiply(reg_maps, OFFSET_VARIANCE)
# detector
detector = SegLinkDetector()
all_maps = detector.build_model(
self.input_images, is_training=False)
cls_prob = tf.nn.softmax(tf.reshape(cls_maps, [-1, 2]))
# decode local predictions
all_nodes, all_links, all_reg = [], [], []
for i, maps in enumerate(all_maps):
cls_maps, lnk_maps, reg_maps = maps[0], maps[
1], maps[2]
reg_maps = tf.multiply(reg_maps, OFFSET_VARIANCE)
lnk_prob_pos = tf.nn.softmax(
tf.reshape(lnk_maps, [-1, 4])[:, :2])
lnk_prob_mut = tf.nn.softmax(
tf.reshape(lnk_maps, [-1, 4])[:, 2:])
lnk_prob = tf.concat([lnk_prob_pos, lnk_prob_mut],
axis=1)
cls_prob = tf.nn.softmax(
tf.reshape(cls_maps, [-1, 2]))
all_nodes.append(cls_prob)
all_links.append(lnk_prob)
all_reg.append(reg_maps)
lnk_prob_pos = tf.nn.softmax(
tf.reshape(lnk_maps, [-1, 4])[:, :2])
lnk_prob_mut = tf.nn.softmax(
tf.reshape(lnk_maps, [-1, 4])[:, 2:])
lnk_prob = tf.concat([lnk_prob_pos, lnk_prob_mut],
axis=1)
# decode segments and links
image_size = tf.shape(self.input_images)[1:3]
segments, group_indices, segment_counts, _ = decode_segments_links_python(
image_size,
all_nodes,
all_links,
all_reg,
anchor_sizes=list(detector.anchor_sizes))
all_nodes.append(cls_prob)
all_links.append(lnk_prob)
all_reg.append(reg_maps)
# combine segments
combined_rboxes, combined_counts = combine_segments_python(
segments, group_indices, segment_counts)
self.output['combined_rboxes'] = combined_rboxes
self.output['combined_counts'] = combined_counts
# decode segments and links
image_size = tf.shape(self.input_images)[1:3]
segments, group_indices, segment_counts, _ = decode_segments_links_python(
image_size,
all_nodes,
all_links,
all_reg,
anchor_sizes=list(detector.anchor_sizes))
with self._session.as_default() as sess:
logger.info(f'loading model from {model_path}')
# load model
model_loader = tf.train.Saver(
variable_averages.variables_to_restore())
model_loader.restore(sess, model_path)
# combine segments
combined_rboxes, combined_counts = combine_segments_python(
segments, group_indices, segment_counts)
self.output['combined_rboxes'] = combined_rboxes
self.output['combined_counts'] = combined_counts
with self._session.as_default() as sess:
logger.info(f'loading model from {model_path}')
# load model
model_loader = tf.train.Saver(
variable_averages.variables_to_restore())
model_loader.restore(sess, model_path)
def preprocess(self, input: Input) -> Dict[str, Any]:
img = LoadImage.convert_to_ndarray(input)
if 'vlpt' in self.model:
img = LoadImage.convert_to_ndarray(input)[:, :, ::-1]
h, w, c = img.shape
img_pad = np.zeros((max(h, w), max(h, w), 3), dtype=np.float32)
img_pad[:h, :w, :] = img
height, width, _ = img.shape
if height < width:
new_height = self.image_short_side
new_width = int(
math.ceil(new_height / height * width / 32) * 32)
else:
new_width = self.image_short_side
new_height = int(
math.ceil(new_width / width * height / 32) * 32)
resized_img = cv2.resize(img, (new_width, new_height))
resize_size = 1024
img_pad_resize = cv2.resize(img_pad, (resize_size, resize_size))
img_pad_resize = cv2.cvtColor(img_pad_resize, cv2.COLOR_RGB2BGR)
img_pad_resize = img_pad_resize - np.array([123.68, 116.78, 103.94],
dtype=np.float32)
resized_img = resized_img - np.array([123.68, 116.78, 103.94],
dtype=np.float32)
resized_img /= 255.
resized_img = torch.from_numpy(resized_img).permute(
2, 0, 1).float().unsqueeze(0)
with self._graph.as_default():
resize_size = tf.stack([resize_size, resize_size])
orig_size = tf.stack([max(h, w), max(h, w)])
self.output['orig_size'] = orig_size
self.output['resize_size'] = resize_size
result = {'img': resized_img, 'org_shape': [height, width]}
return result
else:
img = LoadImage.convert_to_ndarray(input)
result = {'img': np.expand_dims(img_pad_resize, axis=0)}
return result
h, w, c = img.shape
img_pad = np.zeros((max(h, w), max(h, w), 3), dtype=np.float32)
img_pad[:h, :w, :] = img
resize_size = 1024
img_pad_resize = cv2.resize(img_pad, (resize_size, resize_size))
img_pad_resize = cv2.cvtColor(img_pad_resize, cv2.COLOR_RGB2BGR)
img_pad_resize = img_pad_resize - np.array(
[123.68, 116.78, 103.94], dtype=np.float32)
with self._graph.as_default():
resize_size = tf.stack([resize_size, resize_size])
orig_size = tf.stack([max(h, w), max(h, w)])
self.output['orig_size'] = orig_size
self.output['resize_size'] = resize_size
result = {'img': np.expand_dims(img_pad_resize, axis=0)}
return result
def forward(self, input: Dict[str, Any]) -> Dict[str, Any]:
with self._graph.as_default():
with self._session.as_default():
feed_dict = {self.input_images: input['img']}
sess_outputs = self._session.run(
self.output, feed_dict=feed_dict)
return sess_outputs
if 'vlpt' in self.model:
pred = self.infer_model(input['img'])
return {'results': pred, 'org_shape': input['org_shape']}
else:
with self._graph.as_default():
with self._session.as_default():
feed_dict = {self.input_images: input['img']}
sess_outputs = self._session.run(
self.output, feed_dict=feed_dict)
return sess_outputs
def postprocess(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
rboxes = inputs['combined_rboxes'][0]
count = inputs['combined_counts'][0]
if count == 0 or count < rboxes.shape[0]:
raise Exception('modelscope error: No text detected')
rboxes = rboxes[:count, :]
if 'vlpt' in self.model:
pred = inputs['results'][0]
height, width = inputs['org_shape']
segmentation = pred > self.thresh
# convert rboxes to polygons and find its coordinates on the original image
orig_h, orig_w = inputs['orig_size']
resize_h, resize_w = inputs['resize_size']
polygons = rboxes_to_polygons(rboxes)
scale_y = float(orig_h) / float(resize_h)
scale_x = float(orig_w) / float(resize_w)
boxes, scores = polygons_from_bitmap(pred, segmentation, width,
height)
result = {OutputKeys.POLYGONS: np.array(boxes)}
return result
else:
rboxes = inputs['combined_rboxes'][0]
count = inputs['combined_counts'][0]
if count == 0 or count < rboxes.shape[0]:
raise Exception('modelscope error: No text detected')
rboxes = rboxes[:count, :]
# confine polygons inside image
polygons[:, ::2] = np.maximum(
0, np.minimum(polygons[:, ::2] * scale_x, orig_w - 1))
polygons[:, 1::2] = np.maximum(
0, np.minimum(polygons[:, 1::2] * scale_y, orig_h - 1))
polygons = np.round(polygons).astype(np.int32)
# convert rboxes to polygons and find its coordinates on the original image
orig_h, orig_w = inputs['orig_size']
resize_h, resize_w = inputs['resize_size']
polygons = rboxes_to_polygons(rboxes)
scale_y = float(orig_h) / float(resize_h)
scale_x = float(orig_w) / float(resize_w)
# nms
dt_n9 = [o + [cal_width(o)] for o in polygons.tolist()]
dt_nms = nms_python(dt_n9)
dt_polygons = np.array([o[:8] for o in dt_nms])
# confine polygons inside image
polygons[:, ::2] = np.maximum(
0, np.minimum(polygons[:, ::2] * scale_x, orig_w - 1))
polygons[:, 1::2] = np.maximum(
0, np.minimum(polygons[:, 1::2] * scale_y, orig_h - 1))
polygons = np.round(polygons).astype(np.int32)
result = {OutputKeys.POLYGONS: dt_polygons}
return result
# nms
dt_n9 = [o + [cal_width(o)] for o in polygons.tolist()]
dt_nms = nms_python(dt_n9)
dt_polygons = np.array([o[:8] for o in dt_nms])
result = {OutputKeys.POLYGONS: dt_polygons}
return result

View File

@@ -6,12 +6,15 @@ from modelscope.utils.import_utils import LazyImportModule
if TYPE_CHECKING:
from .model_resnet_mutex_v4_linewithchar import SegLinkDetector
from .ops import decode_segments_links_python, combine_segments_python
from .utils import rboxes_to_polygons, cal_width, nms_python
from .utils import rboxes_to_polygons, cal_width, nms_python, polygons_from_bitmap
else:
_import_structure = {
'model_resnet_mutex_v4_linewithchar': ['SegLinkDetector'],
'ops': ['decode_segments_links_python', 'combine_segments_python'],
'utils': ['rboxes_to_polygons', 'cal_width', 'nms_python']
'utils': [
'rboxes_to_polygons', 'cal_width', 'nms_python',
'polygons_from_bitmap'
]
}
import sys

View File

@@ -0,0 +1,431 @@
# ------------------------------------------------------------------------------
# Part of implementation is adopted from ViLT,
# made publicly available under the Apache License 2.0 at https://github.com/dandelin/ViLT.
# ------------------------------------------------------------------------------
import math
import os
import sys
import torch
import torch.nn as nn
BatchNorm2d = nn.BatchNorm2d
def constant_init(module, constant, bias=0):
nn.init.constant_(module.weight, constant)
if hasattr(module, 'bias'):
nn.init.constant_(module.bias, bias)
def conv3x3(in_planes, out_planes, stride=1):
"""3x3 convolution with padding"""
return nn.Conv2d(
in_planes,
out_planes,
kernel_size=3,
stride=stride,
padding=1,
bias=False)
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, inplanes, planes, stride=1, downsample=None, dcn=None):
super(BasicBlock, self).__init__()
self.with_dcn = dcn is not None
self.conv1 = conv3x3(inplanes, planes, stride)
self.bn1 = BatchNorm2d(planes)
self.relu = nn.ReLU(inplace=True)
self.with_modulated_dcn = False
if self.with_dcn:
fallback_on_stride = dcn.get('fallback_on_stride', False)
self.with_modulated_dcn = dcn.get('modulated', False)
# self.conv2 = conv3x3(planes, planes)
if not self.with_dcn or fallback_on_stride:
self.conv2 = nn.Conv2d(
planes, planes, kernel_size=3, padding=1, bias=False)
else:
deformable_groups = dcn.get('deformable_groups', 1)
if not self.with_modulated_dcn:
from assets.ops.dcn import DeformConv
conv_op = DeformConv
offset_channels = 18
else:
from assets.ops.dcn import ModulatedDeformConv
conv_op = ModulatedDeformConv
offset_channels = 27
self.conv2_offset = nn.Conv2d(
planes,
deformable_groups * offset_channels,
kernel_size=3,
padding=1)
self.conv2 = conv_op(
planes,
planes,
kernel_size=3,
padding=1,
deformable_groups=deformable_groups,
bias=False)
self.bn2 = BatchNorm2d(planes)
self.downsample = downsample
self.stride = stride
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
# out = self.conv2(out)
if not self.with_dcn:
out = self.conv2(out)
elif self.with_modulated_dcn:
offset_mask = self.conv2_offset(out)
offset = offset_mask[:, :18, :, :]
mask = offset_mask[:, -9:, :, :].sigmoid()
out = self.conv2(out, offset, mask)
else:
offset = self.conv2_offset(out)
out = self.conv2(out, offset)
out = self.bn2(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
class Bottleneck(nn.Module):
expansion = 4
def __init__(self, inplanes, planes, stride=1, downsample=None, dcn=None):
super(Bottleneck, self).__init__()
self.with_dcn = dcn is not None
self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
self.bn1 = BatchNorm2d(planes)
fallback_on_stride = False
self.with_modulated_dcn = False
if self.with_dcn:
fallback_on_stride = dcn.get('fallback_on_stride', False)
self.with_modulated_dcn = dcn.get('modulated', False)
if not self.with_dcn or fallback_on_stride:
self.conv2 = nn.Conv2d(
planes,
planes,
kernel_size=3,
stride=stride,
padding=1,
bias=False)
else:
deformable_groups = dcn.get('deformable_groups', 1)
if not self.with_modulated_dcn:
from assets.ops.dcn import DeformConv
conv_op = DeformConv
offset_channels = 18
else:
from assets.ops.dcn import ModulatedDeformConv
conv_op = ModulatedDeformConv
offset_channels = 27
self.conv2_offset = nn.Conv2d(
planes,
deformable_groups * offset_channels,
kernel_size=3,
padding=1)
self.conv2 = conv_op(
planes,
planes,
kernel_size=3,
padding=1,
stride=stride,
deformable_groups=deformable_groups,
bias=False)
self.bn2 = BatchNorm2d(planes)
self.conv3 = nn.Conv2d(planes, planes * 4, kernel_size=1, bias=False)
self.bn3 = BatchNorm2d(planes * 4)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
self.dcn = dcn
self.with_dcn = dcn is not None
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
# out = self.conv2(out)
if not self.with_dcn:
out = self.conv2(out)
elif self.with_modulated_dcn:
offset_mask = self.conv2_offset(out)
offset = offset_mask[:, :18, :, :]
mask = offset_mask[:, -9:, :, :].sigmoid()
out = self.conv2(out, offset, mask)
else:
offset = self.conv2_offset(out)
out = self.conv2(out, offset)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.downsample is not None:
residual = self.downsample(x)
out += residual
out = self.relu(out)
return out
class ResNet(nn.Module):
def __init__(self,
block,
layers,
num_classes=1000,
dcn=None,
stage_with_dcn=(False, False, False, False)):
self.dcn = dcn
self.stage_with_dcn = stage_with_dcn
self.inplanes = 64
super(ResNet, self).__init__()
self.conv1 = nn.Conv2d(
3, 64, kernel_size=7, stride=2, padding=3, bias=False)
self.bn1 = BatchNorm2d(64)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(block, 64, layers[0])
self.layer2 = self._make_layer(
block, 128, layers[1], stride=2, dcn=dcn)
self.layer3 = self._make_layer(
block, 256, layers[2], stride=2, dcn=dcn)
self.layer4 = self._make_layer(
block, 512, layers[3], stride=2, dcn=dcn)
# self.avgpool = nn.AvgPool2d(7, stride=1)
# self.fc = nn.Linear(512 * block.expansion, num_classes)
# self.smooth = nn.Conv2d(2048, 256, kernel_size=1, stride=1, padding=1)
for m in self.modules():
if isinstance(m, nn.Conv2d):
n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
m.weight.data.normal_(0, math.sqrt(2. / n))
elif isinstance(m, BatchNorm2d):
m.weight.data.fill_(1)
m.bias.data.zero_()
if self.dcn is not None:
for m in self.modules():
if isinstance(m, Bottleneck) or isinstance(m, BasicBlock):
if hasattr(m, 'conv2_offset'):
constant_init(m.conv2_offset, 0)
def _make_layer(self, block, planes, blocks, stride=1, dcn=None):
downsample = None
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(
self.inplanes,
planes * block.expansion,
kernel_size=1,
stride=stride,
bias=False),
BatchNorm2d(planes * block.expansion),
)
layers = []
layers.append(
block(self.inplanes, planes, stride, downsample, dcn=dcn))
self.inplanes = planes * block.expansion
for i in range(1, blocks):
layers.append(block(self.inplanes, planes, dcn=dcn))
return nn.Sequential(*layers)
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x2 = self.layer1(x)
x3 = self.layer2(x2)
x4 = self.layer3(x3)
x5 = self.layer4(x4)
return x2, x3, x4, x5
class SegDetector(nn.Module):
def __init__(self,
in_channels=[64, 128, 256, 512],
inner_channels=256,
k=10,
bias=False,
adaptive=False,
smooth=False,
serial=False,
*args,
**kwargs):
'''
bias: Whether conv layers have bias or not.
adaptive: Whether to use adaptive threshold training or not.
smooth: If true, use bilinear instead of deconv.
serial: If true, thresh prediction will combine segmentation result as input.
'''
super(SegDetector, self).__init__()
self.k = k
self.serial = serial
self.up5 = nn.Upsample(scale_factor=2, mode='nearest')
self.up4 = nn.Upsample(scale_factor=2, mode='nearest')
self.up3 = nn.Upsample(scale_factor=2, mode='nearest')
self.in5 = nn.Conv2d(in_channels[-1], inner_channels, 1, bias=bias)
self.in4 = nn.Conv2d(in_channels[-2], inner_channels, 1, bias=bias)
self.in3 = nn.Conv2d(in_channels[-3], inner_channels, 1, bias=bias)
self.in2 = nn.Conv2d(in_channels[-4], inner_channels, 1, bias=bias)
self.out5 = nn.Sequential(
nn.Conv2d(
inner_channels, inner_channels // 4, 3, padding=1, bias=bias),
nn.Upsample(scale_factor=8, mode='nearest'))
self.out4 = nn.Sequential(
nn.Conv2d(
inner_channels, inner_channels // 4, 3, padding=1, bias=bias),
nn.Upsample(scale_factor=4, mode='nearest'))
self.out3 = nn.Sequential(
nn.Conv2d(
inner_channels, inner_channels // 4, 3, padding=1, bias=bias),
nn.Upsample(scale_factor=2, mode='nearest'))
self.out2 = nn.Conv2d(
inner_channels, inner_channels // 4, 3, padding=1, bias=bias)
self.binarize = nn.Sequential(
nn.Conv2d(
inner_channels, inner_channels // 4, 3, padding=1, bias=bias),
BatchNorm2d(inner_channels // 4), nn.ReLU(inplace=True),
nn.ConvTranspose2d(inner_channels // 4, inner_channels // 4, 2, 2),
BatchNorm2d(inner_channels // 4), nn.ReLU(inplace=True),
nn.ConvTranspose2d(inner_channels // 4, 1, 2, 2), nn.Sigmoid())
self.binarize.apply(self.weights_init)
self.adaptive = adaptive
if adaptive:
self.thresh = self._init_thresh(
inner_channels, serial=serial, smooth=smooth, bias=bias)
self.thresh.apply(self.weights_init)
self.in5.apply(self.weights_init)
self.in4.apply(self.weights_init)
self.in3.apply(self.weights_init)
self.in2.apply(self.weights_init)
self.out5.apply(self.weights_init)
self.out4.apply(self.weights_init)
self.out3.apply(self.weights_init)
self.out2.apply(self.weights_init)
def weights_init(self, m):
classname = m.__class__.__name__
if classname.find('Conv') != -1:
nn.init.kaiming_normal_(m.weight.data)
elif classname.find('BatchNorm') != -1:
m.weight.data.fill_(1.)
m.bias.data.fill_(1e-4)
def _init_thresh(self,
inner_channels,
serial=False,
smooth=False,
bias=False):
in_channels = inner_channels
if serial:
in_channels += 1
self.thresh = nn.Sequential(
nn.Conv2d(
in_channels, inner_channels // 4, 3, padding=1, bias=bias),
BatchNorm2d(inner_channels // 4), nn.ReLU(inplace=True),
self._init_upsample(
inner_channels // 4,
inner_channels // 4,
smooth=smooth,
bias=bias), BatchNorm2d(inner_channels // 4),
nn.ReLU(inplace=True),
self._init_upsample(
inner_channels // 4, 1, smooth=smooth, bias=bias),
nn.Sigmoid())
return self.thresh
def _init_upsample(self,
in_channels,
out_channels,
smooth=False,
bias=False):
if smooth:
inter_out_channels = out_channels
if out_channels == 1:
inter_out_channels = in_channels
module_list = [
nn.Upsample(scale_factor=2, mode='nearest'),
nn.Conv2d(in_channels, inter_out_channels, 3, 1, 1, bias=bias)
]
if out_channels == 1:
module_list.append(
nn.Conv2d(
in_channels,
out_channels,
kernel_size=1,
stride=1,
padding=1,
bias=True))
return nn.Sequential(module_list)
else:
return nn.ConvTranspose2d(in_channels, out_channels, 2, 2)
def forward(self, features, gt=None, masks=None, training=False):
c2, c3, c4, c5 = features
in5 = self.in5(c5)
in4 = self.in4(c4)
in3 = self.in3(c3)
in2 = self.in2(c2)
out4 = self.up5(in5) + in4 # 1/16
out3 = self.up4(out4) + in3 # 1/8
out2 = self.up3(out3) + in2 # 1/4
p5 = self.out5(in5)
p4 = self.out4(out4)
p3 = self.out3(out3)
p2 = self.out2(out2)
fuse = torch.cat((p5, p4, p3, p2), 1)
# this is the pred module, not binarization module;
# We do not correct the name due to the trained model.
binary = self.binarize(fuse)
return binary
def step_function(self, x, y):
return torch.reciprocal(1 + torch.exp(-self.k * (x - y)))
class VLPTModel(nn.Module):
def __init__(self, *args, **kwargs):
super(VLPTModel, self).__init__()
self.backbone = ResNet(Bottleneck, [3, 4, 6, 3], **kwargs)
self.decoder = SegDetector(
in_channels=[256, 512, 1024, 2048], adaptive=True, k=50, **kwargs)
def forward(self, x):
return self.decoder(self.backbone(x))

View File

@@ -1,6 +1,8 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import cv2
import numpy as np
import pyclipper
from shapely.geometry import Polygon
def rboxes_to_polygons(rboxes):
@@ -107,3 +109,102 @@ def point_line_dist(px, py, x1, y1, x2, y2):
div = np.sqrt(dx * dx + dy * dy) + eps
dist = np.abs(px * dy - py * dx + x2 * y1 - y2 * x1) / div
return dist
# Part of the implementation is borrowed and modified from DB,
# publicly available at https://github.com/MhLiao/DB.
def polygons_from_bitmap(pred, _bitmap, dest_width, dest_height):
"""
_bitmap: single map with shape (1, H, W),
whose values are binarized as {0, 1}
"""
assert _bitmap.size(0) == 1
bitmap = _bitmap.cpu().numpy()[0]
pred = pred.cpu().detach().numpy()[0]
height, width = bitmap.shape
boxes = []
scores = []
contours, _ = cv2.findContours((bitmap * 255).astype(np.uint8),
cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
for contour in contours[:100]:
epsilon = 0.01 * cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, epsilon, True)
points = approx.reshape((-1, 2))
if points.shape[0] < 4:
continue
score = box_score_fast(pred, points.reshape(-1, 2))
if 0.7 > score:
continue
if points.shape[0] > 2:
box = unclip(points, unclip_ratio=2.0)
if len(box) > 1:
continue
else:
continue
box = box.reshape(-1, 2)
_, sside = get_mini_boxes(box.reshape((-1, 1, 2)))
if sside < 3 + 2:
continue
if not isinstance(dest_width, int):
dest_width = dest_width.item()
dest_height = dest_height.item()
box[:, 0] = np.clip(
np.round(box[:, 0] / width * dest_width), 0, dest_width)
box[:, 1] = np.clip(
np.round(box[:, 1] / height * dest_height), 0, dest_height)
boxes.append(box.tolist())
scores.append(score)
return boxes, scores
def box_score_fast(bitmap, _box):
h, w = bitmap.shape[:2]
box = _box.copy()
xmin = np.clip(np.floor(box[:, 0].min()).astype(np.int), 0, w - 1)
xmax = np.clip(np.ceil(box[:, 0].max()).astype(np.int), 0, w - 1)
ymin = np.clip(np.floor(box[:, 1].min()).astype(np.int), 0, h - 1)
ymax = np.clip(np.ceil(box[:, 1].max()).astype(np.int), 0, h - 1)
mask = np.zeros((ymax - ymin + 1, xmax - xmin + 1), dtype=np.uint8)
box[:, 0] = box[:, 0] - xmin
box[:, 1] = box[:, 1] - ymin
cv2.fillPoly(mask, box.reshape(1, -1, 2).astype(np.int32), 1)
return cv2.mean(bitmap[ymin:ymax + 1, xmin:xmax + 1], mask)[0]
def unclip(box, unclip_ratio=1.5):
poly = Polygon(box)
distance = poly.area * unclip_ratio / poly.length
offset = pyclipper.PyclipperOffset()
offset.AddPath(box, pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON)
expanded = np.array(offset.Execute(distance))
return expanded
def get_mini_boxes(contour):
bounding_box = cv2.minAreaRect(contour)
points = sorted(list(cv2.boxPoints(bounding_box)), key=lambda x: x[0])
index_1, index_2, index_3, index_4 = 0, 1, 2, 3
if points[1][1] > points[0][1]:
index_1 = 0
index_4 = 1
else:
index_1 = 1
index_4 = 0
if points[3][1] > points[2][1]:
index_2 = 2
index_3 = 3
else:
index_2 = 3
index_3 = 2
box = [points[index_1], points[index_2], points[index_3], points[index_4]]
return box, min(bounding_box[1])

View File

@@ -12,7 +12,9 @@ class OCRDetectionTest(unittest.TestCase, DemoCompatibilityCheck):
def setUp(self) -> None:
self.model_id = 'damo/cv_resnet18_ocr-detection-line-level_damo'
self.model_id_vlpt = 'damo/cv_resnet50_ocr-detection-vlpt'
self.test_image = 'data/test/images/ocr_detection.jpg'
self.test_image_vlpt = 'data/test/images/ocr_detection_vlpt.jpg'
self.task = Tasks.ocr_detection
def pipeline_inference(self, pipeline: Pipeline, input_location: str):
@@ -25,6 +27,11 @@ class OCRDetectionTest(unittest.TestCase, DemoCompatibilityCheck):
ocr_detection = pipeline(Tasks.ocr_detection, model=self.model_id)
self.pipeline_inference(ocr_detection, self.test_image)
@unittest.skipUnless(test_level() >= 0, 'skip test in current test level')
def test_run_with_vlpt_with_model_from_modelhub(self):
ocr_detection = pipeline(Tasks.ocr_detection, model=self.model_id_vlpt)
self.pipeline_inference(ocr_detection, self.test_image_vlpt)
@unittest.skipUnless(test_level() >= 2, 'skip test in current test level')
def test_run_modelhub_default_model(self):
ocr_detection = pipeline(Tasks.ocr_detection)