IoT機器で温湿度を測定してクラウドでグラフ化 プログラミング編(その5) | そう備忘録

IoT機器で温湿度を測定してクラウドでグラフ化 プログラミング編(その5)

温湿度のグラフ化

RaspberryPi Zeroと温湿度モジュール(DHT22)を使用して温湿度を指定した間隔(分)で測定してクラウド(AWS IoT Core)にデータをアップロードしてグラフ化するまでの5回目。

過去記事は以下の通り。

今回の記事の範囲は事前準備とラズパイゼロで動作するPythonのプログラミングとする。

システム概要図

システム概要図と今回の記事の範囲は以下の通り。

5回目プログラミングの記事の範囲

主な実現したい機能は以下の通り。

  • 指定した間隔(分)で温湿度を測定する
  • 指定した間隔(分)でデータをサーバーにアップロードする
  • アップロードしたデータは集計、グラフ表示する
  • 温湿度のしきい値(上限、下限)を超えた場合はアラートを発信する
  • しきい値は場所毎に異なる値を指定出来るようにする
  • 設定情報はGoogle Sheetで一括指定して設定値を変更したら各端末に速やかに反映させる
  • それぞれの端末は遠隔で再起動等の操作が出来るようにする
  • 測定端末は複数台設置するが基本的に室内とする(屋外には設置しない)
  • 100V電源を確保できる場所とする(バッテリー駆動も一応試してみる)
  • 初期コスト、ランニングコストともに出来るだけローコストで実現する

事前作業

Pythonのプログラム中で必要なモジュールを事前にインストールする。

gspread

Google Sheetにアクセスする為のモジュールgspreadをインストールしている。

インストール手順、Google Sheetの詳細については「IoT機器で温湿度を測定してクラウドでグラフ化 AWS IoT Core編(その2)」Google Sheetsへのアクセスを参照の事。

paho-mqtt

ラズパイゼロとAWS IoT Coreとの間はMQTT(Message Queueing Telemetry Transport )というIoTに適したプロトコルで通信を行う。

ラズパイ上で動作するMQTT用のモジュールは幾つかあるがpaho-mqttを選択している。

paho-mqttについての詳細はこちらの記事を参照

インストール

以下のコマンドでラズパイゼロにpaho-mqttモジュールをインストールする。

sudo pip3 install paho-mqtt

paho-mqtt Ver1.4.0がインストールされた。

paho-mqttのインストール

pandas

読み取った温湿度情報をAWS IoT CoreにPublishするまでの間、ローカルのディスクにCSV形式で保存しておく為にpandasを利用している。

本来pandasはデータ分析ライブラリなのでCSVを扱う為だけにインストールするのはオーバースペックだとは思う。

使いやすいので使ってしまっているがpandasを使う程でも無いので気になる方は必要に応じてプログラムを修正して欲しい。

sudo apt-get install python3-pandas

pandasのインストール

dht22

DHT22温湿度モジュール

温湿度センサーモジュールDHT22から温湿度情報を取得するPythonのモジュールを探したのだが良さげなのが見つからなかった。

仕方が無いのでDHT11用のPythonのモジュールがGithubに上がっていたのでそのソースコードをDHT22用に改造して使うことにした。

修正方法については以前の「ラズパイで温湿度を測定(DHT22)」の変更内容の記事を参照の事。

  • start signalの変更
  • 温度、湿度の計算方法の変更

の2箇所を変更している。

プログラム

ディレクトリ構造

プログラムは /opt/ 配下に temp-sensor というディレクトリを作成して格納した。

ubuntu の説明では /opt は主にサードパーティ製のプログラムや自作のプログラムを格納する場所とあったのでこのディレクトリにしている。

2021年04月29日 追記

今回は /opt/ 配下に格納したが、特定のユーザからしか起動しないプログラムなので /usr/local/ への格納でも良かったのかも知れない。

├─opt
│  │      
│  ├─temp-sensor
│  │  │  temp-sensor-DHT22.py
│  │  │  initial.json
│  │  │
│  │  ├──backup
│  │  │    temp_humi_YYYY-MM-DD-HH-mm-SS.csv
│  │  │        ・
│  │  │        ・
│  │  │
│  │  ├──cert
│  │  │    XXXXXXXXXX-certificate.pem.crt
│  │  │    XXXXXXXXXX-private.pem.key
│  │  │    AmazonRootCAXXXX.pem
│  │  │    GoogleSheetKeyFileXXXXXXX.json
│  │  │
│  │  ├──csv
│  │  │    temp_humi.csv
│  │  │
│  │  ├──DHT22_Python
│  │  │    __init__.py
│  │  │    dht22.py
│  │  │
│  │  └──log
│  │       error.log
│  │

