ラズパイゼロで温湿度と気圧、空気の汚れを検出してグラフ化するIoT機器(後編) | そう備忘録

ラズパイゼロで温湿度と気圧、空気の汚れを検出してグラフ化するIoT機器(後編)

前回の続き

ラズパイゼロとセンサー(BME280とMQ-135)で温湿度、気圧、空気の汚れを検出する環境センサーをダイソーのケースや材料で作った時の備忘録の続き(後編)。

前回の記事では全体の構成、必要な部品、必要な工具、組み立てについて説明を行った。

今回はOSやラズパイの環境設定、AWSなどのクラウド環境の設定、プログラミングについての記事とする。

温湿度、気圧、空気の汚れを検知するセンサー

全体構成図

前回の記事でも紹介したがシステムの全体構成図を再掲する。

  • 各種測定値のしきい値はGoogle Sheetから読み込む
  • BME280及びMQ-135で温湿度・気圧、空気の汚れを測定する
  • 測定情報をAWS IoTへパブリッシュ(アップロード)する
  • 情報をDynamoDBに蓄えると同時にAmazon Elasticsearch Serviceに送信する
  • kibanaでグラフ化する

環境準備

ラズパイゼロのOSや必要なモジュール等の環境周りの設定を行う。

OSのインストール

自分はOSがプリインストールされているSDカードが入ったセット品を購入したが、もしまっさらなSDカードにOSのインストールから始めるのであれば「Raspberry Pi ImagerでOS(Raspbian)インストール」の記事を参照して欲しい。

パッケージの更新

ラズパイのメニューからXLTerminalを起動して下記のコマンドでパッケージを最新の状態にする。

sudo apt-get update
sudo apt-get upgrade

ホスト名の変更

ホスト名はデフォルトの名前から分かりやすい名前に変更をしておいた。

メニューから設定ー>RaspberryPiの設定、システムタブでホスト名を変更する。

sshとVNCを有効にする

プログラム開発中はsshとVNCを有効にしながら開発を行った。

プログラム自体はパソコンでコーディングをしてsshでラズパイにソースコードをコピーをしてVNCでラズパイのデスクトップを操作をしながらプログラムのデバッグを行った。

メニューから設定ー>RaspberryPiの設定、インターフェースタブを開いてsshとVNCを有効化した。

尚、VNCは本運用をする段階で無効化している。

sshは運用開始後も遠隔操作をしたい時があるので有効化ままにしてある。

I2CとSPIを有効にする

続いて同じインターフェイスタブでSPIとI2Cを有効にする。

sshとVNC、I2CとSPIを有効にする

空気の汚れを測定するセンサー(MQ-135)のアナログ出力はADコンバーターでデジタルに変換してSPI(Serial Peripheral Interface)でラズパイと通信をするので有効にする。

またI2C(Inter-Integrated Circuit)はシリアル通信のひとつでマスタ(今回はRaspberry Pi Zero)に対して複数のスレーブ(今回はBME280)が接続できる。

デフォルトユーザpiの変更

プログラム開発中はデフォルトユーザのpiのままで開発をしたが、本運用するのであれば、デフォルトのユーザpiはそのままにしておくとセキュリティ上問題があるので別ユーザ名に変更しておいた方が安全だ。

デフォルトユーザpiの変更方法はこちらの記事を参照して欲しい。

固定IPの割当

起動の度にDHCPから異なるIP Addressを割り当てられるとVNCやsshで接続する時に面倒なので無線LANのIP Addressを固定IPに変更した。

固定IPへの変更方法はこちらの記事を参照して欲しい。

Python3をデフォルトにする

PythonのデフォルトをPython3に変更している。

もしかしたら今の最新のRaspbianをインストールするとPythonのデフォルトはPython3なのかも知れないが、自分が購入したセット品のプリインストールされたOSのPythonのデフォルトが2.7だったので(Kernel version:4.19)Python3をデフォルトに変更した。

これをやっておかないとこの後でインストールするpaho-MQTTやGoogle Spreadsheet用のモジュールがPython2.7の環境にインストールされてしまって面倒なことになる。

変更方法はこちらの記事を参照して欲しい。

尚、Pythonのバージョンの確認方法は以下のコマンドで確認できる。

python --version

必要なモジュールのインストール

ラズパイゼロのプログラムで必要となるモジュールをインストールする。

paho-mqttのインストール

各種センサーの測定結果をMQTTプロトコルでAWS IoT Coreにパブリッシュ(アップロード)するのでPythonでMQTTプロトコルを扱うためのモジュールpaho-mqttをインストールする。

MQTT(Message Queueing Telemetry Transport )プロトコルはIBM社とEurotech社のメンバーにより考案された軽量、省電力なプロトコルでIoT製品のデータ通信に適している。

ネットワークが不安定な場所での利用も想定されており、HTTPプロトコルと比較しても1/10程度の通信量で済むので非力なマシンでも利用が可能だ。

以下のコマンドでpaho-mqttのモジュールをインストールする。

sudo pip3 install paho-mqtt

GoogleSheet関連のモジュールのインストール

端末毎の温湿度のしきい値はGoogle Sheetで指定する仕様とした。

ラズパイゼロからGoogle Sheetを読み込んでしきい値を取得して、しきい値を超えた温湿度が測定された場合はアラートを発信する仕様にしている。

設定情報をGoogle Sheetから読み取る為にGoogle Spreadsheet操作用のモジュールをインストールする。

sudo pip3 install gspread

続いてGoogle SheetへはOAuth2認証で接続するので認証用のモジュールをインストールする。

sudo pip3 install oauth2client

設定シートの仕様

設定シートの仕様は以下の通りとなっている。

横軸に設定項目、縦軸に端末になっており端末毎の個別の設定を可能にしている。

設定シートの仕様

シート名(ファイル名)

温湿度設定シート

この名前は後ほど使用するので控えておく

リンク情報

このシートのリンク情報

spreadsheets/d/~ 以降のシート固有の情報をプログラム中で使用するので控えておく

ClientID

端末を識別する端末ID

initial.jsonファイルのclient_idセクションの値と比較して同一の行のデータを対象とする

LocationID

測定場所を表すID

LocationName

測定場所場所の名称

Interval

温湿度、気圧などの環境情報を取得する間隔を分で指定する

UpLoadInterval

センサーで取得した温湿度、気圧、空気の汚れなどの環境情報をAWS IoT Coreにパブリッシュ(アップロード)する間隔を分で指定する

上記のIntervalより長い時間(分)を指定する

MinTemp

最低温度のしきい値

この値を下回った温度が測定された場合はアラートを発信する

尚、アラートの最小間隔はIntervalの値に依存する(UpLoadIntervalでは無い)

MaxTemp

最高温度のしきい値

この値を上回った温度が測定された場合はアラートを発信する

MinHumi

最低湿度のしきい値

この値を下回った湿度が測定された場合はアラートを発信する

MaxHumi

最高湿度のしきい値

この値を上回った湿度が測定された場合はアラートを発信する

AdjTemp

温度の補正用の値

BME280モジュールで計測された値にこの数値を乗算して測定結果を補正するための数値

機器の個体差による測定誤差を補正する為の機能だが、不要ではあれば1を設定すれば良い

例えば0.9が設定されている場合は、

測定値:21.5(℃)

