You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

103 lines
3.4 KiB

import numpy as np
import time
import onnxruntime as ort
import json
import os
import logging
import ast
import cv2.dnn
# Environment variables
MODEL_PATH = os.environ.get("MODEL_PATH", "model.onnx")
INPUT_IMAGE_PATH = os.environ.get("IMAGE_PATH", "input.jpg")
# Onnxruntime providers
PROVIDERS = ast.literal_eval(os.environ.get("ONNXRUNTIME_PROVIDERS", '["CUDAExecutionProvider"]'))
CLASSES = {
0: "SpeedLimit",
1: "DangerAhead"
}
colors = np.random.uniform(0, 255, size=(len(CLASSES), 3))
MIN_CONF_THRESHOLD = float(os.environ.get("MIN_CONF_THRESHOLD", 0.8))
def preprocess(original_image):
# original_image: np.ndarray = cv2.imread(image_path)
[height, width, _] = original_image.shape
# Prepare a square image for inference
length = max((height, width))
image = np.zeros((length, length, 3), np.uint8)
image[0:height, 0:width] = original_image
# Calculate scale factor
scale = length / 640
# Preprocess the image and prepare blob for model
blob = cv2.dnn.blobFromImage(image, scalefactor=1 / 255, size=(640, 640), swapRB=True)
return blob, scale, original_image
def postprocess(response):
outputs = np.array([cv2.transpose(response[0])])
rows = outputs.shape[1]
boxes = []
scores = []
class_ids = []
# Iterate through output to collect bounding boxes, confidence scores, and class IDs
for i in range(rows):
classes_scores = outputs[0][i][4:]
(minScore, maxScore, minClassLoc, (x, maxClassIndex)) = cv2.minMaxLoc(classes_scores)
if maxScore >= 0.25:
box = [
outputs[0][i][0] - (0.5 * outputs[0][i][2]), outputs[0][i][1] - (0.5 * outputs[0][i][3]),
outputs[0][i][2], outputs[0][i][3]]
boxes.append(box)
scores.append(maxScore)
class_ids.append(maxClassIndex)
detections = []
result_boxes = cv2.dnn.NMSBoxes(boxes, scores, 0.25, 0.45, 0.5)
# Iterate through NMS results to draw bounding boxes and labels
for i in range(len(result_boxes)):
index = result_boxes[i]
box = boxes[index]
if scores[index] > MIN_CONF_THRESHOLD:
detection = {
'class_id': class_ids[index],
'class_name': CLASSES[class_ids[index]],
'confidence': f"{scores[index]:.2f}",
'box': [f"{c:.2f}" for c in box]}
detections.append(detection)
return detections
if __name__ == "__main__":
# Logger
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
ort_sess = ort.InferenceSession(MODEL_PATH, providers=PROVIDERS)
nparr = np.fromfile(INPUT_IMAGE_PATH, np.uint8)
nparr = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
preprocessed, scale, original_image = preprocess(nparr)
outputs = ort_sess.run(None, {'images': preprocessed})
detections = postprocess(outputs[0])
logger.info(f"Processed image {INPUT_IMAGE_PATH}")
logger.info(json.dumps(detections))
logger.info(f"Now starting mass inference...")
start = time.time()
count = 0
while True:
now = time.time()
if now - start > 1:
logger.info(f"Performed {count} inferences in {now-start:.2f}s")
start = time.time()
count = 0
outputs = ort_sess.run(None, {'images': preprocessed})
count+=1