RaspberryPi 3 Model B+でIoT監視カメラをつくる(その9 物体検知時のスコアとボックスの表示) | そう備忘録

RaspberryPi 3 Model B+でIoT監視カメラをつくる(その9 物体検知時のスコアとボックスの表示)

AI/IoT 監視カメラのプログラム修正

RaspberryPi 3 Model B+とカメラ(Raspberry Pi Camera Module V2)とモーションセンサー(HC-SR501)で動きがあった時だけ録画するAI/IoT監視カメラを作成したときの備忘録の9回目。

前回は 検知した Objects 名を LINE のメッセージに追加表示したり動画のコマ落ちの対応を行った。

今回は以下のプログラム修正を行ったので記事として残しておく。

  1. 検知した物体の検知スコア(下記画像の0.79の部分)を LINE に表示するようにした
  2. Deep Learning での誤検知(葉っぱの影を人と誤検知など)を減らす工夫をした
  3. 物体検知した場所を赤枠(バウンディングボックス)で囲む様に修正

を行った。

バウンディングボックスの追加

全体構成図

以前の記事にも書いたが IoT/AI 監視カメラの全体構成図と主な機能は以下の通り。

  1. モーションセンサーで動きを検知する
  2. 動きがあった時だけ録画する
  3. 映像から機械学習で物体検出を行い何が写ったのかを判別する
  4. 物体が検出された映像をLocal Diskに保存する
  5. クラウドに動画ファイルをアップロードする
  6. LINEで検出があった事を通知すると同時に検知した物体名、画像を添付する

尚、過去記事については以下のリンク先を参照して欲しい。

  1. RaspberryPi 3 Model B+でIoT監視カメラをつくる(その1ハードウェア関連)
  2. RaspberryPi 3 Model B+でIoT監視カメラをつくる(その2GoogleDriveとLINEの設定)
  3. RaspberryPi 3 Model B+でIoT監視カメラをつくる(その3プログラミング)
  4. RaspberryPi 3 Model B+でIoT監視カメラをつくる(その4プログラムの自動起動設定)
  5. RaspberryPi 3 Model B+でIoT監視カメラをつくる(その5 外箱の製作)
  6. RaspberryPi 3 Model B+でIoT監視カメラをつくる(その6 プログラムの修正)
  7. RaspberryPi 3 Model B+でIoT監視カメラをつくる(その7 カメラのAI化)
  8. RaspberryPi 3 Model B+でIoT監視カメラをつくる(その8 映像のコマ落ちを修正他)

ハードウェア

使用したハードウェアは以下の通り。

Raspberry Pi 3B+

尚、ラズパイは Raspberry Pi 3B+ で作成したが今なら Raspberry Pi 4B が良いと思う。

プログラムはラズパイ4でも問題なく動作した。

カメラモジュール

モーションセンサー

プログラム

修正後のプログラムは以下の通り。

ソースコード

# -*- coding: utf-8 -*-
"""
Created on Sun Jul 28 09:32:20 2019
@author: Souichirou Kikuchi
2019/11/04 映像上部に日付・時刻を追加表示
2019/11/22 h264からmp4へファイル変換
2020/01/13 機械学習を使ってObjectが写っている時だけ通知
2020/04/30 検出したObjectをLINEに表示、ObjectDetectionを最初の一回のみ、
           INTERVAL 0.5 -> 0.2
2021/06/18 変数のスペルミスを修正
2021/07/28 物体検知時のスコアを表示する

"""

from picamera import PiCamera
from time import sleep
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from concurrent.futures import ThreadPoolExecutor # スレッド処理
import RPi.GPIO as GPIO
import datetime as dt
import sys
sys.path.append('/home/ログイン名/.local/lib/python3.7/site-packages') # Pathを明示的に指定
import os
import io
import re
import json
import requests # LINEメッセージ
import subprocess # MP4Boxコマンド実行の為
import numpy as np
from PIL import Image, ImageDraw
from tflite_runtime.interpreter import Interpreter # TesorFlow
#from tflite_runtime.interpreter import Interpreter, load_delegate