21.5×0.9=19.35→19.4(小数点以下2桁目を四捨五入)となる

AdjHumi

湿度の補正用の値

計算方法は温度と同様

Alert

温湿度がしきい値超えた場合にアラートを発信するかどうかのフラグ

  • 1:発信する
  • 0:発信しない

通常は1(発信する)に設定しておくが例えばIntervalを5分に設定した場合、しきい値を超えると超えている間は5分毎にメール(やSMS)が飛んでくる

実際の現場での対応まで時間が必要で、一旦アラートの送信を停止したい時に0(発信)しないに設定する為の項目

M列

コメント列(プログラム中では使用しない)

Google Sheetへのアクセス設定

シートを作成したらGoogle Cloud Platformで認証情報(サービスアカウント)を作成してGoogle Sheetへ共有設定を行う。

この設定によりサービアカウンでのGoogle Sheetへのアクセスが可能になる。

詳細は「RaspberryPi 3 Model B+からGoogleスプレッドシートへアクセスする方法」の記事を確認して欲しい。

主な手順は以下の通り。

  1. Google Cloud Platformにアクセス
  2. プロジェクトを作成してAPIを有効化する
  3. 認証情報
  4. サービスアカウントを作成する
  5. 秘密鍵ファイルを作成する
  6. Googleスプレッドシートを作成して共有設定する

pandasのインストール

プログラム中では読み取った温湿度、気圧等の環境情報をAWS IoT CoreにパブリッシュするまでローカルのディスクにCSVで保存している。

その際にpandasだとデータの絞り込みや扱いが簡単なのでこのモジュールを以下のコマンドでインストールしている。

sudo apt-get install python3-pandas

pip3でインストールすると依存関係で問題がでるとの記事を見かけたのでapt-get installでインストールした。

python-smbusのインストール

温湿度、気圧センサー(BME280)とI2Cでやり取りをするに辺りpythonのsmbusライブラリーをインストールする。

下記のコマンドでpython-smbus及びsmbus2をインストールする。

尚、smbus2はsmbusを拡張するために1から作成したライブラリー。

詳しくはこちらを参照してほしい。

sudo apt install -y python-smbus
sudo pip install smbus2

AWS関連

プログラム作成の前に温湿度、気圧などの情報をAWS IoT Coreにパブリッシュした後、データベースに蓄えたり、しきい値を超えた場合のアラートを発信したり、kibanaでグラフ化する環境を整える。

以下の操作はAWSコンソールにログインをして操作をする必要があり、有料のサービスもあるので注意が必要だ。

尚、今回はクラウド上でデータの蓄積やアラート、グラフ化を実現しているがクラウドを使用せずにLINEやTwitterにラズパイから直接メッセージやグラフを送信する方法も可能だと思う。

AWS環境を使用しない方法については別記事で紹介したいと思う。

AWS IoT Coreの設定

ラズパイゼロからパブリッシュされた情報はAWS IoT Coreでサブスクライブ(受信)されるので受け取り側のAWS IoT Coreの設定を行う。

詳細はこちらの以前の記事を参照して欲しい。

リンク先の記事の、

  1. AWS IoT Coreの設定
  2. モノの作成
  3. 証明書の作成
  4. ポリシーの作成※
  5. ポリシーのアタッチ
  6. エンドポイントを控えておく

までを実施する。

※リンク先の記事ではリソースARNは下記の様に設定している。

リソースARN:arn:aws:iot:ap-northeast-1:XXXXXXXXX:client/SKRP*

上記はClient名の先頭4文字がSKRPの端末のみをアクセス可能にする設定だが今回のプログラムのClient IDは”temperature0001″にしているので、リソースARNは以下の様に変更する。

リソースARN:arn:aws:iot:ap-northeast-1:XXXXXXXXX:client/temperature*

Amazon SNSの設定

続いてアラート時のメッセージ送信の為のAmazon SNS(Simple Notification Service)の設定を行う。

同記事のリンク先の「Amazon SNSの設定」以降を参考にして、

  1. Amazon SNSの設定
  2. トピックの作成
  3. サブスクリプションの作成

までを実施する。

ACTの設定(アラート)

続いてアラート情報をAWS IoT Coreでサブスクライブした際に上記で設定したAmazon SNSのトピックにメッセージを送信するようにルールの設定を行う。

詳細は同記事リンク先の「IoT CoreのACT」を参考にして、

  1. IoT CoreのACT

を行い、アラートトピックを受信した時に上記で作成したAmazon SNSにデータをプッシュ通知として送信する。

DynamoDBの設定

続いてパブリッシュされた温湿度、気圧情報をデータベースに格納する。

データベースは料金の安いDynamoDBを選択している。

詳細はこちらの以前の記事を参照して欲しい。

  1. DynamoDBへデータを保存する
  2. DynamoDBの設定
  3. テーブルの作成
  4. セカンダリインデックスやキャパシティなど
  5. キャパシティ
  6. 保管時の暗号化

までを実施する。

ACTの設定(DynamoDB)

続いて温湿度、気圧情報をAWS IoT Coreでサブスクライブした際に上記で設定したDynamoDBにデータを送信するためのルールの設定を行う。

同記事のリンク先の「IoT Core ACT」以降を参考にして、

  1. IoT Core ACT
  2. ルールの作成
  3. 名前と説明
  4. ルールクエリステートメント
  5. DynamoDBにメッセージ送信
  6. アクションの設定
  7. ロールの作成
  8. アクションの追加
  9. ルールの作成
  10. ルールの有効化

までを実施する。

Elasticsearch Service

kibanaでデータをグラフ化するに辺りAmazon Elasticsearch Serviceを立ち上げる。

詳細はリンク先の「Amazon Elasticsearch Service」以降の記事を参照して欲しい。

  1. Amazon Elasticsearch Service
  2. Elasticsearch Serviceとは
  3. ドメインの作成
  4. デプロイタイプの選択
  5. ドメインの設定
  6. アクセスとセキュリティ
  7. 確認画面

までを実施する。

ACTの設定(Elasticsearch)

続いて温湿度、気圧情報をAWS IoT Coreでサブスクライブした際に上記で設定したAmazon Elasticsearch Serviceにデータを送信するためのルールの設定を行う。

同記事のリンク先の「IoT Core ACTルール」以降を参考にして、

  1. IoT Core ACTルール
  2. ルールの作成
  3. 名前と説明
  4. ルールクエリステートメント
  5. アクションの追加
  6. アクションの設定
  7. ルールの作成
  8. ルールの有効化

を実施する。

kibanaについて

Amazon Elasticsearch Serviceに格納されているデータをグラフ化するツール、kibanaの設定はプログラムを作成して実行後に行う。

プログラムを実行してAWS IoT Coreー>Amazon Elasticsearch Serviceにデータが連携された後でkibanaのインデックスの設定が可能になるのでAWS側の設定は一旦ここで終了とする。

プログラム

ディレクトリ構造

プログラムは /opt/配下に env-sensor というディレクトリを作成して格納した。

ubuntu の説明では /opt は主にサードパーティ製のプログラムや自作のプログラムを格納する場所とあったのでこのディレクトリにしている。

2021年04月29日 追記

今回は /opt/ 配下に格納したが、特定のユーザからしか起動しないプログラムなので /usr/local/ への格納でも良かったのかも知れない。

