ラズパイとAI(AutoML)で映像からオブジェクトを検出してレゴEV3のモーターを制御する | そう備忘録

ラズパイとAI(AutoML)で映像からオブジェクトを検出してレゴEV3のモーターを制御する

レゴEV3とラズパイカメラで画像認識

教育用レゴマインドストームEV3とラズパイ(Raspberry Pi 3 B+)とラズパイカメラを使ってカメラに写ったレゴのフィギュアを見分けてEV3のモーターを制御して動作を変えるプログラムを作った時の備忘録。

EV3でフィギアの判別

レゴのフィギュアの見分け方は以前の記事を参照して欲しいのだが、GoogleのCloud AutoML(Machine Learning)のオブジェクト検出を使っている。

複数のレゴのフィギュアの画像を読み込ませて学習させて、

  1. Woman(レゴクリエイターの女性)
  2. Olivia(レゴフレンズ サマーキャンピングからオリビア)
  3. Rapunzel(ディズニープリンセスからラプンツェル)
  4. Stephanie(レゴフレンズからステファニー)

の4種類のフィギュアを検出できるようにしている。

撮影したフィギュア

動作

想定している動作は以下の通り。

  1. EV3のタッチセンサーを押す
  2. ラズパイカメラに写っているフィギュアを機械学習で判別する
  3. 識別したフィギュア名をEV3のインテリジェントブロックのモニターに表示する
  4. もう一度タッチセンサーを押す
  5. Womanなら前進、Oliveなら後退、rapunzelなら左旋回、Stephanieなら右旋回する

レゴEV3の動き

構成

使用している主な機器の構成は以下の通り。

機器構成図
  • レゴマインドストームEV3で作成した本体にはラズパイ、ラズパイ用カメラ、ラズパイ用のバッテリー、機械学習用のCoral USB、ラズパイとEV3のインテリジェントブロックを接続するCosoleアダプタを搭載している
  • Coral USBは必須では無いが機械学習(カメラに写った画像の分類)を高速に行う為に使用している
  • ラズパイには機械学習のフレームワークとしてTensorFlow Lite、4種類のフィギュアをAutoMLで学習した学習済みモデルが搭載されている
  • ラズパイ上のPythonのプログラムはパソコンのブラウザのJupyterLabで編集する

必要な機器

教育用レゴマインドストームEV3

本体は教育用レゴマインドストームEV3の基本セットで作成している。

レゴマインドストームには様々なバージョンがあるので詳しくは以前の記事を参照して欲しい。

自分は日本の正規代理店で基本セット、拡張セットを購入したが並行輸入品で手に入れる事も可能だ。

ラズパイ

EV3制御用のラズパイ。

カメラを接続してGoogleの機械学習のフレームワークのTensorFlow LiteをインストールしてPyhtonのプログラムを動作させている。

バッテリー

Ankerのバッテリー。

ラズパイを動作させるのに使用している。

カメラ

ラズパイに接続するカメラ。

このカメラで撮影したフィギュアを検出する。

Consoleアダプター

ラズパイ(Linuxマシン)とレゴEV3との接続用のアダプタ。

ラズパイ側はUSB接続、EV3側はポート1に接続している。

ロボット ショップ テクノロジアで購入した。

EV3コンソールアダプター

Coral USB Accelerator

機械学習をEdgeで高速に実行するためのGoogleのアクセラレータ。

LinuxマシンにUSB接続して使用する。

機械学習自体はラズパイ単体でも動作は遅いが実行する事ができるので必須の機器ではない。

環境の構築

学習済みモデルを用意する

4種類のレゴフィギュアを学習した学習済みモデルを構築する。

モデルはGoogleのCloud AutoML Visionを使って自分で用意したフィギュアの写真を学習させた。

その時の詳細は以前の記事を参照して欲しい。

  • lego_figure_OD_dict.txt:ラベルファイル
  • lego_figure_OD_model.tflite:学習済みモデル
  • lego_figure_OD_tflite_metadata.json

のモデル(ファイル)を作成した。

本体の組み立て

本体は以前の「レゴEV3のディープラーニングによるライントレース」の記事で使用した本体をそのまま流用している。

