処理負荷は最低限に抑えるのが僕のモットーです! たっきん(Twitter)です!
今日はROS機能の1つである「ROSタイマー(Timer)」の解説をし、サンプルコード実装して動作を確認していきましょう!
「ROSタイマー(Timer)」機能は読んで字の如く、時間・時刻に関わる処理を行う機能になります。
大雑把に「ROSタイマー(Timer)」を使用する利点は次の1点に尽きると思います。
ROSタイマー(Timer)の利点
- CPU処理負荷を減らし、リアルタイム性を高める。
時間・時刻を測定して決まった時間に処理を行えることが利点ではないのですか?
半分は合っていますが、半分は間違っていますね。
事実、時間・時刻を測定して決まった時間に処理するだけならROSタイマーを使わなくても実現できます。
ROSタイマーを使うことで決まった時間に効率よく処理が行えるようになり、その結果、各プロセスのリアルタイム性が維持できるようになります。
ここで例として、カップ麺にお湯を入れてから3分経過を知らせるタイマーのプログラムを実装してみます。
ROSタイマーを使用しない時間計測の実装例
まずはROSタイマーを使わない例です。
下記のコードはwhileループを使って3分経過するまで待ち続ける実装例になります。
import datetime
def wait_3minutes_by_while():
# 経過時間「3分」を取得
td3m = datetime.timedelta(minutes=3)
# 「現在時間」を取得
now = datetime.datetime.now()
# 「現在時間 + 3分」を取得
after_3m = now + td3m
print("[{}]カップ麺にお湯を入れました。".format(now))
# 3分経過するまでwhileループ
while datetime.datetime.now() < after_3m:
pass
# 「現在時間」を取得
now = datetime.datetime.now()
print("[{}]3分経過しました。".format(now))
if __name__ == "__main__":
wait_3minutes_by_while()
"""
<実行結果>
[2023-09-08 01:20:00.519126]カップ麺にお湯を入れました。
[2023-09-08 01:23:00.519322]3分経過しました。
"""
間違っても上記のようなwhileループで処理を停止させるプログラムを組むのは絶対に止めましょう!
このようなプログラムはCPU負荷が極限まで高まり、ROSの最大の利点であるリアルタイム性が損なわれてしまいます!
もう1つROSタイマーを使用しない実装例を紹介します。
下記のコードはtimeパッケージのsleep()関数を使った実装例になります。
import time
import datetime
def wait_3minutes_by_sleep():
# 「現在時間」を取得
now = datetime.datetime.now()
print("[{}]カップ麺にお湯を入れました。".format(now))
# 3分間停止
time.sleep(3 * 60)
# 「現在時間」を取得
now = datetime.datetime.now()
print("[{}]3分経過しました。".format(now))
if __name__ == "__main__":
wait_3minutes_by_sleep()
"""
<実行結果>
[2023-09-08 01:30:13.444207]カップ麺にお湯を入れました。
[2023-09-08 01:33:13.456904]3分経過しました。
"""
一見問題なさそうに見えますが、上記プログラムには致命的な欠点があります。
それは、sleep()関数で処理が停止している間は他の処理が実行できなくなることです。
例えばカップ麺にお湯を入れて3分待っている間に、電話がかかってきたとしましょう。
上記コードでは「電話に出る」処理を入れ込むことができないため、他のことが何もできずに3分間待ち続けることしかできなくなってしまいます。
リアルタイム性が求められないアプリケーションであれば問題ないかもしれませんが、上記のようなプログラムを組むのは僕的には非推奨ですね。
全てのアプリケーションは基本的に実行効率が求められますので、sleep()関数のような停止処理を入れ込むのは好ましくないです。
ROSタイマーを使用した時間計測の実装例
では本題のROSタイマーを使用した実装例を紹介します。
import rclpy
from rclpy.node import Node
import datetime
class CupNoodle(Node):
"""
カップ麺ノード
"""
def __init__(self) -> None:
"""
ノードの初期化
"""
super().__init__("cup_noodle")
# ロガー取得
self.logger = self.get_logger()
# 3分周期で実行されるタイマーの定義
# (timer_callbackは3分経過する度に呼び出されるコールバック関数)
self.timer = self.create_timer(3 * 60, self._timer_callback)
def _timer_callback(self) -> None:
"""
timerコールバック
"""
# 現在時刻を取得
now = datetime.datetime.now()
self.logger.info("[{}]3分経過しました。".format(now))
# タイマー停止
self.timer.cancel()
# システム終了
raise SystemExit
def main(args: list[str] | None = None) -> None:
# ROSの初期化
rclpy.init(args=args)
# simple_timerノードの作成
cn = CupNoodle()
cn.get_logger().info("カップ麺ノードを実行開始します。")
# 「現在時間」を取得
now = datetime.datetime.now()
cn.get_logger().info("[{}]カップ麺にお湯を入れました。".format(now))
try:
# ノードの実行開始
rclpy.spin(cn)
except SystemExit:
cn.get_logger().info("カップ麺ノードを実行終了します。")
# ROSのシャットダウン
rclpy.shutdown()
# ノードの破棄
cn.destroy_node()
"""
<実行結果>
[cup_noodle]: カップ麺ノードを実行開始します。
[cup_noodle]: [2023-09-08 13:30:39.592403]カップ麺にお湯を入れました。
[cup_noodle]: [2023-09-08 13:33:39.582332]3分経過しました。
[cup_noodle]: カップ麺ノードを実行終了します。
"""
プログラムが実行(ノードが起動)してからちょうど3分後にメッセージが表示されました。
しかし、このままでは挙動がROSタイマーを使用しない場合と大差ありませんよね。
そこで、上記のコードに「電話に出る」処理を追加してみましょう!
import rclpy
from rclpy.node import Node
import datetime
import time
from std_srvs.srv import Empty
class CupNoodle(Node):
"""
カップ麺ノード
"""
def __init__(self) -> None:
"""
ノードの初期化
"""
super().__init__("cup_noodle")
# ロガー取得
self.logger = self.get_logger()
# 3分周期で実行されるROSタイマーの定義
# (timer_callbackは3分経過する度に呼び出されるコールバック関数)
self.timer = self.create_timer(3 * 60, self._timer_callback)
# 電話"phone"ROSサービスの定義
# (take_callはROSサービスクライアントからリクエストを受けた際に
# 呼び出されるコールバック関数)
self.phone = self.create_service(Empty, "phone", self._take_call)
def _timer_callback(self) -> None:
"""
timerコールバック
"""
# 現在時刻を取得
now = datetime.datetime.now()
self.logger.info("[{}]3分経過しました。".format(now))
# タイマー停止
self.timer.cancel()
# システム終了
raise SystemExit
def _take_call(self, req, rsp) -> None:
"""
"phone"serviceコールバック
"""
# 現在時刻を取得
now = datetime.datetime.now()
self.logger.info("[{}]電話に出ました。".format(now))
# 重たい処理の代わり(10秒停止)
time.sleep(10)
# 現在時刻を取得
now = datetime.datetime.now()
self.logger.info("[{}]電話を切りました。".format(now))
return rsp
def main(args: list[str] | None = None) -> None:
# ROSの初期化
rclpy.init(args=args)
# simple_timerノードの作成
cn = CupNoodle()
cn.get_logger().info("カップ麺ノードを実行開始します。")
# 「現在時間」を取得
now = datetime.datetime.now()
cn.get_logger().info("[{}]カップ麺にお湯を入れました。".format(now))
try:
# ノードの実行開始
rclpy.spin(cn)
except SystemExit:
cn.get_logger().info("カップ麺ノードを実行終了します。")
# ROSのシャットダウン
rclpy.shutdown()
# ノードの破棄
cn.destroy_node()
"""
<実行結果>
[cup_noodle]: カップ麺ノードを実行開始します。
[cup_noodle]: [2023-09-08 18:50:20.287031]カップ麺にお湯を入れました。
[cup_noodle]: [2023-09-08 18:50:40.587724]電話に出ました。
[cup_noodle]: [2023-09-08 18:50:50.599175]電話を切りました。
[cup_noodle]: [2023-09-08 18:53:20.273301]3分経過しました。
[cup_noodle]: カップ麺ノードを実行終了します。
"""
「電話に出る」処理はROS通信のサービス(Service)を使用しました。
ROSサービス通信についてはまた別の記事で詳しい解説をしようと思いますので今回は割愛しますが、上記コードを実行するとカップ麺にお湯を入れてから3分待っている間に「電話に出る」処理を実行することができました。
このように、ROSタイマーを使用することで他の処理を妨げることなく、かつ最小限の処理負荷でプログラムを実行させることができます。
システム設計者を目指すなら「処理負荷」は常に意識しましょう!
システムを作り上げてから「処理負荷」を改善しようとしても大規模改修になり兼ねない場合があり、痛い目を見る可能性があります。
なんとなく、ROSタイマーの挙動は理解できましたでしょうか?
次のページからROSタイマーについての詳細を解説していきます。
コメント