├─opt
│  │      
│  ├─env-sensor
│  │  │  env-sensor.py
│  │  │  initial.json
│  │  │
│  │  ├──backup
│  │  │    temp_humi_YYYY-MM-DD-HH-mm-SS.csv
│  │  │        ・
│  │  │        ・
│  │  │
│  │  ├──cert
│  │  │    XXXXXXXXXX-certificate.pem.crt
│  │  │    XXXXXXXXXX-private.pem.key
│  │  │    AmazonRootCAXXXX.pem
│  │  │    GoogleSheetKeyFileXXXXXXX.json
│  │  │
│  │  ├──csv
│  │  │    temp_humi.csv
│  │  │
│  │  └──log
│  │       error.log
│  │

env-sensor

env-sensor.py

プログラム本体

initial.json

初期設定ファイル

{
	"client_id": "temperature0001",
	"awsport": 8883,
	"awscert": "./cert/XXXXXXXXXX-certificate.pem.crt",
	"awskey": "./cert/XXXXXXXXXX-private.pem.key",
	"awsroot_ca": "./cert/AmazonRootCAXXXX.pem",
	"awsend_point": "XXXXXXXXXXX-ats.iot.ap-northeast-1.amazonaws.com",
	"gskey_name": "./cert/GoogleSheetKeyFileXXXXXXX.json",
	"gssheet_name": "温湿度設定シート",
	"gssheet_link": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
}

client_id

端末を識別する為のID

awsport

AWS IoT Coreに接続するためのポート番号

awscert

前述の「AWS IoT Coreの設定」で発行したモノの証明書を格納したPathとファイル名を指定している

awskey

前述の「AWS IoT Coreの設定」で発行したIoT Coreに接続するためのプライベートキーのPathとファイル名を指定している

awsroot_ca

前述の「AWS IoT Coreの設定」で発行したAmazon Root CAのPathとファイル名を指定している

awsend_point

AWS IoT Coreのエンドポイント

前述の「AWS IoT Coreの設定」で控えておいたエンドポイントを指定する

gskey_name

前述の「Google Sheetへのアクセス設定」で取得したGoogleシートにアクセスする為のJSON形式の秘密鍵のPathとファイル名を指定している

gssheet_name

前述の「設定シートの仕様」で控えておいたシート名(ファイル名)

gssheet_link

前述の「設定シートの仕様」で控えておいたリンク情報

アラート発生時にこのリンク情報を送信メッセージ内に埋め込んでシートの編集を容易にするために使用する

backup

測定した温湿度、気圧の情報をAWS IoT Coreにパブリッシュする前に一時的にローカルのディスクに保存している。

パブリッシュした後はCSVファイルはディスクから削除するが一定期間バックアップの意味合いでこのディレクトリにファイルを保存している。

万が一クラウド上のデータが失われてもこのローカルディスクにバックアップされたファイルを見れば一定期間の情報は保存されている。

ファイル名はtemp_humi_YYYY-MM-DD-HH-mm-SS.csvの形式で保存されていて、プログラム中の定数(MAX_NUMBER_OF_FILE)を超えたファイルは古い順から削除している。

cert

認証情報が保存されているディレクトリ。

XXXXXXXXXX-certificate.pem.crt

前述の「AWS IoT Coreの設定」で発行したモノの証明書ファイル

XXXXXXXXXX-private.pem.key

前述の「AWS IoT Coreの設定」で発行したプライベートキーファイル

AmazonRootCAXXXX.pem

前述の「AWS IoT Coreの設定」で発行したAmazon Root CA

GoogleSheetKeyFileXXXXXXX.json

前述の「Google Sheetへのアクセス設定」で保存したGoogleシートにアクセスする為のJSON形式の秘密鍵のファイル

csv

温湿度、気圧情報をIoT Coreにパブリッシュする前にローカルディスクに一時的保存するファイルを格納するディレクトリ。

temp_humi.csvのファイル名で保存している。

log

各種ログを保存する為のディレクトリ。

Pythonのプログラム中で何らかの例外が発生した時にerror.logのファイル名でエラーメッセージ等を保存している。

プログラム本体

プログラムは以下の通り。

# -*- coding: utf-8 -*-
"""
Created on Thu Sep 17 10:39:46 2020
@author: Souichirou Kikuchi
RaspberryPi Zero
・ Google Sheetで指定された間隔で温湿度・気圧を取得
・ 同じく指定された間隔でデータをAWS IoT Coreに情報をPublish
・ しきい値を超えている場合はアラートを発信(Amazon SNSでメール配信)
・ 温度、湿度、気圧センサーはBME280で測定する
・ ベンゼン、アルコール、煙などはMQ-135で測定する

"""

from smbus2 import SMBus
import datetime as dt # 日付時刻
from time import sleep
import RPi.GPIO as GPIO # GPIO
import ssl # SSL
import os # OSコマンド
import sys # ErrorLog出力用
import json # JSON形式のファイルの扱い
import gspread # Google SpreadSheet
from oauth2client.service_account import ServiceAccountCredentials # GoogleSheet認証用
import pandas as pd # 温湿度CSV保存
import paho.mqtt.client as mqtt # MQTT
import spidev # ADコンバーター用

TEMP_CSV  = './csv/temp_humi.csv' # アップロードするまで温湿度をcsvファイルに保存しておく
TOPIC1    = 'dt' # データを表す
TOPIC2    = '/TempAndHumi' # アプリケーション名
TOPIC3    = '/myhome' # 場所
TOPIC5    = '/temp-humi' # 温湿度 TOPIC4はclient_idがセットされる
ARTOPIC1  = 'alert' # アラート用のTOPIC
MQ135_GPIO = 17 # MQ-135センサーデジタル接続GPIO

def put_error_log(message): # エラーログファイルを出力する
    ERROR_LOG_FILE  = './log/error.log' # エラーログ

    if (os.path.isfile(ERROR_LOG_FILE)): # ファイルが存在しているとき
        __df = pd.read_csv(ERROR_LOG_FILE, header=0, encoding='UTF-8') # 読み込み 0行目がヘッダー(有り)
    else: # ファイルが無ければヘッダーを作成
        __df = pd.DataFrame(columns=['ErrorData', 'ErrorMessage'])
    __errorlog = pd.Series( ['{0:%Y-%m-%d %H:%M:%S.%f}'.format(dt.datetime.now()), message], index=__df.columns)
    __df = __df.append(__errorlog, ignore_index=True ) # dataframeを作成して行追加、ignore_indexで新たな行番号を振っている
    __df.to_csv(ERROR_LOG_FILE, index=False) # Log書き込み indexは書き込まない


