RaspberryPi 3 Model B+でIoT監視カメラをつくる(その7 カメラのAI化) | そう備忘録

RaspberryPi 3 Model B+でIoT監視カメラをつくる(その7 カメラのAI化)

IoT監視カメラのAI化(機械学習)

RaspberryPi 3 Model B+とカメラ(Raspberry Pi Camera Module V2)とモーションセンサー(HC-SR501)で動きがあった時だけ録画するIoT監視カメラを作成したときの備忘録の7回目。

今回は6回目の記事のプログラムにAI機能(ちょっと大げさかも)を追加したのでその時の事を記事として残しておく。

ちなみに前回までのプログラムについてはこちらを参照。

モーションセンサーで動きがあった時だけ撮影する監視カメラ機能は順調に稼働をしていたのだが、風が強い日などはカメラに写った草などが揺れてモーションセンサーが作動してしまう事があった。

モーションセンサーの感度を下げても反応してしまうので「何か風景以外のモノが写った時だけ撮影したい」とは感じていた。

考えられる方法としては直前の画像と今の画像を比較して変化がしきい値以上の(大きかった)時だけ映像を残す方法だが、折角なのでGoogleの機械学習のフレームワークであるTensorFlowを使ってObject Detection(物体検出)を行い、物体が検出された時だけ映像を残してLINEで通知する仕組みにした。

宅急便のトラックを認識して撮影

宅急便のトラックを認識して撮影

画像の前後比較で要件は足りるし、リソースも恐らくそちらの方が消費しない所、あえて機械学習で物体検出をしているので無理矢理感は否めないが後々面白そうな拡張も出来そうなのであえて”IoT/AI監視カメラ”にしている。

仕事の場では「AIを使うことありきの仕事のやり方」は駄目なプロジェクトの代表なのだが趣味の電子工作なのでまぁ良しとした。

またLINE通知も今までは動画へのリンクだけだったが機械学習で検出したモノの写真も一緒に送信する事にした。

そうすればLINEの通知を見ただけで何がカメラに写ったかが分かるので動画を見る必要があるかどうかが直ぐに分かる。

尚、記事の最後に動画を載せているので興味がある方は見て欲しい。

過去関連記事

1回目はRaspberryPiとカメラ、モーションセンサーとの接続に関するこちらの記事を参照

2回目はGoogleDriveにアクセスする為のGoogle Developersの設定とLINEにメッセージを送信するためのLINE Notifyでのアクセストークンの発行に関する記事

3回目はPythonのプログラムの説明の記事

4回目はプログラムの起動環境、CLI起動とプログラムの自動起動に関する記事

5回目のケースの外箱を製作した時の記事はこちら

6回目のちょっとしたプログラムの修正の記事はこちら

全体構成図

IoT/AI監視カメラの全体構成図を載せておく。

モーションセンサーで動きを検知してカメラで撮影した映像をGoogleの機械学習(TensorFlowLite)でObject Detection(物体検出)を行い、モノ(車や人)が写っている時だけ映像をクラウド(GoogleDrive)にアップロードする。

またLINEで通知する際にモノが写っている写真(jpgファイル)を添付する様にした。

IoT/AI監視カメラ 今回の記事の範囲

ハードウェア

使用したハードウェアは以下の通り。

Raspberry Pi 3B+

尚、この時はRaspberry Pi 3B+ で作成したが今なら Raspberry Pi 4B で作成するのが良いと思う。

後述するプログラムはラズパイ4でも問題なく動作した。

カメラモジュール

モーションセンサー

追加インストール

機械学習の為にGoogeのTensorFlow関連のモジュールを追加インストールした。

Edge TPUランタイムのインストール

以下のコマンドでラズパイにEdge TPU(Tensor Processing Unit)ランタイム(スタンダード版)のインストールを行った。

尚、本来はCoral USB Acceleratorの様なEdge TPUユニットをラズパイに接続して機械学習を行うのが理想なのだろうが今のIoT監視カメラにCoral USB Acceleratorを追加すると外箱を作り直さないといけないのと、そこそこの値段がするので(1.5万円ぐらい)ラズパイ単体でTensor Flow Liteを動かすことにしている。

この為、最後の行のlibedgetpu1-stdのインストールは今回のプログラムをラズパイ単体で動かすだけであれば不要なのだが、今後Coral USB AcceleratorなどのEdge TPUを接続する事もありうるのでインストールしておいた。

echo "deb https://packages.cloud.google.com/apt coral-edgetpu-stable main" | sudo tee /etc/apt/sources.list.d/coral-edgetpu.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
sudo apt-get update
sudo apt-get install libedgetpu1-std