temp-sensor

temp-sensor-DHT22.py

プログラム本体

initial.json

初期設定ファイル

{
	"client_id": "SKRPZ0001",
	"awsport": 8883,
	"awscert": "./cert/XXXXXXXXXX-certificate.pem.crt",
	"awskey": "./cert/XXXXXXXXXX-private.pem.key",
	"awsroot_ca": "./cert/AmazonRootCAXXXX.pem",
	"awsend_point": "XXXXXXXXXXX-ats.iot.ap-northeast-1.amazonaws.com",
	"gskey_name": "./cert/GoogleSheetKeyFileXXXXXXX.json",
	"gssheet_name": "温湿度設定シート",
	"gssheet_link": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
}

client_id

端末を識別する為のID

以前の記事のIoT Coreでのポリシーの設定にて先頭4文字が”SKRP”の端末のみ接続可能にしているので注意が必要

awsport

AWS IoT Coreに接続するためのポート番号

awscert

以前の記事で発行したモノの証明書を格納したPathとファイル名を指定している

awskey

以前の記事で発行したIoT Coreに接続するためのプライベートキーのPathとファイル名を指定している

awsroot_ca

以前の記事で発行したAmazon Root CAのPathとファイル名を指定している

awsend_point

AWS IoT Coreのエンドポイント

以前にIoT Coreの設定を行った時に控えておいたエンドポイントを指定する

gskey_name

Googleシートにアクセスする為のJSON形式の秘密鍵のPathとファイル名を指定している

gssheet_name

設定用のGoogle Sheetsのシート名

以前の記事で控えておいた値

gssheet_link

設定用Google Sheetsのリンク

以前の記事で控えておいた値

backup

測定した温湿度の情報をIoT CoreにPublishする前に一時的にローカルのディスクに保存している。

Publishした後はCSVファイルはディスクから削除するが一定期間バックアップの意味でこのディレクトリにファイルを保存している。

ファイル名はtemp_humi_YYYY-MM-DD-HH-mm-SS.csvの形式で保存されてていて、プログラム中の定数(MAX_NUMBER_OF_FILE)を超えたファイルは古い順から削除している。

cert

認証情報が保存されているディレクトリ。

XXXXXXXXXX-certificate.pem.crt

以前の記事で発行したモノの証明書ファイル

XXXXXXXXXX-private.pem.key

以前の記事で発行したIoT Coreに接続するためのプライベートキーファイル

AmazonRootCAXXXX.pem

以前の記事で発行したAmazon Root CA

GoogleSheetKeyFileXXXXXXX.json

Googleシートにアクセスする為のJSON形式の秘密鍵のファイル

csv

温湿度情報をIoT CoreにPublishする前にローカルディスクに一時的保存する為のディレクトリ。

ファイル名はtemp_humi.csvで保存している。

DHT22_Python

温湿度モジュールDHT22から情報を取得するためのPythonのモジュール。

__init__.py

モジュールimportする時の初期化処理

特に何も記述していない

dht22.py

dht11.pyをdht22用に改造したモジュール

修正内容はこちらの記事を参照

log

各種ログを保存する為のディレクトリ。

Pythonのプログラム中で何らかの例外が発生した時にerror.logのファイル名でエラーメッセージ等を保存している。

プログラム本体

プログラム本体のソースコードは以下の通り。

# -*- coding: utf-8 -*-
"""
Created on Sun Jul 26 09:42:05 2020

@author: Souichirou Kikuchi
RaspberryPi Zero
・ Google Sheetで指定された間隔で温湿度を取得
・ 同じく指定された間隔でデータをAWS IoT Coreに情報をPublish
・ しきい値を超えている場合はアラートを発信(Amazon SNSでメール配信)
・ 温度、湿度センサーはDHT22を使用(補正あり)

"""

