前のページではrclpy.spin_until_future_complete()
を使った「待ち」が発生し、メイン処理が妨げられる可能性がある実装方法を紹介しました。
このページでは「待ち」を発生させずにリアルタイム性を維持した実装方法を紹介していきます。
本ページで紹介するサンプルコードは全てROSパッケージ化した状態でGitHubに載せています。
自分の環境で実行して挙動を確認してみたい場合は下記からクローンして使用してください。
本ページで使用するコード
リアルタイム性を維持したサービス実装例
今回はリアルタイム性の高いシステムトレードを作成することを想定したサービス実装例で紹介します。
サービス処理内容は、
- サービス要求があった場合、現時点で最新の為替レートを返す。
となります。
サービス定義ファイル(.srv)
まずはサービス定義ファイルを作成しましょう。
サービス処理内容は「最新の為替レートを返す」処理になるので引数は不要になります。
必要なのは返り値用の為替レート変数のみになります。
#--------------------------------------------------
# 要求を受けた時点の為替レートを返すサービス定義
#--------------------------------------------------
# なし
---
# 最新の為替レート
float32 price
サービス提供ノード(サーバー)
次はサービス提供ノード(サーバー)を作成していきます。
サービス要求を受けたら最新の為替レートを返り値に代入して返却する処理内容です。
import time
import datetime
import random
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from ros2_example_msgs.srv import PriceQuery
class PriceServer(Node):
"""
為替レートサービスServerノード
"""
def __init__(self) -> None:
"""
ノードの初期化
"""
super().__init__("price_server")
# ロガー取得
self.logger = self.get_logger()
# circleサービスserverの定義
self.srv = self.create_service(PriceQuery, "price_query", self._get_price)
# 為替レート初期値
self._price = 100.0
def _get_price(self, req, rsp):
"""
price_queryサービスコールバック
"""
now = datetime.datetime.now()
self.logger.info("[{}]サービス処理中・・・".format(now.time()))
# 重たい処理の代わり
time.sleep(3) # 3秒停止
# 乱数で為替レートの変動を模擬
self._price += random.uniform(10.0, -10.0)
rsp.price = self._price
now = datetime.datetime.now()
self.logger.info("[{}]完了".format(now.time()))
return rsp
def main(args: list[str] | None = None) -> None:
# ROSの初期化
rclpy.init(args=args)
# price_serverノードの作成
ps = PriceServer()
ps.get_logger().info("price_server start!")
try:
# ノードの実行開始
rclpy.spin(ps)
except (KeyboardInterrupt, ExternalShutdownException):
pass
else:
# ROSのシャットダウン
rclpy.shutdown()
finally:
# ノードの破棄
ps.destroy_node()
サービス処理の内容は30~47行目になります。
def _get_price(self, req, rsp):
"""
price_queryサービスコールバック
"""
now = datetime.datetime.now()
self.logger.info("[{}]サービス処理中・・・".format(now.time()))
# 重たい処理の代わり
time.sleep(3) # 3秒停止
# 乱数で為替レートの変動を模擬
self._price += random.uniform(10.0, -10.0)
rsp.price = self._price
now = datetime.datetime.now()
self.logger.info("[{}]完了".format(now.time()))
return rsp
本来であれば最新の為替レートはAPIサーバーから受け取ることになると思いますが、今回はあくまでも実装例になりますので、最新の為替レートは乱数を使用してレート変動を模擬しています。
そして、クライアント側で「待ち」状態を発生させないサービス非同期処理のリアルタイム性が実感できるように「3秒」のスリープを重たい処理の代わりとして実装しています。
まとめると、「サービス要求を受けてから3秒後に最新為替レートを返却する」サービス処理となります。
サービス受領ノード(クライアント)<非同期実装例①>
ではサービス受領ノード(クライアント)を作成していきます。
実装のポイントは「待ち」が発生しないようにしてメインルーチン処理を阻害させないことです。
import datetime
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.task import Future
from ros2_example_msgs.srv import PriceQuery
class PriceClient(Node):
"""
為替レートサービスClientノード
"""
def __init__(self) -> None:
"""
ノードの初期化
"""
super().__init__("price_client_async")
# ロガー取得
self.logger = self.get_logger()
# circleサービスclientの定義
self.cli = self.create_client(PriceQuery, "price_query")
# サービスServerが有効になるまで待機
while not self.cli.wait_for_service(timeout_sec=1.0):
self.logger.info("サービスServerが有効になるまで待機中・・・")
# 1.0秒周期で実行されるROSタイマーの定義
self.timer = self.create_timer(1.0, self._main_routine)
# 5.0秒周期で実行されるROSタイマーの定義
self.timer = self.create_timer(5.0, self._service_call_async)
# Future初期化
self._future = Future()
# 為替レート初期値
self._price = 100.0
def _main_routine(self) -> None:
"""
timerコールバック(メインルーチン)
"""
# price_queryサービスの応答がある場合は、為替レートを更新する。
if self._future.done():
rsp = self._future.result()
self._future = Future()
if rsp is not None:
self.logger.info("<サービス応答>為替レート更新")
self._price = rsp.price
# 現在時刻をログ出力
now = datetime.datetime.now()
self.logger.info("<メインルーチン処理>[{}]為替レート:{:.3f}".format(now.time(), self._price))
def _service_call_async(self) -> None:
"""
price_queryサービス呼び出し(非同期)
"""
# price_queryサービスの引数
req = PriceQuery.Request()
# サービスの非同期実行
self.logger.info("<サービス要求>送信")
self._future = self.cli.call_async(req)
def main(args: list[str] | None = None) -> None:
# ROSの初期化
rclpy.init(args=args)
# price_clientノードの作成
pc = PriceClient()
pc.get_logger().info("price_client start!")
try:
# ノードの実行開始
rclpy.spin(pc)
except (KeyboardInterrupt, ExternalShutdownException):
pass
else:
# ROSのシャットダウン
rclpy.shutdown()
finally:
# ノードの破棄
pc.destroy_node()
上記の実装では2つの処理をROSタイマーを使って実装しています。(30~34行目)
- メインルーチン処理(
self._main_routine()
):1秒周期で実行 - サービス要求処理(
self._service_call_async()
):5秒周期で実行
# 1.0秒周期で実行されるROSタイマーの定義
self.timer = self.create_timer(1.0, self._main_routine)
# 5.0秒周期で実行されるROSタイマーの定義
self.timer = self.create_timer(5.0, self._service_call_async)
「メインルーチン処理を阻害しなこと」が本実装のポイントになりますので、言い換えるなら、サービス実行中は「1秒周期でのメインルーチン・コールバックが阻害されないこと」になります。
動作確認の際はこの点に着目しましょう。
サービス要求処理(self._service_call_async()
)は57~66行目になります。
def _service_call_async(self) -> None:
"""
price_queryサービス呼び出し(非同期)
"""
# price_queryサービスの引数
req = PriceQuery.Request()
# サービスの非同期実行
self.logger.info("<サービス要求>送信")
self._future = self.cli.call_async(req)
call_async()
の非同期での処理を要求していますので、返り値はself._future
で受け取っておきます。
ただし、サービス処理(サーバー側)は3秒を要するため、self._future
受け取り直後はサービス処理は完了していないため、返り値の取り出しは後回しにします。
メインルーチン処理(self._main_routine()
)は41~55行目になります。
def _main_routine(self) -> None:
"""
timerコールバック(メインルーチン)
"""
# price_queryサービスの応答がある場合は、為替レートを更新する。
if self._future.done():
rsp = self._future.result()
self._future = Future()
if rsp is not None:
self.logger.info("<サービス応答>為替レート更新")
self._price = rsp.price
# 現在時刻をログ出力
now = datetime.datetime.now()
self.logger.info("<メインルーチン処理>[{}]為替レート:{:.3f}".format(now.time(), self._price))
サービス処理が完了し、返り値が取り出せるようになるとself._future.done()
がTrue
となりますので、後回しにしていた返り値の取り出しはここで実行します。
そしてその後に、最新為替レートを使ってメイン処理を行っていく流れになります。
(本コードは実装例になりますので、”最新為替レートのログ表示”をメイン処理の代わりとしています。)
# 現在時刻をログ出力
now = datetime.datetime.now()
self.logger.info("<メインルーチン処理>[{}]為替レート:{:.3f}".format(now.time(), self._price))
実行してみる
それでは実行してみましょう。
<サーバー側>
$ ros2 run ros2_example price_server
[price_server]: price_server start!
[price_server]: [14:21:36.823213]サービス処理中・・・
[price_server]: [14:21:39.824826]完了
[price_server]: [14:21:41.828317]サービス処理中・・・
[price_server]: [14:21:44.832883]完了
<クライアント側>
$ ros2 run ros2_example price_client_async
[price_client_async]: サービスServerが有効になるまで待機中・・・
[price_client_async]: サービスServerが有効になるまで待機中・・・
[price_client_async]: price_client start!
[price_client_async]: <メインルーチン処理>[14:21:32.820918]為替レート:100.000
[price_client_async]: <メインルーチン処理>[14:21:33.820062]為替レート:100.000
[price_client_async]: <メインルーチン処理>[14:21:34.819964]為替レート:100.000
[price_client_async]: <メインルーチン処理>[14:21:35.820240]為替レート:100.000
[price_client_async]: <メインルーチン処理>[14:21:36.820362]為替レート:100.000
[price_client_async]: <サービス要求>送信
[price_client_async]: <メインルーチン処理>[14:21:37.820212]為替レート:100.000
[price_client_async]: <メインルーチン処理>[14:21:38.820515]為替レート:100.000
[price_client_async]: <メインルーチン処理>[14:21:39.820073]為替レート:100.000
[price_client_async]: <サービス応答>為替レート更新
[price_client_async]: <メインルーチン処理>[14:21:40.821257]為替レート:98.756
[price_client_async]: <メインルーチン処理>[14:21:41.821251]為替レート:98.756
[price_client_async]: <サービス要求>送信
[price_client_async]: <メインルーチン処理>[14:21:42.820134]為替レート:98.756
[price_client_async]: <メインルーチン処理>[14:21:43.820021]為替レート:98.756
[price_client_async]: <メインルーチン処理>[14:21:44.820731]為替レート:98.756
[price_client_async]: <サービス応答>為替レート更新
[price_client_async]: <メインルーチン処理>[14:21:45.821878]為替レート:104.697
[price_client_async]: <メインルーチン処理>[14:21:46.820579]為替レート:104.697
着目ポイントはサービス実行中(“<サービス要求>送信“から”<サービス応答>為替レート更新“までの間)で<メインルーチン処理>が阻害されることなく実行できていることです。
上記のようにクライアント側を実装することでメインルーチン処理が阻害されずにリアルタイム性が維持された状態でサービス実行することができます。
サービス受領ノード(クライアント)<非同期実装例②>
もう1つクライアント側の実装例を紹介します。
1つ前のコードでは後回しにしたサービス返り値をメインルーチン処理内で常時監視し、サービス処理が完了していたら返り値を取り出す処理としていました。
この他にもサービス処理が完了した時点でコールバック関数が呼び出されて返り値を受け取る方法もありますので実装例を紹介しますね。
import datetime
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.task import Future
from ros2_example_msgs.srv import PriceQuery
class PriceClient(Node):
"""
為替レートサービスClientノード
"""
def __init__(self) -> None:
"""
ノードの初期化
"""
super().__init__("price_client_async_callback")
# ロガー取得
self.logger = self.get_logger()
# circleサービスclientの定義
self.cli = self.create_client(PriceQuery, "price_query")
# サービスServerが有効になるまで待機
while not self.cli.wait_for_service(timeout_sec=1.0):
self.logger.info("サービスServerが有効になるまで待機中・・・")
# 1.0秒周期で実行されるROSタイマーの定義
self.timer = self.create_timer(1.0, self._main_routine)
# 5.0秒周期で実行されるROSタイマーの定義
self.timer = self.create_timer(5.0, self._service_call_async)
# 為替レート初期値
self._price = 100.0
def _main_routine(self) -> None:
"""
timerコールバック(メインルーチン)
"""
# 現在時刻をログ出力
now = datetime.datetime.now()
self.logger.info("<メインルーチン処理>[{}]為替レート:{:.3f}".format(now.time(), self._price))
def _service_call_async(self) -> None:
"""
price_queryサービス呼び出し(非同期)
"""
# price_queryサービスの引数
req = PriceQuery.Request()
# サービスの非同期実行
self.logger.info("<サービス要求>送信")
future = self.cli.call_async(req)
# サービス応答時の返り値取得用コールバック関数の登録
future.add_done_callback(self._done_callback)
def _done_callback(self, future: Future) -> None:
"""
price_queryサービス処理完了コールバック
"""
rsp = future.result()
if rsp is not None:
self.logger.info("<サービス応答>為替レート更新")
self._price = rsp.price
def main(args: list[str] | None = None) -> None:
# ROSの初期化
rclpy.init(args=args)
# price_clientノードの作成
pc = PriceClient()
pc.get_logger().info("price_client start!")
try:
# ノードの実行開始
rclpy.spin(pc)
except (KeyboardInterrupt, ExternalShutdownException):
pass
else:
# ROSのシャットダウン
rclpy.shutdown()
finally:
# ノードの破棄
pc.destroy_node()
サービス処理完了時に呼び出されるコールバック関数の設定は54~58行目になります。
# サービスの非同期実行
self.logger.info("<サービス要求>送信")
future = self.cli.call_async(req)
# サービス応答時の返り値取得用コールバック関数の登録
future.add_done_callback(self._done_callback)
self._service_call_async()
の返り値で受け取ったfuture
にはadd_done_callback()
メソッドがあり、このメソッドでサービス完了時に呼び出される関数(上記コードではself._done_callback()
)を登録することができます。
コールバック関数の中身は下記のようになってます。(60~67行目)
def _done_callback(self, future: Future) -> None:
"""
price_queryサービス処理完了コールバック
"""
rsp = future.result()
if rsp is not None:
self.logger.info("<サービス応答>為替レート更新")
self._price = rsp.price
関数の引数がfuture
となっているので引数より返り値を取り出し、為替レートを更新しています。
実行してみる
それでは実行してみましょう。
<サーバー側>
$ ros2 run ros2_example price_server
[price_server]: price_server start!
[price_server]: [15:04:34.419257]サービス処理中・・・
[price_server]: [15:04:37.421308]完了
[price_server]: [15:04:39.421077]サービス処理中・・・
[price_server]: [15:04:42.423590]完了
<クライアント側>
$ ros2 run ros2_example price_client_async_callback
[price_client_async_callback]: price_client start!
[price_client_async_callback]: <メインルーチン処理>[15:04:30.416237]為替レート:100.000
[price_client_async_callback]: <メインルーチン処理>[15:04:31.416519]為替レート:100.000
[price_client_async_callback]: <メインルーチン処理>[15:04:32.415953]為替レート:100.000
[price_client_async_callback]: <メインルーチン処理>[15:04:33.415714]為替レート:100.000
[price_client_async_callback]: <メインルーチン処理>[15:04:34.416215]為替レート:100.000
[price_client_async_callback]: <サービス要求>送信
[price_client_async_callback]: <メインルーチン処理>[15:04:35.416519]為替レート:100.000
[price_client_async_callback]: <メインルーチン処理>[15:04:36.415840]為替レート:100.000
[price_client_async_callback]: <メインルーチン処理>[15:04:37.415754]為替レート:100.000
[price_client_async_callback]: <サービス応答>為替レート更新
[price_client_async_callback]: <メインルーチン処理>[15:04:38.416704]為替レート:98.992
[price_client_async_callback]: <メインルーチン処理>[15:04:39.415855]為替レート:98.992
[price_client_async_callback]: <サービス要求>送信
[price_client_async_callback]: <メインルーチン処理>[15:04:40.416391]為替レート:98.992
[price_client_async_callback]: <メインルーチン処理>[15:04:41.416203]為替レート:98.992
[price_client_async_callback]: <メインルーチン処理>[15:04:42.416087]為替レート:98.992
[price_client_async_callback]: <サービス応答>為替レート更新
[price_client_async_callback]: <メインルーチン処理>[15:04:43.416636]為替レート:100.437
[price_client_async_callback]: <メインルーチン処理>[15:04:44.415725]為替レート:100.437
コールバック関数を使用した場合もサービス実行中(”<サービス要求>送信“から”<サービス応答>為替レート更新“までの間)で<メインルーチン処理>が阻害されることなく実行できていることがわかります。
コールバック関数を使用しない非同期実装例①、コールバック関数を使用する非同期実装例②のどちらでも実現したいことが実行できていますので、好きな方で実装すれば良いと思います。
まとめ
ROSサービス(Service)
- サービス提供ノード(サーバー)
create_service()
でサービス・サーバー定義- 第1引数:サービス型
- 第2引数:サービス名
- 第3引数:コールバック関数
- サービス受領ノード(クライアント)
create_client()
でサービス・クライアント定義- 第1引数:サービス型
- 第2引数:サービス名
call_async()
でサービス呼び出し(非同期)- 返り値:
future
で受け取り(値の取り出しは後回し)future.done()
:サービス実行完了時→True
future.result()
:返り値の取り出しfuture.add_done_callback()
:サービス実行完了時に呼び出されるコールバック関数
- 返り値:
コメント