OpenCV の実用的で便利なコード集-並列処理編2 (Python)

OpenCV
スポンサーリンク

スポンサーリンク

はじめに

前回の記事では、カメラの並列処理方法を実装しました。
これを使えば、AI を動かしながらでもリアルタイムで映像を確認することができます。

今回は、複数のカメラを使用して並列処理を実装してみようと思います。

前提条件

前提条件は以下の通りです。

  • Python == 3.9.13
  • opencv-python == 4.6.0
  • numpy == 1.23.4
  • Windows11

今回はノートパソコン備え付けのカメラと、ロジクールの C925e を使用していきます。

カメラキャプチャ管理クラスの作成

キャプチャオブジェクトを管理するクラスを作成します。

class captureClass():
    def __init__(self, cap_number):
        self.cap = cv2.VideoCapture(cap_number, cv2.CAP_DSHOW)

    def readFrame(self):
        ret, self.frame = self.cap.read()
        return ret

    def getFrame(self):
        return self.frame

    def capRelease(self):
        self.cap.release()

captureClass() は 呼び出されたときに __init__ の内容を実行します。
class 内で共有した変数は、self.変数名 とすることで共有できるようになります。

def 関数名(self): は、引数に self を追加してやることでクラス外からも呼び出すことができるようになります。

def readFrame(self):
    ret, self.frame = self.cap.read()
    return ret

こちらの関数は、画像を1枚読み込む関数です。
読み込めたかどうかを return ret として返します。

def getFrame(self):
    return self.frame

getFrame 関数は、readFrame() で self.frame を作成しているので、return で返すだけです。

def capRelease(self):
    self.cap.release()

キャプチャオブジェクトを開放します。

スレッド管理関数の作成

スレッド管理用の関数を作成します。
この関数では、captureClass 2つを管理します。今回はほぼ同時に画像を読み込んで2つのカメラの画像を結合し、表示します。


def threadCapture(cap_number1, cap_number2):
    cap_obj1 = captureClass(cap_number1)
    cap_obj2 = captureClass(cap_number2)
    count = 0
    while True:
        cap_obj1.readFrame()
        cap_obj2.readFrame()
        frame1 = cap_obj1.getFrame()
        frame2 = cap_obj2.getFrame()
        frame1 = cv2.resize(frame1, (320,240))
        frame2 = cv2.resize(frame2, (320,240))

        frame = cv2.vconcat([frame1, frame2])
        frame = cv2.putText(frame, "count: "+str(count), (30, 30),
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255),1,cv2.LINE_8)

        cv2.imshow("frame", frame)
        count += 1
        
        lastkey = cv2.waitKey(1)
        if lastkey == ord("q"):
            cap.release()
            cv2.destroyAllWindows()
            break
        if lastkey == ord("c"):
            cv2.imwrite("frame.png", frame)

コードを説明していきます。

def threadCapture(cap_number1, cap_number2):
    cap_obj1 = captureClass(cap_number1)
    cap_obj2 = captureClass(cap_number2)

threadCapture() 関数は cap_number1, cap_number2 を引数に取ります。
この関数で、カメラオブジェクトを一括管理します。

while True:
    cap_obj1.readFrame()
    cap_obj2.readFrame()
    frame1 = cap_obj1.getFrame()
    frame2 = cap_obj2.getFrame()

cap_obj1.readFrame() は、captureClass の readFrame 関数を実行します。
frame1 = cap_obj1.getFrame() で、frame1 変数にself.frame を代入します。

frame1 = cv2.resize(frame1, (320,240))
frame2 = cv2.resize(frame2, (320,240))

frame = cv2.vconcat([frame1, frame2])
frame = cv2.putText(frame, "count: "+str(count), (30, 30),
                     cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255),1,cv2.LINE_8)

cv2.resize() で、画像のサイズを半分にして cv2.vconcat() で縦に画像を連結します。
また、並列起動しているのが分かるように、cv2.putText() でフレーム数を記載します。

残りの説明は今までしてきましたので、割愛します。

メイン関数の作成

最後に、メインとなる関数を作成していきます。

if __name__ == "__main__":

    cap_number1 = 0
    cap_number2 = 1
    
    executor = ThreadPoolExecutor(max_workers=3)
    camera_future = executor.submit(threadCapture, cap_number1, cap_number2)

    while True:
        if camera_future.running() == False:
            print("camera shutdown")
            executor.shutdown()
            break
        else:
            time.sleep(5)
            print("5 seconds ...")

    print("program complete")

こちらは前回の記事で説明しましたので、省略します。

コードのまとめ

全体のコードを掲載しておきます。
thread_capture.py として話を進めていきます。

