ラズパイとAI(AutoML)で映像からオブジェクトを検出してレゴEV3のモーターを制御する
Contents
レゴEV3とラズパイカメラで画像認識
教育用レゴマインドストームEV3とラズパイ(Raspberry Pi 3 B+)とラズパイカメラを使ってカメラに写ったレゴのフィギュアを見分けてEV3のモーターを制御して動作を変えるプログラムを作った時の備忘録。
レゴのフィギュアの見分け方は以前の記事を参照して欲しいのだが、GoogleのCloud AutoML(Machine Learning)のオブジェクト検出を使っている。
複数のレゴのフィギュアの画像を読み込ませて学習させて、
- Woman(レゴクリエイターの女性)
- Olivia(レゴフレンズ サマーキャンピングからオリビア)
- Rapunzel(ディズニープリンセスからラプンツェル)
- Stephanie(レゴフレンズからステファニー)
の4種類のフィギュアを検出できるようにしている。
動作
想定している動作は以下の通り。
- EV3のタッチセンサーを押す
- ラズパイカメラに写っているフィギュアを機械学習で判別する
- 識別したフィギュア名をEV3のインテリジェントブロックのモニターに表示する
- もう一度タッチセンサーを押す
- Womanなら前進、Oliveなら後退、rapunzelなら左旋回、Stephanieなら右旋回する
構成
使用している主な機器の構成は以下の通り。
- レゴマインドストームEV3で作成した本体にはラズパイ、ラズパイ用カメラ、ラズパイ用のバッテリー、機械学習用のCoral USB、ラズパイとEV3のインテリジェントブロックを接続するCosoleアダプタを搭載している
- Coral USBは必須では無いが機械学習(カメラに写った画像の分類)を高速に行う為に使用している
- ラズパイには機械学習のフレームワークとしてTensorFlow Lite、4種類のフィギュアをAutoMLで学習した学習済みモデルが搭載されている
- ラズパイ上のPythonのプログラムはパソコンのブラウザのJupyterLabで編集する
必要な機器
教育用レゴマインドストームEV3
本体は教育用レゴマインドストームEV3の基本セットで作成している。
レゴマインドストームには様々なバージョンがあるので詳しくは以前の記事を参照して欲しい。
自分は日本の正規代理店で基本セット、拡張セットを購入したが並行輸入品で手に入れる事も可能だ。
ラズパイ
EV3制御用のラズパイ。
カメラを接続してGoogleの機械学習のフレームワークのTensorFlow LiteをインストールしてPyhtonのプログラムを動作させている。
バッテリー
Ankerのバッテリー。
ラズパイを動作させるのに使用している。
カメラ
ラズパイに接続するカメラ。
このカメラで撮影したフィギュアを検出する。
Consoleアダプター
ラズパイ(Linuxマシン)とレゴEV3との接続用のアダプタ。
ラズパイ側はUSB接続、EV3側はポート1に接続している。
ロボット ショップ テクノロジアで購入した。
Coral USB Accelerator
機械学習をEdgeで高速に実行するためのGoogleのアクセラレータ。
LinuxマシンにUSB接続して使用する。
機械学習自体はラズパイ単体でも動作は遅いが実行する事ができるので必須の機器ではない。
環境の構築
学習済みモデルを用意する
4種類のレゴフィギュアを学習した学習済みモデルを構築する。
モデルはGoogleのCloud AutoML Visionを使って自分で用意したフィギュアの写真を学習させた。
その時の詳細は以前の記事を参照して欲しい。
- lego_figure_OD_dict.txt:ラベルファイル
- lego_figure_OD_model.tflite:学習済みモデル
- lego_figure_OD_tflite_metadata.json
のモデル(ファイル)を作成した。
本体の組み立て
本体は以前の「レゴEV3のディープラーニングによるライントレース」の記事で使用した本体をそのまま流用している。
上記の記事の内容をざっと説明すると、
- レゴ・マインドストームEV3とラズパイでディープラーニングによるライントレースができる
- Preferred Networks、株式会社アフレル、山梨大学が共同で開発
- 機械学習のフレームワークはChainer(今回はTensorFlowを使用した)
の内容になっている。
こちらのページから「学習する」をクリックしてIDとパスワードを登録すると(無料)教材がダウンロード可能になる。
その教材(PDF)の「付録 EV3組立図」の通りに組み立てれば良いのだが、
- 駆動はLモーターの2輪車(後輪は金属ボール)
- タッチセンサー
- ラズパイとバッテリーを積めるスペース
- 前方にカメラ
が満たされていればどのような形でも大丈夫である。
環境構築
環境も「レゴEV3のディープラーニングによるライントレース」の記事で使用した環境をそのまま流用した。
先程の教材(PDF)の「環境構築ガイド」の、
- Raspberry Pi に LinuxOS をインストール
- Raspberry Pi と PC の ssh 設定
- Raspberry Pi の環境構築
- Raspberry Pi Camera 導入と設定
- EV3 の実行環境構築(toppers 環境の EV3RT)
- JupyterLab 起動確認
- Appendix
を参考に構築した。
テキストの内容は転載禁止なのでこちらには載せないので自分でダウンロードして確認してみてほしい。
PythonでのEV3の制御
尚、上記の手順を行うとPythonでのEV3制御用のモジュールがインストールされてタッチセンサーの制御や簡単なステアリング操作、インテリジェントブロックのディスプレイへの文字の表示などがPythonで可能になる。
上記記事のリリース時は無かったのだがその後にEV3用のPython(Micro Python)がリリースされた。
こちら(Micro Python)を使用すれば、より細かいEV3の制御を行えると思うのだが今回は簡単な動作しかしないのでインストールしていない。
いずれ、より複雑なEV3の制御が必要なフィギュアの追跡(フィギュアをさ探して追跡する)とかもやりたいので試してみようと思っている。
TensorFlowのインストール
上記の手順でインストールされる機械学習のフレームワークはPFN社のChainerだが、AutoML VisionでTensorFlow Liteのモデルを用意したのでラズパイ上でTensorFlowの環境を構築する。
詳細な手順はリンク先の記事を参照して欲しいのだが、
- Edge TPUランタイムのインストール
- ラズパイとCoral USB Acceleratorを接続
- 最大クロック周波数で動作させる為のランタイムをインストール
- TensorFlow Liteインタープリターのインストール
を行った。
プログラム
ディレクトリ構成
ホーム配下のディレクトリ構成は以下の通り。
├─chainer-ev3
│ │
│ ├─workspace
│ │ DeepChick.ipynb
│ │ │
│ │ ├──models
│ │ │ lego_figure_OD_dict.txt
│ │ │ lego_figure_OD_model.tflite
│ │ │ lego_figure_OD_tflite_metadata.json
│ │ │
DeepChick.ipynb | プログラム本体 ソースコードは後述 |
lego_figure_OD_dict.txt
| フィギュアを学習したモデルのラベルファイル 下記を含む3つのファイルの詳細はこちらの記事を参照 |
lego_figure_OD_model.tflite | フィギュアを学習したモデル |
lego_figure_OD_tflite_metadata.json | フィギュアを学習したメタデータファイル |
プログラムソースコード
ソースコードは以下の通り。
# -*- coding: utf-8 -*-
"""
Created on Sun Mar 8 11:18:55 2020
@author: Souichirou Kikuchi
"""
from time import sleep
import os
import io
import sys
sys.path.append('/home/pi/.local/lib/python3.7/site-packages') # Pathを明示的に指定
from concurrent.futures import ThreadPoolExecutor # スレッド処理
from concurrent.futures import ProcessPoolExecutor # プロセス処理
from picamera import PiCamera
from PIL import Image # Python Image Library
from lib.ev3 import EV3
from tflite_runtime.interpreter import Interpreter, load_delegate
import numpy as np
MODEL = './models/lego_figure_OD_model.tflite'
# 読み込むカメラの解像度
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
THRESHOLD = 0.4 # 最低予測スコア
class EyeCameraClass: # カメラのクラス
def __init__(self): # コンストラクタ
self.camera = PiCamera(resolution=(CAMERA_WIDTH, CAMERA_HEIGHT), framerate=30)
self.camera.start_preview()
def detect_pi_camera(self): # カメラ撮影は別スレッドで実行する
print('camera theread start')
executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread")
executor.submit(self.camera_theread)
print('camera theread end')
def camera_theread(self): # PiCameraのストリームからjpegを取り出して物体検出
_, input_height, input_width, _ = interpreter.get_input_details()[0]['shape']
stream = io.BytesIO()
for _ in self.camera.capture_continuous(stream, format='jpeg', use_video_port=True):
stream.seek(0)
image = Image.open(stream).convert('RGB').resize((input_width, input_height), Image.ANTIALIAS)
self.objs = self.detect_objects(interpreter, image, THRESHOLD)
stream.seek(0)
stream.truncate()
def set_input_tensor(self, interpreter, image):
tensor_index = interpreter.get_input_details()[0]['index']
input_tensor = interpreter.tensor(tensor_index)()[0]
input_tensor[:, :] = image
def get_objs(self): # objsを返す関数(先頭のclass_idを返す)
if len(self.objs) == 0: # Objectが見つからない時
res = 99
else:
res = self.objs[0]['class_id']
return res
def get_output_tensor(self, interpreter, index):
# indexで指定されたtensorを出力する
# 配列の順番はlego_figure_OD_tflite_metadata.jsonのoutputTensorsに記述されている
output_details = interpreter.get_output_details()[index]
tensor = np.squeeze(interpreter.get_tensor(output_details['index']))
return tensor
def detect_objects(self, interpreter, image, threshold):
# threshold以上の予測スコアの検出結果を複数返す
self.set_input_tensor(interpreter, image)
interpreter.invoke()
# 検出結果明細を(予測スコア以下も含めて)全て出力する
boxes = self.get_output_tensor(interpreter, 0)
classes = self.get_output_tensor(interpreter, 1)
scores = self.get_output_tensor(interpreter, 2)
count = int(self.get_output_tensor(interpreter, 3))
results = []
for i in range(count):
if scores[i] >= threshold: # 予測スコア以上のモノだけをresultに追加する
result = {
'bounding_box': boxes[i],
'class_id': classes[i],
'score': scores[i]
}
results.append(result)
return results
def close_pi_camera(self):
self.camera.stop_preview()
self.camera.close()
#main
try:
if __name__ == "__main__":
os.chdir(os.path.dirname(os.path.abspath("__file__"))) # カレントディレクトリをプログラムのあるディレクトリに移動する
# センサーとモーターの通信ポートの定義.
touch_port = EV3.PORT_2
lmotor_port = EV3.PORT_B
rmotor_port = EV3.PORT_C
# センサーとモーターの初期設定.
ev3 = EV3()
ev3.motor_config(lmotor_port, EV3.LARGE_MOTOR)
ev3.motor_config(rmotor_port, EV3.LARGE_MOTOR)
ev3.sensor_config(touch_port, EV3.TOUCH_SENSOR)
# Tensorインタープリター
interpreter = Interpreter(MODEL, experimental_delegates=[load_delegate('libedgetpu.so.1.0')])
interpreter.allocate_tensors()
eye_camara = EyeCameraClass() # カメラコンストラクタ
res = eye_camara.detect_pi_camera() # 物体検出
while True:
print("追いかけるフィギュアを認識します。タッチセンサーを押してください")
while not ev3.touch_sensor_is_pressed(touch_port): # タッチセンサーが押されて離されるまで待つ
pass
while ev3.touch_sensor_is_pressed(touch_port):
pass
target_class = eye_camara.get_objs() # 追いかけるターゲットのクラスIDを取得する
if target_class == 0: # class_idによって振り分け
ev3.lcd_draw_string('Woman', 1)
speed = 10 # 前進
steer = 0
elif target_class == 1:
ev3.lcd_draw_string('Olivia', 1)
speed = -10 # 後退
steer = 0
elif target_class == 2:
ev3.lcd_draw_string('Rapunzel', 1)
speed = 10 # 左
steer = -50
elif target_class == 3:
ev3.lcd_draw_string('Stephanie', 1)
speed = 10 # 右
steer = 50
elif target_class == 99:
ev3.lcd_draw_string('not found', 1)
speed = 0 # 停止
steer = 0
print("もう一度タッチセンサーを押すとWoman:前進、Olivia:後退、Rapunzel:左、Stephanie:右に動きます")
while not ev3.touch_sensor_is_pressed(touch_port): # タッチセンサーが押されて離されるまで待つ
pass
while ev3.touch_sensor_is_pressed(touch_port):
pass
ev3.motor_steer(lmotor_port, rmotor_port, speed, steer)
sleep(1)
speed = 0 # 停止
steer = 0
ev3.motor_steer(lmotor_port, rmotor_port, speed, steer)
except KeyboardInterrupt:
pass
finally:
ev3.close()
eye_camara.close_pi_camera()
print('program end')
ポイント
ソースコード中にコメントを入れているので詳しい説明は省略するがポイントだけ解説をしておく。
カメラは別スレッド
32行目からのdetect_pi_camera関数内にてPiカメラの撮影はmainの処理とは別スレッド(非同期)で行うことでステアリング操作とは別に常にカメラに写ったオブジェクトを非同期で検知するようしている。
つまりタッチセンサーを押していない時やステアリングの操作中もオブジェクトの検出を同時並行で実行している。
def detect_pi_camera(self): # カメラ撮影は別スレッドで実行する
print('camera theread start')
executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread")
executor.submit(self.camera_theread)
print('camera theread end')
ストリームからオブジェクト検出
38行目からのcamera_thereadにてPiカメラのストリームからjpegを取り出して detect_objectsメソッドでjpeg画像に写っているオブジェクトの検出を行う。
def camera_theread(self): # PiCameraのストリームからjpegを取り出して物体検出
_, input_height, input_width, _ = interpreter.get_input_details()[0]['shape']
stream = io.BytesIO()
for _ in self.camera.capture_continuous(stream, format='jpeg', use_video_port=True):
stream.seek(0)
image = Image.open(stream).convert('RGB').resize((input_width, input_height), Image.ANTIALIAS)
self.objs = self.detect_objects(interpreter, image, THRESHOLD)
stream.seek(0)
stream.truncate()
ディスプレイへの表示とステアリング
117行目のget_objs()メソッドで検出されたオブジェクトの先頭(一番スコアが高かったオブジェクト)を取り出して、オブジェクトのClassIDによって処理の振り分けを行う。
以降の処理でレゴのインテリジェントブロックのディスプレイにオブジェクトの名称を表示してステアリング操作を行っている。
実行結果
動画
上記のプログラムの動いている様子。
一度目のタッチセンサーでフィギュアの種類を認識して、2度目のタッチセンサーでEV3のステアリングを操作している様を確認してみてほしい。
ブログの内容を動画にしている。
以上で今回の記事は終了とする。
この記事が何処かで誰かの役に立つことを願っている。
尚、当記事中の商品へのリンクはAmazonアソシエイトへのリンクが含まれています。Amazonのアソシエイトとして、当メディアは適格販売により収入を得ていますのでご了承ください。
とても面白い記事なのでコメントしてしまいました。なんとか参考にしたいと思います
面白い記事とのコメントありがとうございます。
書いた甲斐がありました。
是非、参考にして試してみて下さい。