上記の記事の内容をざっと説明すると、

  • レゴ・マインドストームEV3とラズパイでディープラーニングによるライントレースができる
  • Preferred Networks、株式会社アフレル、山梨大学が共同で開発
  • 機械学習のフレームワークはChainer(今回はTensorFlowを使用した)

の内容になっている。

こちらのページから「学習する」をクリックしてIDとパスワードを登録すると(無料)教材がダウンロード可能になる。

学習する

その教材(PDF)の「付録 EV3組立図」の通りに組み立てれば良いのだが、

  • 駆動はLモーターの2輪車(後輪は金属ボール)
  • タッチセンサー
  • ラズパイとバッテリーを積めるスペース
  • 前方にカメラ

が満たされていればどのような形でも大丈夫である。

環境構築

環境も「レゴEV3のディープラーニングによるライントレース」の記事で使用した環境をそのまま流用した。

先程の教材(PDF)の「環境構築ガイド」の、

  • Raspberry Pi に LinuxOS をインストール
  • Raspberry Pi と PC の ssh 設定
  • Raspberry Pi の環境構築
  • Raspberry Pi Camera 導入と設定
  • EV3 の実行環境構築(toppers 環境の EV3RT)
  • JupyterLab 起動確認
  • Appendix

を参考に構築した。

テキストの内容は転載禁止なのでこちらには載せないので自分でダウンロードして確認してみてほしい。

PythonでのEV3の制御

尚、上記の手順を行うとPythonでのEV3制御用のモジュールがインストールされてタッチセンサーの制御や簡単なステアリング操作、インテリジェントブロックのディスプレイへの文字の表示などがPythonで可能になる。

上記記事のリリース時は無かったのだがその後にEV3用のPython(Micro Python)がリリースされた。

こちら(Micro Python)を使用すれば、より細かいEV3の制御を行えると思うのだが今回は簡単な動作しかしないのでインストールしていない。

いずれ、より複雑なEV3の制御が必要なフィギュアの追跡(フィギュアをさ探して追跡する)とかもやりたいので試してみようと思っている。

TensorFlowのインストール

上記の手順でインストールされる機械学習のフレームワークはPFN社のChainerだが、AutoML VisionでTensorFlow Liteのモデルを用意したのでラズパイ上でTensorFlowの環境を構築する。

詳細な手順はリンク先の記事を参照して欲しいのだが、

  1. Edge TPUランタイムのインストール
  2. ラズパイとCoral USB Acceleratorを接続
  3. 最大クロック周波数で動作させる為のランタイムをインストール
  4. TensorFlow Liteインタープリターのインストール

を行った。

プログラム

ディレクトリ構成

ホーム配下のディレクトリ構成は以下の通り。

├─chainer-ev3
│  │      
│  ├─workspace
│  │ DeepChick.ipynb
│  │  │
│  │  ├──models
│  │  │  lego_figure_OD_dict.txt  
│  │  │  lego_figure_OD_model.tflite  
│  │  │  lego_figure_OD_tflite_metadata.json  
│  │  │

DeepChick.ipynb

プログラム本体

ソースコードは後述

lego_figure_OD_dict.txt

 

フィギュアを学習したモデルのラベルファイル

下記を含む3つのファイルの詳細はこちらの記事を参照

lego_figure_OD_model.tflite

フィギュアを学習したモデル

lego_figure_OD_tflite_metadata.json

フィギュアを学習したメタデータファイル

プログラムソースコード

ソースコードは以下の通り。

# -*- coding: utf-8 -*-
"""
Created on Sun Mar  8 11:18:55 2020
@author: Souichirou Kikuchi
"""

from time import sleep
import os
import io
import sys
sys.path.append('/home/pi/.local/lib/python3.7/site-packages') # Pathを明示的に指定
from concurrent.futures import ThreadPoolExecutor # スレッド処理
from concurrent.futures import ProcessPoolExecutor # プロセス処理
from picamera import PiCamera
from PIL import Image # Python Image Library
from lib.ev3 import EV3
from tflite_runtime.interpreter import Interpreter, load_delegate
import numpy as np

MODEL = './models/lego_figure_OD_model.tflite'

# 読み込むカメラの解像度
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
THRESHOLD = 0.4 # 最低予測スコア