class Bme280Class: # 温湿度気圧センサー(BME280)クラス

    def __init__(self, bus_number, i2c_address): # コンストラクタ(初期処理)
        try:
            self.bus = SMBus(bus_number)
            self.dig_temp = []
            self.dig_pres = []
            self.dig_humi = []
            self.t_fine = 0.0
            __osrs_t = 1 #Temperature oversampling x 1
            __osrs_p = 1 #Pressure oversampling x 1
            __osrs_h = 1 #Humidity oversampling x 1
            __mode = 3 #Normal mode
            __t_sb = 5 #Tstandby 1000ms
            __filter = 0 #Filter off
            __spi3w_en = 0 #3-wire SPI Disable
           
            __ctrl_meas_reg = (__osrs_t << 5) | (__osrs_p << 2) | __mode
            __config_reg    = (__t_sb << 5) | (__filter << 2) | __spi3w_en
            __ctrl_hum_reg  = __osrs_h
            self.write_reg(i2c_address, 0xF2, __ctrl_hum_reg)
            self.write_reg(i2c_address, 0xF4, __ctrl_meas_reg)
            self.write_reg(i2c_address, 0xF5, __config_reg)
            self.get_calib_param(i2c_address)
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def get_calib_param(self, i2c_address):
        try:
            __calib = []
            for i in range (0x88, 0x88+24):
                __calib.append(self.bus.read_byte_data(i2c_address, i))
            __calib.append(self.bus.read_byte_data(i2c_address, 0xA1))
            for i in range (0xE1, 0xE1+7):
                __calib.append(self.bus.read_byte_data(i2c_address, i))
            self.dig_temp.append((__calib[1] << 8) | __calib[0])
            self.dig_temp.append((__calib[3] << 8) | __calib[2])
            self.dig_temp.append((__calib[5] << 8) | __calib[4])
            self.dig_pres.append((__calib[7] << 8) | __calib[6])
            self.dig_pres.append((__calib[9] << 8) | __calib[8])
            self.dig_pres.append((__calib[11]<< 8) | __calib[10])
            self.dig_pres.append((__calib[13]<< 8) | __calib[12])
            self.dig_pres.append((__calib[15]<< 8) | __calib[14])
            self.dig_pres.append((__calib[17]<< 8) | __calib[16])
            self.dig_pres.append((__calib[19]<< 8) | __calib[18])
            self.dig_pres.append((__calib[21]<< 8) | __calib[20])
            self.dig_pres.append((__calib[23]<< 8) | __calib[22])
            self.dig_humi.append( __calib[24] )
            self.dig_humi.append((__calib[26]<< 8) | __calib[25])
            self.dig_humi.append( __calib[27] )
            self.dig_humi.append((__calib[28]<< 4) | (0x0F & __calib[29]))
            self.dig_humi.append((__calib[30]<< 4) | ((__calib[29] >> 4) & 0x0F))
            self.dig_humi.append( __calib[31] )
            for i in range(1, 2):
                if self.dig_temp[i] & 0x8000:
                    self.dig_temp[i] = (-self.dig_temp[i] ^ 0xFFFF) + 1
            for i in range(1, 8):
                if self.dig_pres[i] & 0x8000:
                    self.dig_pres[i] = (-self.dig_pres[i] ^ 0xFFFF) + 1
            for i in range(0, 6):
                if self.dig_humi[i] & 0x8000:
                    (-self.dig_humi[i] ^ 0xFFFF) + 1
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def read_data(self, i2c_address):
        try:
            __data = []
            for i in range (0xF7, 0xF7+8):
                __data.append(self.bus.read_byte_data(i2c_address, i))
            __pres_raw = (__data[0] << 12) | (__data[1] << 4) | (__data[2] >> 4)
            __temp_raw = (__data[3] << 12) | (__data[4] << 4) | (__data[5] >> 4)
            __humi_raw = (__data[6] << 8)  |  __data[7]
            __temp = self.compensate_temp(__temp_raw) # 温度を計算
            __pres = self.compensate_pres(__pres_raw) # 気圧を計算
            __humi = self.compensate_humi(__humi_raw) # 湿度を計算
            return __temp, __pres, __humi
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def write_reg(self, i2c_address, reg_address, data):
        try:
            self.bus.write_byte_data(i2c_address, reg_address, data)
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def compensate_pres(self, adc_pres):
        try:
            __pressure = 0.0
            __v1 = (self.t_fine / 2.0) - 64000.0
            __v2 = (((__v1 / 4.0) * (__v1 / 4.0)) / 2048) * self.dig_pres[5]
            __v2 = __v2 + ((__v1 * self.dig_pres[4]) * 2.0)
            __v2 = (__v2 / 4.0) + (self.dig_pres[3] * 65536.0)
            __v1 = (((self.dig_pres[2] * (((__v1 / 4.0) * (__v1 / 4.0)) / 8192)) / 8)  + ((self.dig_pres[1] * __v1) / 2.0)) / 262144
            __v1 = ((32768 + __v1) * self.dig_pres[0]) / 32768
    
            if __v1 == 0:
                return 0
            __pressure = ((1048576 - adc_pres) - (__v2 / 4096)) * 3125
            if __pressure < 0x80000000:
                __pressure = (__pressure * 2.0) / __v1
            else:
                __pressure = (__pressure / __v1) * 2
            __v1 = (self.dig_pres[8] * (((__pressure / 8.0) * (__pressure / 8.0)) / 8192.0)) / 4096
            __v2 = ((__pressure / 4.0) * self.dig_pres[7]) / 8192.0
            __pressure = __pressure + ((__v1 + __v2 + self.dig_pres[6]) / 16.0)
            return __pressure / 100
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理
        
    def compensate_temp(self, adc_temp):
        try:
            __v1 = (adc_temp / 16384.0 - self.dig_temp[0] / 1024.0) * self.dig_temp[1]
            __v2 = (adc_temp / 131072.0 - self.dig_temp[0] / 8192.0) * (adc_temp / 131072.0 - self.dig_temp[0] / 8192.0) * self.dig_temp[2]
            self.t_fine = __v1 + __v2
            __temperature = self.t_fine / 5120.0
            return __temperature
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理
    
    def compensate_humi(self, adc_humi):
        try:
            __var_h = self.t_fine - 76800.0
            if __var_h != 0:
                __var_h = (adc_humi - (self.dig_humi[3] * 64.0 + self.dig_humi[4]/16384.0 * __var_h)) * (self.dig_humi[1] / 65536.0 * (1.0 + self.dig_humi[5] / 67108864.0 * __var_h * (1.0 + self.dig_humi[2] / 67108864.0 * __var_h)))
            else:
                return 0
            __var_h = __var_h * (1.0 - self.dig_humi[0] * __var_h / 524288.0)
            if __var_h > 100.0:
                __var_h = 100.0
            elif __var_h < 0.0:
                __var_h = 0.0
            return __var_h
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理


class MQ135Class: # MQ-135センサークラス

    def __init__(self): # コンストラクタ
        try:
            self.spi = spidev.SpiDev()
            self.spi.open(0, 0) # 0:SPI0、0:CE0
            self.spi.max_speed_hz = 1000000 # 1MHz
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def read_mq135_data(self, sample_interval, sample_times): # MQ-135からデジタル値とアナログ値を読み取る
        try:
            __digital = 0
            if GPIO.input(MQ135_GPIO) == GPIO.LOW: # ベンゼン、アルコール、煙が検出された
                __digital = 1
    
            CHN = 0 # ADコンバーター接続チャンネル
            __analog = 0.0
            for i in range(sample_times): # 複数回読み込んで平均を算出して誤差を少なくする
                __analog += self.get_adc_data(CHN) # ADコンバーターからの値を合算する
                sleep(sample_interval / 1000) # 間隔を空ける
            __analog = __analog/sample_times # 平均を算出する(数値が高いほど空気が汚れている)
            return __digital, __analog
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def get_adc_data(self, channel): # ADコンバーターから0~4095の値で電圧を取得する
        try:
            dout = self.spi.xfer2([((0b1000+channel)>>2)+0b100,((0b1000+channel)&0b0011)<<6,0]) # Din(RasPi→MCP3208)を指定
            bit12 = ((dout[1]&0b1111) << 8) + dout[2] # Dout(MCP3208→RasPi)から12ビットを取り出す
            return float(bit12) # 0~4095
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def __del__(self): # デストラクタ
        self.spi.close()
  