import datetime as dt # 日付時刻
from time import sleep
import RPi.GPIO as GPIO # GPIO
import ssl # SSL
import os # OSコマンド
import sys # ErrorLog出力用
import json # JSON形式のファイルの扱い
from DHT22_Python import dht22 # 温湿度センサー(カスタマイズ)
import gspread # Google SpreadSheet
from oauth2client.service_account import ServiceAccountCredentials # GoogleSheet認証用
import pandas as pd # 温湿度CSV保存
import paho.mqtt.client as mqtt # MQTT

TEMP_CSV  = './csv/temp_humi.csv' # アップロードするまで温湿度をcsvファイルに保存しておく
TOPIC1    = 'dt' # データを表す
TOPIC2    = '/TempAndHumi' # アプリケーション名
TOPIC3    = '/XXX' # 場所を指定する
TOPIC5    = '/temp-humi' # 温湿度 TOPIC4はclient_idがセットされる
ARTOPIC1  = 'alert' # アラート用のTOPIC

class EnvSensorClass: # 温湿度センサークラス
    def __init__(self): # コンストラクタ
        INITIAL_FILE= './initial.json' # 初期設定ファイル
        try:
            with open(INITIAL_FILE) as f: # 初期設定ファイルの読み込み
                __jsn = json.load(f)
                self.client_id = __jsn['client_id'] # 端末ID
                __awsport = __jsn['awsport'] # AWS接続用ポート番号
                __awscert = __jsn['awscert'] # AWS認証用
                __awskey = __jsn['awskey'] # AWSプライベートキー
                __awsroot_ca = __jsn['awsroot_ca'] # AmazonRootCA
                __awsend_point = __jsn['awsend_point'] # AWSエンドポイント
                self.gskey_name = __jsn['gskey_name'] # 設定用のGoogleSheet
                self.gssheet_name = __jsn['gssheet_name'] # GoogleSheetのName
                self.gssheet_link = __jsn['gssheet_link'] # 温度設定シートへのリンク
            # 初期値の設定
            self.client = mqtt.Client(self.client_id, protocol=mqtt.MQTTv311) #MQTT初期化
            self.client.tls_set(ca_certs=__awsroot_ca, # TLS通信のセット
                                certfile=__awscert,
                                keyfile=__awskey,
                                cert_reqs=ssl.CERT_REQUIRED,
                                tls_version=ssl.PROTOCOL_TLSv1_2,
                                ciphers=None)
            self.client.connect(__awsend_point, port=__awsport, keepalive=60) #AWS IoT coreに接続
            self.client.loop_start() # ループスタート
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def get_setting_info(self): # GoogleSheetよりclient_id毎の設定情報を読み込む
        try:
            __scope = ['https://spreadsheets.google.com/feeds','https://www.googleapis.com/auth/drive']
            __credentials = ServiceAccountCredentials.from_json_keyfile_name(self.gskey_name, __scope)
            __gc = gspread.authorize(__credentials)
            __wks = __gc.open(self.gssheet_name).sheet1 # 該当シートをオープン
            __records = __wks.get_all_values() # __recordsに全ての値を保存する
            __row_count = len(__records) # 行数を取得
            self.location_id = ''
            for i in range(1, __row_count): # 最下行まで繰り返す
                if __records[i][0] == self.client_id: # ClientIDが一致する行の値を取り出す
                    self.location_id = __records[i][1] # ロケーションID
                    self.location_name = __records[i][2] # ロケーション名
                    __interval = __records[i][3] # 温湿度取得間隔(分)
                    __upLoad_interval = __records[i][4] # データアップロード間隔(分)
                    self.min_temp = __records[i][5] # 最低温度(しきい値)
                    self.max_temp = __records[i][6] # 最高温度(しきい値)
                    self.min_humi = __records[i][7] # 最低湿度(しきい値)
                    self.max_humi = __records[i][8] # 最高湿度(しきい値)
                    self.adj_temp = __records[i][9] # 温度調整値
                    self.adj_humi = __records[i][10] # 湿度調整値
                    self.alert = __records[i][11] # アラート送信するかどうか(1:発信、0:停止)
            if self.location_id == '':
                ms = 'Google温湿度シートが取得できませんでした'
                self.put_error_log(ms) # エラーログ処理
            return __interval, __upLoad_interval
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def __del__(self):
        self.client.disconnect() # 停止時にAWS IoT Coreからdisconnect

    def get_temp(self): # 温湿度を取得
        WAIT = 5 # 1回で取得できなかった時待機する秒数
        try:
            __retry_count = 0
            while True:
                __retry_count += 1
                rtn, temp, humi = env.temp_value() # 温湿度を取得
                if rtn == 0 or __retry_count > 5: # 更に最大5回リトライする(1回で取得できないことが多いので)
                    break
                sleep(WAIT) # WAIT分待機する
            if rtn == 0: # 温湿度が取得できたら補正する(機器によって個体差があるので)
                temp = '{:.1f}'.format(temp * float(self.adj_temp))
                humi = '{:.1f}'.format(humi * float(self.adj_humi))
            return rtn, temp, humi
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def temp_value(self): # 温湿度を取得
        TEMP_SENSOR_PIN = 4 # 温湿度センサーのピンの番号
        MAX_RETRY = 10 # dht22から温湿度が取得できなかった時の最大リトライ回数
        RETRY_TIME = 4 # dht22から値が取得できなかった時のリトライまでの待機秒数
        try:
            __instance = dht22.DHT22(pin=TEMP_SENSOR_PIN) # DHT22はDHT11を改造して自作
            __retry_count = 0
            while True: # MAX_RETRY回まで繰り返す
                __retry_count += 1
                __result = __instance.read()
                if __result.is_valid(): # 取得できたら温度と湿度を返す
                    return 0, float(__result.temperature), float(__result.humidity) # returnの時にwhileを抜けている
                elif __retry_count >= MAX_RETRY:
                    return 1, 99.9, 99.9 # 取得できなかった時は1と温湿度99.9を返す
                sleep(RETRY_TIME)
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def warning_check(self, temperature, humidity): # 温度と湿度がしきい値を超えていないかをチェックする
        try:
            LOW_TEMP  = '低温度注意!'
            HIGH_TEMP = '高温度注意!'
            LOW_HUMI  = '低湿度注意!'
            HIGH_HUMI = '高湿度注意!'
            __temp_check = 'OK'
            if float(temperature) < float(self.min_temp):
                __temp_check = LOW_TEMP
            elif float(temperature) > float(self.max_temp):
                __temp_check = HIGH_TEMP
            __humi_check = 'OK'
            if float(humidity) < float(self.min_humi):
                __humi_check = LOW_HUMI
            elif float(humidity) > float(self.max_humi):
                __humi_check = HIGH_HUMI
            if (__temp_check != 'OK' or __humi_check != 'OK') and self.alert == '1': # アラートが発生していて発信フラグがオンなら
                self.alert_publish(temperature, humidity, __temp_check, __humi_check) # アラートをPublish
            return __temp_check, __humi_check        
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理
            
    def alert_publish(self, temperature, humidity, temp_check, humi_check): # しきい値を超えているのでアラートをPublishする
        DRIVE_LINK = 'https://drive.google.com/open?id=' # 温度設定シート(GoogleSheet)へのリンク
        try:
            __email = '' # メッセージを組み立てる
            if temp_check != 'OK':
                __email = __email + temp_check
            if humi_check != 'OK':
                __email = __email + '\n' + humi_check
            __email = __email + '\nが発生しました。\n場所は' + self.location_name + '(' + self.location_id + ')です。' \
                    + '\n検出された温度は' +  temperature + '度\n湿度は' + humidity + '%です。\n端末番号は' + self.client_id + 'です。' \
                    + '\n温湿度設定シートを確認してください\n' + DRIVE_LINK + self.gssheet_link
            __message = {'default':'',
                        'email':__email}
            self.client.publish(ARTOPIC1+TOPIC2+TOPIC3+'/'+self.client_id+TOPIC5, json.dumps(__message)) # AWS IoTにAlert送信(Publish)
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def save_temp_csv(self, rtn, now, temperature, humidity, temp_check, humi_check): # アップロードするまでは温湿度をローカルディスクにCSV保存
        try:
            if (os.path.isfile(TEMP_CSV)): # ファイルが存在しているとき
                df = pd.read_csv(TEMP_CSV, header=0, encoding='UTF-8') # 読み込み 0行目がヘッダー(有り)
            else: # ファイルが無ければヘッダーを作成
                df = pd.DataFrame(columns=['client_id', 'location_id', 'location_name', 'temp_date_time', 'status', 'temperature', 'temp_check', 'min_temp', 'max_temp', 'humidity', 'humi_check', 'min_humi', 'max_humi'])
            __now_tmp = pd.Series( [ self.client_id, self.location_id, self.location_name, '{0:%Y-%m-%d}T{1:%H:%M:%S}'.format(now,now), rtn,  temperature, temp_check, self.min_temp, self.max_temp, humidity, humi_check, self.min_humi, self.max_humi], index=df.columns)
            df = df.append(__now_tmp, ignore_index=True ) # dataframeを作成して行追加、ignore_indexで新たな行番号を振っている
            df.to_csv(TEMP_CSV, index=False) # CSV書き込み indexは書き込まない
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理
       
    def mqtt_publish(self): # MQTTでAWS IoT Coreにpublish ファイルはバックアップ
        BACK_CSV  = './backup/temp_humi_' # csvファイル保存用ファイル
        try:
            if (os.path.isfile(TEMP_CSV)): # ファイルが存在しているとき
                df = pd.read_csv(TEMP_CSV, header=0, encoding='UTF-8') # 読み込み 0行目がヘッダー(有り)
            for i in range(df.shape[0]):
                __message = {'tempkeyindex':df.iat[i, 3]+self.client_id, # インデックス(YYYY-MM-DD+CliendID)
                           'clientid':str(df.iat[i, 0]), # 端末ID
                           'locationid':str(df.iat[i, 1]), # ロケーションID
                           'locationname':str(df.iat[i, 2]), # ロケーション名
                           'tempdatetime':df.iat[i, 3], # 日付時刻(YYYY-MM-DD HH:MM:SS)
                           'status':str(df.iat[i, 4]), # status(0:温湿度が正常に取得された、1:取得されなかった)
                           'temperature':float(df.iat[i, 5]), # 温度
                           'tempcheck':str(df.iat[i, 6]), # 温度しきい値チェック結果
                           'min_temp':float(df.iat[i, 7]), # 最低温度しきい値
                           'max_temp':float(df.iat[i, 8]), # 最高温度しきい値
                           'humidity':float(df.iat[i, 9]), # 湿度
                           'humicheck':str(df.iat[i, 10]), # 湿度しきい値チェック結果
                           'min_humi':float(df.iat[i, 11]), # 最低湿度しきい値
                           'max_humi':float(df.iat[i, 12])} # 最高湿度しきい値
                self.client.publish(TOPIC1+TOPIC2+TOPIC3+'/'+self.client_id+TOPIC5, json.dumps(__message)) # AWS IoTに送信(Publish)
            # ファイルをバックアップ    
            __time_stamp   =  '{0:%Y-%m-%d-%H-%M-%S}'.format(dt.datetime.now()) # 日付時刻をセット
            __backup_file = BACK_CSV + __time_stamp + '.csv' # ディレクトリ、ファイル名をセット
            os.rename(TEMP_CSV, __backup_file) # タイムスタンプ付きのファイルにファイル名変更(バックアップ)
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理
            
    def backup_maint(self): # 保存件数を超えたバックアップファイルを古い順から削除する
        MAX_NUMBER_OF_FILE = 1000 # 最大保存ファイル数
        BACK_DIR  = './backup/' # Backup用ディレクトリ
        try:
            __files = os.listdir(BACK_DIR)  # ディレクトリ内のファイルリストを取得
            if len(__files) > MAX_NUMBER_OF_FILE: # MAX_NUMBER_OF_FILE件を超える場合
                __files.sort()  # ファイルリストを昇順に並び替え
                __del_count = len(__files) - MAX_NUMBER_OF_FILE # MAX_NUMBER_OF_FILE件を超える件数を計算
                for i in range(__del_count): # 古い順から1000件を超えているファイルを削除する
                    os.remove(BACK_DIR + __files[i])  # 削除            
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(str(type(ms)) + ' at ' + sys._getframe().f_code.co_name) # エラーログ処理

    def put_error_log(self, message): # エラーログファイルを出力する
        ERROR_LOG_FILE  = './log/error.log' # エラーログ
        if (os.path.isfile(ERROR_LOG_FILE)): # ファイルが存在しているとき
            __df = pd.read_csv(ERROR_LOG_FILE, header=0, encoding='UTF-8') # 読み込み 0行目がヘッダー(有り)
        else: # ファイルが無ければヘッダーを作成
            __df = pd.DataFrame(columns=['ErrorData', 'ErrorMessage'])
        __errorlog = pd.Series( ['{0:%Y-%m-%d %H:%M:%S.%f}'.format(dt.datetime.now()), message], index=__df.columns)
        __df = __df.append(__errorlog, ignore_index=True ) # dataframeを作成して行追加、ignore_indexで新たな行番号を振っている
        __df.to_csv(ERROR_LOG_FILE, index=False) # Log書き込み indexは書き込まない
    