import cv2
import time
from concurrent.futures import ThreadPoolExecutor
import traceback

class captureClass():
    def __init__(self, cap_number):
        self.cap = cv2.VideoCapture(cap_number, cv2.CAP_DSHOW)

    def readFrame(self):
        ret, self.frame = self.cap.read()
        return ret

    def getFrame(self):
        return self.frame

    def capRelease(self):
        self.cap.release()

def threadCapture(cap_number1, cap_number2):
    cap_obj1 = captureClass(cap_number1)
    cap_obj2 = captureClass(cap_number2)
    count = 0
    while True:
        cap_obj1.readFrame()
        cap_obj2.readFrame()
        frame1 = cap_obj1.getFrame()
        frame2 = cap_obj2.getFrame()
        frame1 = cv2.resize(frame1, (320,240))
        frame2 = cv2.resize(frame2, (320,240))

        frame = cv2.vconcat([frame1, frame2])
        frame = cv2.putText(frame, "count: "+str(count), (30, 30),
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255),1,cv2.LINE_8)

        cv2.imshow("frame", frame)
        count += 1
        
        lastkey = cv2.waitKey(1)
        if lastkey == ord("q"):
            cap.release()
            cv2.destroyAllWindows()
            break
        if lastkey == ord("c"):
            cv2.imwrite("frame.png", frame)


if __name__ == "__main__":

    cap_number1 = 0
    cap_number2 = 1
    
    executor = ThreadPoolExecutor(max_workers=3)
    camera_future = executor.submit(threadCapture, cap_number1, cap_number2)

    while True:
        if camera_future.running() == False:
            print("camera shutdown")
            executor.shutdown()
            break
        else:
            time.sleep(5)
            print("5 seconds ...")

    print("program complete")

実際に動かしてみる

いよいよ、並列処理プログラムを動かしていきます。

python3 thread_capture.py

上記を実行すると、以下のように画像を取得できます。

例えば、両面同時に撮影する必要がある場合や、側面から撮影する必要がある場合は非常に重宝します。

今回説明した方法を使用すると、PCの電流値の許す限り、カメラ映像を同時に取得することができます。

メインスレッドで画像を取得

メインスレッドで画像を取得する場合、ビデオキャプチャクラスをメインスレッドで作成し、ThreadPoolExecutor に渡してやるだけです。

import cv2
import time
from concurrent.futures import ThreadPoolExecutor
import traceback

class captureClass():
    def __init__(self, cap_number):
        self.cap = cv2.VideoCapture(cap_number, cv2.CAP_DSHOW)

    def readFrame(self):
        ret, self.frame = self.cap.read()
        return ret

    def getFrame(self):
        return self.frame

    def capRelease(self):
        self.cap.release()

def threadCapture(cap_obj1, cap_obj2):

    while True:
        cap_obj1.readFrame()
        cap_obj2.readFrame()
        frame1 = cap_obj1.getFrame()
        frame2 = cap_obj2.getFrame()
        frame1 = cv2.resize(frame1, (320,240))
        frame2 = cv2.resize(frame2, (320,240))

        frame = cv2.vconcat([frame1, frame2])
        frame = cv2.putText(frame, "count: "+str(count), (30, 30),
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255),1,cv2.LINE_8)

        cv2.imshow("frame", frame)
        count += 1
        
        lastkey = cv2.waitKey(1)
        if lastkey == ord("q"):
            cap.release()
            cv2.destroyAllWindows()
            break

if __name__ == "__main__":

    cap_number1 = 0
    cap_number2 = 1
    cap_obj1 = captureClass(cap_number1)
    cap_obj2 = captureClass(cap_number2)
    
    executor = ThreadPoolExecutor(max_workers=3)
    camera_future = executor.submit(threadCapture, cap_obj1 , cap_obj2)

    while True:
        if camera_future.running() == False:
            print("camera shutdown")
            executor.shutdown()
            break
        else:
            frame1 = cap_obj1.getFrame()
            frame2 = cap_obj2.getFrame()
            print(frame1.shape, frame2.shape)

    print("program complete")

どちらかのスレッドで cv2.imshow() を実行していると、もう片方のスレッドで実行することができません。

おわりに

今回は複数のカメラを同時に映しながら、time.sleep() で疑似的に重い処理を実行しました。
非常に簡単に同時撮影や複数カメラの処理ができることが分かったと思います。
Go言語 だともっと簡単にやれてしまいますが…。

次回は、cv2.dnn について説明できればと思います。
この関数を使用すると onnx モデルを (PyTorchも?) 推論することができるので、非常に便利な関数です。

コメント

タイトルとURLをコピーしました