class EnvSensorClass: # 環境センサークラス
    BME280_BUS_NUMBER = 1
    BME280_I2C_ADDRESS = 0x76 # BME280のI2Cのアドレス

    def __init__(self): # コンストラクタ
        INITIAL_FILE= './initial.json' # 初期設定ファイル

        try:
            GPIO.setwarnings(False) # GPIO.cleanup()をしなかった時のメッセージを非表示にする
            GPIO.setmode(GPIO.BCM) # ピンをGPIOの番号で指定
            GPIO.setup(MQ135_GPIO, GPIO.IN)
            with open(INITIAL_FILE) as f: # 初期設定ファイルの読み込み
                __jsn = json.load(f)
                self.client_id = __jsn['client_id'] # 端末ID
                __awsport = __jsn['awsport'] # AWS接続用ポート番号
                __awscert = __jsn['awscert'] # AWS認証用
                __awskey = __jsn['awskey'] # AWSプライベートキー
                __awsroot_ca = __jsn['awsroot_ca'] # AmazonRootCA
                __awsend_point = __jsn['awsend_point'] # AWSエンドポイント
                self.gskey_name = __jsn['gskey_name'] # 設定用のGoogleSheet
                self.gssheet_name = __jsn['gssheet_name'] # GoogleSheetのName
                self.gssheet_link = __jsn['gssheet_link'] # 温度設定シートへのリンク
                # self.token = __jsn['line_token'] # LINE用tokenの読み込み(LINEにメッセージを送信する時には使用する)
            # 初期値の設定
            self.client = mqtt.Client(self.client_id, protocol=mqtt.MQTTv311) #MQTT初期化
            self.client.tls_set(ca_certs=__awsroot_ca, # TLS通信のセット
                                certfile=__awscert,
                                keyfile=__awskey,
                                cert_reqs=ssl.CERT_REQUIRED,
                                tls_version=ssl.PROTOCOL_TLSv1_2,
                                ciphers=None)
            self.client.connect(__awsend_point, port=__awsport, keepalive=60) #AWS IoT coreに接続
            self.client.loop_start() # ループスタート
            self.bme280 = Bme280Class(self.BME280_BUS_NUMBER, self.BME280_I2C_ADDRESS) # BME280クラスのインスタンス作成
            self.mq135 = MQ135Class() # MQ135クラスのインスタンス作成
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def get_setting_info(self): # GoogleSheetよりclient_id毎の設定情報を読み込む
        try:
            __scope = ['https://spreadsheets.google.com/feeds','https://www.googleapis.com/auth/drive']
            __credentials = ServiceAccountCredentials.from_json_keyfile_name(self.gskey_name, __scope)
            __gc = gspread.authorize(__credentials)
            __wks = __gc.open(self.gssheet_name).sheet1 # 該当シートをオープン
            __records = __wks.get_all_values() # __recordsに全ての値を保存する
            __row_count = len(__records) # 行数を取得
            self.location_id = ''
            for i in range(1, __row_count): # 最下行まで繰り返す
                if __records[i][0] == self.client_id: # ClientIDが一致する行の値を取り出す
                    self.location_id = __records[i][1] # ロケーションID
                    self.location_name = __records[i][2] # ロケーション名
                    __interval = __records[i][3] # 温湿度取得間隔(分)
                    __upLoad_interval = __records[i][4] # データアップロード間隔(分)
                    self.min_temp = __records[i][5] # 最低温度(しきい値)
                    self.max_temp = __records[i][6] # 最高温度(しきい値)
                    self.min_humi = __records[i][7] # 最低湿度(しきい値)
                    self.max_humi = __records[i][8] # 最高湿度(しきい値)
                    self.adj_temp = __records[i][9] # 温度調整値
                    self.adj_humi = __records[i][10] # 湿度調整値
                    self.alert = __records[i][11] # アラート送信するかどうか(1:発信、0:停止)
            if self.location_id == '':
                ms = 'Google温湿度シートが取得できませんでした'
                put_error_log(ms) # エラーログ処理
            return __interval, __upLoad_interval
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def __del__(self): # デストラクタ
        GPIO.cleanup()
        self.client.disconnect() # 停止時にAWS IoT Coreからdisconnect
        del self.bme280
        del self.mq135


    def get_environment(self): # 温湿度・気圧、空気の汚れなど環境情報を取得
        WAIT = 5 # 1回で取得できなかった時待機する秒数
        READ_SAMPLE_INTERVAL = 5 # データ取得時の間隔(ミリ秒)
        READ_SAMPLE_TIMES = 50 # データ取得時の繰り返し回数

        try:
            __retry_count = 0
            while True:
                __retry_count += 1
                rtn, temp, pres, humi = self.env_value() # 温湿度を取得
                if rtn == 0 or __retry_count > 5: # 更に最大5回リトライする
                    break
                sleep(WAIT) # WAIT分待機する
            if rtn == 0: # 温湿度が取得できたら補正する(機器によって個体差があるので)
                temp = '{:.1f}'.format(temp * float(self.adj_temp)) # 補正係数を乗算する
                humi = '{:.1f}'.format(humi * float(self.adj_humi))
                pres = '{:.0f}'.format(pres)
            air_digital, air_analog = self.mq135.read_mq135_data(READ_SAMPLE_INTERVAL, READ_SAMPLE_TIMES) # MQ135から空気のデジタル値、アナログ値を読み取る
            return rtn, temp, pres, humi, air_digital, air_analog
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def env_value(self): # 温湿度を取得
        MAX_RETRY = 5 # 最大リトライ回数
        RETRY_TIME = 4 # リトライまでの待機秒数
        try:
            __retry_count = 0
            while True: # MAX_RETRY回まで繰り返す
                __retry_count += 1
                __temp, __pres, __humi = self.bme280.read_data(self.BME280_I2C_ADDRESS) # 温湿度、気圧を取得
                if __temp is not None: # 取得できたら温度と湿度を返す
                    return 0, float(__temp), float(__pres), float(__humi) # returnの時にwhileを抜ける
                elif __retry_count >= MAX_RETRY:
                    return 1, 99.9, 99.9, 99.9 # 取得できなかった時は1と99.9を返す
                sleep(RETRY_TIME)
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def warning_check(self, temperature, humidity, air_digital): # 温度と湿度がしきい値を超えていないかと空気の汚れをチェックする
        try:
            LOW_TEMP  = '低温度注意!'
            HIGH_TEMP = '高温度注意!'
            LOW_HUMI  = '低湿度注意!'
            HIGH_HUMI = '高湿度注意!'
            AIR_CHECK = '空気の汚染注意!'
            __temp_check = 'OK'
            if float(temperature) < float(self.min_temp):
                __temp_check = LOW_TEMP
            elif float(temperature) > float(self.max_temp):
                __temp_check = HIGH_TEMP
            __humi_check = 'OK'
            if float(humidity) < float(self.min_humi):
                __humi_check = LOW_HUMI
            elif float(humidity) > float(self.max_humi):
                __humi_check = HIGH_HUMI
            __air_check = 'OK'
            if air_digital == 1: # 空気の汚れ
                __air_check = AIR_CHECK
            if (__temp_check != 'OK' or __humi_check != 'OK' or __air_check != 'OK') and self.alert == '1': # アラートが発生していて発信フラグがオンなら
                self.alert_publish(temperature, humidity, __temp_check, __humi_check, __air_check) # アラートをPublish
            return __temp_check, __humi_check        
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理
            
    def alert_publish(self, temperature, humidity, temp_check, humi_check, air_check): # しきい値を超えているのでアラートをPublishする
        DRIVE_LINK = 'https://drive.google.com/open?id=' # 温度設定シート(GoogleSheet)へのリンク
        try:
            __email = '' # メッセージを組み立てる
            if temp_check != 'OK':
                __email = __email + temp_check + '\n'
            if humi_check != 'OK':
                __email = __email + humi_check + '\n'
            if air_check != 'OK':
                __email = __email + air_check + '\n'
            __email = __email + '\nが発生しました。\n場所は' + self.location_name + '(' + self.location_id + ')です。' \
                    + '\n検出された温度は' +  temperature + '度\n湿度は' + humidity + '%です。\n端末番号は' + self.client_id + 'です。' \
                    + '\n温湿度設定シートを確認してください\n' + DRIVE_LINK + self.gssheet_link
            __message = {'default':'',
                        'email':__email}
            self.client.publish(ARTOPIC1+TOPIC2+TOPIC3+'/'+self.client_id+TOPIC5, json.dumps(__message)) # AWS IoTにAlert送信(Publish)
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def save_temp_csv(self, rtn, now, temperature, humidity, temp_check, humi_check, pres, air_digital, air_analog): # アップロードするまでは温湿度をローカルディスクにCSV保存
        try:
            if (os.path.isfile(TEMP_CSV)): # ファイルが存在しているとき
                df = pd.read_csv(TEMP_CSV, header=0, encoding='UTF-8') # 読み込み 0行目がヘッダー(有り)
            else: # ファイルが無ければヘッダーを作成
                df = pd.DataFrame(columns=['client_id',
                                           'location_id',
                                           'location_name',
                                           'temp_date_time',
                                           'status',
                                           'temperature',
                                           'temp_check',
                                           'min_temp',
                                           'max_temp',
                                           'humidity',
                                           'humi_check',
                                           'min_humi',
                                           'max_humi',
                                           'pressure',
                                           'air_digital',
                                           'air_analog'])
            __now_tmp = pd.Series( [self.client_id,
                                    self.location_id,
                                    self.location_name,
                                    '{0:%Y-%m-%d}T{1:%H:%M:%S}'.format(now,now),
                                    rtn,
                                    temperature,
                                    temp_check,
                                    self.min_temp,
                                    self.max_temp,
                                    humidity,
                                    humi_check,
                                    self.min_humi,
                                    self.max_humi,
                                    pres,
                                    air_digital,
                                    air_analog], index=df.columns)
            df = df.append(__now_tmp, ignore_index=True ) # dataframeを作成して行追加、ignore_indexで新たな行番号を振っている
            df.to_csv(TEMP_CSV, index=False) # CSV書き込み indexは書き込まない
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理
       
    def mqtt_publish(self): # MQTTでAWS IoT Coreにpublish ファイルはバックアップ
        BACK_CSV  = './backup/temp_humi_' # csvファイル保存用ファイル
        try:
            if (os.path.isfile(TEMP_CSV)): # ファイルが存在しているとき
                df = pd.read_csv(TEMP_CSV, header=0, encoding='UTF-8') # 読み込み 0行目がヘッダー(有り)
            for i in range(df.shape[0]):
                __message = {'tempkeyindex':df.iat[i, 3]+self.client_id, # インデックス(YYYY-MM-DD+CliendID)
                           'clientid':str(df.iat[i, 0]), # 端末ID
                           'locationid':str(df.iat[i, 1]), # ロケーションID
                           'locationname':str(df.iat[i, 2]), # ロケーション名
                           'tempdatetime':df.iat[i, 3], # 日付時刻(YYYY-MM-DD HH:MM:SS)
                           'status':str(df.iat[i, 4]), # status(0:温湿度が正常に取得された、1:取得されなかった)
                           'temperature':float(df.iat[i, 5]), # 温度
                           'tempcheck':str(df.iat[i, 6]), # 温度しきい値チェック結果
                           'min_temp':float(df.iat[i, 7]), # 最低温度しきい値
                           'max_temp':float(df.iat[i, 8]), # 最高温度しきい値
                           'humidity':float(df.iat[i, 9]), # 湿度
                           'humicheck':str(df.iat[i, 10]), # 湿度しきい値チェック結果
                           'min_humi':float(df.iat[i, 11]), # 最低湿度しきい値
                           'max_humi':float(df.iat[i, 12]), # 最高湿度しきい値
                           'pressure':float(df.iat[i, 13]), # 気圧
                           'air_digital':str(df.iat[i, 14]), # 空気の汚れ 0:クリーン、1:汚れている
                           'air_analog':float(df.iat[i, 15])} # 空気の汚れアナログ値(0~4095)
                self.client.publish(TOPIC1+TOPIC2+TOPIC3+'/'+self.client_id+TOPIC5, json.dumps(__message)) # AWS IoTに送信(Publish)
            # ファイルをバックアップ    
            __time_stamp   =  '{0:%Y-%m-%d-%H-%M-%S}'.format(dt.datetime.now()) # 日付時刻をセット
            __backup_file = BACK_CSV + __time_stamp + '.csv' # ディレクトリ、ファイル名をセット
            os.rename(TEMP_CSV, __backup_file) # タイムスタンプ付きのファイルにファイル名変更(バックアップ)
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理
            
    def backup_maint(self): # 保存件数を超えたバックアップファイルを古い順から削除する
        MAX_NUMBER_OF_FILE = 1000 # 最大保存ファイル数
        BACK_DIR  = './backup/' # Backup用ディレクトリ
        try:
            __files = os.listdir(BACK_DIR)  # ディレクトリ内のファイルリストを取得
            if len(__files) > MAX_NUMBER_OF_FILE: # MAX_NUMBER_OF_FILE件を超える場合
                __files.sort()  # ファイルリストを昇順に並び替え
                __del_count = len(__files) - MAX_NUMBER_OF_FILE # MAX_NUMBER_OF_FILE件を超える件数を計算
                for i in range(__del_count): # 古い順から1000件を超えているファイルを削除する
                    os.remove(BACK_DIR + __files[i])  # 削除            
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理