GPIO.setwarnings(False) # GPIO.cleanup()をしなかった時のメッセージを非表示にする
GPIO.setmode(GPIO.BCM) # ピンをGPIOの番号で指定

#main
MAIN_INTERVAL = 5 # mainループの待機秒
try:
    if __name__ == '__main__':
        os.chdir(os.path.dirname(os.path.abspath(__file__))) # カレントディレクトリをプログラムのあるディレクトリに移動する
        env = EnvSensorClass() # 温湿度センサークラス、インスタンスの作成
        interval, upLoad_interval = env.get_setting_info() # 設定Googleシートの読み込み
        last_Minute = last_Minute_up = last_Minute_bk = '{0:%Y-%m-%d %H:%M}'.format(dt.datetime.now()) # 前回の分をセット
        while True:
            now = dt.datetime.now() # 現在時刻を取得
            mod_minute_int = now.minute % int(interval) # 分が温湿度取得間隔(分)の倍数かをチェック
            mod_minute_up = now.minute % int(upLoad_interval) # 分がデータアップロード間隔(分)の倍数かをチェック
            # 温湿度を測定してしきい値を超えているかをチェックする
            if (mod_minute_int == 0 and # 分が倍数で
                last_Minute != '{0:%Y-%m-%d %H:%M}'.format(now)): # 同一分では1回のみ
                rtn, temp, humi = env.get_temp() # 温湿度を取得
                if rtn == 0: # 正常に取得できた時
                    temp_check, humi_check = env.warning_check(temp, humi) # しきい値を超えているかをチェック
                else:
                    temp_check = humi_check = 'NC'
                env.save_temp_csv(rtn, now, temp, humi, temp_check, humi_check) # 温湿度としきい値のチェック結果をローカルディスクにCSV保存
                last_Minute = '{0:%Y-%m-%d %H:%M}'.format(now) # 同一分で複数回実行されれないように時刻をセット
            # ローカルに蓄えた温湿度情報をPublsih
            if (mod_minute_up == 0 and # 分が倍数で
                last_Minute_up != '{0:%Y-%m-%d %H:%M}'.format(now)): # 同一分では1回のみ
                env.mqtt_publish() # AWS IoT Coreにpublish
                last_Minute_up = '{0:%Y-%m-%d %H:%M}'.format(now) # 時刻をセット
                interval, upLoad_interval = env.get_setting_info() # 設定Googleシートの読み込み(Publish時に再読み込み)
            # 毎晩MAX_NUMBER_OF_FILE件を超えるファイルは古い順から削除
            if (now.hour == 0 and now.minute == 0 and # 毎日0時0分
                last_Minute_bk != '{0:%Y-%m-%d %H:%M}'.format(now)): # 同一分では1回のみ
                env.backup_maint() # 過去データの削除
                last_Minute_bk = '{0:%Y-%m-%d %H:%M}'.format(now) # 同一分で複数回実行されれないように時刻をセット
            sleep(MAIN_INTERVAL)
