Source code for caer.video.stream
# _____ ______ _____
# / ____/ /\ | ____ | __ \
# | | / \ | |__ | |__) | Caer - Modern Computer Vision
# | | / /\ \ | __| | _ / Languages: Python, C, C++, Cuda
# | |___ / ____ \ | |____ | | \ \ http://github.com/jasmcaus/caer
# \_____\/_/ \_ \______ |_| \_\
# Licensed under the MIT License <http://opensource.org/licenses/MIT>
# SPDX-License-Identifier: MIT
# Copyright (c) 2020-2021 The Caer Authors <http://github.com/jasmcaus>
from threading import Thread
import time
import math
from queue import Queue
import cv2 as cv
import numpy as np
from .constants import (
FPS, FRAME_COUNT, FRAME_HEIGHT, FRAME_WIDTH
)
from ..annotations import Tuple, Optional
from ..coreten import Tensor
__all__ = [
"Stream"
]
#pylint:disable=no-member
# ret, jpeg = cv2.imencode(".jpg", image)
# return jpeg.tobytes()
# This class can handle both live as well as pre-existing videos.
[docs]class Stream:
r"""
This is an auxiliary class that enables Video Streaming for ``caer`` with minimalistic latency, and at the expense
of little to no additional computational requirements.
The basic idea behind it is to tracks and save the salient feature array for the given number of frames and then uses these anchor point to cancel out all perturbations relative to it for the incoming frames in the queue. This class relies heavily on **Threaded Queue mode** for error-free & ultra-fast frame handling.
Args:
source (int, str): Source path for the video. Uses an external camera device if ``source`` is an integer.
qsize (int): Default queue size for handling the video streams. Default: 128.
"""
def __init__(self, source = 0, qsize=128) -> None: # TODO: Add colorspace support
r"""
Source must either be an integer (0,1,2 etc) or a path to a video file
"""
self.live_video = False
if isinstance(source, int):
self.live_video = True
# raise ValueError("Expected a filepath. Got an integer. FileVideoStream is not for live feed. Use LiveVideoStream instead"ConnectionRefusedError())
if not isinstance(source, (int,str)):
raise ValueError(f"Expected either an integer or filepath. Got {type(source)}")
# initializing the video stream
self._video_stream = cv.VideoCapture(source)
self.kill_stream = False
self.width = int(self._video_stream.get(FRAME_WIDTH))
self.height = int(self._video_stream.get(FRAME_HEIGHT))
self.res = (self.width, self.height)
self.fps = math.ceil(self._video_stream.get(FPS))
self.frames = int(self._video_stream.get(FRAME_COUNT))
# initialize the queue to store frames
self._Q: Queue = Queue(maxsize=qsize)
# intialize thread
self._thread: Optional[Thread] = None
def start(self):
# start a thread to read frames from the video stream
self._thread = Thread(target=self._update, name="caer.video.Stream()", args=())
self._thread.daemon = True
self._thread.start()
return self
def _update(self):
while True:
if self.kill_stream:
break
# otherwise, ensure the queue has room in it
if not self._Q.full():
# read the next frame from the file
ret, frame = self._video_stream.read()
# If at the end of the video stream
if not ret:
self.release()
return
# add the frame to the queue
self._Q.put(frame)
else:
time.sleep(0.1) # Rest for 10ms if we have a full queue
self._video_stream.release()
def read(self) -> Tensor:
"""
Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the deque is full.
**Returns:** A n-dimensional numpy array.
"""
return self._Q.get()
def release(self) -> None:
"""
Safely terminates the thread, and release the Stream resources.
"""
self.kill_stream = True
# wait until stream resources are released
if self._thread is not None:
self._thread.join()
self._thread = None
self.frames = 0
self.fps = 0
# Gets frame count
def count_frames(self) -> int: # type: ignore[return]
"""
Returns the number of frames for the current video
Optional: use the `frames` attribute
"""
if not self.kill_stream and not self.live_video:
return self.frames
# if get_opencv_version() == "2":
# return int(self.stream.get(FRAME_COUNT_DEPR))
# else:
# return int(self.stream.get(FRAME_COUNT))
if self.live_video:
print("[WARNING] Frames cannot be computed on live streams")
return -1
# Gets FPS count
def get_fps(self) -> int: # type: ignore[return]
"""
Returns the fps (frames per second) value for the current video
Optional: use the `fps` attribute
"""
if not self.kill_stream:
return self.fps
# Get frame dimensions
def get_res(self) -> Tuple[int, int]:
return self.res
###########################################################################################
# Old implementation of stream.py when it used to inherit the FileStream() from filestream.py.
# Until 5 Dec 2020, this was the implementation.
###########################################################################################
# from .filestream import FileStream
# __all__ = [
# "Stream"
# ]
# # Using the FileStream class as it can handle both live as well as pre-existing videos
# class Stream(FileStream):
# """
# Stream() supports a diverse range of video streams which can handle/control video stream almost any IP/USB Cameras, multimedia video file format (tested upto 4k), any network stream URL such as *http(s), rtp, rstp, rtmp, mms, etc.*
# It provides a flexible, high-level multi-threaded wrapper around OpenCV's VideoCapture() for threaded, error-free and synchronized frame handling.
# """
# def __init__(self, source=0):
# # Initializing the stream from DefaultVideoStream
# self._video_stream = FileStream(source=source)
# self.width = self._video_stream.width
# self.height = self._video_stream.height
# self.res = (self.width, self.height)
# self.fps = self._video_stream.fps
# self.frames = self._video_stream.frames
# def start(self):
# # Begins the threaded video stream
# return self._video_stream.start()
# def update(self):
# self._video_stream.update()
# def read(self):
# """
# Extracts frames synchronously from monitored deque, while maintaining a fixed-length frame buffer in the memory, and blocks the thread if the deque is full.
# **Returns:** A n-dimensional numpy array.
# """
# return self._video_stream.read()
# def count_frames(self):
# return self._video_stream.count_frames()
# def release(self):
# """
# Safely terminates the thread, and release the VideoStream resources.
# """
# self._video_stream.release()
# # Get FPS
# def get_fps(self):
# return self._video_stream.get_fps()
# # Get frame dimensions
# def get_res(self):
# return self._video_stream.get_res()