#main
MAIN_INTERVAL = 5 # mainループの待機秒

try:
    if __name__ == '__main__':
        print('--- start program ---')
        os.chdir(os.path.dirname(os.path.abspath(__file__))) # カレントディレクトリをプログラムのあるディレクトリに移動する
        env = EnvSensorClass() # 温湿度センサークラス、インスタンスの作成
        interval, upLoad_interval = env.get_setting_info() # 設定Googleシートの読み込み
        last_Minute = last_Minute_up = last_Minute_bk = '{0:%Y-%m-%d %H:%M}'.format(dt.datetime.now()) # 前回の分をセット
        while True:
            now = dt.datetime.now() # 現在時刻を取得
            mod_minute_int = now.minute % int(interval) # 分が温湿度取得間隔(分)の倍数かをチェック
            mod_minute_up = now.minute % int(upLoad_interval) # 分がデータアップロード間隔(分)の倍数かをチェック
            # 温湿度を測定してしきい値を超えているかをチェックする
            if (mod_minute_int == 0 and # 分が倍数で
                last_Minute != '{0:%Y-%m-%d %H:%M}'.format(now)): # 同一分では1回のみ
                rtn, temp, pres, humi, air_digital, air_analog = env.get_environment() # 温湿度気圧などの環境情報を取得
                if rtn == 0: # 正常に取得できた時
                    temp_check, humi_check = env.warning_check(temp, humi, air_digital) # しきい値を超えているかをチェック
                else:
                    temp_check = humi_check = 'NC'
                env.save_temp_csv(rtn, now, temp, humi, temp_check, humi_check, pres, air_digital, air_analog) # 温湿度としきい値のチェック結果をローカルディスクにCSV保存
                last_Minute = '{0:%Y-%m-%d %H:%M}'.format(now) # 同一分で複数回実行されれないように時刻をセット
            # ローカルに蓄えた温湿度情報をPublsih
            if (mod_minute_up == 0 and # 分が倍数で
                last_Minute_up != '{0:%Y-%m-%d %H:%M}'.format(now)): # 同一分では1回のみ
                env.mqtt_publish() # AWS IoT Coreにpublish
                last_Minute_up = '{0:%Y-%m-%d %H:%M}'.format(now) # 時刻をセット
                interval, upLoad_interval = env.get_setting_info() # 設定Googleシートの読み込み(Publish時に再読み込み)
            # 毎晩MAX_NUMBER_OF_FILE件を超えるファイルは古い順から削除
            if (now.hour == 0 and now.minute == 0 and # 毎日0時0分
                last_Minute_bk != '{0:%Y-%m-%d %H:%M}'.format(now)): # 同一分では1回のみ
                env.backup_maint() # 過去データの削除
                last_Minute_bk = '{0:%Y-%m-%d %H:%M}'.format(now) # 同一分で複数回実行されれないように時刻をセット
            sleep(MAIN_INTERVAL)