except KeyboardInterrupt:
    pass
finally:
    env.__del__()
    GPIO.cleanup()

ソースコード中にコメントを入れているのでおおよそのロジックは分かると思うが何箇所か補足をしておく。

TOPIC3

TPOICSはIoT CoreにPublishするメッセージを階層構造で表している。

今回のプログラムでは、”データの種類/アプリケーション名/場所(地域)/端末名/温湿度を表す”の階層構造にしておりTOPICの設計はTopicデザインホワイトペーパ(Designing MQTT Topics for AWS IoT Core)を参考にすると良い。

尚、30行目のTOPIC3のXXXには地名を入れている。

main

最初にプログラムの下の方にあるmainルーチンの補足説明。

MAIN_INTERVAL(5秒)間隔でWhileループを繰り返して、

  1. 温湿度設定シートに設定された間隔で温湿度の取得&CSV保存
  2. 温湿度設定シートに設定された間隔でAWS IoT CoreにPublish
  3. 指定件数を超えたバックアップファイルの削除

を行っている。

1.の間隔を仮に5分とすると2.の間隔は5分の倍数を指定する事を前提としている。

__initi__

EnvSensorClassのコンストラクタ(初期処理)を行っている。

EnvSensorClassを作成したタイミング自動で呼ばれて、JSONファイルから各種設定値を読み込むと同時にmqttプロトコルでAWS IoT Coreに接続している。

