【ROS×Python】トピック(Topic)通信を実装しよう!

 ※ 本記事は広告・プロモーションを含みます。
Python

それでは、実際に独自メッセージを定義し、独自メッセージトピックの送受信プログラムを書いて、動作を確認してみましょう!

独自メッセージ型を定義する

それでは、ASKBIDの2つの為替レートを持った独自メッセージ型を定義してみましょう!

独自メッセージ定義のファイルは拡張子を.msgとする必要があります。

今回定義するファイル名は「CustomPricing.msg」とします。

そしてファイルには「データ型:float32」の2つの変数askbidを記述します。

# Customized Pricing

# Ask pricing
float32 ask

# Bid pricing
float32 bid

独自メッセージ定義ファイルを新規追加した場合は忘れずにCMakeLists.txtにも追加したファイル名を追記しましょう!

・・・
・・・
・・・
set(msg_files
  # "msg/***.msg"
  "msg/CustomPricing.msg" ← 追加
)
・・・
・・・
・・・

独自メッセージ定義ファイルの追加作業はこれで完了になります。

あとはcolconビルドすることで、各プログラミング言語に対応したアクセスライブラリが自動で生成され、各ノードで使用可能となります。

では、先ほど定義した独自メッセージを使って為替レートASKBIDを送受信するプログラムを書いてみましょう。

送信側ノード(パブリッシャー)の作成

前のページで作成した送信側ノード(パブリッシャー)のコードを独自メッセージに置き換えると下記のようになります。

import time
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSHistoryPolicy, QoSReliabilityPolicy
from ros2_example_msgs.msg import CustomPricing
from ..utils.waveform import SineWave


class CustomPricingPublisher(Node):
    """
    独自型為替レートトピックPublisherノード
    """

    def __init__(self) -> None:
        """
        ノードの初期化
        """
        super().__init__("custom_pricing_publisher")

        # 為替レートを正弦波で生成
        self._sin_wave = SineWave(30, 0.05, 100)

        qos_profile = QoSProfile(
            history=QoSHistoryPolicy.KEEP_ALL, reliability=QoSReliabilityPolicy.RELIABLE
        )

        # 独自型のcustom_pricingトピックを送信するpublisherの定義
        self.pub = self.create_publisher(CustomPricing, "custom_pricing", qos_profile)

        # 送信周期毎にtimer_callbackを呼び出し(送信周期は0.5秒)
        self.timer = self.create_timer(0.5, self.timer_callback)

        self._start_time = time.time()

    def timer_callback(self) -> None:
        """
        ROSタイマーコールバック
        """
        elapsed_time = time.time() - self._start_time
        # 現在為替レートを取得
        pricing = self._sin_wave.get_value(elapsed_time)

        # custom_pricingトピックにmsgを送信
        msg = CustomPricing()
        msg.ask = pricing + 1.0
        msg.bid = pricing - 1.0
        self.pub.publish(msg)

        # 送信msgの中身をログ出力
        self.get_logger().info(
            "<送信>経過時間:[{:.2f}] 現在レート:Ask={:.3f}, Bid={:.3f}".format(
                elapsed_time, msg.ask, msg.bid
            )
        )


def main(args: list[str] | None = None) -> None:
    # ROSの初期化
    rclpy.init(args=args)
    # custom_pricing_publisherノードの作成
    spp = CustomPricingPublisher()
    spp.get_logger().info("custom_pricing_publisher start!")

    try:
        # ノードの実行開始
        rclpy.spin(spp)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass
    else:
        # ROSのシャットダウン
        rclpy.shutdown()
    finally:
        # ノードの破棄
        spp.destroy_node()

6行目で独自メッセージ「CustomPricing」をインポートしています。

from ros2_example_msgs.msg import CustomPricing

29行目でCustomPricingメッセージ型」「custom_pricing」という名前を付けて送信するトピックを定義しています。

self.pub = self.create_publisher(CustomPricing, "custom_pricing", qos_profile)

45~48行目で独自メッセージの変数askbidに現在レート値を代入し、トピック送信しています。

msg = CustomPricing()
msg.ask = pricing + 1.0
msg.bid = pricing - 1.0
self.pub.publish(msg)

受信側ノード(サブスクライバー)の作成

前のページで作成した受信側ノード(サブスクライバー)のコードを独自メッセージに置き換えると下記のようになります。

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSHistoryPolicy, QoSReliabilityPolicy
from ros2_example_msgs.msg import CustomPricing


class CustomPricingSubscriber(Node):
    """
    独自型為替レートトピックSubscriberノード
    """

    def __init__(self) -> None:
        """
        ノードの初期化
        """
        super().__init__("custom_pricing_subscriber")

        qos_profile = QoSProfile(
            history=QoSHistoryPolicy.KEEP_ALL, reliability=QoSReliabilityPolicy.RELIABLE
        )

        # 独自型のcustom_pricingトピックを受信するsubscriptionの定義
        # (listener_callbackは受信毎に呼び出されるコールバック関数)
        self.sub = self.create_subscription(
            CustomPricing, "custom_pricing", self._listener_callback, qos_profile
        )

    def _listener_callback(self, msg: CustomPricing) -> None:
        """
        subscriptionコールバック
        """
        # 受信msgの中身をログ出力
        self.get_logger().info(
            "<受信>現在レート:Ask={:.3f}, Bid={:.3f}".format(msg.ask, msg.bid)
        )