Edge TPUランタイムのインストール

TensorFlow Liteインタープリターのインストール

2021年2月 追記

以前はwhlファイルよりTensorFlow Liteのインタープリターをインストールしていたが、現在の手順は以下のコマンドでpycoralライブラリ(TensorFlow Liteを含む)をインストールする様に変更されていた。

sudo apt-get install python3-pycoral

※上記のコマンドでpycoraをインストールすれば以下のTensorFlow Liteインタープリターのインストールは不要

続いてこちらのページにアクセスして下の方にスクロールするとPythonのバージョン、プロセッサー別の一覧が出てくる。

ラズパイはARMプロセッサを搭載しているのでARM32の列のPython3.7の”tflite_runtime-1.14.0-cp37-cp37m-linux_armv7l.whl”を選択してダウンロードする。

TensorFlow Liteインタープリター

ダウンロードしたらWinSCP等を使ってラズパイにファイルを送信して以下のコマンドでインストールを行う。

pip3 install tflite_runtime-1.14.0-cp37-cp37m-linux_armv7l.whl

TensorFlow Liteインタープリターのインストール

学習済みモデルの取得

映像から物体を検出する為のTensorFlowのモデルが必要だが自分で1から写真を読み込ませて学習させてモデルを作成するのは手間と時間がかかってしまうので学習済みモデルを使う事にする。

以前の記事でCOCO(Common Objects in Context)のSSD(Single Shot Multibox Detector 高速リアルタイムの物体検出の手法)のモデルを使った。

その時の関連シェルを色々と調べた所、全モデルをダウンロードするシェルがgithubにあった

download_models.shシェルの中身を調べてこちらのファイルをダウンロードして解凍した。

解凍されたファイル類より、

  • coco_labels.txt:ラベルファイル
  • mobilenet_ssd_v2_coco_quant_postprocess.tflite:約90の物体の学習済みモデル

を使用する。

尚、mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tfliteの方が高速なのだがCoral USB Acceleratorが必要で今回はラズパイだけで物体検出をしたいので使用しない。

All Models

尚、注意書きに「製品品質のモデルでは無い、デモンストレーションのみを目的(意訳)」とあるのでそれは了承の上で使うことにする。

このモデルをベースにして追加学習をする事もできるので、精度が悪ければ追加学習をしようと思っていたのだが2週間程度使っているが今の所は十分な精度が出ている。

プログラム配下にコピー

プログラム配下にmodelsディレクトリを作成してコピーする。

cd /opt/security-camera
mkdir models

WinSCP等で上記2つのファイルをコピーする。

2020年5月2日 追記

下記のプログラムから更に修正して映像のコマ落ちなどに対応している。最新のプログラムはこちら

プログラムの修正

修正後のプログラムは以下の通り。

# -*- coding: utf-8 -*-
"""
Created on Sun Jul 28 09:32:20 2019
@author: Souichirou Kikuchi
2019/11/04 映像上部に日付・時刻を追加表示
2019/11/22 h264からmp4へファイル変換
2020/01/13 機械学習を使ってObjectが写っている時だけ通知
"""

from picamera import PiCamera
from time import sleep
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from concurrent.futures import ThreadPoolExecutor # スレッド処理
import RPi.GPIO as GPIO
import datetime as dt
import sys
sys.path.append('/home/login名/.local/lib/python3.7/site-packages') # Pathを明示的に指定
import os
import io
import re
import json
import requests # LINEメッセージ
import subprocess # MP4Boxコマンド実行の為
import numpy as np
from PIL import Image
from tflite_runtime.interpreter import Interpreter # TesorFlow

SC_CAMERA_PIN = 5 # ピンの名前を変数として定義
MAX_REC_TIME = 3600 # 最長録画時間(秒数)
SAVE_DIR = "./video/" # ファイル保存用ディレクトリ
INITIAL_FILE= "./cert/initial.json" # 初期設定ファイル
LINE_URL = "https://notify-api.line.me/api/notify" # LINEメッセージ用URL
DRIVE_LINK = "https://drive.google.com/open?id=" # LINEに表示するGoogleDriveへのリンク
INTERVAL = 0.5 # 監視間隔(秒)
AN_TEXT_SIZE = 24 # 録画画像上部に表示される注釈文字のサイズ
LABEL_FILE = './models/coco_labels.txt'
MODEL_FILE = './models/mobilenet_ssd_v2_coco_quant_postprocess.tflite'
THRESHOLD = 0.4