class  EyeCameraClass: # カメラのクラス
    def __init__(self): # コンストラクタ
        self.camera = PiCamera(resolution=(CAMERA_WIDTH, CAMERA_HEIGHT), framerate=30)
        self.camera.start_preview()

    def detect_pi_camera(self): # カメラ撮影は別スレッドで実行する
        print('camera theread start')
        executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread")
        executor.submit(self.camera_theread)
        print('camera theread end')
 
    def camera_theread(self): # PiCameraのストリームからjpegを取り出して物体検出
        _, input_height, input_width, _ = interpreter.get_input_details()[0]['shape']
        stream = io.BytesIO()
        for _ in self.camera.capture_continuous(stream, format='jpeg', use_video_port=True):
            stream.seek(0)
            image = Image.open(stream).convert('RGB').resize((input_width, input_height), Image.ANTIALIAS)
            self.objs = self.detect_objects(interpreter, image, THRESHOLD)
            stream.seek(0)
            stream.truncate()
    
    def set_input_tensor(self, interpreter, image):
        tensor_index = interpreter.get_input_details()[0]['index']
        input_tensor = interpreter.tensor(tensor_index)()[0]
        input_tensor[:, :] = image

    def get_objs(self): # objsを返す関数(先頭のclass_idを返す)
        if len(self.objs) == 0: # Objectが見つからない時
            res = 99
        else:
            res = self.objs[0]['class_id']
        return res

    def get_output_tensor(self, interpreter, index):
        # indexで指定されたtensorを出力する
        # 配列の順番はlego_figure_OD_tflite_metadata.jsonのoutputTensorsに記述されている
        output_details = interpreter.get_output_details()[index]
        tensor = np.squeeze(interpreter.get_tensor(output_details['index']))
        return tensor

    def detect_objects(self, interpreter, image, threshold):
        # threshold以上の予測スコアの検出結果を複数返す
        self.set_input_tensor(interpreter, image)
        interpreter.invoke()
        # 検出結果明細を(予測スコア以下も含めて)全て出力する
        boxes = self.get_output_tensor(interpreter, 0)
        classes = self.get_output_tensor(interpreter, 1)
        scores = self.get_output_tensor(interpreter, 2)
        count = int(self.get_output_tensor(interpreter, 3))
        results = []
        for i in range(count):
            if scores[i] >= threshold: # 予測スコア以上のモノだけをresultに追加する
                result = {
                        'bounding_box': boxes[i],
                        'class_id': classes[i],
                        'score': scores[i]
                        }
                results.append(result)
        return results

    def close_pi_camera(self):
        self.camera.stop_preview()
        self.camera.close()

#main
try:
    if __name__ == "__main__":
        os.chdir(os.path.dirname(os.path.abspath("__file__"))) # カレントディレクトリをプログラムのあるディレクトリに移動する
        # センサーとモーターの通信ポートの定義.
        touch_port = EV3.PORT_2
        lmotor_port = EV3.PORT_B
        rmotor_port = EV3.PORT_C
        # センサーとモーターの初期設定.
        ev3 = EV3()
        ev3.motor_config(lmotor_port, EV3.LARGE_MOTOR)
        ev3.motor_config(rmotor_port, EV3.LARGE_MOTOR)
        ev3.sensor_config(touch_port, EV3.TOUCH_SENSOR)

        # Tensorインタープリター
        interpreter = Interpreter(MODEL, experimental_delegates=[load_delegate('libedgetpu.so.1.0')]) 
        interpreter.allocate_tensors()
        eye_camara = EyeCameraClass() # カメラコンストラクタ
        res = eye_camara.detect_pi_camera() # 物体検出
        while True:

            print("追いかけるフィギュアを認識します。タッチセンサーを押してください")
            while not ev3.touch_sensor_is_pressed(touch_port): # タッチセンサーが押されて離されるまで待つ
                pass
            while ev3.touch_sensor_is_pressed(touch_port):
                pass
            target_class = eye_camara.get_objs() # 追いかけるターゲットのクラスIDを取得する
            if target_class == 0: # class_idによって振り分け
                ev3.lcd_draw_string('Woman', 1)
                speed = 10 # 前進
                steer = 0
            elif target_class == 1:
                ev3.lcd_draw_string('Olivia', 1)
                speed = -10 # 後退
                steer = 0
            elif target_class == 2:
                ev3.lcd_draw_string('Rapunzel', 1)
                speed = 10 # 左
                steer = -50
            elif target_class == 3:
                ev3.lcd_draw_string('Stephanie', 1)
                speed = 10 # 右
                steer = 50
            elif target_class == 99:
                ev3.lcd_draw_string('not found', 1)
                speed = 0 # 停止
                steer = 0
            print("もう一度タッチセンサーを押すとWoman:前進、Olivia:後退、Rapunzel:左、Stephanie:右に動きます")
            while not ev3.touch_sensor_is_pressed(touch_port): # タッチセンサーが押されて離されるまで待つ
                pass
            while ev3.touch_sensor_is_pressed(touch_port):
                pass
            ev3.motor_steer(lmotor_port, rmotor_port, speed, steer)
            sleep(1)
            speed = 0 # 停止
            steer = 0
            ev3.motor_steer(lmotor_port, rmotor_port, speed, steer)
