Replies: 6 comments 2 replies
-
onnx模型是怎么跑的呢?命令是? |
Beta Was this translation helpful? Give feedback.
-
import argparse
import logging
import math
import time
from typing import Any, Dict, List, Tuple, Union
import cv2
import numpy as np
from onnxruntime import (
GraphOptimizationLevel,
InferenceSession,
SessionOptions,
get_available_providers,
get_device,
)
from .logger import get_logger
class EP(Enum):
CUDA_EP = "CUDAExecutionProvider"
CPU_EP = "CPUExecutionProvider"
DIRECTML_EP = "DmlExecutionProvider"
class OrtInferSession:
def __init__(self, config: Dict[str, Any]):
self.logger = get_logger("OrtInferSession")
model_path = config.get("model_path", None)
self._verify_model(model_path)
self.cfg_use_cuda = config.get("use_cuda", None)
self.cfg_use_dml = config.get("use_dml", None)
self.had_providers: List[str] = get_available_providers()
EP_list = self._get_ep_list()
sess_opt = self._init_sess_opts(config)
self.session = InferenceSession(
model_path,
sess_options=sess_opt,
providers=EP_list,
)
self._verify_providers()
@staticmethod
def _init_sess_opts(config: Dict[str, Any]) -> SessionOptions:
sess_opt = SessionOptions()
sess_opt.log_severity_level = 4
sess_opt.enable_cpu_mem_arena = False
sess_opt.graph_optimization_level = GraphOptimizationLevel.ORT_ENABLE_ALL
cpu_nums = os.cpu_count()
intra_op_num_threads = config.get("intra_op_num_threads", -1)
if intra_op_num_threads != -1 and 1 <= intra_op_num_threads <= cpu_nums:
sess_opt.intra_op_num_threads = intra_op_num_threads
inter_op_num_threads = config.get("inter_op_num_threads", -1)
if inter_op_num_threads != -1 and 1 <= inter_op_num_threads <= cpu_nums:
sess_opt.inter_op_num_threads = inter_op_num_threads
return sess_opt
def _get_ep_list(self) -> List[Tuple[str, Dict[str, Any]]]:
cpu_provider_opts = {
"arena_extend_strategy": "kSameAsRequested",
}
EP_list = [(EP.CPU_EP.value, cpu_provider_opts)]
cuda_provider_opts = {
"device_id": 0,
"arena_extend_strategy": "kNextPowerOfTwo",
"cudnn_conv_algo_search": "EXHAUSTIVE",
"do_copy_in_default_stream": True,
}
self.use_cuda = self._check_cuda()
if self.use_cuda:
EP_list.insert(0, (EP.CUDA_EP.value, cuda_provider_opts))
self.use_directml = self._check_dml()
if self.use_directml:
self.logger.info(
"Windows 10 or above detected, try to use DirectML as primary provider"
)
directml_options = (
cuda_provider_opts if self.use_cuda else cpu_provider_opts
)
EP_list.insert(0, (EP.DIRECTML_EP.value, directml_options))
return EP_list
def _check_cuda(self) -> bool:
if not self.cfg_use_cuda:
return False
cur_device = get_device()
if cur_device == "GPU" and EP.CUDA_EP.value in self.had_providers:
return True
self.logger.warning(
"%s is not in available providers (%s). Use %s inference by default.",
EP.CUDA_EP.value,
self.had_providers,
self.had_providers[0],
)
self.logger.info("!!!Recommend to use rapidocr_paddle for inference on GPU.")
self.logger.info(
"(For reference only) If you want to use GPU acceleration, you must do:"
)
self.logger.info(
"First, uninstall all onnxruntime pakcages in current environment."
)
self.logger.info(
"Second, install onnxruntime-gpu by `pip install onnxruntime-gpu`."
)
self.logger.info(
"\tNote the onnxruntime-gpu version must match your cuda and cudnn version."
)
self.logger.info(
"\tYou can refer this link: https://onnxruntime.ai/docs/execution-providers/CUDA-EP.html"
)
self.logger.info(
"Third, ensure %s is in available providers list. e.g. ['CUDAExecutionProvider', 'CPUExecutionProvider']",
EP.CUDA_EP.value,
)
return False
def _check_dml(self) -> bool:
if not self.cfg_use_dml:
return False
cur_os = platform.system()
if cur_os != "Windows":
self.logger.warning(
"DirectML is only supported in Windows OS. The current OS is %s. Use %s inference by default.",
cur_os,
self.had_providers[0],
)
return False
cur_window_version = int(platform.release().split(".")[0])
if cur_window_version < 10:
self.logger.warning(
"DirectML is only supported in Windows 10 and above OS. The current Windows version is %s. Use %s inference by default.",
cur_window_version,
self.had_providers[0],
)
return False
if EP.DIRECTML_EP.value in self.had_providers:
return True
self.logger.warning(
"%s is not in available providers (%s). Use %s inference by default.",
EP.DIRECTML_EP.value,
self.had_providers,
self.had_providers[0],
)
self.logger.info("If you want to use DirectML acceleration, you must do:")
self.logger.info(
"First, uninstall all onnxruntime pakcages in current environment."
)
self.logger.info(
"Second, install onnxruntime-directml by `pip install onnxruntime-directml`"
)
self.logger.info(
"Third, ensure %s is in available providers list. e.g. ['DmlExecutionProvider', 'CPUExecutionProvider']",
EP.DIRECTML_EP.value,
)
return False
def _verify_providers(self):
session_providers = self.session.get_providers()
first_provider = session_providers[0]
if self.use_cuda and first_provider != EP.CUDA_EP.value:
self.logger.warning(
"%s is not avaiable for current env, the inference part is automatically shifted to be executed under %s.",
EP.CUDA_EP.value,
first_provider,
)
if self.use_directml and first_provider != EP.DIRECTML_EP.value:
self.logger.warning(
"%s is not available for current env, the inference part is automatically shifted to be executed under %s.",
EP.DIRECTML_EP.value,
first_provider,
)
def __call__(self, input_content: np.ndarray) -> np.ndarray:
input_dict = dict(zip(self.get_input_names(), [input_content]))
try:
return self.session.run(self.get_output_names(), input_dict)
except Exception as e:
error_info = traceback.format_exc()
raise ONNXRuntimeError(error_info) from e
def get_input_names(self) -> List[str]:
return [v.name for v in self.session.get_inputs()]
def get_output_names(self) -> List[str]:
return [v.name for v in self.session.get_outputs()]
def get_character_list(self, key: str = "character") -> List[str]:
meta_dict = self.session.get_modelmeta().custom_metadata_map
return meta_dict[key].splitlines()
def have_key(self, key: str = "character") -> bool:
meta_dict = self.session.get_modelmeta().custom_metadata_map
if key in meta_dict.keys():
return True
return False
@staticmethod
def _verify_model(model_path: Union[str, Path, None]):
if model_path is None:
raise ValueError("model_path is None!")
model_path = Path(model_path)
if not model_path.exists():
raise FileNotFoundError(f"{model_path} does not exists.")
if not model_path.is_file():
raise FileExistsError(f"{model_path} is not a file.")
class ONNXRuntimeError(Exception):
pass
class CTCLabelDecode:
def __init__(
self,
character: Optional[List[str]] = None,
character_path: Union[str, Path, None] = None,
):
self.character = self.get_character(character, character_path)
self.dict = {char: i for i, char in enumerate(self.character)}
def __call__(
self, preds: np.ndarray, return_word_box: bool = False, **kwargs
) -> List[Tuple[str, float]]:
preds_idx = preds.argmax(axis=2)
preds_prob = preds.max(axis=2)
text = self.decode(
preds_idx, preds_prob, return_word_box, is_remove_duplicate=True
)
if return_word_box:
for rec_idx, rec in enumerate(text):
wh_ratio = kwargs["wh_ratio_list"][rec_idx]
max_wh_ratio = kwargs["max_wh_ratio"]
rec[2][0] = rec[2][0] * (wh_ratio / max_wh_ratio)
return text
def get_character(
self,
character: Optional[List[str]] = None,
character_path: Union[str, Path, None] = None,
) -> List[str]:
if character is None and character_path is None:
raise ValueError("character must not be None")
character_list = None
if character:
character_list = character
if character_path:
character_list = self.read_character_file(character_path)
if character_list is None:
raise ValueError("character must not be None")
character_list = self.insert_special_char(
character_list, " ", len(character_list)
)
character_list = self.insert_special_char(character_list, "blank", 0)
return character_list
@staticmethod
def read_character_file(character_path: Union[str, Path]) -> List[str]:
character_list = []
with open(character_path, "rb") as f:
lines = f.readlines()
for line in lines:
line = line.decode("utf-8").strip("\n").strip("\r\n")
character_list.append(line)
return character_list
@staticmethod
def insert_special_char(
character_list: List[str], special_char: str, loc: int = -1
) -> List[str]:
character_list.insert(loc, special_char)
return character_list
def decode(
self,
text_index: np.ndarray,
text_prob: Optional[np.ndarray] = None,
return_word_box: bool = False,
is_remove_duplicate: bool = False,
) -> List[Tuple[str, float]]:
"""convert text-index into text-label."""
result_list = []
ignored_tokens = self.get_ignored_tokens()
batch_size = len(text_index)
for batch_idx in range(batch_size):
selection = np.ones(len(text_index[batch_idx]), dtype=bool)
if is_remove_duplicate:
selection[1:] = text_index[batch_idx][1:] != text_index[batch_idx][:-1]
for ignored_token in ignored_tokens:
selection &= text_index[batch_idx] != ignored_token
if text_prob is not None:
conf_list = np.array(text_prob[batch_idx][selection]).tolist()
else:
conf_list = [1] * len(selection)
if len(conf_list) == 0:
conf_list = [0]
char_list = [
self.character[text_id] for text_id in text_index[batch_idx][selection]
]
text = "".join(char_list)
if return_word_box:
word_list, word_col_list, state_list = self.get_word_info(
text, selection
)
result_list.append(
(
text,
np.mean(conf_list).tolist(),
[
len(text_index[batch_idx]),
word_list,
word_col_list,
state_list,
conf_list,
],
)
)
else:
result_list.append((text, np.mean(conf_list).tolist()))
return result_list
@staticmethod
def get_word_info(
text: str, selection: np.ndarray
) -> Tuple[List[List[str]], List[List[int]], List[str]]:
"""
Group the decoded characters and record the corresponding decoded positions.
from https://github.com/PaddlePaddle/PaddleOCR/blob/fbba2178d7093f1dffca65a5b963ec277f1a6125/ppocr/postprocess/rec_postprocess.py#L70
Args:
text: the decoded text
selection: the bool array that identifies which columns of features are decoded as non-separated characters
Returns:
word_list: list of the grouped words
word_col_list: list of decoding positions corresponding to each character in the grouped word
state_list: list of marker to identify the type of grouping words, including two types of grouping words:
- 'cn': continous chinese characters (e.g., 你好啊)
- 'en&num': continous english characters (e.g., hello), number (e.g., 123, 1.123), or mixed of them connected by '-' (e.g., VGG-16)
"""
state = None
word_content = []
word_col_content = []
word_list = []
word_col_list = []
state_list = []
valid_col = np.where(selection)[0]
col_width = np.zeros(valid_col.shape)
if len(valid_col) > 0:
col_width[1:] = valid_col[1:] - valid_col[:-1]
col_width[0] = min(
3 if "\u4e00" <= text[0] <= "\u9fff" else 2, int(valid_col[0])
)
for c_i, char in enumerate(text):
if "\u4e00" <= char <= "\u9fff":
c_state = "cn"
else:
c_state = "en&num"
if state is None:
state = c_state
if state != c_state or col_width[c_i] > 4:
if len(word_content) != 0:
word_list.append(word_content)
word_col_list.append(word_col_content)
state_list.append(state)
word_content = []
word_col_content = []
state = c_state
word_content.append(char)
word_col_content.append(int(valid_col[c_i]))
if len(word_content) != 0:
word_list.append(word_content)
word_col_list.append(word_col_content)
state_list.append(state)
return word_list, word_col_list, state_list
@staticmethod
def get_ignored_tokens() -> List[int]:
return [0]
class TextRecognizer:
def __init__(self, config: Dict[str, Any]):
self.session = OrtInferSession(config)
character = None
if self.session.have_key():
character = self.session.get_character_list()
character_path = config.get("rec_keys_path", None)
self.postprocess_op = CTCLabelDecode(
character=character, character_path=character_path
)
self.rec_batch_num = config["rec_batch_num"]
self.rec_image_shape = config["rec_img_shape"]
def __call__(
self,
img_list: Union[np.ndarray, List[np.ndarray]],
return_word_box: bool = False,
) -> Tuple[List[Tuple[str, float]], float]:
if isinstance(img_list, np.ndarray):
img_list = [img_list]
# Calculate the aspect ratio of all text bars
width_list = [img.shape[1] / float(img.shape[0]) for img in img_list]
# Sorting can speed up the recognition process
indices = np.argsort(np.array(width_list))
img_num = len(img_list)
rec_res = [("", 0.0)] * img_num
batch_num = self.rec_batch_num
elapse = 0
for beg_img_no in range(0, img_num, batch_num):
end_img_no = min(img_num, beg_img_no + batch_num)
# Parameter Alignment for PaddleOCR
imgC, imgH, imgW = self.rec_image_shape[:3]
max_wh_ratio = imgW / imgH
wh_ratio_list = []
for ino in range(beg_img_no, end_img_no):
h, w = img_list[indices[ino]].shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
wh_ratio_list.append(wh_ratio)
norm_img_batch = []
for ino in range(beg_img_no, end_img_no):
norm_img = self.resize_norm_img(img_list[indices[ino]], max_wh_ratio)
norm_img_batch.append(norm_img[np.newaxis, :])
norm_img_batch = np.concatenate(norm_img_batch).astype(np.float32)
starttime = time.time()
preds = self.session(norm_img_batch)[0]
print("run time info: ",time.time() - starttime)
starttime2 = time.time()
rec_result = self.postprocess_op(
preds,
return_word_box,
wh_ratio_list=wh_ratio_list,
max_wh_ratio=max_wh_ratio,
)
print("post precess:",time.time() - starttime2)
for rno, one_res in enumerate(rec_result):
rec_res[indices[beg_img_no + rno]] = one_res
elapse += time.time() - starttime
return rec_res, elapse
def resize_norm_img(self, img: np.ndarray, max_wh_ratio: float) -> np.ndarray:
img_channel, img_height, img_width = self.rec_image_shape
assert img_channel == img.shape[2]
img_width = int(img_height * max_wh_ratio)
h, w = img.shape[:2]
ratio = w / float(h)
if math.ceil(img_height * ratio) > img_width:
resized_w = img_width
else:
resized_w = int(math.ceil(img_height * ratio))
resized_image = cv2.resize(img, (resized_w, img_height))
resized_image = resized_image.astype("float32")
resized_image = resized_image.transpose((2, 0, 1)) / 255
resized_image -= 0.5
resized_image /= 0.5
padding_im = np.zeros((img_channel, img_height, img_width), dtype=np.float32)
padding_im[:, :, 0:resized_w] = resized_image
return padding_im
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--image_path", type=str, help="image_dir|image_path")
parser.add_argument("--config_path", type=str, default="config.yaml")
args = parser.parse_args()
config = read_yaml(args.config_path)
text_recognizer = TextRecognizer(config)
img = cv2.imread(args.image_path)
rec_res, predict_time = text_recognizer(img)
print(f"rec result: {rec_res}\t cost: {predict_time}s") 转了onnx模型后 用onnxruntime-gpu 加载跑的,可以测试一下?批量跑真慢 如果识别batch 6348*1325 的输入就特别慢,药将近10s,而且同样的代码A6000 onnxruntime-gpu==1.17比2060 onnxruntime-gpu==1.13 还慢 |
Beta Was this translation helpful? Give feedback.
-
是不是在cpu上跑的?
日志中显示:
|
Beta Was this translation helpful? Give feedback.
-
您好,我这边也出现了这个情况,onnxruntime=1.14.1 cuda11.6 查看版本都是和cuda 匹配的,但是推理ocr 则出现推理使用cpu 但是速度很慢,但是还占用了显存 |
Beta Was this translation helpful? Give feedback.
-
你好,请问,您是否解决了这个问题,我当前也遇到了这个问题 |
Beta Was this translation helpful? Give feedback.
-
请问使用的paddle2onnx是什么版本的呢? |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
🔎 Search before asking
🐛 Bug (问题描述)
v5的官方识别模型转成onnx推理很慢,显示gpu又再用的,检测模型同样也很慢,很确定gpu是在用,不用gpu,cpu反而更快但单图也要0.6s
paddle2onnx --model_dir /PP-OCRv5_server_rec_infer
--model_filename inference.json
--params_filename inference.pdiparams
--save_file /text_rec.onnx
--opset_version 11
--enable_onnx_checker True
🏃♂️ Environment (运行环境)
cuda 11.6 onnxruntiem-gpu 1.13.1
🌰 Minimal Reproducible Example (最小可复现问题的Demo)
paddle2onnx --model_dir /PP-OCRv5_server_rec_infer
--model_filename inference.json
--params_filename inference.pdiparams
--save_file /text_rec.onnx
--opset_version 11
--enable_onnx_checker True
Beta Was this translation helpful? Give feedback.
All reactions