get_setting_info

Google Sheetsの温湿度設定シートを読み込むための処理。

プログラム起動時及びAWS IoT CoreにPublishしたタイミングの2箇所で呼ばれている。

__del__

デストラクタ。

EnvSensorClassを削除したタイミング自動で呼ばれる。

get_temp

実際に温湿度を取得する関数temp_valueを呼ぶ回数を制御している。

dht22モジュールはタイミングの関係なのか、1度ではデータを取得できない事が何回かあったので数秒を置いて何回かリトライする仕様にしている。

尚、取得できた情報は温湿度設定シートのAdjTemp、AdjHumi(温度、湿度の補正用パラメータ)に基づいて補正している。

DHT22の測定結果が正しいのかを判定する為に他のシチズン等の複数の温湿度計と比較した際に温度はほぼ同じ値が取得できたのに対して湿度がDHT22の方が若干高めに取得される傾向があった。

その為、設定シートの補正用パラメータに基づいて補正するようにしている。

temp_value

dht22.pyを使って温湿度モジュールからデータを取得している。

こちらも一回では取得できない事があったので何回かリトライする仕様にしている。

つまりtemp_valueで4秒間隔で最大10回繰り返して更にget_tempで5秒間隔で最大5回繰り返しているので合計最大50回繰り返すことになる。

※1回でデータが取得できれば繰り返しは行わない

50回繰り返してもデータが取得できない場合は温度、湿度に99.9をセットしている。

warning_check

測定した温湿度がしきい値(上限、下限)を超えた時にアラートを発信する為のチェックを行う。

しきい値を超えている場合はPublishの間隔指定(分)に関わらず直ぐにアラートトピックのPublishを行う。

AWS IoT Coreではアラートトピックを受信したらAmazon SNS(Simple Notification Service)を使って担当者にメール(またはSMS)が発信される。

alert_publish

アラート用のメッセージ本文を組み立ててアラートトピックをPublishする。

アラートトピックはデータトピックと同構造だがTOPIC1(1階層目)が”dt”ではなく”alert”になっている所だけが異なる。

save_temp_csv

IoT CoreにPublishする前に一時的にローカルディスクに保存する温湿度情報のCSVファイルを作成する。

4項目目のtemp_date_timeの日付時刻のフォーマットは'{0:%Y-%m-%d}T{1:%H:%M:%S}’としており、日付と時刻の間に”T”を入れている。

“T”を入れないとIoT Core経由でAWS Elasticsearch Serviceに連携された際にElasticsearch Service上でstring(文字列)と認識されてしまい扱いづらく、date(日付)タイプとして認識して欲しいのでこのフォーマットにしている。

mqtt_publish

保存されているCSVファイルを元にIoT CoreにMQTTプロトコルでPublishしている。

backup_maint

backupディレクトリに保存されているファイルを古い順に削除している。

ファイルの最大保存件数はMAX_NUMBER_OF_FILE定数で指定している。

put_error_log

プログラム中にエラーが発生した際にこのメソッドが呼ばれてlogディレクトリにエラー情報を書き込む。

後からエラーの理由を調査する時に利用する。

続く

以上でプログラミング編の記事は終了とする。

次回はAmazon Elasticsearch Serviceに蓄えられたデータをグラフィカルに表示するオープンソースのツールのkibanaの設定に関する記事としたい。

souichirou

やった事を忘れない為の備忘録 同じような事をやりたい人の参考になればと思ってブログにしてます。 主にレゴ、AWS(Amazon Web Services)、WordPress、Deep Learning、RaspberryPiに関するブログを書いています。 仕事では工場に協働ロボットの導入や中小企業へのAI/IoT導入のアドバイザーをやっています。 2019年7月にJDLA(一般社団法人 日本デイープラーニング協会)Deep Learning for GENERALに合格しました。 質問は記事一番下にあるコメントかメニュー上部の問い合わせからお願いします。

おすすめ

質問やコメントや励ましの言葉などを残す

名前、メール、サイト欄は任意です。
またメールアドレスは公開されません。