ラズパイカメラでモーションセンサーを使わずに動きがあった時だけ撮影する
Contents
ラズパイカメラ
Raspberry Pi 4B と カメラモジュールで通行者を撮像して画像を保存するカメラを作成した時の備忘録。
24時間365日、常に撮影をするのでは無く、カメラの前で何らかの動きがあった時だけ撮像をする仕様にしたいと思っていた。
また今回のプログラムは動画の撮影は行わず、JPG画像をGoogle Drive に保存する仕様とする。
以前に監視カメラを作成した時は人やモノの動きを感知するためにモーションセンサー(HC-SR501)を使った。
モーションセンサーを使って動画を保存するパターンは以前の記事を参照して欲しい。
今回はモーションセンサーを使用せずにカメラだけで “何らかの動きを” を検知したいと思った。
具体的にはカメラの映像の直近数十フレームの平均と最新のフレームとの差異が一定数を超えたときに何らかの動きがあったと判断する。
具体的な Python のロジックは後述する。
全体構成図
システム全体の構成図と仕様は以下の通り。
- Raspberry Pi と PiCamera で動きがあった時だけ撮像して画像を保存する。
- 保存する画像は全体画像と画像中に人の顔が検知された時は顔の画像を保存する
- 顔の検知は OpenCV のカスケード分類器を使用する
- 保存した画像を Google Drive にアップロードする
ハードウェア
以下のハードウェアを用意した。
ラズパイ
以前に購入した Raspberry Pi 4B。
本体以外に SDカード、ケース、電源ケーブル、HDMIケーブル、ヒートシンクなど、一通り揃っているセットを購入している。
ラズパイカメラ
ラズパイ用のカメラモジュール。
ラズパイ本体と Camera Serial Interface(CSI-2)で接続するタイプを購入している。
事前準備
カメラの設定
まずはラズパイ本体と PiCamera の接続を行う。
接続方法については以前のこちらの記事を参照して欲しい。
続いて “Raspberry Pi の設定” にてカメラを有効化した後、試し撮りを行いカメラが正常に動作することを確認する。
操作方法については以前のこちらの記事を参照して欲しい。
OpenCV(cv2)のインストール
撮像した画像に人の顔が写っているかどうかの判定に OpenCV(cv2)のカスケード分類器を使用する。
ラズパイOSに OpenCV をインストールする手順については以前のこちらの記事を参照して欲しい。
尚、この記事では人の顔の検出に OpenCV のカスケード分類器を使用しているがその他の方法では、
- HOG特徴量(特徴量検出)とSVM(support vector machine)によるパターン認識
- Deep Learning による Object Detection(物体検知)
等の方法が考えられる。
1 は実際に試したのだが判定に時間がかかってしまったので、より高速なカスケード分類器を採用することにした。
2 の方法については以前に監視カメラのAI化(COCOモデルを使ったSSDでのリアルタイムの物体検出)にて試したので興味があればその時の記事を参照して欲しい。
実は Deep Learning による Object Detection の方がカスケード分類器よりも精度が良いと感じた。
今回は簡単に実装できるカスケード分類器でプログラムを作成したが精度が求められる要件であれば Deep Learning による顔認証の方を採用すると思う。
カスケード分類器のダウンロード
こちらのページにアクセスしてカスケード分類器をダウンロードする。
自分はラズパイのブラウザから直接アクセスをしてダウンロードを行ったがパソコン等でダウンロードした後、解凍してファイルのみをラズパイにコピーしても良い。
「Code」ー>「Download ZIP」を選択する。
ファイルをダウンロードするとラズパイではデフォルトでは Downloads ディレクトリにファイルが保存されるので LXTerminal から以下のコマンドで解凍する。
cd Downloads
unzip opencv-master.zip
zip ファイルは不要なので削除する。
rm opencv-master.zip
/Downloads/opencv-master/data/haarcascades/ ディレクトリに人の顔(正面)を判別する分類器は複数存在した。
- haarcascade_frontalface_default.xml
- haarcascade_frontalface_alt.xml
- haarcascade_frontalface_alt2.xml
一通り試してみたが、3.の haarcascade_frontalface_alt2.xml がラズパイ+PiCamera の環境では精度が良かった様に思えたので、こちらを使うことした。
後述のカスケード分類器ファイルを保存するディレクトリ(haarcascades)にコピーする。
Google Driveへのアクセス
Google Drive にアクセスする為に Google Cloud Platform にてプロジェクトにてサービスアカウントを作成する。
こちらの記事を参照して、
- プロジェクトの作成
- APIの有効化
- 認証情報の作成(サービスアカウント)
- 秘密鍵の作成
を行う。
秘密鍵を JSON 形式で作成したら後述のディレクトリ構造の cert ディレクトリ配下に保存する。
また画像をアップロードするフォルダーのフォルダーIDをプログラム中で使用するので保存しておく。
ディレクトリ構造
ディレクトリ構造は以下の通り。
├─/home/pi ・・・ ホームディレクトリ
│ │
│ ├─.local
│ │ │
│ │ ├──motion-caputure
│ │ │ │ motion-capture.py ・・・ プログラム
│ │ │ │ initial-sou.json ・・・ 初期設定ファイル
│ │ │ │
│ │ │ ├──cert
│ │ │ │ xxxxxx.json ・・・ Google Driveへアクセスする為の秘密鍵
│ │ │ │
│ │ │ │─haarcascades
│ │ │ │ haarcascade_frontalface_alt2.xml ・・・ カスケード分類器ファイル
│ │ │ │
│ │ │ │─images ・・・ 画像を一時的に保存するディレクトリ
│ │ │ │
│ │ │ │─log ・・・ エラー情報を保存するディレクトリ
│ │ │ │
プログラム
ソースコード
Python のソースコードは以下の通り。
# -*- coding: utf-8 -*-
"""
Created on Sun Apr 11 08:42:11 2021
・通行者等の動きをカメラで感知する
・人の場合、顔写真をキャプチャーする(顔を認識出来なかった時は全体画像)
・Google Driveに画像をアップロード
@author: Souichirou Kikuchi
"""
from picamera import PiCamera
import picamera.array
from time import sleep
from datetime import datetime as dt
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google.oauth2 import service_account
import os
import sys # ErrorLog出力用
import json
import cv2
import numpy as np
import csv
INITIAL_FILE= './initial-sou.json' # 初期設定ファイル
RESOLUTION = (720, 480) # 1920×1080 30fps、1280×720 60fps、720×480 60fps
class PasserbyCameraClass: # 通行者を撮影するカメラのクラス
def __init__(self):
try:
print('start Program')
with open(INITIAL_FILE) as f: # 初期設定ファイルの読み込み
__jsn = json.load(f)
self.location_name = __jsn['location_name'] # 設置場所
self.cascade_file = __jsn['cascade_file'] # 顔認識用カスケードファイル
self.gskey_name = __jsn['gskey_name'] # Google Driveへのアクセスキー
self.folder_id = __jsn['folder_id'] # 画像upload用フォルダーID
self.images_dir = __jsn['images_dir'] # 検出画像を一時的に保存するディレクトリ
self.error_log = __jsn['error_log'] # エラーログ格納ディレクトリ
self.save_obj_file = self.images_dir + 'object_'+ '{0:%Y-%m-%d %H:%M:%S.%f}'.format(dt.now()) + '.jpg'
self.camera = PiCamera()
self.camera.rotation = 270 # 回転
self.camera.resolution = RESOLUTION # 解像度
sleep(2) # カメラが起動するまで待機する
self.buffer = list() # bufferを空にする
except: # 例外時
ex, ms, tb = sys.exc_info()
self.put_error_log(type(ms)) # エラーログ処理
def close_camera(self): # カメラクローズ
self.camera.close()
print('stop camera and Program')
def detect_movement(self): # 動きを検知する
try:
self.stream = picamera.array.PiRGBArray(self.camera, size=RESOLUTION) # RGBキャプチャから3次元RGB配列を生成
self.camera.capture(self.stream, 'bgr', use_video_port=True) # use_video_port False:カメラのイメージポートを使用(高画質、低速)、True:(中画質、高速)
self.gray = cv2.cvtColor(src=self.stream.array, code=cv2.COLOR_BGR2GRAY) # グレースケールに変換
base = self.ave_image(self.gray) # 直近のフレームの平均
rb_rate = self.get_comp_hist(self.gray, base) # 比較
except: # 例外時
ex, ms, tb = sys.exc_info()
self.put_error_log(type(ms)) # エラーログ処理
finally:
return rb_rate
def ave_image(self, img): # 直近SAVE_FRAME数の平均
try:
SAVE_FRAME = 10 # 保存するフレーム数
base_image = np.zeros_like(img, dtype='int32') # Zero埋め
self.buffer.append(img)
if len(self.buffer) > SAVE_FRAME: # 直近SAVE_FRAMEだけ保存
self.buffer.pop(0) # 古いのを削除
i = 0
for tempImg in self.buffer:
base_image += tempImg # 過去のフレームを重ねて画素ごとの合計値をとる
i += 1
base = np.uint8(base_image // i) # 合計画素の平均をとることで静止画の映像を作る
except: # 例外時
ex, ms, tb = sys.exc_info()
self.put_error_log(type(ms)) # エラーログ処理
finally:
return base
def get_comp_hist(self, targetimg, base_img): # 直近フレームとの比較
try:
hist1 = cv2.calcHist(images=[targetimg], channels=[0], mask=None, histSize=[256], ranges=[0, 256]) # (イメージ画,チャンネル,マスク,ビン数(全画素値の場合は256),画素値の範囲(0-255)
hist2 = cv2.calcHist(images=[base_img], channels=[0], mask=None, histSize=[256], ranges=[0, 256]) # (イメージ画,チャンネル,マスク,ビン数(全画素値の場合は256),画素値の範囲(0-255)
hist_rst = cv2.compareHist(H1=hist1, H2=hist2, method=cv2.HISTCMP_CORREL)
comp_rate = np.uint8(hist_rst * 100)
except: # 例外時
ex, ms, tb = sys.exc_info()
self.put_error_log(type(ms)) # エラーログ処理
finally:
return comp_rate
def detect_passerby(self): # 通行人の顔を検出する
try:
face_detect = 0 # 0:顔未検出、1:顔検出
cascade = cv2.CascadeClassifier(self.cascade_file) # カスケードファイルで顔の位置を見つける
face_list = cascade.detectMultiScale(self.gray, minNeighbors=3, minSize=(150, 150)) # minNeighbors:信頼度、minSizeより小さいものは無視される
if len(face_list) > 0:
for (x, y, w, h) in face_list: # 顔を抽出
face = self.stream.array[y:y+h, x:x+w] # 顔だけを抜き出す
save_file = self.images_dir + 'face_'+ '{0:%Y-%m-%d %H-%M-%S.%f}'.format(dt.now()) + '.jpg'
cv2.imwrite(save_file, face) # ファイル保存
if os.path.isfile(self.save_obj_file): # 顔未検出の画像が存在するなら
os.remove(self.save_obj_file) # 削除する
self.save_obj_file = self.images_dir + 'object_'+ '{0:%Y-%m-%d %H-%M-%S.%f}'.format(dt.now()) + '.jpg'
cv2.imwrite(self.save_obj_file, self.stream.array) # 顔が検出された画像を保存
face_detect = 1 # 顔検出
if self.obj_save == False: # 全体画像を保存していなければ
self.obj_save = True
self.save_obj_file = self.images_dir + 'object_'+ '{0:%Y-%m-%d %H-%M-%S.%f}'.format(dt.now()) + '.jpg'
cv2.imwrite(self.save_obj_file, self.stream.array) # 顔未検出の画像を保存
except: # 例外時
ex, ms, tb = sys.exc_info()
self.put_error_log(type(ms)) # エラーログ処理
finally:
return face_detect
def upload_data(self): # 画像などをアップロードする
try:
scope = ['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive'] # スプレッドシートとドライブに対するフルアクセス権限
credentials = service_account.Credentials.from_service_account_file(filename=self.gskey_name, scopes=scope)
service = build('drive', 'v3', credentials=credentials) # Drive書き込み
for file in os.listdir(self.images_dir): # ローカルにある画像ファイルを全てGoogle Driveへ移動
query = "name = '{}' and '{}' in parents and trashed=false".format(file, self.folder_id)
res = service.files().list(q=query).execute()
if len(res['files']) == 0: # 存在チェック
media_body = MediaFileUpload(self.images_dir + file, mimetype='image/jpeg', resumable=True)
service.files().create( # ファイル作成
body={'name': file, 'mimeType': 'image/jpeg', 'parents':[self.folder_id]},
media_body=media_body,
).execute()
os.remove(self.images_dir + file) # ローカルを削除する
except: # 例外時
ex, ms, tb = sys.exc_info()
self.put_error_log(type(ms)) # エラーログ処理
def put_error_log(self, message): # エラーログファイルを出力する
if (os.path.isfile(self.error_log)): # ファイルが存在しているとき
f = open(self.error_log, 'a') # 追記モードで読み込み
else: # ファイルが無ければヘッダーを作成
f = open(self.error_log, 'w') # 書き込みモード
writer = csv.writer(f)
writer.writerow(['{0:%Y-%m-%d %H:%M:%S.%f}'.format(dt.now()), message])
#main
try:
if __name__ == '__main__':
os.chdir(os.path.dirname(os.path.abspath(__file__))) # カレントディレクトリをプログラムのあるディレクトリに移動する
pass_camara = PasserbyCameraClass()
motion_flg = 0 # 0:動きがない、1:動きがある
face_flg = 0 # 0:顔未検知、1:顔検知
pass_camara.obj_save = False # 全体画像の保存
while True:
rb_rate = pass_camara.detect_movement() # カメラの映像から動きを検知する
if rb_rate < 90 or rb_rate > 100: # 動きがあった時だけ
if face_flg == 0: # 顔が未検知の時
face_flg = pass_camara.detect_passerby() # 通行人を検出する 顔が検出されたらface_flgが1になる
motion_flg = 1 # 動きがあった
else: # 動きがない
if motion_flg == 1: # 前回動きがあったが今は動きが無い
pass_camara.upload_data() # 画像などの情報のアップロード
motion_flg = 0 # 動きが無くなればリセット
face_flg = 0 # 0:顔検知リセット
pass_camara.obj_save = False # 全体画像の保存
except KeyboardInterrupt:
pass
finally:
pass_camara.close_camera() # カメラの後処理
初期設定ファイル
上記ソースコードの33行目で読み込んでいる初期設定ファイル。
{
"location_name": "カメラ設置場所",
"cascade_file": "./haarcascades/haarcascade_frontalface_alt2.xml",
"gskey_name": "./cert/XXXXXXXXX.json",
"folder_id": "XXXXXXXXXX",
"images_dir": "./images/",
"error_log": "./log/error.log"
}
location_name | カメラ設置場所 |
cascade_file | カスケード分類器のファイルPATHを指定する |
gskey_name | Google Drive にアクセスする為の秘密鍵のファイルPATH |
folder_id | 先程保存した、Google Drive のフォルダーID |
images_dir | 画像ファイルを一時的に保存するディレクトリ |
error_log | エラーログを出力するディレクトリ |
補足説明
細かい部分についてはソースコード中のコメントを確認して欲しい。
__init__
クラスのインスタンス生成時に呼ばれるコンストラクタ。
初期設定ファイルの読み込みや PiCamera の生成を行っている。
camera.rotatio でカメラを回転させているが、ラズパイの設置方向によっては調整が必要だ。
detect_movement
カメラの映像から動きがあったかどうかを判断する。
グレースケールに変換した後、直近の数フレームの平均と最新のフレームとの比較を行い、違いを数値で取得する。
ave_image
直近の数フレーム(SAVE_FRAMEで指定した10フレーム)の平均を計算する。
get_comp_hist
直近の数フレームの平均と最新のフレームとを比較して数値で返す。
detect_passerby
カスケード分類器を使って画像中に人の顔があるかどうかを判定する。
minSize=(150, 150) で指定したサイズより小さい場合は無視する。
ある程度大きな値を設定しないと、誤検知が増えるのだが、大きくし過ぎるとかなりカメラの近くまで近寄らないと顔と認識をしないので設置現場での調整も必要だ。
1枚の画像に複数の顔が認識された場合は、face_list に配列で保存されるので、それぞれ顔だけの jpg ファイルを作成して保存すると同時に、全体の画像も保存する。
upload_data
Google Drive に認証してローカルにある特定ディレクトリのファイルを全てアップロードする。
アップロードが終わったらローカルのファイルは削除する。
put_error_log
何らかの例外(エラー)が発生した時にログを記録する。
メソッド(関数)で何らかの例外処理があった際に呼ばれる。
#main
メイン処理。
detect_movement メソッドで動きを検知する。
rb_rat で戻り値を受け取るのだが、動きが少ない時は90~100前後の戻り値が返る。
動きがあった時だけ、更に人の顔が検知できるかどうか、detect_passerby 関数を呼んで判定する。
また、”動きを検知している状態” から、”動きが検知出来なくなった状態” に遷移した時に、upload_data関数を使ってロケールの保存した画像をクラウドにアップロードする。
終わりに
以上でカメラだけで動きがあったことを検知するプログラムの記事は終了とする。
カメラの映像だけで動きを検知するロジックはモーションセンサーが誤作動しやすい場所や、あえてモーションセンサーを設置したく無い場合には有効だと思う。
ただモーションセンサーを使えば、カメラに写っていない時から反応して動画の録画を開始する事もできる。
利用シーンによって使い分けるのが良いと思う。
この記事が何処かで誰かの役に立つことを願っている。
尚、当記事中の商品へのリンクはAmazonアソシエイトへのリンクが含まれています。Amazonのアソシエイトとして、当メディアは適格販売により収入を得ていますのでご了承ください。
最近のコメント