import importlib
import signal
import tkinter.ttk as ttk
from tkinter import filedialog
from .app import App
from .base import Base
from .button import Button
from .modulesmanager import ModulesManager
from .popupmenu import PopupMenu
try:
import cv2
except ImportError:
cv2 = None
[docs]
class VideoView(Base):
"""Live video capture and display widget backed by OpenCV."""
def __init__(self, device=0, zoom_level=3, auto_start=True):
super().__init__()
self.device = device
self.zoom_level = zoom_level
self.image = None
self.capture = None
self.videowriter = None
self.abort = False
self.auto_start = auto_start
self.startstop_behaviour_button = None
self.save_behaviour_button = None
self.stream_behaviour_button = None
self.histogram_xyplot = None
self._displayed_tkimage = None
self.previous_handler = signal.signal(
signal.SIGINT, self.signal_handler
)
self.next_scheduled_update = None
self.next_scheduled_update_histogram = None
[docs]
def is_environment_valid(self):
"""Check that OpenCV and Pillow are available, installing them if needed."""
ModulesManager.install_and_import_modules_if_absent(
{"opencv-python": "cv2", "Pillow": "PIL"}
)
self.cv2 = ModulesManager.imported.get("opencv-python", None)
self.PIL = ModulesManager.imported.get("Pillow", None)
if self.PIL is not None:
self.PILImage = importlib.import_module("PIL.Image")
self.PILImageTk = importlib.import_module("PIL.ImageTk")
return all(
v is not None
for v in [self.cv2, self.PIL, self.PILImage, self.PILImageTk]
)
[docs]
def signal_handler(self, sig, frame):
"""Handle SIGINT to stop capture gracefully before propagating."""
print(f"Handling signal {sig} ({signal.Signals(sig).name}).")
if sig == signal.SIGINT:
if self.is_running:
self.abort = True
else:
self.previous_handler(sig, frame)
[docs]
@classmethod
def available_devices(cls):
"""Return a list of indices for available video capture devices."""
available_devices = []
try:
index = 0
while True:
cap = cv2.VideoCapture(index)
if not cap.read()[0]:
break
else:
available_devices.append(index)
cap.release()
index += 1
except Exception:
pass
return available_devices
@property
def is_running(self):
"""Return True if video capture is currently active."""
return self.capture is not None
@property
def startstop_button_label(self):
"""Return the appropriate label for the start/stop button."""
if self.is_running:
return "Stop"
return "Start"
[docs]
def start_capturing(self):
"""Open the video device and begin reading frames."""
if not self.is_running:
try:
self.capture = self.cv2.VideoCapture(self.device)
if self.capture.isOpened():
self.update_display()
except Exception as err:
print(err)
self.capture = None
[docs]
def stop_capturing(self):
"""Release the video device and cancel scheduled display updates."""
if self.is_running:
if self.next_scheduled_update is not None:
App.app.root.after_cancel(self.next_scheduled_update)
self.capture.release()
self.capture = None
[docs]
def start_streaming(self, filepath):
"""Begin writing captured frames to a video file at the given path."""
width = self.get_prop_id(cv2.CAP_PROP_FRAME_WIDTH)
height = self.get_prop_id(cv2.CAP_PROP_FRAME_HEIGHT)
fourcc = self.cv2.VideoWriter_fourcc("I", "4", "2", "0")
self.videowriter = self.cv2.VideoWriter(
filepath, fourcc, 20.0, (int(width), int(height)), True
)
[docs]
def stop_streaming(self):
"""Stop writing frames and release the video writer."""
if self.videowriter is not None:
self.videowriter.release()
self.videowriter = None
[docs]
def update_display(self, readonly_frame=None):
"""Read the next frame, update the display widget, and schedule the next refresh."""
ret = True
if readonly_frame is None and self.is_running:
ret, readonly_frame = self.capture.read()
if ret:
# The OpenCV documentation is clear: the returned frame from read() is read-only
# and must be copied to be used (I assume it can be overwritten internally)
# https://docs.opencv.org/3.4/d8/dfe/classcv_1_1VideoCapture.html#a473055e77dd7faa4d26d686226b292c1
# Without this copy, the program crashes in a few seconds
frame = readonly_frame.copy()
if self.videowriter is not None:
self.videowriter.write(frame)
if len(frame.shape) == 3 and frame.shape[2] == 3:
frame = self.cv2.cvtColor(frame, self.cv2.COLOR_BGR2RGB)
# frame = cv.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# convert to PIL image
img = self.PILImage.fromarray(frame)
resized_image = img.resize(
(
img.width // int(self.zoom_level),
img.height // int(self.zoom_level),
),
self.PILImage.NEAREST,
)
self.image = resized_image
# convert to Tkinter image
photo = self.PILImageTk.PhotoImage(image=self.image)
# solution for bug in `PhotoImage`
self._displayed_tkimage = photo
# replace image in label
self.widget.configure(image=photo)
if self.next_scheduled_update_histogram is None:
self.update_histogram()
self.next_scheduled_update = App.app.root.after(
20, self.update_display
)
if self.abort:
self.stop_capturing()
self.previous_handler(signal.SIGINT, 0)
[docs]
def update_histogram(self):
"""Recompute and redraw the per-channel histogram from the current image."""
if self.histogram_xyplot is not None:
self.histogram_xyplot.clear_plot()
values = self.image.histogram()
bins_per_channel = len(values) // 3
decimate = 8
self.histogram_xyplot.x = list(range(bins_per_channel // decimate))
self.histogram_xyplot.y.append(values[0:bins_per_channel:decimate])
self.histogram_xyplot.y.append(
values[bins_per_channel : 2 * bins_per_channel : decimate]
)
self.histogram_xyplot.y.append(
values[2 * bins_per_channel :: decimate]
)
self.histogram_xyplot.update_plot()
self.next_scheduled_update_histogram = App.app.root.after(
300, self.update_histogram
)
[docs]
def camera_selection_changed(self, index):
"""Switch capture to a different camera device by index."""
self.stop_capturing()
self.device = index
self.start_capturing()
[docs]
def prop_ids(self):
"""Print all major OpenCV capture properties to stdout."""
capture = self.capture
print(
"CV_CAP_PROP_FRAME_WIDTH: '{}'".format(
capture.get(self.cv2.CAP_PROP_FRAME_WIDTH)
)
)
print(
"CV_CAP_PROP_FRAME_HEIGHT : '{}'".format(
capture.get(self.cv2.CAP_PROP_FRAME_HEIGHT)
)
)
print("CAP_PROP_FPS : '{}'".format(capture.get(self.cv2.CAP_PROP_FPS)))
print(
"CAP_PROP_POS_MSEC : '{}'".format(
capture.get(self.cv2.CAP_PROP_POS_MSEC)
)
)
print(
"CAP_PROP_FRAME_COUNT : '{}'".format(
capture.get(self.cv2.CAP_PROP_FRAME_COUNT)
)
)
print(
"CAP_PROP_BRIGHTNESS : '{}'".format(
capture.get(self.cv2.CAP_PROP_BRIGHTNESS)
)
)
print(
"CAP_PROP_CONTRAST : '{}'".format(
capture.get(self.cv2.CAP_PROP_CONTRAST)
)
)
print(
"CAP_PROP_SATURATION : '{}'".format(
capture.get(self.cv2.CAP_PROP_SATURATION)
)
)
print("CAP_PROP_HUE : '{}'".format(capture.get(self.cv2.CAP_PROP_HUE)))
print(
"CAP_PROP_GAIN : '{}'".format(capture.get(self.cv2.CAP_PROP_GAIN))
)
print(
"CAP_PROP_CONVERT_RGB : '{}'".format(
capture.get(self.cv2.CAP_PROP_CONVERT_RGB)
)
)
[docs]
def get_prop_id(self, prop_id):
"""Return the value of an OpenCV capture property, or None if not capturing.
Common *prop_id* constants include CAP_PROP_FRAME_WIDTH,
CAP_PROP_FRAME_HEIGHT, CAP_PROP_FPS, CAP_PROP_FOURCC, and
CAP_PROP_CONVERT_RGB.
"""
if self.capture is not None:
return self.capture.get(prop_id)
return None