def main(args: list[str] | None = None) -> None:
    # ROSの初期化
    rclpy.init(args=args)
    # custom_pricing_subscriberノードの作成
    cps = CustomPricingSubscriber()
    cps.get_logger().info("custom_pricing_subscriber start!")

    try:
        # ノードの実行開始
        rclpy.spin(cps)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass
    else:
        # ROSのシャットダウン
        rclpy.shutdown()
    finally:
        # ノードの破棄
        cps.destroy_node()

5行目で独自メッセージ「CustomPricing」をインポートしています。

from ros2_example_msgs.msg import CustomPricing

25~27行目でトピック受信(サブスクライバー)の定義をしています。

self.sub = self.create_subscription(
    CustomPricing, "custom_pricing", self._listener_callback, qos_profile
)

34~36行目で受信したaskbidの値をログ出力しています。

self.get_logger().info(
    "<受信>現在レート:Ask={:.3f}, Bid={:.3f}".format(msg.ask, msg.bid)
)

送受信ノードを動かしてみる

では、双方のノードを起動してみましょう!

$ ros2 run ros2_example custom_pricing_publisher 
[custom_pricing_publisher]: custom_pricing_publisher start!
[custom_pricing_publisher]: <送信>経過時間:[0.51] 現在レート:Ask=105.748, Bid=103.748
[custom_pricing_publisher]: <送信>経過時間:[1.00] 現在レート:Ask=110.285, Bid=108.285
[custom_pricing_publisher]: <送信>経過時間:[1.50] 現在レート:Ask=114.636, Bid=112.636
[custom_pricing_publisher]: <送信>経過時間:[2.00] 現在レート:Ask=118.636, Bid=116.636
[custom_pricing_publisher]: <送信>経過時間:[2.50] 現在レート:Ask=122.223, Bid=120.223
[custom_pricing_publisher]: <送信>経過時間:[3.00] 現在レート:Ask=125.277, Bid=123.277
[custom_pricing_publisher]: <送信>経過時間:[3.50] 現在レート:Ask=127.731, Bid=125.731
[custom_pricing_publisher]: <送信>経過時間:[4.01] 現在レート:Ask=129.549, Bid=127.549
[custom_pricing_publisher]: <送信>経過時間:[4.50] 現在レート:Ask=130.633, Bid=128.633
[custom_pricing_publisher]: <送信>経過時間:[5.01] 現在レート:Ask=131.000, Bid=129.000
$ ros2 run ros2_example custom_pricing_subscriber 
[custom_pricing_subscriber]: custom_pricing_subscriber start!
[custom_pricing_subscriber]: <受信>現在レート:Ask=105.748, Bid=103.748
[custom_pricing_subscriber]: <受信>現在レート:Ask=110.285, Bid=108.285
[custom_pricing_subscriber]: <受信>現在レート:Ask=114.636, Bid=112.636
[custom_pricing_subscriber]: <受信>現在レート:Ask=118.636, Bid=116.636
[custom_pricing_subscriber]: <受信>現在レート:Ask=122.223, Bid=120.223
[custom_pricing_subscriber]: <受信>現在レート:Ask=125.277, Bid=123.277
[custom_pricing_subscriber]: <受信>現在レート:Ask=127.731, Bid=125.731
[custom_pricing_subscriber]: <受信>現在レート:Ask=129.549, Bid=127.549
[custom_pricing_subscriber]: <受信>現在レート:Ask=130.633, Bid=128.633
[custom_pricing_subscriber]: <受信>現在レート:Ask=131.000, Bid=129.000

ASKBIDの2値がちゃんと送受信できてますね!

rqtを見ても問題なさそうです。

基本的にメッセージ通信は独自型を定義して使用することがほとんどです。

また、メッセージの変数に配列を指定したり、定数を指定することもできます。

ROSアプリケーションを作成する前にメッセージ定義で出来ることを一通り調べておくと良いかもしれませんね!

まとめ

ROSトピック(Topic)

  • 送信側ノード(パブリッシャー)
    • create_publisher()でパブリッシャー定義
      • 第1引数:メッセージ型
      • 第2引数:トピック名
      • 第3引数:QoS(Quality of Service)の設定
    • publish()で送信
  • 受信側ノード(サブスクライバー)
    • create_subscription()でサブスクライバー定義
      • 第1引数:メッセージ型
      • 第2引数:トピック名
      • 第3引数:コールバック関数
      • 第4引数:QoS(Quality of Service)の設定
  • メッセージ型は独自に定義可能
    • ファイル名は.msgで作成する。
    • colconビルド後、各プログラミング言語に対応したアクセスライブラリが自動で生成される。
スポンサーリンク

コメント

タイトルとURLをコピーしました