except KeyboardInterrupt:
    pass
finally:
    del env
    print('--- end program ---')

補足説明

それぞれの関数の概要について補足説明。

ソース中にコメントを入れているので基本的にはそちらを参照して欲しい。

put_error_log

それぞれの関数で何らかの例外が発生した時に後でエラー原因を調べるためエラーログファイルを出力する為の共通関数。

Bme280Class

温湿度、気圧センサー(BME280)のクラス。

このクラスの説明については以前の記事を参照して欲しい。

MQ135Class

空気の汚れを検知するセンサー(MQ-135)のクラス。

デジタル値およびアナログ値をADコンバーターを介してSPI(Serial Peripheral Interface)で受け取っている。

デジタル値は空気が汚れている/いないの2つの値しか検出できない。

一方、アナログ値は電圧を0~4095の値に変換して(空気が汚れている程、値が高い)アナログ数値で空気の汚れを検知する。

read_mq135_data

MQ-135からデジタル値で検出した空気の汚れとアナログ値をADコンバーター経由で検出している。

尚、アナログ値は複数回取得(5ミリ秒毎に50回取得)して平均値を算出する事で検出誤差を少なくしている。

get_adc_data

ADコンバーター(MCP3208)に信号を送って返ってきた値(0~4095)を取得している。

ADコンバーターからの値の取得の詳細は以前の記事を参照して欲しい。

EnvSensorClass

温湿度、気圧センサー(BME280)と空気の汚れを検知するセンサー(MQ-135)の両方を制御するクラス。

初期処理で初期設定ファイルより各種キー情報の取得とAWS IoT CoreへパブリッシュするMQTTプロトコルの初期化などを行っている。

get_setting_info

Google Sheetより温湿度のしきい値などが設定されたシートを読み込み、上から順番に該当行を検索する。

該当行が見つかったらしきい値を取得する。

get_environment

温湿度、気圧、空気の汚れを取得している。

エラーの際は複数回リトライして、取得できた場合には補正係数を温湿度に乗算している。

warning_check

取得した温湿度がGoogle Sheetで設定されたしきい値を超えていないかのチェックを行っている。

alert_publish

しきい値を超えていた際にアラートトピックをパブリッシュする。

またその際にAmazon SNSで送信するメール本文も組み立てる。

save_temp_csv

温湿度、気圧等の情報をローカルディスクにCSVファイルとして保存する。

尚、このCSVファイルはクラウドにパブリッシュ後も一定期間保存している。

mqtt_publish

AWS IoT CoreにMQTTプロトコルでパブリッシュ(送信)する。

ローカルに保存されているCSVファイルを元にメッセージをJSON形式で組み立てて保存行数だけパブリッシュする。

またパブリッシュしたCSVファイルはファイル名の一部をタイムスタンプに置き換えて別ディレクトリに保存する。

backup_maint

ローカルディスクに保存しているCSVファイルからMAX_NUMBER_OF_FILE(最大ファイル保存数)を超えた分のファイルを古い順に削除する。

ローカルディスクを圧迫しないようにする為の関数。

main

whileでループをしながらMAIN_INTERVAL(5秒)間隔で現在時刻をチェックして以下の処理を行う関数を呼び出している。

  1. Google Sheetで設定されている間隔(分)で環境情報(温湿度、気圧など)をチェックする
  2. Google Sheetに設定されている間隔(分)でAWS IoT Coreに環境情報をパブリッシュ(送信)する
  3. ローカルディスクに保存されている保存用のCSVファイルが一定数以上になったら古い順に削除する

以上でプログラムに関する説明は終了。

kibanaの設定

続いてkibanaの設定を行うのだがkibanaの設定を行う前にプログラムを動作させてRaspberryPi→AWS IoT Core→Amazon Elasticsearch Serviceへとデータを連携させた上でkibanaの設定を行った方がやりやすい。

大まかな作業手順は下記の通り。

  1. 基本的な設定を行う
  2. インデックスパターンを作成する
  3. visualizationを作成する
  4. ダッシュボードに上記で作成したvisualizationを貼り付ける

kibanaは非常に高機能なダッシュボードで素晴らしいと思うのだが英語の記事は多い一方、日本語の記事が少ないと感じる。

直感的な部品の配置とプロパティの設定で簡単に実現できる事と複雑な設定やちょっとしたコーディングをすれば実現できる事があって、後者の方が日本語の記事が少なくてちょっと手間取った。

分からないなりに試行錯誤をしながら試したのでもしかして間違っている部分があれば指摘をして貰えるとありがたい。

kibanaの起動

kibanaのリンクをクリックしてkibanaを起動する。

kibanaの起動

設定

最初に歯車アイコンをクリックして各種設定を行う。

設定

“Advanced Setting”を選択してDate formatを”YYYY-MM-DD HH:mm:ss”に変更してSAVEする。