GPIO.setmode(GPIO.BCM) # ピンをGPIOの番号で指定
GPIO.setup(SC_CAMERA_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # GPIOセットアップ

class  SecurityCameraClass: # セキュリティカメラのクラス
    def __init__(self):
        with open(INITIAL_FILE) as f: # 初期設定ファイルの読み込み
            __jsn = json.load(f)
            self.folder_id = __jsn["folder_id"] # folder_idの読み込み
            self.token = __jsn["line_token"] # LINE用tokenの読み込み
            self.location = __jsn["location"] # 監視カメラ設置場所
        self.camera = PiCamera()
        self.camera.rotation = 270 # カメラを回転
        gauth = GoogleAuth() # GoogleDriveへの認証
        gauth.LocalWebserverAuth()
        self.drive = GoogleDrive(gauth)

    def StartRecording(self): # 録画開始
        __time_stamp_file =  "{0:%Y-%m-%d-%H-%M-%S}".format(dt.datetime.now()) # 日付時刻をセット
        __time_stamp_disp =  "{0:%Y/%m/%d %H:%M:%S}".format(dt.datetime.now()) # 日付時刻をセット(表示用)
        self.save_file_path = SAVE_DIR + "video" + __time_stamp_file + ".h264" # ディレクトリ、ファイル名をセット
        self.camera.annotate_text = self.location + " " + __time_stamp_disp # 映像上部に表示 
        self.camera.annotate_text_size = AN_TEXT_SIZE
        self.camera.start_recording(self.save_file_path) # 指定pathに録画
        return True 

    def StopRecording(self, objs): # 録画終了
        self.camera.stop_recording() # 録画終了
        __executor = ThreadPoolExecutor(max_workers=5) # 同時実行は5つまでスレッド実行
        __executor.submit(self.OnTheread(self.save_file_path, objs))
        return False 

    def OnTheread(self, sv_file, objs): # ファイルのGoogleDriveへのアップロードは時間がかかるので別スレッドで行う
        if len(objs) == 0:
            os.remove(sv_file) # 検出Objectがゼロならファイルを削除して終了
            pass
        __mp4_file_path = os.path.splitext(sv_file)[0] + '.mp4' # 拡張子をmp4にしたファイル名
        __file_name = os.path.basename(__mp4_file_path) # ファイル名部分を取り出し
        # h264形式からmp4に変換
        __res = subprocess.call("MP4Box -add " + sv_file + " " + __mp4_file_path, shell=True)
        if __res == 0: # 変換が正常終了ならファイルをアップロード
            __f = self.drive.CreateFile({"title": __file_name, # GoogleDrive 
                                  "mimeType": "video/mp4",
                                  "parents": [{"kind": "drive#fileLink", "id":self.folder_id}]})
            __f.SetContentFile(__mp4_file_path) # ファイル名指定
            __f.Upload() # アップロード
            os.remove(sv_file) # アップロード後にファイルは削除
            os.remove(__mp4_file_path) # アップロード後にファイルは削除
            sec_camera.LineMessage() # LINEにメッセージを送信

    def LineMessage(self): # LINEに録画検知しましたメッセージを送信する
        __headers = {"Authorization" : "Bearer " + self.token}
        __message = "録画検知しました " + DRIVE_LINK + self.folder_id
        payload = {"message" : __message}
        __files = {'imageFile': open(self.img_file_path, "rb")} # 画像ファイル
        requests.post(LINE_URL, headers=__headers, params=payload, files=__files)
        os.remove(self.img_file_path) # LINE後にjpgファイルは削除

    def CloseCamera(self): # カメラクローズ
        self.camera.close()

    def load_labels(self, path):
        # ラベルファイルをLoadして返す
        with open(path, 'r', encoding='utf-8') as f:
            __lines = f.readlines()
            labels = {}
            for row_number, content in enumerate(__lines):
                pair = re.split(r'[:\s]+', content.strip(), maxsplit=1)
                if len(pair) == 2 and pair[0].strip().isdigit():
                    labels[int(pair[0])] = pair[1].strip()
                else:
                    labels[row_number] = pair[0].strip()
        return labels

    def set_input_tensor(self, interpreter, image):
        __tensor_index = interpreter.get_input_details()[0]['index']
        input_tensor = interpreter.tensor(__tensor_index)()[0]
        input_tensor[:, :] = image
    
    def get_output_tensor(self, interpreter, index):
        __output_details = interpreter.get_output_details()[index]
        tensor = np.squeeze(interpreter.get_tensor(__output_details['index']))
        return tensor

    def detect_objects(self, interpreter, labels, new_size, threshold, objs):
        # picameraの映像をキャプチャーしてTensorFlowでObjectDetection(物体検出)
        __stream = io.BytesIO()
        self.camera.capture(__stream, format='jpeg') # picameraの映像をjpegでキャプチャーする
        __stream.seek(0)
        __image = Image.open(__stream)
        __img_resize = __image.resize(new_size) # interpreterから読み込んだモデルのサイズにリサイズする
        self.set_input_tensor(interpreter, __img_resize)
        interpreter.invoke() # TensorFlowの呼び出し
        __classes = self.get_output_tensor(interpreter, 1) # クラス
        __scores = self.get_output_tensor(interpreter, 2) # 評価スコア
        __count = int(self.get_output_tensor(interpreter, 3)) # 評価数
        results = []
        for i in range(__count): #scoreがthreshold(デフォルトで0.4)以上の物だけをフィルタリングしてresultsで返す
            if (__scores[i] >= threshold and # 検知スコアが40%以上で
               int(__classes[i]) != 14): # benchで無ければ(何も無い時はbenchが検知される)
                __result = labels[__classes[i]] # labelsの名称をセット
                results.append(__result)  # 検知ラベルを追加
                if len(objs) == 0: # 初回に認識したObjectをjpgで保存する(後でLINEで送付するため)
                    __time_stamp_file =  "{0:%Y-%m-%d-%H-%M-%S}".format(dt.datetime.now()) # 日付時刻をセット
                    self.img_file_path = SAVE_DIR + 'temp' + __time_stamp_file + '.jpg' # ディレクトリ、ファイル名をセット
                    __image.save(self.img_file_path, quality=25) # ファイルに保存しておく
        return results

#main
try:
    if __name__ == "__main__":
        os.chdir(os.path.dirname(os.path.abspath(__file__))) # カレントディレクトリをプログラムのあるディレクトリに移動する
        sec_camera = SecurityCameraClass()
        labels = sec_camera.load_labels(LABEL_FILE) # ラベル
        interpreter = Interpreter(MODEL_FILE) # TensorFlowモデル
        interpreter.allocate_tensors()
        _, input_height, input_width, _ = interpreter.get_input_details()[0]['shape']
        new_size = (input_width, input_height) # モデルのサイズを読み込み
        __objs = [] # 映像から検出するオブジェクト
        rec = False # 録画中フラグ OFF
        start_detect = dt.datetime.now() # 検知開始時刻
        while True:
            rec_time = dt.timedelta(seconds=0)
            if GPIO.input(SC_CAMERA_PIN) == GPIO.HIGH: # 検知
                if rec == False: # 録画 OFFなら
                    __objs = []
                    start_detect = dt.datetime.now() # ビデオスタート時刻
                    rec = sec_camera.StartRecording() # 録画開始
                rec_time  = dt.datetime.now() - start_detect # 録画時間を計算
                if  rec_time.total_seconds() >= MAX_REC_TIME: # 録画最大時間を超えた時
                    rec = sec_camera.StopRecording(list(set(__objs))) # 録画終了
                    start_detect = dt.datetime.now() # ビデオスタート時刻
                    rec = sec_camera.StartRecording() # 録画開始
            else: # 未検知
                if rec == True: # 録画 ON なら
                    rec = sec_camera.StopRecording(list(set(__objs))) # 録画終了
                    start_detect = dt.datetime.now() # ビデオスタート時刻リセット
            if rec == True: # 録画中ならpicameの映像を元にObject Detection
                __results = sec_camera.detect_objects(interpreter, labels, new_size, THRESHOLD, __objs)
                __objs.extend(__results) # 検出されたオブジェクトを蓄える
            sleep(INTERVAL)
except KeyboardInterrupt:
    pass
GPIO.cleanup()
sec_camera.CloseCamera()

簡単に主な変更点ついて説明する。

Pathを明示的に指定

プログラムを実行した所、tflite_runtimeがmodule not foundになってしまった。

モジュールがインストールされているPathを環境変数で指定しても良いのだが、プログラム中で明示的に指定した(18行目)。

sys.path.append('/home/login名/.local/lib/python3.7/site-packages') # Pathを明示的に指定

必要なモジュール

配列の数値計算を行うnumpyとカメラの映像を処理する為にPIL(Python Imaging Library)モジュール、そしてTelsorFlowのモジュールをインポートする(25行目から)。

import numpy as np
from PIL import Image
from tflite_runtime.interpreter import Interpreter # TesorFlow

ラベルとモデル

ラベルファイルとモデルファイルのPathを指定している。

ラベルはこのモデルが検出可能な物体の一覧が載っているテキストファイル。

90の物体名が羅列されている。

またTHRESHOLDは物体が検出された時のスコア(認識率)のしきい値。

Object Detectionでは検出された物体の”確からしさ”が%で検出される。

例)98%の確率で”車”と認識される

