Commit 1110eb3e authored by Thomas Müller's avatar Thomas Müller
Browse files

coral-demo: add new package



Add recipe and source files for camera demo with webserver.
Signed-off-by: Thomas Müller's avatarThomas Mueller <thomas.mueller@kontron.com>
parent fbfc5af1
SRC_URI = "file://streaming.py \
file://common.py \
file://obj_detect.py \
file://templates/index.html \
"
LICENSE = "CLOSED"
RDEPENDS_${PN} = "python3-numpy python3-flask python3-opencv python3-psutil python3-werkzeug-tests"
do_install () {
install -d ${D}/opt/edgetpu/coral-demo
install -d ${D}/opt/edgetpu/coral-demo/templates
install -m 0755 ${WORKDIR}/streaming.py ${D}/opt/edgetpu/coral-demo
install -m 0755 ${WORKDIR}/common.py ${D}/opt/edgetpu/coral-demo
install -m 0755 ${WORKDIR}/obj_detect.py ${D}/opt/edgetpu/coral-demo
install -m 0755 ${WORKDIR}/templates/index.html ${D}/opt/edgetpu/coral-demo/templates
}
FILES_${PN} += " \
/opt/edgetpu/coral-demo/*.py \
/opt/edgetpu/coral-demo/templates/*.html \
"
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common utilities."""
import numpy as np
from PIL import Image
import tflite_runtime.interpreter as tflite
EDGETPU_SHARED_LIB = 'libedgetpu.so.1'
def make_interpreter(model_file, delegate=True):
model_file, *device = model_file.split('@')
if (delegate == True):
return tflite.Interpreter(
model_path=model_file,
experimental_delegates=[
tflite.load_delegate(EDGETPU_SHARED_LIB,
{'device': device[0]} if device else {})
])
else:
return tflite.Interpreter(model_path=model_file)
def set_input(interpreter, image, resample=Image.NEAREST):
"""Copies data to input tensor."""
image = image.resize((input_image_size(interpreter)[0:2]), resample)
input_tensor(interpreter)[:, :] = image
def input_image_size(interpreter):
"""Returns input image size as (width, height, channels) tuple."""
_, height, width, channels = interpreter.get_input_details()[0]['shape']
return width, height, channels
def input_tensor(interpreter):
"""Returns input tensor view as numpy array of shape (height, width, 3)."""
tensor_index = interpreter.get_input_details()[0]['index']
return interpreter.tensor(tensor_index)()[0]
def output_tensor(interpreter, i):
"""Returns dequantized output tensor if quantized before."""
output_details = interpreter.get_output_details()[i]
output_data = np.squeeze(interpreter.tensor(output_details['index'])())
if 'quantization' not in output_details:
return output_data
scale, zero_point = output_details['quantization']
if scale == 0:
return output_data - zero_point
return scale * (output_data - zero_point)
import common
import collections
import threading
import queue
import re
import cv2
import time
from PIL import Image
import numpy as np
import psutil
class obj_detect():
def __init__(self, camera_idx, top_k, threshold,
delegate_model, default_model, default_labels):
self.camera_idx = camera_idx
self.top_k = top_k
self.threshold = threshold
self.delegate_model = delegate_model
self.default_model = default_model
self.labels = self.load_labels(default_labels)
inputQueue = queue.Queue(maxsize=2)
outputQueue = queue.PriorityQueue(maxsize=4)
stop_event= threading.Event()
delegate = False
outputFrame = None
lock = threading.Lock()
tpu_inference_time = 1
frame_period = 0.15
frame_id = 0
frame_times = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
cpu_times = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
tpu_times = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
Object = collections.namedtuple('Object', ['id', 'score', 'bbox'])
cap = None
def switch_delegate(self):
if(self.delegate == True):
self.delegate = False
self.reset_stop_event()
self.frame_period = 0.15
else:
self.delegate = True
self.stop_threads()
self.frame_period = 0.025
def make_interpreters(self, delegate_model, default_model, default_labels):
self.delegate_interpreter = common.make_interpreter(delegate_model, True)
self.default_interpreter = common.make_interpreter(default_model, False)
self.delegate_interpreter.allocate_tensors()
self.default_interpreter.allocate_tensors()
self.labels = self.load_labels(default_labels)
def append_framerate_to_image(self, cv2_im, framerate, tpu, cpu):
cv2_im = cv2.rectangle(cv2_im, (400, 10), (630, 100), (255,255,255), -1)
cv2_im = cv2.putText(cv2_im, framerate, (450, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2)
cv2_im = cv2.putText(cv2_im, tpu, (450, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2)
cv2_im = cv2.putText(cv2_im, cpu , (450, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2)
return cv2_im
def append_objs_to_img(self, cv2_im, objs):
height, width, channels = cv2_im.shape
for obj in objs:
x0, y0, x1, y1 = list(obj.bbox)
x0, y0, x1, y1 = int(x0*width), int(y0*height), int(x1*width), int(y1*height)
percent = int(100 * obj.score)
label = '{}% {}'.format(percent, self.labels.get(obj.id, obj.id))
cv2_im = cv2.rectangle(cv2_im, (x0, y0), (x1, y1), (0, 255, 0), 2)
cv2_im = cv2.putText(cv2_im, label, (x0, y0+30),
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 0, 0), 2)
return cv2_im
def load_labels(self, path):
p = re.compile(r'\s*(\d+)(.+)')
with open(path, 'r', encoding='utf-8') as f:
lines = (p.match(line).groups() for line in f.readlines())
return {int(num): text.strip() for num, text in lines}
class BBox(collections.namedtuple('BBox', ['xmin', 'ymin', 'xmax', 'ymax'])):
"""Bounding box.
Represents a rectangle which sides are either vertical or horizontal, parallel
to the x or y axis.
"""
__slots__ = ()
def get_output(self, interpreter, image_scale=1.0):
"""Returns list of detected objects."""
boxes = common.output_tensor(interpreter, 0)
class_ids = common.output_tensor(interpreter, 1)
scores = common.output_tensor(interpreter, 2)
count = int(common.output_tensor(interpreter, 3))
def make(i):
ymin, xmin, ymax, xmax = boxes[i]
return self.Object(
id=int(class_ids[i]),
score=scores[i],
bbox=self.BBox(xmin=np.maximum(0.0, xmin),
ymin=np.maximum(0.0, ymin),
xmax=np.minimum(1.0, xmax),
ymax=np.minimum(1.0, ymax)))
return [make(i) for i in range(self.top_k) if scores[i] >= self.threshold]
def run(self):
t = threading.Thread(target=self.process_frame, args=
[self.camera_idx, self.delegate_interpreter, self.default_interpreter])
t.daemon = True
t.start()
def run_frame_grabber(self):
t = threading.Thread(target=self.grab_frames)
t.daemon = True
t.start()
def run_inference_thread(self):
t = threading.Thread(target=self.inference_worker)
t.daemon = True
t.start()
def run_tpu_inference_thread(self):
t = threading.Thread(target=self.tpu_inference_worker)
t.daemon = True
t.start()
def open_camera(self, camera_idx):
self.cap = cv2.VideoCapture(camera_idx)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
def close_camera(self):
self.cap.release()
def grab_frames(self):
while self.cap.isOpened():
ret, cv2_im = self.cap.read()
if not ret:
break
if(self.inputQueue.full() == True):
discard = self.inputQueue.get()
self.inputQueue.put((self.frame_id,cv2_im))
self.frame_id += 1
time.sleep(self.frame_period)
def generate_webstream_frame(self):
encodedImage = None
start_frame = 0
# loop over frames from the output stream
i = 0
tpu_string = '0'
cpu_string = '0'
framerate_string = '0'
while True:
if(self.outputQueue.full() == True):
end_frame = time.time()
frame_time = end_frame-start_frame
self.frame_times.pop()
self.frame_times.insert(0,frame_time)
frame_time = sum(self.frame_times)/16
start_frame = time.time()
frame_id, cv2_im = self.outputQueue.get()
cpu = psutil.cpu_percent(interval=None, percpu=False)
self.cpu_times.pop()
self.cpu_times.insert(0,cpu)
cpu = sum(self.cpu_times)/16
if(self.delegate == True):
with self.lock:
tpu_time = self.tpu_inference_time
self.tpu_times.pop()
self.tpu_times.insert(0,tpu_time)
tpu_time = sum(self.tpu_times)/16
tpu = 100 * tpu_time / frame_time
i += 1
if (i==4):
cpu_string = 'cpu: {0:2.1f} %'.format(cpu)
framerate_string = '{0:2.1f} fps'.format(1/(frame_time))
tpu_string = 'tpu: {0:2.1f} %'.format(tpu)
i = 0
else:
cpu_string = 'cpu: {0:2.1f} %'.format(cpu)
framerate_string = '{0:2.1f} fps'.format(1/(frame_time))
tpu_string = 'tpu: {0:2.1f} %'.format(0)
cv2_im = self.append_framerate_to_image(cv2_im, framerate_string, tpu_string, cpu_string)
# encode the frame in JPEG format
(flag, encodedImage) = cv2.imencode(".jpg", cv2_im)
# ensure the frame was successfully encoded
if not flag:
continue
# yield the output frame in the byte format
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
bytearray(encodedImage) + b'\r\n')
time.sleep(self.frame_period)
def inference_worker(self):
default_interpreter = common.make_interpreter(self.default_model, False)
default_interpreter.allocate_tensors()
while True:
if not self.stop_event.is_set():
if(self.inputQueue.empty() == True):
time.sleep(0.01)
continue
frame_id, cv2_im = self.inputQueue.get()
cv2_im_rgb = cv2.cvtColor(cv2_im, cv2.COLOR_BGR2RGB)
pil_im = Image.fromarray(cv2_im_rgb)
common.set_input(default_interpreter, pil_im)
start_inference = time.time()
default_interpreter.invoke()
end_inference = time.time()
inference_time = end_inference - start_inference
objs = self.get_output(default_interpreter)
cv2_im = self.append_objs_to_img(cv2_im, objs)
self.outputQueue.put((frame_id, cv2_im))
else:
time.sleep(0.15)
def tpu_inference_worker(self):
delegate_interpreter = common.make_interpreter(self.delegate_model, True)
delegate_interpreter.allocate_tensors()
end_inference = 1
while True:
if self.stop_event.is_set():
if(self.inputQueue.empty() == True):
time.sleep(0.01)
continue
frame_id, cv2_im = self.inputQueue.get()
cv2_im_rgb = cv2.cvtColor(cv2_im, cv2.COLOR_BGR2RGB)
pil_im = Image.fromarray(cv2_im_rgb)
common.set_input(delegate_interpreter, pil_im)
start_inference = time.time()
delegate_interpreter.invoke()
end_inference = time.time()
inference_time = end_inference - start_inference
with self.lock:
self.tpu_inference_time = inference_time
objs = self.get_output(delegate_interpreter)
cv2_im = self.append_objs_to_img(cv2_im, objs)
self.outputQueue.put((frame_id, cv2_im))
else:
time.sleep(0.15)
def stop_threads(self):
self.stop_event.set()
def reset_stop_event(self):
self.stop_event.clear()
import argparse
import obj_detect
from flask import request
from flask import Response
from flask import Flask
from flask import render_template
import time
# initialize a flask object
app = Flask(__name__)
global detector
global delegate_flag
@app.route("/")
def index():
# return the rendered template
return render_template("index.html")
@app.route('/button')
def button():
detector.switch_delegate()
return render_template("index.html")
@app.route("/video_feed")
def video_feed():
# return the response generated along with the specific media
# type (mime type)
return Response(detector.generate_webstream_frame(),
mimetype = "multipart/x-mixed-replace; boundary=frame")
def main():
global delegate_flag
global detector
delegate_flag = False
default_model_dir = 'all_models'
default_model = '../examples-camera/all_models/mobilenet_ssd_v2_coco_quant_postprocess.tflite'
delegate_model = '../examples-camera/all_models/mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite'
default_labels = '../examples-camera/all_models/coco_labels.txt'
parser = argparse.ArgumentParser()
parser.add_argument('--top_k', type=int, default=3,
help='number of categories with highest score to display')
parser.add_argument('--camera_idx', type=int, help='Index of which video source to use. ', default = 0)
parser.add_argument('--threshold', type=float, default=0.1,
help='classifier score threshold')
args = parser.parse_args()
print('Loading {} with {} labels.'.format(default_model, default_labels))
detector = obj_detect.obj_detect(args.camera_idx, args.top_k, args.threshold,
delegate_model, default_model, default_labels)
detector.open_camera(args.camera_idx)
detector.run_frame_grabber()
detector.run_inference_thread()
detector.run_inference_thread()
detector.run_inference_thread()
detector.run_tpu_inference_thread()
# start server app
app.run(host='0.0.0.0', port=8000, debug=True,
threaded=True, use_reloader=False)
if __name__=="__main__":
main()
<html>
<head>
<title>Object Detection</title>
</head>
<body>
<h1>Object Detection</h1>
<img src="{{ url_for('video_feed') }}">
<form action="button">
<button type="submit">Switch CPU/TPU!</button>
<form>
</body>
</html>
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment