ラズパイZEROとモーションセンサーで見守りシステムをつくる(その4 プログラミング) | そう備忘録

ラズパイZEROとモーションセンサーで見守りシステムをつくる(その4 プログラミング)

前回からのつづき

ラズパイとモーションセンサーで実家用の見守りシステムを構築した時の4回目。

モーションセンサーで検知した件数をLINEを通して見守り元に通知する。

また各種設定値をGoogleスプレッドシートから読み込んで通知時間などを変更することができる。

  • 初回の「全体構成とハードウェア編」はこちら
  • 2回目の「事前設定編」はこちら
  • 3回目の「AWS IoT CoreとDynamoDBの設定」はこちら

今回はPythonのプログラミングの記事とする。

尚、記事の最後で動画でプログラムについて説明をしているの参考にして欲しい。

第4回の記事の範囲

プログラムのファイル・ディレクトリ構成

まずはプログラム周りのファイル・ディレクトリ構成。

├─opt
│  │      
│  ├─WatchOver
│  │  │  WatchOver.py
│  │  │  initial.json
│  │  │
│  │  ├──cert
│  │  │    XXXXXXXXXX-certificate.pem.crt
│  │  │    XXXXXXXXXX-private.pem.key
│  │  │    AmazonRootCA1WatchOver.pem
│  │  │    watchoverproject-2019-XXXXXXXXXXXX.json
│  │  │
│  │  ├──csv
│  │  │    count_data.csv
│  │  │
│  │  └──log
│  │       error.log
│  │

/opt/WatchOverディレクトリ

プログラムをこのディレクトリに格納した。

ubuntuの説明で「主にサードパーティー製アプリのインストール先」で「自分で作成したアプリをインストールしてもOK」とあったのでここにディレクトリを作成してインストールすることにした。

2021年4月29日 追記

今回は /opt/ 配下に格納したが、特定のユーザからしか起動しないプログラムなので /usr/local/ への格納でも良かったのかも知れない。

WatchOver.py

プログラム本体

プログラム内容については後述

initial.json

初期設定ファイル

{
	"client_id": "RTRPZ0003",
	"awsport": 8883,
	"awscert": "./cert/XXXXXXXXXX-certificate.pem.crt",
	"awskey": "./cert/XXXXXXXXXX-private.pem.key",
	"awsroot_ca": "./cert/AmazonRootCA1WatchOver.pem",
	"awsend_point": "XXXXXXXXXXXXXX-ats.iot.ap-northeast-1.amazonaws.com",
	"gskey_name": "./cert/watchoverproject-2019-XXXXXXXXXXXX.json",
	"gssheet_name": "WatchOverSetting",
	"line_token":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
}

client_id

端末を一意に識別するClinet IDを指定する

awsport

AWS IoT Coreに接続時のポート番号

awscert

IoT Coreに接続するためのモノの証明書の保存先のPath

証明書等の取得方法は過去記事を参照

awskey

IoT Coreに接続するためのプライベートキーの保存先のPath

awsroot_ca

ルート CA 証明書の保存先のPath

awsend_point

AWSのエンドポイント

エンドポイントの取得方法は過去記事を参照

gskey_name

Googleスプレッドシートのサービスアカウントキーの保存先のPath

サービスアカウントキーの取得方法は過去記事を参照

gssheet_name

Googleスプレッドシートのシート名

line_token

LINE Notifyで発行されたアクセストークン

アクセストークンの取得方法は過去記事を参照

cert ディレクトリ

証明書類を格納するディレクトリ

XXXXXXXXXX-certificate.pem.crt

AWS IoT Coreで発行したモノの証明書

取得方法は過去記事を参照

XXXXXXXXXX-private.pem.key

AWS IoT Coreで発行したプライベートキー

取得方法は過去記事を参照

AmazonRootCA1WatchOver.pem

AWS IoT Coreで発行したAmazonルート CA

取得方法は過去記事を参照

watchoverproject-2019-XXXXXXXXXXXX.json

Googleスプレッドシートのサービスアカウントキー

所得方法は過去記事を参照

csvディレクトリ

下記のCSVデータを保存するディレクトリ

count_data.csv

過去30日間の10分間隔での検知件数のCSV

プログラム中で一時的に保存する

logディレクトリ

ログファイルを格納するディレクトリ

error.log

何らかの原因でプログラムが異常終了した時にエラー原因を書き出すCSVファイル

プログラム

続いてPythonのプログラム全体。

# -*- coding: utf-8 -*-
"""
Created on Thu Jan 02 10:13:04 2020

@author: Souichirou Kikuchi
"""

import paho.mqtt.client as mqtt # MQTT
import ssl # IoT coreにTLS1.2で接続
import json
import datetime as dt
from time import sleep
import sys # ErrorLog出力用
sys.path.append('/home/login名/.local/lib/python3.7/site-packages') # Pathを明示的に指定
import RPi.GPIO as GPIO
import gspread # Google SpreadSheet
from oauth2client.service_account import ServiceAccountCredentials # GoogleSheet認証用
import os # ファイルの存在チェック等
import pandas as pd # 検知件数保存CSV作成に使用する
import requests # LINEメッセージ

INITIAL_FILE= './initial.json' # 初期設定ファイル
TOPIC1 = 'dt' # AWS Publish時のTOPICS 1~5で構成される。データを表す
TOPIC2 = '/WatchOver' # アプリケーション名
TOPIC3 = '/own' # context
TOPIC5 = '/count' # カウント TOPIC4はself.client_idがセットされる
MOTION_PIN = 23 # モーションセンサーのGPIOピン番号
PUBLISH_INTERVAL = 10 # この分数毎にpublishを繰り返す
CHECK_INTERVAL = 0.5 # この秒数毎にモーションセンサーをチェックする
COUNT_CSV  = './csv/count_data.csv' # 検知件数はCSVで30日分保存する
ERROR_LOG  = './log/error.log' # エラーログ
LINE_URL = 'https://notify-api.line.me/api/notify' # LINEメッセージ用URL

class WOClass: # Watch Over(見守り) Class
    def __init__(self): # コンストラクタ
        with open(INITIAL_FILE) as f: # 初期設定ファイルの読み込み
            jsn = json.load(f)
            self.client_id = jsn['client_id'] # 端末ID
            __awsport = jsn['awsport'] # AWS接続用ポート番号
            __awscert = jsn['awscert'] # AWS認証用
            __awskey = jsn['awskey'] # AWSプライベートキー
            __awsroot_ca = jsn['awsroot_ca'] # AmazonRootCA
            __awsend_point = jsn['awsend_point'] # AWSエンドポイント
            self.gskey_name = jsn['gskey_name'] # 設定用のGoogleSheet
            self.gssheet_name = jsn['gssheet_name'] # GoogleSheetのName
            self.token = jsn['line_token'] # LINE用tokenの読み込み
        # 初期値の設定
        self.client = mqtt.Client(self.client_id, protocol=mqtt.MQTTv311) #MQTT初期化
        self.client.tls_set(ca_certs=__awsroot_ca, # TLS通信のセット
                            certfile=__awscert,
                            keyfile=__awskey,
                            cert_reqs=ssl.CERT_REQUIRED,
                            tls_version=ssl.PROTOCOL_TLSv1_2,
                            ciphers=None)
        self.client.connect(__awsend_point, port=__awsport, keepalive=60) #AWS IoTに接続
        self.client.loop_start() # ループスタート

    def __del__(self):
        self.client.disconnect() # 停止時にdisconnect

    def count_publish(self, from_now, now, find_count): # AWS IoT CoreにPublishすると当時にCSVに保存する
        # メッセージを組み立ててAWS IoT CoreにPublish
        try:
            __message = {'ClientID':self.client_id, # Client ID
                         'CountTimeFrom':'{0:%Y-%m-%d %H:%M:%S}'.format(from_now), # カウント開始日付時刻
                         'CountTimeTo':'{0:%Y-%m-%d %H:%M:%S}'.format(now), # カウント終了日付時刻
                         'Count':find_count} # 検知件数
            self.client.publish(TOPIC1+TOPIC2+TOPIC3+'/'+self.client_id+TOPIC5, json.dumps(__message)) # AWS IoTに送信(Publish)
            # Publishした検知件数をCSVに書き込み
            if (os.path.isfile(COUNT_CSV)): # ファイルが存在しているとき
                __df = pd.read_csv(COUNT_CSV, header=0, encoding='UTF-8') # 読み込み 0行目がヘッダー(有り)
            else: # ファイルが無ければヘッダーを作成
                __df = pd.DataFrame(columns=['CountTimeFrom', 'CountTimeTo', 'Count'])
            __count = pd.Series( ['{0:%Y-%m-%d %H:%M:%S}'.format(from_now), '{0:%Y-%m-%d %H:%M:%S}'.format(now), find_count], index=__df.columns)
            __df = __df.append(__count, ignore_index=True ) # dataframeを作成して行追加、ignore_indexで新たな行番号を振っている
            __df.to_csv(COUNT_CSV, index=False) # CSV書き込み indexは書き込まない
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(type(ms)) # エラーログ処理

    def get_rule(self): # GoogleSheetよりルール(各種条件)を取得する
        results = []
        try:
            __scope = ['https://spreadsheets.google.com/feeds','https://www.googleapis.com/auth/drive']
            __credentials = ServiceAccountCredentials.from_json_keyfile_name(self.gskey_name, __scope)
            __gc = gspread.authorize(__credentials)
            __wks = __gc.open(self.gssheet_name).sheet1 # 該当シートをオープン
            __records = __wks.get_all_values() # __recordsに全ての値を保存する
            __row_count = len(__records) # 行数を取得
            self.exp_flg = 0
            for i in range(1, __row_count): # 最下行まで繰り返す
                if __records[i][0]  == self.client_id: # ClientIDが一致する行の値を取り出す
                    result = {
                            'Action': __records[i][1], # アクション
                            'Parameter': __records[i][2] # パラメータ
                            }
                    results.append(result)
                    if (__records[i][1] == 'EXP' and # EXPで当日が指定されていたらアラートフラグをオン
                        __records[i][2] == '{0:%Y-%m-%d}'.format(dt.datetime.now())):
                        self.exp_flg = 1
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(type(ms)) # エラーログ処理
        finally:
            return results

    def put_report(self): # 直近24時間のレポートを1時間毎に集計してLINEに送付する
        try:
            if (os.path.isfile(COUNT_CSV)): # ファイルが存在しているとき
                __df = pd.read_csv(COUNT_CSV, header=0, encoding='UTF-8') # 読み込み 0行目がヘッダー(有り)
                __df['Hour'] = __df['CountTimeFrom'].apply(lambda x: x[0:13]) # YYYY-MM-DD HHまでを抽出 
                __df_grouped = __df.groupby(['Hour'], as_index=False)['Count'].sum() # Hourをindexにしない
                yesterday = (dt.datetime.now() + dt.timedelta(hours=-24)).strftime('%Y-%m-%d %H') # 24時間前
                __df_grouped = __df_grouped[__df_grouped['Hour'] >= yesterday] # 24時間以内のみを抽出
                # LINE用のメッセージを組み立てる
                __headers = {'Authorization' : 'Bearer ' + self.token}
                __message = '\n24時間以内の検知件数一覧\n'
                for i in range(__df_grouped.shape[0]):
                    __message = __message + __df_grouped.iat[i, 0] + '時台は ' + str(__df_grouped.iat[i, 1]) + '件\n'
                __payload = {'message' : __message}
                requests.post(LINE_URL, headers=__headers, params=__payload) # LINEメッセージを送信
                # CSVから30日以内のデータのみを残す
                delete_day = (dt.datetime.now() + dt.timedelta(days=-30)).strftime('%Y-%m-%d %H') # 30日前
                __df = __df[__df['Hour'] >= delete_day] # 30日以内を抽出
                __df = __df.drop(columns='Hour') # Hour列を削除
                __df.to_csv(COUNT_CSV, index=False) # 保存する
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(type(ms)) # エラーログ処理

    def put_alert(self, now, from_time, to_time): # From Toの範囲内の時刻で検知件数がゼロの時アラートを送信する
        try:
            if self.exp_flg == 0: # EXPで当日が指定されていたらアラートは出力しない
                if (os.path.isfile(COUNT_CSV)): # ファイルが存在しているとき
                    __df = pd.read_csv(COUNT_CSV, header=0, encoding='UTF-8') # 読み込み 0行目がヘッダー(有り)
                    __df['Day'] = __df['CountTimeFrom'].apply(lambda x: x[0:10]) # YYYY-MM-DDまでを抽出 
                    __df['FromHour'] = __df['CountTimeFrom'].apply(lambda x: x[11:16]) # FromのHH:MMを抽出 
                    __df['ToHour'] = __df['CountTimeTo'].apply(lambda x: x[11:16]) # ToのHH:MMを抽出 
                    __df = __df[__df['Day'] >= '{0:%Y-%m-%d}'.format(now)] # 今日のデータのみ
                    __df = __df[__df['FromHour'] >= from_time] # FromとToが範囲内
                    __df = __df[__df['ToHour'] <= to_time]
                    __df_grouped = __df.groupby(['Day'], as_index=False)['Count'].sum() # 日付でグルーピング(基本的に1件のみになる)
                    __total = 0
                    for i in range(__df_grouped.shape[0]):
                        __total = __total + __df_grouped.iat[i, 1]
                    if __total == 0: # 検知件数がゼロ
                        # LINE用のメッセージを組み立てる
                        __headers = {'Authorization' : 'Bearer ' + self.token}
                        __message = '\n***注意!!検知件数がゼロ***\n' + from_time + ' ~ ' + to_time + ' の間で検知件数がゼロでした'
                        __payload = {'message' : __message}
                        requests.post(LINE_URL, headers=__headers, params=__payload) # LINEメッセージを送信
        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(type(ms)) # エラーログ処理

    def put_error_log(self, message): # エラーログファイルを出力する
        if (os.path.isfile(ERROR_LOG)): # ファイルが存在しているとき
            __df = pd.read_csv(ERROR_LOG, header=0, encoding='UTF-8') # 読み込み 0行目がヘッダー(有り)
        else: # ファイルが無ければヘッダーを作成
            __df = pd.DataFrame(columns=['ErrorData', 'ErrorMessage'])
        __errorlog = pd.Series( ['{0:%Y-%m-%d %H:%M:%S.%f}'.format(dt.datetime.now()), message], index=__df.columns)
        __df = __df.append(__errorlog, ignore_index=True ) # dataframeを作成して行追加、ignore_indexで新たな行番号を振っている
        __df.to_csv(ERROR_LOG, index=False) # Log書き込み indexは書き込まない

GPIO.setmode(GPIO.BCM) # ピンをGPIOの番号で指定
GPIO.setup(MOTION_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # GPIOセットアップ

#main
try:
    if __name__ == '__main__':
        os.chdir(os.path.dirname(os.path.abspath(__file__))) # カレントディレクトリをプログラムのあるディレクトリに移動する
        find_flg = find_count = 0 # 検知フラグ、回数の初期化
        from_now = dt.datetime.now() # 現在時刻を取得(カウント開始時刻)
        last_Minute = Rep_last_Minute = '{0:%Y-%m-%d %H:%M}'.format(dt.datetime.now()) # 前回の分をセット(INTERVAL毎の処理の為)
        Alrt_last_Minute = '{0:%H:%M}'.format(dt.datetime.now()) # アラート時間(前回分)
        watch_over = WOClass() # WatchOverインスタンスの作成
        rule = watch_over.get_rule() # GoogleSheetよりルールを取得する
        while True:
            if GPIO.input(MOTION_PIN) == GPIO.HIGH: # 検知
                if find_flg == 0: # 前回未検知の時のみ
                    find_count += 1 # 検知回数のカウントアップ
                find_flg = 1
            else:
                find_flg = 0
            now = dt.datetime.now() # 現在時刻を取得
            mod_minute = now.minute % PUBLISH_INTERVAL # 分がPUBLISH_INTERVALの倍数かをチェック
            if (mod_minute == 0 and # 分が倍数で
                last_Minute != '{0:%Y-%m-%d %H:%M}'.format(now)): # 同一分では1回のみAWS IoT CoreにPublish
                watch_over.count_publish(from_now, now, find_count) # publish
                from_now = now # Publishした時刻を前回の時刻として保存
                last_Minute = '{0:%Y-%m-%d %H:%M}'.format(now) # 時刻をセット
                find_count = 0 # カウンターをリセット
                rule = watch_over.get_rule() # Publish毎にGoogleSheetよりルールを取得する
            for obj in rule: # ルールを検索する
                if obj['Action'] == 'RPT': # 直近24時間レポート
                    if (obj['Parameter'] == '{0:%H:%M}'.format(now) and # 時刻が一致
                        Rep_last_Minute != '{0:%Y-%m-%d %H:%M}'.format(now)): # 同一分では1回のみレポート
                        watch_over.put_report() # レポートを出力
                        Rep_last_Minute = '{0:%Y-%m-%d %H:%M}'.format(now) # 時刻をセット
                elif obj['Action'] == 'ALT': # この時間帯にゼロ件だったらアラート報告
                    __from_time = obj['Parameter'][0:5] # From HH:MMを抽出
                    __to_time = obj['Parameter'][6:11] # To HH:MMを抽出
                    if (__to_time == '{0:%H:%M}'.format(now) and # アラートのTo時刻で
                        Alrt_last_Minute != '{0:%H:%M}'.format(now)): # 一度も送信していなければ
                        watch_over.put_alert(now, __from_time, __to_time) # アラート
                        Alrt_last_Minute = '{0:%H:%M}'.format(now)
            sleep(CHECK_INTERVAL) # 指定秒数の間隔で繰り返す
except KeyboardInterrupt:
    pass
finally:
    watch_over.__del__()
    GPIO.cleanup()

恐らくもっと簡潔にコーディングが出来るのだとは思うけどPython初心者なのでご容赦いただきたい。

特にクラスのメソッド間で共有したい変数をself.XXXとしているけど、このやり方で良いのかgobal変数として定義した方が良いのかイマイチ分からないのでご存知の方はご指摘を頂けるとありがたいです。

またロジックで不明な点があればコメント欄に書き込んでください。

説明

ソースにはコメントを入れているが一部不明瞭なところがあると思うので内容の説明をする。

Pathを明示的に指定

14行目でPathを明示的に指定しているのは16行目でimportしているgspreadモジュールがsystemdでプログラムを自動起動させると「ModuleNotFoundError: No module named ‘gspread’」とエラーになってしまう為、プログラム中で明示的に指定することにした。

login名にはgspreadをインストールした時のlogin名が入る。

ログインユーザのホームディレクトリにgspreadのモジュールがインストールされてしまったせいでPathが通らなくなってしまったのだと想像される(13行目)。

import sys # ErrorLog出力用
sys.path.append('/home/login名/.local/lib/python3.7/site-packages') # Pathを明示的に指定
import RPi.GPIO as GPIO
import gspread # Google SpreadSheet

尚、モジュールがどこにインストールされたのかは以下のコマンドで調べた。

pip show gspread

Topicについて

23~26行目でMQTTでAWSで IoT CoreにPublish(ラズパイ側からデータをIoT Coreに送信すること)する時のTopicを指定している。

Topicは以前の記事でも書いたが、AWSのデザインペーパーのMQTT Telemetry Topic Syntaxの章を参考にしている。

dt/WatchOver/own/ClientID/countの形式でTopicを構成することにした。

ClientIDは端末毎に異なる(23行目)。

TOPIC1 = 'dt' # AWS Publish時のTOPICS 1~5で構成される。データを表す
TOPIC2 = '/WatchOver' # アプリケーション名
TOPIC3 = '/own' # context
TOPIC5 = '/count' # カウント TOPIC4はself.client_idがセットされる

PublishとCSVへの保存

64~68行目でJSON形式のメッセージを組み立ててIoT CoreにPublishしている。

またバックアップの意味でローカルのディスクにPublishした検知件数のデータをCSV形式で保存している(64行目)。

            __message = {'ClientID':self.client_id, # Client ID
                         'CountTimeFrom':'{0:%Y-%m-%d %H:%M:%S}'.format(from_now), # カウント開始日付時刻
                         'CountTimeTo':'{0:%Y-%m-%d %H:%M:%S}'.format(now), # カウント終了日付時刻
                         'Count':find_count} # 検知件数
            self.client.publish(TOPIC1+TOPIC2+TOPIC3+'/'+self.client_id+TOPIC5, json.dumps(__message)) # AWS IoTに送信(Publish)
            # Publishした検知件数をCSVに書き込み
            if (os.path.isfile(COUNT_CSV)): # ファイルが存在しているとき
                __df = pd.read_csv(COUNT_CSV, header=0, encoding='UTF-8') # 読み込み 0行目がヘッダー(有り)
            else: # ファイルが無ければヘッダーを作成
                __df = pd.DataFrame(columns=['CountTimeFrom', 'CountTimeTo', 'Count'])
            __count = pd.Series( ['{0:%Y-%m-%d %H:%M:%S}'.format(from_now), '{0:%Y-%m-%d %H:%M:%S}'.format(now), find_count], index=__df.columns)
            __df = __df.append(__count, ignore_index=True ) # dataframeを作成して行追加、ignore_indexで新たな行番号を振っている
            __df.to_csv(COUNT_CSV, index=False) # CSV書き込み indexは書き込まない

例外時にエラーログ

例外時にエラーのメッセージを拾ってエラーログファイル(定数ERROR_LOGで指定)に出力している。

何らかの原因でエラーになった際はこのファイルを参照して原因究明を行う(77行目)。

        except: # 例外時
            ex, ms, tb = sys.exc_info()
            self.put_error_log(type(ms)) # エラーログ処理

EXP指定日

このシステムではGoogle Sheetにて

  • Dailyのレポートの配信時間
  • 指定した時間帯で検知件数がゼロの時のアラーム通知
  • アラーム通知の除外日

が指定できるようになっている。

シートの指定例

Googleスプレッドシートは以下のフォーマットにしている。

Googleスプレッドシート設定例

Client_id(A列)

複数のラズパイを設置する前提で端末別に設定が可能なように端末IDをA列に持たせた

Action(B列)

  • RPT:直近24時間の件数報告を行う時刻
  • ALT:指定した時刻の範囲で検知がゼロだった場合、アラート通知を行う
  • EXP:上記ALTを除外する日付

Parameter(C列)

  • ActionがRPT:HH:MM(24時間表記、ゼロ埋めで指定する)
  • ActionがALT:HH:MM-HH:MM(24時間表記、10分単位、ゼロ埋めで指定する)
  • ActionがEXP:YY-MM-DD(上記ALTを実行しない日付を指定する)

Description(D列)

説明欄

プログラムでは使用しないのでコメント等、好きなように使用できる

98~100行目でアラームの除外日かどうかの判定を行っている。

exp_flg=1の時は除外日なのでALT(アラート)の時間帯で検知件数がゼロでもアラート送信を行わない(98行目)。

※事前に旅行などで留守が分かっている時に指定する。

                    if (__records[i][1] == 'EXP' and # EXPで当日が指定されていたらアラートフラグをオン
                        __records[i][2] == '{0:%Y-%m-%d}'.format(dt.datetime.now())):
                        self.exp_flg = 1

Dailyレポート

読み込んだCSVファイルは10分間隔で検知件数が記録されている。

それをそのままLINEに送ると24時間分で144行になってしまうので1時間単位で合計(最大24行)をしている。

CountTimeFromからYY-MM-DD HH(時間)までを抽出して新たな列を作成してその列(Hour)で集計したのち、直近24時間の以内のデータのみを抽出している。

pandasだとこの辺の操作(集計、絞り込み)が簡単にできるのでやりやすかった(110行目)。

                __df = pd.read_csv(COUNT_CSV, header=0, encoding='UTF-8') # 読み込み 0行目がヘッダー(有り)
                __df['Hour'] = __df['CountTimeFrom'].apply(lambda x: x[0:13]) # YYYY-MM-DD HHまでを抽出 
                __df_grouped = __df.groupby(['Hour'], as_index=False)['Count'].sum() # Hourをindexにしない
                yesterday = (dt.datetime.now() + dt.timedelta(hours=-24)).strftime('%Y-%m-%d %H') # 24時間前
                __df_grouped = __df_grouped[__df_grouped['Hour'] >= yesterday] # 24時間以内のみを抽出

過去30日分のみをCSV保存

AWS IoT CoreにPublishしたメッセージをCSVに保存しているのはあくまでもバックアップの意味合い。

全てのデータを保存してしまってはローカルのディスクを圧迫するので30日以内のデータのみにしている(122行目)。

                # CSVから30日以内のデータのみを残す
                delete_day = (dt.datetime.now() + dt.timedelta(days=-30)).strftime('%Y-%m-%d %H') # 30日前
                __df = __df[__df['Hour'] >= delete_day] # 30日以内を抽出
                __df = __df.drop(columns='Hour') # Hour列を削除
                __df.to_csv(COUNT_CSV, index=False) # 保存する

指定した時間帯で検知件数がゼロ

指定した時間帯で検知件数がゼロの時にアラート通知を行うためのロジック。

必ず活動している時間帯を指定する事によりその時間帯で活動がなければ分かるようにしている。

読み込んだCSVに、

  • 日付
  • From時刻
  • To時刻

の項目を追加して指定した日付、時刻内の検知件数を合計してゼロであれば通知する(135行目)。

                    __df = pd.read_csv(COUNT_CSV, header=0, encoding='UTF-8') # 読み込み 0行目がヘッダー(有り)
                    __df['Day'] = __df['CountTimeFrom'].apply(lambda x: x[0:10]) # YYYY-MM-DDまでを抽出 
                    __df['FromHour'] = __df['CountTimeFrom'].apply(lambda x: x[11:16]) # FromのHH:MMを抽出 
                    __df['ToHour'] = __df['CountTimeTo'].apply(lambda x: x[11:16]) # ToのHH:MMを抽出 
                    __df = __df[__df['Day'] >= '{0:%Y-%m-%d}'.format(now)] # 今日のデータのみ
                    __df = __df[__df['FromHour'] >= from_time] # FromとToが範囲内
                    __df = __df[__df['ToHour'] <= to_time]
                    __df_grouped = __df.groupby(['Day'], as_index=False)['Count'].sum() # 日付でグルーピング(基本的に1件のみになる)
                    __total = 0
                    for i in range(__df_grouped.shape[0]):
                        __total = __total + __df_grouped.iat[i, 1]
                    if __total == 0: # 検知件数がゼロ
                        # LINE用のメッセージを組み立てる
                        __headers = {'Authorization' : 'Bearer ' + self.token}
                        __message = '\n***注意!!検知件数がゼロ***\n' + from_time + ' ~ ' + to_time + ' の間で検知件数がゼロでした'
                        __payload = {'message' : __message}
                        requests.post(LINE_URL, headers=__headers, params=__payload) # LINEメッセージを送信

Googleスプレッドシートの再読み込み

プログラムの起動時とPublish(10分に1回)後にGoogleスプレッドシートを再読み込みしている。

Sheetの値を変更すると、このタイミングでプログラムに反映されるのだがシートの再読み込みはそれなりに時間のかかる処理(数秒)なので本来であればスプレッドシートを変更した時だけメッセージがサブスクライブ(IoT Coreからラズパイにメッセージ送信)されて値の変更が反映される仕組みにしたい所。

これはそのうちに対応したいと思っている(193行目)。

                rule = watch_over.get_rule() # Publish毎にGoogleSheetよりルールを取得する

アラート報告

アラート報告の際のFromとToの時刻をGoogleスプレッドシートから取り出している。

ここもポジションの決め打ち(0:5)で時刻を取り出しているが本来であればバリデーションチェック(入力チェック)を行った方がよい。

今の所、自分しか入力しないので手抜きをしているがそのうちきちんとエラー処理を入れたいと思う(200行目)。

                elif obj['Action'] == 'ALT': # この時間帯にゼロ件だったらアラート報告
                    __from_time = obj['Parameter'][0:5] # From HH:MMを抽出
                    __to_time = obj['Parameter'][6:11] # To HH:MMを抽出

続く

今回のプログラミングの記事は以上で終了する。

次回はsystemdによる電源ON後の自動起動の設定や遠隔操作の記事とする。

動画

プログラムについて動画で説明をしているので、参考にしてみて欲しい。

souichirou

やった事を忘れない為の備忘録 同じような事をやりたい人の参考になればと思ってブログにしてます。 主にレゴ、AWS(Amazon Web Services)、WordPress、Deep Learning、RaspberryPiに関するブログを書いています。 仕事では工場に協働ロボットの導入や中小企業へのAI/IoT導入のアドバイザーをやっています。 2019年7月にJDLA(一般社団法人 日本デイープラーニング協会)Deep Learning for GENERALに合格しました。 質問は記事一番下にあるコメントかメニュー上部の問い合わせからお願いします。

おすすめ

質問やコメントや励ましの言葉などを残す

名前、メール、サイト欄は任意です。
またメールアドレスは公開されません。