0.4と指定しているので40%以上の確率で何らかの物体と認識された時だけ「物体が検出された」として扱う(37行目)

LABEL_FILE = './models/coco_labels.txt'
MODEL_FILE = './models/mobilenet_ssd_v2_coco_quant_postprocess.tflite'
THRESHOLD = 0.4

未検出の時の判断

物体が未検出の時の判断。

録画時に検出された物体をobjsに蓄える。

objsがある時だけ後続のmp4ファイルへの変換やLINEへのメッセージの送信を行う(73行目)。

        if len(objs) == 0:
            os.remove(sv_file) # 検出Objectがゼロならファイルを削除して終了
            pass

LINEで検出画像の送信

モノが写った時の画像をLINEの通知と一緒に送信する(94行目)。

        __files = {'imageFile': open(self.img_file_path, "rb")} # 画像ファイル

Google Driveへのリンクと同時に検出された物体の画像が送られてくる。

LINEからの検出画像

ラベルファイルの読み込みなど

前述のラベルファイルを読み込みとTensorFlowインタープリターの準備。

以前の「ラズパイカメラの映像をObject Detection(物体検出)する」の記事のCoral USB Acceleratorのサンプルプログラムのロジックを流用している(101行目)。

    def load_labels(self, path):
        # ラベルファイルをLoadして返す
        with open(path, 'r', encoding='utf-8') as f:
            __lines = f.readlines()
            labels = {}
            for row_number, content in enumerate(__lines):
                pair = re.split(r'[:\s]+', content.strip(), maxsplit=1)
                if len(pair) == 2 and pair[0].strip().isdigit():
                    labels[int(pair[0])] = pair[1].strip()
                else:
                    labels[row_number] = pair[0].strip()
        return labels
 
    def set_input_tensor(self, interpreter, image):
        __tensor_index = interpreter.get_input_details()[0]['index']
        input_tensor = interpreter.tensor(__tensor_index)()[0]
        input_tensor[:, :] = image
    
    def get_output_tensor(self, interpreter, index):
        __output_details = interpreter.get_output_details()[index]
        tensor = np.squeeze(interpreter.get_tensor(__output_details['index']))
        return tensor