SC_CAMERA_PIN = 5 # ピンの名前を変数として定義
MAX_REC_TIME = 3600 # 最長録画時間(秒数)
SAVE_DIR = "./video/" # ファイル保存用ディレクトリ
INITIAL_FILE= "./cert/initial.json" # 初期設定ファイル
LINE_URL = "https://notify-api.line.me/api/notify" # LINEメッセージ用URL
DRIVE_LINK = "https://drive.google.com/open?id=" # LINEに表示するGoogleDriveへのリンク
INTERVAL = 0.2 # 監視間隔(秒)
AN_TEXT_SIZE = 24 # 録画画像上部に表示される注釈文字のサイズ
LABEL_FILE = './models/coco_labels.txt'
MODEL_FILE = './models/mobilenet_ssd_v2_coco_quant_postprocess.tflite'
#MODEL_FILE = './models/mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tflite'
THRESHOLD = 0.4 # 物体検知の時のしきい値
CAMERA_WIDTH = 640 # 読み込むカメラの解像度(1920x1080、1280x720、640x480)
CAMERA_HEIGHT = 480

GPIO.setmode(GPIO.BCM) # ピンをGPIOの番号で指定
GPIO.setup(SC_CAMERA_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # GPIOセットアップ

class  SecurityCameraClass: # セキュリティカメラのクラス
    def __init__(self):
        with open(INITIAL_FILE) as f: # 初期設定ファイルの読み込み
            __jsn = json.load(f)
            self.folder_id = __jsn["folder_id"] # folder_idの読み込み
            self.token = __jsn["line_token"] # LINE用tokenの読み込み
            self.location = __jsn["location"] # 監視カメラ設置場所
        self.camera = PiCamera(resolution=(CAMERA_WIDTH, CAMERA_HEIGHT), framerate=30) # PiCamear
        self.camera.rotation = 270 # カメラを回転
        gauth = GoogleAuth() # GoogleDriveへの認証
        gauth.LocalWebserverAuth()
        self.drive = GoogleDrive(gauth)

    def start_recordings(self): # 録画開始
        __time_stamp_file =  "{0:%Y-%m-%d-%H-%M-%S}".format(dt.datetime.now()) # 日付時刻をセット
        __time_stamp_disp =  "{0:%Y/%m/%d %H:%M:%S}".format(dt.datetime.now()) # 日付時刻をセット(表示用)
        self.save_file_path = SAVE_DIR + "video" + __time_stamp_file + ".h264" # ディレクトリ、ファイル名をセット
        self.camera.annotate_text = self.location + " " + __time_stamp_disp # 映像上部に表示 
        self.camera.annotate_text_size = AN_TEXT_SIZE
        self.camera.start_recording(self.save_file_path) # 指定pathに録画
        return True 

    def stop_recordings(self, objs): # 録画終了
        self.camera.stop_recording() # 録画終了
        __executor = ThreadPoolExecutor(max_workers=5) # 同時実行は5つまでスレッド実行
        __executor.submit(self.on_theread(self.save_file_path, objs))
        return False 

    def on_theread(self, sv_file, objs): # ファイルのGoogleDriveへのアップロードは時間がかかるので別スレッドで行う
        if len(objs) == 0:
            os.remove(sv_file) # 検出Objectがゼロならファイルを削除して終了
            pass
        __mp4_file_path = os.path.splitext(sv_file)[0] + '.mp4' # 拡張子をmp4にしたファイル名
        __file_name = os.path.basename(__mp4_file_path) # ファイル名部分を取り出し
        # h264形式からmp4に変換
        __res = subprocess.call("MP4Box -add " + sv_file + " " + __mp4_file_path, shell=True)
        if __res == 0: # 変換が正常終了ならファイルをアップロード
            __f = self.drive.CreateFile({"title": __file_name, # GoogleDrive 
                                  "mimeType": "video/mp4",
                                  "parents": [{"kind": "drive#fileLink", "id":self.folder_id}]})
            __f.SetContentFile(__mp4_file_path) # ファイル名指定
            __f.Upload() # アップロード
            os.remove(sv_file) # アップロード後にファイルは削除
            os.remove(__mp4_file_path) # アップロード後にファイルは削除
            sec_camera.line_message(objs) # LINEにメッセージを送信

    def line_message(self, objs): # LINEに録画検知しましたメッセージを送信する
        __headers = {"Authorization" : "Bearer " + self.token}
        __message = objs[0] + " を検知しました " + DRIVE_LINK + self.folder_id
        payload = {"message" : __message}
        __files = {'imageFile': open(self.img_file_path, "rb")} # 画像ファイル
        requests.post(LINE_URL, headers=__headers, params=payload, files=__files)
        os.remove(self.img_file_path) # LINE後にjpgファイルは削除

    def close_camera(self): # カメラクローズ
        self.camera.close()

    def load_labels(self, path):
        # ラベルファイルをLoadして返す
        with open(path, 'r', encoding='utf-8') as f:
            __lines = f.readlines()
            labels = {}
            for row_number, content in enumerate(__lines):
                pair = re.split(r'[:\s]+', content.strip(), maxsplit=1)
                if len(pair) == 2 and pair[0].strip().isdigit():
                    labels[int(pair[0])] = pair[1].strip()
                else:
                    labels[row_number] = pair[0].strip()
        return labels

    def set_input_tensor(self, interpreter, image):
        __tensor_index = interpreter.get_input_details()[0]['index']
        input_tensor = interpreter.tensor(__tensor_index)()[0]
        input_tensor[:, :] = image
    
    def get_output_tensor(self, interpreter, index):
        __output_details = interpreter.get_output_details()[index]
        tensor = np.squeeze(interpreter.get_tensor(__output_details['index']))
        return tensor

    def detect_objects(self, interpreter, labels, new_size, threshold, objs):
        # picameraの映像をキャプチャーしてTensorFlowでObjectDetection(物体検出)
        __stream = io.BytesIO()
        self.camera.capture(__stream, format='jpeg') # picameraの映像をjpegでキャプチャーする
        __stream.seek(0)
        __image = Image.open(__stream)
        __img_resize = __image.resize(new_size) # interpreterから読み込んだモデルのサイズにリサイズする
        self.set_input_tensor(interpreter, __img_resize)
        interpreter.invoke() # TensorFlowの呼び出し
        __boxes = self.get_output_tensor(interpreter, 0) # バウンディングボックス
        __classes = self.get_output_tensor(interpreter, 1) # クラス
        __scores = self.get_output_tensor(interpreter, 2) # 評価スコア
        __count = int(self.get_output_tensor(interpreter, 3)) # 評価数
        __results = []
        for i in range(__count): #scoreがthreshold(デフォルトで0.4)以上の物だけをフィルタリングしてresultsで返す
            if (__scores[i] >= threshold and # 検知スコアが40%以上で
               int(__classes[i]) != 14): # benchで無ければ(何も無い時にbenchが検知される事がある)
                __results.append(labels[__classes[i]]+'(' + str(round(__scores[i], 2)) + ')')  # 検知ラベルを追加
                ymin, xmin, ymax, xmax = __boxes[i] # オブジェクト検知位置(バウンディングボックス)
                xmin = int(xmin * CAMERA_WIDTH) # bounding_boxは全体の長さに対して%で返すのでWIDTH、HEIGHTを掛ける
                xmax = int(xmax * CAMERA_WIDTH)
                ymin = int(ymin * CAMERA_HEIGHT)
                ymax = int(ymax * CAMERA_HEIGHT)
                obj_rate = ((xmax-xmin)*(ymax-ymin)) / (CAMERA_WIDTH * CAMERA_HEIGHT) # スクリーンに占める物体の割合
                if (obj_rate >= 0.01) and (obj_rate <= 0.8): # 1%以上80%以下
                    draw = ImageDraw.Draw(__image)
                    draw.rectangle([xmin, ymin, xmax, ymax], fill=None, outline=(255, 0, 0), width=3) # 赤枠を描く
    
                    if len(objs) == 0: # 初回に認識したObjectをjpgで保存する(後でLINEで送付するため)
                        __time_stamp_file =  "{0:%Y-%m-%d-%H-%M-%S}".format(dt.datetime.now()) # 日付時刻をセット
                        self.img_file_path = SAVE_DIR + 'temp' + __time_stamp_file + '.jpg' # ディレクトリ、ファイル名をセット
                        __image.save(self.img_file_path, quality=25) # ファイルに保存しておく
                    objs.extend(__results) # 検出されたオブジェクトを蓄える

#main
try:
    if __name__ == "__main__":
        os.chdir(os.path.dirname(os.path.abspath(__file__))) # カレントディレクトリをプログラムのあるディレクトリに移動する
        sec_camera = SecurityCameraClass()
        labels = sec_camera.load_labels(LABEL_FILE) # ラベル
        interpreter = Interpreter(MODEL_FILE) # TensorFlowモデル
#        interpreter = Interpreter(MODEL_FILE, experimental_delegates=[load_delegate('libedgetpu.so.1.0')]) 
        interpreter.allocate_tensors()
        _, input_height, input_width, _ = interpreter.get_input_details()[0]['shape']
        new_size = (input_width, input_height) # モデルのサイズを読み込み
        __objs = [] # 映像から検出するオブジェクト
        rec = False # 録画中フラグ OFF
        start_detect = dt.datetime.now() # 検知開始時刻
        while True:
            rec_time = dt.timedelta(seconds=0)
            if GPIO.input(SC_CAMERA_PIN) == GPIO.HIGH: # 検知
                if rec == False: # 録画 OFFなら
                    __objs = []
                    start_detect = dt.datetime.now() # ビデオスタート時刻
                    rec = sec_camera.start_recordings() # 録画開始
                rec_time  = dt.datetime.now() - start_detect # 録画時間を計算
                if  rec_time.total_seconds() >= MAX_REC_TIME: # 録画最大時間を超えた時
                    rec = sec_camera.stop_recordings(list(set(__objs))) # 録画終了
                    start_detect = dt.datetime.now() # ビデオスタート時刻
                    rec = sec_camera.start_recordings() # 録画開始
            else: # 未検知
                if rec == True: # 録画 ON なら
                    rec = sec_camera.stop_recordings(list(set(__objs))) # 録画終了
                    start_detect = dt.datetime.now() # ビデオスタート時刻リセット
            if rec == True and len(__objs) == 0: # 録画中でObject未検出ならpicameの映像を元にObject Detection
                sec_camera.detect_objects(interpreter, labels, new_size, THRESHOLD, __objs)
            sleep(INTERVAL)
except KeyboardInterrupt:
    pass
GPIO.cleanup()
sec_camera.close_camera()

変更点

主な変更点は以下の通り。

カメラの解像度を指定

47、48行目で PiCamera 解像度の幅、高さを定義して 60 行目でPiCameraで読み込む時に明示的に解像度を指定する。

CAMERA_WIDTH = 640 # 読み込むカメラの解像度(1920x1080、1280x720、640x480)
CAMERA_HEIGHT = 480
   ・
   ・
   ・
self.camera = PiCamera(resolution=(CAMERA_WIDTH, CAMERA_HEIGHT), framerate=30) # PiCamear

未指定でもデフォルトの解像度(640✕480)が適用されるのだが、この後の処理で検知した物体を画像上で赤い枠(バウンディングボックス)で囲む処理の際にカメラの解像度が必要になるので明示的に指定している。

バウンディングボックスを描画

Deep Learning で物体を検知した後、検知した画像を LINE に送信する際に検知した物体の位置を赤枠(バウンディングボックス)で囲む。

142行目にて検知したバウンディングボックスの配列を取得する。

複数物体を検知した場合は配列で複数の位置が返される。

__boxes = self.get_output_tensor(interpreter, 0) # バウンディングボックス

151行目以降で座標を取得する。

ymin、xmin は左上のXY座標、ymax、xmaxは右下のXY座標を%(パーセント)で表しているので、画像の解像度を乗算すれば画像上のピクセル位置が算出される。

XY座標の算出
ymin, xmin, ymax, xmax = __boxes[i] # オブジェクト検知位置(バウンディングボックス)
xmin = int(xmin * CAMERA_WIDTH) # bounding_boxは全体の長さに対して%で返すのでWIDTH、HEIGHTを掛ける
xmax = int(xmax * CAMERA_WIDTH)
ymin = int(ymin * CAMERA_HEIGHT)
ymax = int(ymax * CAMERA_HEIGHT)
   ・
   ・
   ・
draw = ImageDraw.Draw(__image)
draw.rectangle([xmin, ymin, xmax, ymax], fill=None, outline=(255, 0, 0), width=3) # 赤枠を描く

尚、赤枠は rectangle で描画している。

  • xmin,ymin,xmax,ymax:描画位置を Pixel で指定している
  • fill:塗りつぶし無し
  • outline:カラーをRGBで指定している。Red のみ 255 なので赤枠
  • width:線の太さ

誤検知対策

今までの誤検知は主に以下の2パターンがあった。

  1. 植物の葉の一部を人(Person)と誤検知(非常に小さい領域)
  2. 画像全体をテレビと誤検知(非常に大きい領域)

よって物体を検知した領域の面積が大き過ぎたり小さすぎたり場合は検知対象から外すことにした。

156行目の以下のロジックにて物体の面積が 1%~80% の時だけを対象にしている。

obj_rate = ((xmax-xmin)*(ymax-ymin)) / (CAMERA_WIDTH * CAMERA_HEIGHT) # スクリーンに占める物体の割合
if (obj_rate >= 0.01) and (obj_rate <= 0.8): # 1%以上80%以下

検知スコアをLINEに送信

TensorFlow Lite の Object Detection で物体検知を行うと検知結果のスコア※が返される。

※数値が高いほど信頼性が髙い

150行目の下記のロジックにてスコアをLINEのテキストに追加表示するように変更した。

__results.append(labels[__classes[i]]+'(' + str(round(__scores[i], 2)) + ')')  # 検知ラベルを追加
検知スコアの表示

終わりに

今回の修正で誤検知はほぼ無くなった。

今までは風で草が揺れた時にモーションセンセーが反応してしまい、たまたま写った葉っぱが人の形に似ていると、検知してしまう事があった。

物体検知時のしきい値(THRESHOLD)を 0.4 にしていたので、0.6 に上げてみたのだが今度は正しく認識すべき物体(人やバイク)を見落とす事があった。

誤検知のパターンは検知した物体が画像に対して大き過ぎ、小さ過ぎがほとんどだったので今回の修正がうまく作用した。

また画像にあまり手を加えたくなかったのでバウンディングボックスは描画していなかったのだが、描いてみると検知した場所が一目瞭然なので分かりやすくなったと思う。

以上で今回の記事は終了とする。

最後に

この記事が何処かで誰かの役に立つことを願っている。

尚、当記事中の商品へのリンクはAmazonアソシエイトへのリンクが含まれています。Amazonのアソシエイトとして、当メディアは適格販売により収入を得ていますのでご了承ください。

souichirou

やった事を忘れない為の備忘録 同じような事をやりたい人の参考になればと思ってブログにしてます。 主にレゴ、AWS(Amazon Web Services)、WordPress、Deep Learning、RaspberryPiに関するブログを書いています。 仕事では工場に協働ロボットの導入や中小企業へのAI/IoT導入のアドバイザーをやっています。 2019年7月にJDLA(一般社団法人 日本デイープラーニング協会)Deep Learning for GENERALに合格しました。 質問は記事一番下にあるコメントかメニュー上部の問い合わせからお願いします。

質問やコメントや励ましの言葉などを残す

名前、メール、サイト欄は任意です。
またメールアドレスは公開されません。