except KeyboardInterrupt:
    pass
finally:
    ev3.close()
    eye_camara.close_pi_camera()
    print('program end')

ポイント

ソースコード中にコメントを入れているので詳しい説明は省略するがポイントだけ解説をしておく。

カメラは別スレッド

32行目からのdetect_pi_camera関数内にてPiカメラの撮影はmainの処理とは別スレッド(非同期)で行うことでステアリング操作とは別に常にカメラに写ったオブジェクトを非同期で検知するようしている。

つまりタッチセンサーを押していない時やステアリングの操作中もオブジェクトの検出を同時並行で実行している。

    def detect_pi_camera(self): # カメラ撮影は別スレッドで実行する
        print('camera theread start')
        executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="thread")
        executor.submit(self.camera_theread)
        print('camera theread end')

ストリームからオブジェクト検出

38行目からのcamera_thereadにてPiカメラのストリームからjpegを取り出して detect_objectsメソッドでjpeg画像に写っているオブジェクトの検出を行う。

    def camera_theread(self): # PiCameraのストリームからjpegを取り出して物体検出
        _, input_height, input_width, _ = interpreter.get_input_details()[0]['shape']
        stream = io.BytesIO()
        for _ in self.camera.capture_continuous(stream, format='jpeg', use_video_port=True):
            stream.seek(0)
            image = Image.open(stream).convert('RGB').resize((input_width, input_height), Image.ANTIALIAS)
            self.objs = self.detect_objects(interpreter, image, THRESHOLD)
            stream.seek(0)
            stream.truncate()

ディスプレイへの表示とステアリング

117行目のget_objs()メソッドで検出されたオブジェクトの先頭(一番スコアが高かったオブジェクト)を取り出して、オブジェクトのClassIDによって処理の振り分けを行う。

以降の処理でレゴのインテリジェントブロックのディスプレイにオブジェクトの名称を表示してステアリング操作を行っている。

実行結果

実行結果

動画

上記のプログラムの動いている様子。

一度目のタッチセンサーでフィギュアの種類を認識して、2度目のタッチセンサーでEV3のステアリングを操作している様を確認してみてほしい。

ブログの内容を動画にしている。

以上で今回の記事は終了とする。

最後に

この記事が何処かで誰かの役に立つことを願っている。

尚、当記事中の商品へのリンクはAmazonアソシエイトへのリンクが含まれています。Amazonのアソシエイトとして、当メディアは適格販売により収入を得ていますのでご了承ください。

souichirou

やった事を忘れない為の備忘録 同じような事をやりたい人の参考になればと思ってブログにしてます。 主にレゴ、AWS(Amazon Web Services)、WordPress、Deep Learning、RaspberryPiに関するブログを書いています。 仕事では工場に協働ロボットの導入や中小企業へのAI/IoT導入のアドバイザーをやっています。 2019年7月にJDLA(一般社団法人 日本デイープラーニング協会)Deep Learning for GENERALに合格しました。 質問は記事一番下にあるコメントかメニュー上部の問い合わせからお願いします。

おすすめ

2件のフィードバック

  1. 匿名 より:

    とても面白い記事なのでコメントしてしまいました。なんとか参考にしたいと思います

    • souichirou より:

      面白い記事とのコメントありがとうございます。
      書いた甲斐がありました。
      是非、参考にして試してみて下さい。

souichirou へ返信する コメントをキャンセル

名前、メール、サイト欄は任意です。
またメールアドレスは公開されません。