物体検出

物体検出のメソッド。

piCameraの映像をキャプチャーしてset_input_tensorを呼び出す。

get_output_tensorで

  • クラス:検出した物体をラベルの数値で表したもの
  • 評価スコア:物体の認識率
  • 評価数:このモデルは1枚の写真から最大20個の物体を検出する

124行目

    def detect_objects(self, interpreter, labels, new_size, threshold, objs):

ベンチ

通常時にデッキがベンチと判定される事があるので”ベンチ”は対象外にしている(139行目)。

               int(__classes[i]) != 14): # benchで無ければ(何も無い時はbenchが検知される)

物体検出

録画中であれば物体検出を呼び出して検出されたオブジェクトを蓄える(177行目)。

            if rec == True: # 録画中ならpicameの映像を元にObject Detection
                __results = sec_camera.detect_objects(interpreter, labels, new_size, THRESHOLD, __objs)
                __objs.extend(__results) # 検出されたオブジェクトを蓄える

今後の課題

カメラでの録画と同時での物体検出はラズパイにとってパワー不足と思われ、映像がカクついてしまっている。

Coral USB AcceleratorなどのEdgeTPUでTensorFlowを動かせばその問題は解決するのだろうが、元々無理にAIを適用したこともあるので「監視カメラにCoral USB Acceleratorをつけるのもなぁ」の気持ちもある。

まぁ監視カメラとしての役には立つのでとりあえずそのままにしているが、レコーディングを別スレッドにする等、何らかの方法でカクつきはもう少し抑えたいと思っている。

動画

プログラムの変更内容を動画で説明している。

souichirou

やった事を忘れない為の備忘録 同じような事をやりたい人の参考になればと思ってブログにしてます。 主にレゴ、AWS(Amazon Web Services)、WordPress、Deep Learning、RaspberryPiに関するブログを書いています。 仕事では工場に協働ロボットの導入や中小企業へのAI/IoT導入のアドバイザーをやっています。 2019年7月にJDLA(一般社団法人 日本デイープラーニング協会)Deep Learning for GENERALに合格しました。 質問は記事一番下にあるコメントかメニュー上部の問い合わせからお願いします。

おすすめ

質問やコメントや励ましの言葉などを残す

名前、メール、サイト欄は任意です。
またメールアドレスは公開されません。