RaspberryPi 3 Model B+でIoT監視カメラをつくる(その7 カメラのAI化)
Contents
IoT監視カメラのAI化(機械学習)
RaspberryPi 3 Model B+とカメラ(Raspberry Pi Camera Module V2)とモーションセンサー(HC-SR501)で動きがあった時だけ録画するIoT監視カメラを作成したときの備忘録の7回目。
今回は6回目の記事のプログラムにAI機能(ちょっと大げさかも)を追加したのでその時の事を記事として残しておく。
ちなみに前回までのプログラムについてはこちらを参照。
モーションセンサーで動きがあった時だけ撮影する監視カメラ機能は順調に稼働をしていたのだが、風が強い日などはカメラに写った草などが揺れてモーションセンサーが作動してしまう事があった。
モーションセンサーの感度を下げても反応してしまうので「何か風景以外のモノが写った時だけ撮影したい」とは感じていた。
考えられる方法としては直前の画像と今の画像を比較して変化がしきい値以上の(大きかった)時だけ映像を残す方法だが、折角なのでGoogleの機械学習のフレームワークであるTensorFlowを使ってObject Detection(物体検出)を行い、物体が検出された時だけ映像を残してLINEで通知する仕組みにした。
画像の前後比較で要件は足りるし、リソースも恐らくそちらの方が消費しない所、あえて機械学習で物体検出をしているので無理矢理感は否めないが後々面白そうな拡張も出来そうなのであえて”IoT/AI監視カメラ”にしている。
仕事の場では「AIを使うことありきの仕事のやり方」は駄目なプロジェクトの代表なのだが趣味の電子工作なのでまぁ良しとした。
またLINE通知も今までは動画へのリンクだけだったが機械学習で検出したモノの写真も一緒に送信する事にした。
そうすればLINEの通知を見ただけで何がカメラに写ったかが分かるので動画を見る必要があるかどうかが直ぐに分かる。
尚、記事の最後に動画を載せているので興味がある方は見て欲しい。
過去関連記事
1回目はRaspberryPiとカメラ、モーションセンサーとの接続に関するこちらの記事を参照。
2回目はGoogleDriveにアクセスする為のGoogle Developersの設定とLINEにメッセージを送信するためのLINE Notifyでのアクセストークンの発行に関する記事。
3回目はPythonのプログラムの説明の記事。
4回目はプログラムの起動環境、CLI起動とプログラムの自動起動に関する記事。
5回目のケースの外箱を製作した時の記事はこちら。
全体構成図
IoT/AI監視カメラの全体構成図を載せておく。
モーションセンサーで動きを検知してカメラで撮影した映像をGoogleの機械学習(TensorFlowLite)でObject Detection(物体検出)を行い、モノ(車や人)が写っている時だけ映像をクラウド(GoogleDrive)にアップロードする。
またLINEで通知する際にモノが写っている写真(jpgファイル)を添付する様にした。
ハードウェア
使用したハードウェアは以下の通り。
Raspberry Pi 3B+
尚、この時はRaspberry Pi 3B+ で作成したが今なら Raspberry Pi 4B で作成するのが良いと思う。
後述するプログラムはラズパイ4でも問題なく動作した。
カメラモジュール
モーションセンサー
追加インストール
機械学習の為にGoogeのTensorFlow関連のモジュールを追加インストールした。
Edge TPUランタイムのインストール
以下のコマンドでラズパイにEdge TPU(Tensor Processing Unit)ランタイム(スタンダード版)のインストールを行った。
尚、本来はCoral USB Acceleratorの様なEdge TPUユニットをラズパイに接続して機械学習を行うのが理想なのだろうが今のIoT監視カメラにCoral USB Acceleratorを追加すると外箱を作り直さないといけないのと、そこそこの値段がするので(1.5万円ぐらい)ラズパイ単体でTensor Flow Liteを動かすことにしている。
この為、最後の行のlibedgetpu1-stdのインストールは今回のプログラムをラズパイ単体で動かすだけであれば不要なのだが、今後Coral USB AcceleratorなどのEdge TPUを接続する事もありうるのでインストールしておいた。
echo "deb https://packages.cloud.google.com/apt coral-edgetpu-stable main" | sudo tee /etc/apt/sources.list.d/coral-edgetpu.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
sudo apt-get update
sudo apt-get install libedgetpu1-std
TensorFlow Liteインタープリターのインストール
以前はwhlファイルよりTensorFlow Liteのインタープリターをインストールしていたが、現在の手順は以下のコマンドでpycoralライブラリ(TensorFlow Liteを含む)をインストールする様に変更されていた。
sudo apt-get install python3-pycoral
※上記のコマンドでpycoraをインストールすれば以下のTensorFlow Liteインタープリターのインストールは不要
続いてこちらのページにアクセスして下の方にスクロールするとPythonのバージョン、プロセッサー別の一覧が出てくる。
ラズパイはARMプロセッサを搭載しているのでARM32の列のPython3.7の”tflite_runtime-1.14.0-cp37-cp37m-linux_armv7l.whl”を選択してダウンロードする。
ダウンロードしたらWinSCP等を使ってラズパイにファイルを送信して以下のコマンドでインストールを行う。
pip3 install tflite_runtime-1.14.0-cp37-cp37m-linux_armv7l.whl
学習済みモデルの取得
映像から物体を検出する為のTensorFlowのモデルが必要だが自分で1から写真を読み込ませて学習させてモデルを作成するのは手間と時間がかかってしまうので学習済みモデルを使う事にする。
以前の記事でCOCO(Common Objects in Context)のSSD(Single Shot Multibox Detector 高速リアルタイムの物体検出の手法)のモデルを使った。
その時の関連シェルを色々と調べた所、全モデルをダウンロードするシェルがgithubにあった。
download_models.shシェルの中身を調べてこちらのファイルをダウンロードして解凍した。
解凍されたファイル類より、
- coco_labels.txt:ラベルファイル
- mobilenet_ssd_v2_coco_quant_postprocess.tflite:約90の物体の学習済みモデル
を使用する。
尚、mobilenet_ssd_v2_coco_quant_postprocess_edgetpu.tfliteの方が高速なのだがCoral USB Acceleratorが必要で今回はラズパイだけで物体検出をしたいので使用しない。
尚、注意書きに「製品品質のモデルでは無い、デモンストレーションのみを目的(意訳)」とあるのでそれは了承の上で使うことにする。
このモデルをベースにして追加学習をする事もできるので、精度が悪ければ追加学習をしようと思っていたのだが2週間程度使っているが今の所は十分な精度が出ている。
プログラム配下にコピー
プログラム配下にmodelsディレクトリを作成してコピーする。
cd /opt/security-camera
mkdir models
WinSCP等で上記2つのファイルをコピーする。
下記のプログラムから更に修正して映像のコマ落ちなどに対応している。最新のプログラムはこちら
プログラムの修正
修正後のプログラムは以下の通り。
# -*- coding: utf-8 -*-
"""
Created on Sun Jul 28 09:32:20 2019
@author: Souichirou Kikuchi
2019/11/04 映像上部に日付・時刻を追加表示
2019/11/22 h264からmp4へファイル変換
2020/01/13 機械学習を使ってObjectが写っている時だけ通知
"""
from picamera import PiCamera
from time import sleep
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from concurrent.futures import ThreadPoolExecutor # スレッド処理
import RPi.GPIO as GPIO
import datetime as dt
import sys
sys.path.append('/home/login名/.local/lib/python3.7/site-packages') # Pathを明示的に指定
import os
import io
import re
import json
import requests # LINEメッセージ
import subprocess # MP4Boxコマンド実行の為
import numpy as np
from PIL import Image
from tflite_runtime.interpreter import Interpreter # TesorFlow
SC_CAMERA_PIN = 5 # ピンの名前を変数として定義
MAX_REC_TIME = 3600 # 最長録画時間(秒数)
SAVE_DIR = "./video/" # ファイル保存用ディレクトリ
INITIAL_FILE= "./cert/initial.json" # 初期設定ファイル
LINE_URL = "https://notify-api.line.me/api/notify" # LINEメッセージ用URL
DRIVE_LINK = "https://drive.google.com/open?id=" # LINEに表示するGoogleDriveへのリンク
INTERVAL = 0.5 # 監視間隔(秒)
AN_TEXT_SIZE = 24 # 録画画像上部に表示される注釈文字のサイズ
LABEL_FILE = './models/coco_labels.txt'
MODEL_FILE = './models/mobilenet_ssd_v2_coco_quant_postprocess.tflite'
THRESHOLD = 0.4
GPIO.setmode(GPIO.BCM) # ピンをGPIOの番号で指定
GPIO.setup(SC_CAMERA_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # GPIOセットアップ
class SecurityCameraClass: # セキュリティカメラのクラス
def __init__(self):
with open(INITIAL_FILE) as f: # 初期設定ファイルの読み込み
__jsn = json.load(f)
self.folder_id = __jsn["folder_id"] # folder_idの読み込み
self.token = __jsn["line_token"] # LINE用tokenの読み込み
self.location = __jsn["location"] # 監視カメラ設置場所
self.camera = PiCamera()
self.camera.rotation = 270 # カメラを回転
gauth = GoogleAuth() # GoogleDriveへの認証
gauth.LocalWebserverAuth()
self.drive = GoogleDrive(gauth)
def StartRecording(self): # 録画開始
__time_stamp_file = "{0:%Y-%m-%d-%H-%M-%S}".format(dt.datetime.now()) # 日付時刻をセット
__time_stamp_disp = "{0:%Y/%m/%d %H:%M:%S}".format(dt.datetime.now()) # 日付時刻をセット(表示用)
self.save_file_path = SAVE_DIR + "video" + __time_stamp_file + ".h264" # ディレクトリ、ファイル名をセット
self.camera.annotate_text = self.location + " " + __time_stamp_disp # 映像上部に表示
self.camera.annotate_text_size = AN_TEXT_SIZE
self.camera.start_recording(self.save_file_path) # 指定pathに録画
return True
def StopRecording(self, objs): # 録画終了
self.camera.stop_recording() # 録画終了
__executor = ThreadPoolExecutor(max_workers=5) # 同時実行は5つまでスレッド実行
__executor.submit(self.OnTheread(self.save_file_path, objs))
return False
def OnTheread(self, sv_file, objs): # ファイルのGoogleDriveへのアップロードは時間がかかるので別スレッドで行う
if len(objs) == 0:
os.remove(sv_file) # 検出Objectがゼロならファイルを削除して終了
pass
__mp4_file_path = os.path.splitext(sv_file)[0] + '.mp4' # 拡張子をmp4にしたファイル名
__file_name = os.path.basename(__mp4_file_path) # ファイル名部分を取り出し
# h264形式からmp4に変換
__res = subprocess.call("MP4Box -add " + sv_file + " " + __mp4_file_path, shell=True)
if __res == 0: # 変換が正常終了ならファイルをアップロード
__f = self.drive.CreateFile({"title": __file_name, # GoogleDrive
"mimeType": "video/mp4",
"parents": [{"kind": "drive#fileLink", "id":self.folder_id}]})
__f.SetContentFile(__mp4_file_path) # ファイル名指定
__f.Upload() # アップロード
os.remove(sv_file) # アップロード後にファイルは削除
os.remove(__mp4_file_path) # アップロード後にファイルは削除
sec_camera.LineMessage() # LINEにメッセージを送信
def LineMessage(self): # LINEに録画検知しましたメッセージを送信する
__headers = {"Authorization" : "Bearer " + self.token}
__message = "録画検知しました " + DRIVE_LINK + self.folder_id
payload = {"message" : __message}
__files = {'imageFile': open(self.img_file_path, "rb")} # 画像ファイル
requests.post(LINE_URL, headers=__headers, params=payload, files=__files)
os.remove(self.img_file_path) # LINE後にjpgファイルは削除
def CloseCamera(self): # カメラクローズ
self.camera.close()
def load_labels(self, path):
# ラベルファイルをLoadして返す
with open(path, 'r', encoding='utf-8') as f:
__lines = f.readlines()
labels = {}
for row_number, content in enumerate(__lines):
pair = re.split(r'[:\s]+', content.strip(), maxsplit=1)
if len(pair) == 2 and pair[0].strip().isdigit():
labels[int(pair[0])] = pair[1].strip()
else:
labels[row_number] = pair[0].strip()
return labels
def set_input_tensor(self, interpreter, image):
__tensor_index = interpreter.get_input_details()[0]['index']
input_tensor = interpreter.tensor(__tensor_index)()[0]
input_tensor[:, :] = image
def get_output_tensor(self, interpreter, index):
__output_details = interpreter.get_output_details()[index]
tensor = np.squeeze(interpreter.get_tensor(__output_details['index']))
return tensor
def detect_objects(self, interpreter, labels, new_size, threshold, objs):
# picameraの映像をキャプチャーしてTensorFlowでObjectDetection(物体検出)
__stream = io.BytesIO()
self.camera.capture(__stream, format='jpeg') # picameraの映像をjpegでキャプチャーする
__stream.seek(0)
__image = Image.open(__stream)
__img_resize = __image.resize(new_size) # interpreterから読み込んだモデルのサイズにリサイズする
self.set_input_tensor(interpreter, __img_resize)
interpreter.invoke() # TensorFlowの呼び出し
__classes = self.get_output_tensor(interpreter, 1) # クラス
__scores = self.get_output_tensor(interpreter, 2) # 評価スコア
__count = int(self.get_output_tensor(interpreter, 3)) # 評価数
results = []
for i in range(__count): #scoreがthreshold(デフォルトで0.4)以上の物だけをフィルタリングしてresultsで返す
if (__scores[i] >= threshold and # 検知スコアが40%以上で
int(__classes[i]) != 14): # benchで無ければ(何も無い時はbenchが検知される)
__result = labels[__classes[i]] # labelsの名称をセット
results.append(__result) # 検知ラベルを追加
if len(objs) == 0: # 初回に認識したObjectをjpgで保存する(後でLINEで送付するため)
__time_stamp_file = "{0:%Y-%m-%d-%H-%M-%S}".format(dt.datetime.now()) # 日付時刻をセット
self.img_file_path = SAVE_DIR + 'temp' + __time_stamp_file + '.jpg' # ディレクトリ、ファイル名をセット
__image.save(self.img_file_path, quality=25) # ファイルに保存しておく
return results
#main
try:
if __name__ == "__main__":
os.chdir(os.path.dirname(os.path.abspath(__file__))) # カレントディレクトリをプログラムのあるディレクトリに移動する
sec_camera = SecurityCameraClass()
labels = sec_camera.load_labels(LABEL_FILE) # ラベル
interpreter = Interpreter(MODEL_FILE) # TensorFlowモデル
interpreter.allocate_tensors()
_, input_height, input_width, _ = interpreter.get_input_details()[0]['shape']
new_size = (input_width, input_height) # モデルのサイズを読み込み
__objs = [] # 映像から検出するオブジェクト
rec = False # 録画中フラグ OFF
start_detect = dt.datetime.now() # 検知開始時刻
while True:
rec_time = dt.timedelta(seconds=0)
if GPIO.input(SC_CAMERA_PIN) == GPIO.HIGH: # 検知
if rec == False: # 録画 OFFなら
__objs = []
start_detect = dt.datetime.now() # ビデオスタート時刻
rec = sec_camera.StartRecording() # 録画開始
rec_time = dt.datetime.now() - start_detect # 録画時間を計算
if rec_time.total_seconds() >= MAX_REC_TIME: # 録画最大時間を超えた時
rec = sec_camera.StopRecording(list(set(__objs))) # 録画終了
start_detect = dt.datetime.now() # ビデオスタート時刻
rec = sec_camera.StartRecording() # 録画開始
else: # 未検知
if rec == True: # 録画 ON なら
rec = sec_camera.StopRecording(list(set(__objs))) # 録画終了
start_detect = dt.datetime.now() # ビデオスタート時刻リセット
if rec == True: # 録画中ならpicameの映像を元にObject Detection
__results = sec_camera.detect_objects(interpreter, labels, new_size, THRESHOLD, __objs)
__objs.extend(__results) # 検出されたオブジェクトを蓄える
sleep(INTERVAL)
except KeyboardInterrupt:
pass
GPIO.cleanup()
sec_camera.CloseCamera()
簡単に主な変更点ついて説明する。
Pathを明示的に指定
プログラムを実行した所、tflite_runtimeがmodule not foundになってしまった。
モジュールがインストールされているPathを環境変数で指定しても良いのだが、プログラム中で明示的に指定した(18行目)。
sys.path.append('/home/login名/.local/lib/python3.7/site-packages') # Pathを明示的に指定
必要なモジュール
配列の数値計算を行うnumpyとカメラの映像を処理する為にPIL(Python Imaging Library)モジュール、そしてTelsorFlowのモジュールをインポートする(25行目から)。
import numpy as np
from PIL import Image
from tflite_runtime.interpreter import Interpreter # TesorFlow
ラベルとモデル
ラベルファイルとモデルファイルのPathを指定している。
ラベルはこのモデルが検出可能な物体の一覧が載っているテキストファイル。
90の物体名が羅列されている。
またTHRESHOLDは物体が検出された時のスコア(認識率)のしきい値。
Object Detectionでは検出された物体の”確からしさ”が%で検出される。
例)98%の確率で”車”と認識される
0.4と指定しているので40%以上の確率で何らかの物体と認識された時だけ「物体が検出された」として扱う(37行目)
LABEL_FILE = './models/coco_labels.txt'
MODEL_FILE = './models/mobilenet_ssd_v2_coco_quant_postprocess.tflite'
THRESHOLD = 0.4
未検出の時の判断
物体が未検出の時の判断。
録画時に検出された物体をobjsに蓄える。
objsがある時だけ後続のmp4ファイルへの変換やLINEへのメッセージの送信を行う(73行目)。
if len(objs) == 0:
os.remove(sv_file) # 検出Objectがゼロならファイルを削除して終了
pass
LINEで検出画像の送信
モノが写った時の画像をLINEの通知と一緒に送信する(94行目)。
__files = {'imageFile': open(self.img_file_path, "rb")} # 画像ファイル
Google Driveへのリンクと同時に検出された物体の画像が送られてくる。
ラベルファイルの読み込みなど
前述のラベルファイルを読み込みとTensorFlowインタープリターの準備。
以前の「ラズパイカメラの映像をObject Detection(物体検出)する」の記事のCoral USB Acceleratorのサンプルプログラムのロジックを流用している(101行目)。
def load_labels(self, path):
# ラベルファイルをLoadして返す
with open(path, 'r', encoding='utf-8') as f:
__lines = f.readlines()
labels = {}
for row_number, content in enumerate(__lines):
pair = re.split(r'[:\s]+', content.strip(), maxsplit=1)
if len(pair) == 2 and pair[0].strip().isdigit():
labels[int(pair[0])] = pair[1].strip()
else:
labels[row_number] = pair[0].strip()
return labels
def set_input_tensor(self, interpreter, image):
__tensor_index = interpreter.get_input_details()[0]['index']
input_tensor = interpreter.tensor(__tensor_index)()[0]
input_tensor[:, :] = image
def get_output_tensor(self, interpreter, index):
__output_details = interpreter.get_output_details()[index]
tensor = np.squeeze(interpreter.get_tensor(__output_details['index']))
return tensor
物体検出
物体検出のメソッド。
piCameraの映像をキャプチャーしてset_input_tensorを呼び出す。
get_output_tensorで
- クラス:検出した物体をラベルの数値で表したもの
- 評価スコア:物体の認識率
- 評価数:このモデルは1枚の写真から最大20個の物体を検出する
124行目
def detect_objects(self, interpreter, labels, new_size, threshold, objs):
ベンチ
通常時にデッキがベンチと判定される事があるので”ベンチ”は対象外にしている(139行目)。
int(__classes[i]) != 14): # benchで無ければ(何も無い時はbenchが検知される)
物体検出
録画中であれば物体検出を呼び出して検出されたオブジェクトを蓄える(177行目)。
if rec == True: # 録画中ならpicameの映像を元にObject Detection
__results = sec_camera.detect_objects(interpreter, labels, new_size, THRESHOLD, __objs)
__objs.extend(__results) # 検出されたオブジェクトを蓄える
今後の課題
カメラでの録画と同時での物体検出はラズパイにとってパワー不足と思われ、映像がカクついてしまっている。
Coral USB AcceleratorなどのEdgeTPUでTensorFlowを動かせばその問題は解決するのだろうが、元々無理にAIを適用したこともあるので「監視カメラにCoral USB Acceleratorをつけるのもなぁ」の気持ちもある。
まぁ監視カメラとしての役には立つのでとりあえずそのままにしているが、レコーディングを別スレッドにする等、何らかの方法でカクつきはもう少し抑えたいと思っている。
動画
プログラムの変更内容を動画で説明している。
最近のコメント