Date format

インデックスパターンの作成

ここではElasticsearch Serviceに連携されてくる複数のインデックスファイルを一つにまとめて扱う為のパターンを定義する。

設定から”Index Patterns”、「Create Index pattern」ボタンをクリックする。

インデックスの作成

index pattern欄に”temp-app-*”と入力すると既に連携されている対象のデータが下に表示される。

前述のACTの設定(Elasticsearch)の索引の設定でtemp-app-YYYYMMDD(日付毎に異なるファイル)の形式でインデックスを作成するように設定している。

パターンの日付部分にワイルドカードのアスタリスクを指定しているので検索結果に対象のデータが表示される。

データの表示を確認したら「Next step」ボタンをクリックする。

インデックスパターンの作成

“Time Filter Field name”でtempdatetimeを選択して「Create index pattern」ボタンをクリックする。

インデックスの作成

インデックスの項目一覧が表示される。

インデックスの項目一覧

visualization

続いて表示する項目別にvisualization(表示項目)の作成を行う。

温度、湿度を一つのvisualizationに表示して、もう一つのvisualizationに気圧と空気の汚れを表示することにする。

温湿度

温湿度を表示するvisualizationを作成する。

新規作成

左のメニューからvisualizationアイコンをクリックして「Create new visualization」ボタンをクリックする。

Create new visualization

グラフを表示したいのでLineを選択する。

Lineを選択する

先ほど作成したインデックスパターン、”temp-app-*”を選択する。

インデックスパターンの選択
Y軸

Dataタブを選択してY-axis(Y軸)をクリックする。

DataのY軸を設定する
  • Aggregation:Top Hit(先頭行)
  • Field:temperature(温度)
  • Aggregate with:Max
  • Size:1
  • Sort on:temperature
  • Order:Descending(降順)
  • Custom label:温度

を設定する。

Y軸項目の設定

湿度も同じvisualizationに表示したいのでAddをクリックしてもう一つY-axis(Y軸)を追加する。

Y軸をもう一つ追加する
  • Aggregation:Top Hit(先頭行)
  • Field:humidity(湿度)
  • Aggregate with:Max
  • Size:1
  • Sort on:humidity
  • Order:Descending(降順)
  • Custom label:湿度

を設定する。

湿度のY軸

途中で▶を押すと右側にグラフがプレビュー表示される。

プレビュー表示

一旦、保存したい時は左上のSAVEボタンを押す。

Title欄に”test_temp_humi”と名前をつけて「Confirm Save」ボタンで保存する。

名前を付けて保存
X軸

続いてX軸の設定を行う。

X軸は日付時刻を表示したいのでBukets、AddでX-axisをクリックする。

X軸の追加
  • Aggregation:Date Histogram
  • Field:tempdatetime(日付項目はこれだけなので自動的にセットされる)
  • Minimum interval:Auto
  • Custom label:日付時刻
X軸の項目の設定

Add、Split seriesをクリックする。

Split

範囲を直近一週間にしたいので、

  • Sub aggregation:Date Range
  • Field:tempdatetime
  • From:now-7d/d+15h(過去一週間分)
  • To:now+9h(世界標準時間+9時間で日本時間に変換している)

を設定する。

しかしこの後のダッシュボードでも日付時刻の範囲を設定できるのでvisualizationで無理に設定しなくても良いのかも知れない。

split

凡例の先頭に日付時刻が表示されてしまうので目のアイコンをクリックして非表示にする。

splitを非表示にする

Metrics & Axesタブ

指標と軸に関する設定を行うMetrics & Axesタブを選択する。

前半部分のMetrics欄は温度指標ともに変更点は無し。

下の方のY-Axes(Y軸)のTitleを”温湿度”に変更する。

タイトルの変更

X-AxisのRotateをAngledに変更すると日付時刻が斜めに表示される。

X軸の日付時刻の表示を斜めにする

Panel Settingsタブ

続いてPanel Settingsタブを選択する。

Grid(縦横の格子線)を表示したいのでGridでShow X-axis linesをオンにしてY-axis linesでLeftAxis-1を選択する。

グリッドを表示する

以上で温湿度のvisualizationの設定は終了。

気圧、空気の汚れ

気圧、空気の汚れをグラフで表示するvisualizationを作成する。

尚、気圧は1,000前後の数値、空気の汚れは0~4095の数値で表されるので温度と湿度とは違うグラフにしている。

visualizationの作成方法については温湿度とほぼ同じなので省略する。

気圧、空気の汚れのグラフ

Dashboard

ダッシュボードを作成して先程作成したvisualizationを配置する。

左のメニューからDashboardアイコンをクリックして、「Create new dashboard」ボタンをクリックする。

Create New Dashboard

上のメニューから”Add”で項目を追加する。

Addで項目の追加

検索欄でvisualizationのTitleの一部を入力して検索するとtest_temp_humiのlineが呼び出されるので選択する。

パネルを検索

クリックするとダッシュボードに追加される。

ダッシュボードに配置される

パネルのタイトルにLineのTitleが表示されているので非表示にしたい。

パネルの右上の歯車アイコン、Customize Panelsを選択してShow panel titleをオフにして「Save」ボタンをクリックする。

パネルのタイトルを非表示にする

ダッシュボードを途中で保存したい時は左上の”Save”をクリックしてTitleに名前をつけて(test_temp_humi_dashboardとした)「Confirm Save」ボタンをクリックする。

ダッシュボードの保存

同様の手順で気圧、空気の汚れを表示するパネルを貼り付けてプログラムをしばらく動作させた。

下記の画面ショットは直近1時間のグラフを表示している。

直近1時間

直近24時間のグラフ。

直近24時間

kibanaの設定はここまでとする。

プログラムの自動起動

デフォルトのラズパイゼロのOSはデスクトップ(GUI)環境なのだがこのプログラムを動作させるだけであればデスクトップである必要は無い。

使用するリソースの少ないCLI(Command line interface)に変更をしておいた方が安定して動作する可能性が上がる。

また電源を入れた時に自動的にプログラムが動作して、万が一異常終了してしまっても再び自動で起動するような設定にしておきたい。

以前のこちらの記事を参照して、

  • ラズパイゼロをデスクトップからCLIに変更
  • プログラムの自動起動

の設定を行う。

以上で今回の記事は終了とする。

最後に

この記事が何処かで誰かの役に立つことを願っている。

尚、当記事中の商品へのリンクはAmazonアソシエイトへのリンクが含まれています。Amazonのアソシエイトとして、当メディアは適格販売により収入を得ていますのでご了承ください。

souichirou

やった事を忘れない為の備忘録 同じような事をやりたい人の参考になればと思ってブログにしてます。 主にレゴ、AWS(Amazon Web Services)、WordPress、Deep Learning、RaspberryPiに関するブログを書いています。 仕事では工場に協働ロボットの導入や中小企業へのAI/IoT導入のアドバイザーをやっています。 2019年7月にJDLA(一般社団法人 日本デイープラーニング協会)Deep Learning for GENERALに合格しました。 質問は記事一番下にあるコメントかメニュー上部の問い合わせからお願いします。

おすすめ

質問やコメントや励ましの言葉などを残す

名前、メール、サイト欄は任意です。
またメールアドレスは公開されません。