【ROS×Python】トピック(Topic)通信を実装しよう!

 ※ 本記事は広告・プロモーションを含みます。
Python

それでは、実際にトピックの送受信プログラムを書いて、動作を確認してみましょう!

以降で紹介するサンプルコードは全てROSパッケージ化した状態でGitHubに載せています。

自分の環境で実行して挙動を確認してみたい場合は下記からクローンして使用してください。

本ページで使用するコード

トピックの送受信プログラムを動かしてみよう!

それでは、実際にトピックの送受信プログラムを書いて、動作を確認してみましょう!

送受信になりますので、「送信側ノード(パブリッシャー)」「受信側ノード(サブスクライバー)」を作成していきます。

送信側ノード(パブリッシャー)の作成

まずは送信側ノード(パブリッシャー)を作成していきます。

下記のコードは正弦波で疑似的に生成した為替レートを0.5秒周期でトピック送信する処理内容になります。

import time
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSHistoryPolicy, QoSReliabilityPolicy
from std_msgs.msg import Float32
from ..utils.waveform import SineWave


class SimplePricingPublisher(Node):
    """
    シンプル為替レートトピックPublisherノード
    """

    def __init__(self) -> None:
        """
        ノードの初期化
        """
        super().__init__("simple_pricing_publisher")

        # 為替レートを正弦波で生成
        self._sin_wave = SineWave(30, 0.05, 100)

        qos_profile = QoSProfile(
            history=QoSHistoryPolicy.KEEP_ALL, reliability=QoSReliabilityPolicy.RELIABLE
        )

        # Float32型のsimple_pricingトピックを送信するpublisherの定義
        self.pub = self.create_publisher(Float32, "simple_pricing", qos_profile)

        # 送信周期毎にtimer_callbackを呼び出し(送信周期は0.5秒)
        self.timer = self.create_timer(0.5, self.timer_callback)

        self._start_time = time.time()

    def timer_callback(self) -> None:
        """
        ROSタイマーコールバック
        """
        elapsed_time = time.time() - self._start_time
        # 現在為替レートを取得
        pricing = self._sin_wave.get_value(elapsed_time)

        # simple_pricingトピックにmsgを送信
        msg = Float32()
        msg.data = pricing
        self.pub.publish(msg)

        # 送信msgの中身をログ出力
        self.get_logger().info(
            "<送信>経過時間:[{:.2f}] 現在レート:{:.3f}".format(elapsed_time, msg.data)
        )


def main(args: list[str] | None = None) -> None:
    # ROSの初期化
    rclpy.init(args=args)
    # simple_pricing_publisherノードの作成
    spp = SimplePricingPublisher()
    spp.get_logger().info("simple_pricing_publisher start!")

    try:
        # ノードの実行開始
        rclpy.spin(spp)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass
    else:
        # ROSのシャットダウン
        rclpy.shutdown()
    finally:
        # ノードの破棄
        spp.destroy_node()

トピック送信(パブリッシャー)の定義はcreate_publisher()関数を使用します。

引数には下記を指定します。

  • 第1引数:メッセージ型
  • 第2引数:トピック名
  • 第3引数:QoS(Quality of Service)の設定

QoS(Quality of Service)はネットワーク経由で他のコンピュータにROS通信でデータ送信する際の品質を設定する項目になりますが、この記事では詳細は説明しません。

QoS(Quality of Service)はまた別の機会で紹介記事を書く予定です。

上記コードでは29行目で「Float32メッセージ型」「simple_pricing」という名前を付けて送信するトピックを定義しています。

self.pub = self.create_publisher(Float32, "simple_pricing", qos_profile)

「Float32メッセージ型」はROS2をインストールするとデフォルトで使用できるスタンダート・メッセージとなっており、定義内容は次のようになります。

float32 data

このメッセージをROSタイマーを使用して0.5秒周期でトピック送信します。

送信している行は45~47行目になります。

msg = Float32()
msg.data = pricing
self.pub.publish(msg)

Float32メッセージのインスタンスを作成後に、変数dataに為替レートの値pricingを代入してからpublish()関数の引数に渡して送信となります。

では実際に送信ノードを実行してみましょう!

$ ros2 run ros2_example simple_pricing_publisher
[simple_pricing_publisher]: simple_pricing_publisher start!
[simple_pricing_publisher]: <送信>経過時間:[0.50] 現在レート:104.704
[simple_pricing_publisher]: <送信>経過時間:[1.00] 現在レート:109.280
[simple_pricing_publisher]: <送信>経過時間:[1.50] 現在レート:113.630
[simple_pricing_publisher]: <送信>経過時間:[2.00] 現在レート:117.637
[simple_pricing_publisher]: <送信>経過時間:[2.50] 現在レート:121.220
[simple_pricing_publisher]: <送信>経過時間:[3.00] 現在レート:124.275

上記のように0.5秒間隔で<送信>と表示されていると思いますが、これでは本当にトピックが送信できているかがわかりにくいですよね?

ROSには開発者向けデバッグツールとしてrqtが提供されていますので、このツールを使ってトピックがちゃんと送信されているか確認してみましょう!

rqtはROS2をインストールすると自動でインストールされます。

$ rqt

上記コマンドを実行すると、GUIが立ち上がります。

rqtの使い方については詳細は割愛しますが、トピック送信データを確認するには下記の3つを表示させれば必要十分です。

  • Node Graph
  • Plot画面
  • Topic Monitor

「Node Graph」より、実行中のノード「simple_pricing_publisher」と、送信中のトピック名「simple_pricing」が確認できました。

「Plot画面」より、トピックで送信している為替レートの値が時系列で確認できます。

為替レートは正弦波で疑似的に生成してましたが、グラフを見てもちゃんと正弦波で送信できていることが確認できますね!

「Topic Monitor」より、トピックが2Hz(=0.5秒)で送信できていることが確認できます。

以上のことから、ちゃんとトピックデータが送信できていることが確認できました。

ROSアプリ開発でrqtは使用頻度が非常に高くなると思いますので、ぜひ使いこなせるようになっておきましょう。

rqtが使いこなせるようになると、デバッグ作業が捗ります。

受信側ノード(サブスクライバー)の作成

次は受信側ノード(サブスクライバー)を作成していきます。

下記のコードは「Float32メッセージ型」「simple_pricing」という名前のトピックを受信したときに、受信メッセージの中身をログ出力する処理内容になります。

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSHistoryPolicy, QoSReliabilityPolicy
from std_msgs.msg import Float32


class SimplePricingSubscriber(Node):
    """
    シンプル為替レートトピックSubscriberノード
    """

    def __init__(self) -> None:
        """
        ノードの初期化
        """
        super().__init__("simple_pricing_subscriber")

        qos_profile = QoSProfile(
            history=QoSHistoryPolicy.KEEP_ALL, reliability=QoSReliabilityPolicy.RELIABLE
        )

        # Float32型のsimple_pricingトピックを受信するsubscriptionの定義
        # (listener_callbackは受信毎に呼び出されるコールバック関数)
        self.sub = self.create_subscription(
            Float32, "simple_pricing", self._listener_callback, qos_profile
        )

    def _listener_callback(self, msg: Float32) -> None:
        """
        subscriptionコールバック
        """
        # 受信msgの中身をログ出力
        self.get_logger().info("<受信>現在レート:{:.3f}".format(msg.data))


def main(args: list[str] | None = None) -> None:
    # ROSの初期化
    rclpy.init(args=args)
    # simple_pricing_subscriberノードの作成
    sps = SimplePricingSubscriber()
    sps.get_logger().info("simple_pricing_subscriber start!")

    try:
        # ノードの実行開始
        rclpy.spin(sps)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass
    else:
        # ROSのシャットダウン
        rclpy.shutdown()
    finally:
        # ノードの破棄
        sps.destroy_node()

トピック受信(サブスクライバー)の定義はcreate_subscription()関数を使用します。

引数には下記を指定します。

  • 第1引数:メッセージ型
  • 第2引数:トピック名
  • 第3引数:コールバック関数
  • 第4引数:QoS(Quality of Service)の設定

上記コードでは25~27行目で定義しています。

self.sub = self.create_subscription(
    Float32, "simple_pricing", self._listener_callback, qos_profile
)

ここでは送信側ノードのトピックを受信するため、第1、第2引数は送信側ノードと同じメッセージ型、トピック名を指定します。

  • 第1引数:メッセージ型 → Float32
  • 第2引数:トピック名 → simple_pricing

これで、トピックを受信した際に、第3引数で指定したコールバック関数self._listener_callback()が呼び出されるようになります。

コールバック関数の定義は上記コードの29~34行目で定義しています。

def _listener_callback(self, msg: Float32) -> None:
    """
    subscriptionコールバック
    """
    # 受信msgの中身をログ出力
    self.get_logger().info("<受信>現在レート:{:.3f}".format(msg.data))

サブスクライバーのコールバック関数は引数で受信メッセージが渡されるので、関数定義の際は引数(上記の例だとmsg)を指定しておきましょう。

では実際に受信ノードを実行してみましょう!

$ ros2 run ros2_example simple_pricing_subscriber
[simple_pricing_subscriber]: simple_pricing_subscriber start!

上記のようなメッセージが表示されたら受信ノードの起動は完了です。

Float32型のトピック「simple_pricing」を受信したときに、ちゃんとself._listener_callback()コールバック関数が呼ばれるか確認してみましょう!

再びrqtを立ち上げます。

$ rqt

rqtにはトピック・メッセージを任意に定義して送信する機能「Message Publisher」がありますので、これを使って確認していきます。

上記のようにrqtからトピックにdata = 100を設定して送信したところ、下記のように表示されました。

$ ros2 run ros2_example simple_pricing_subscriber
[simple_pricing_subscriber]: simple_pricing_subscriber start!
[simple_pricing_subscriber]: <受信>現在レート:100.000
[simple_pricing_subscriber]: <受信>現在レート:100.000
[simple_pricing_subscriber]: <受信>現在レート:100.000

「現在レート = 100」で表示されているので、ちゃんとトピックデータが受信できていることが確認できました。

送信側ノードと受信側ノードを連結してみる

ここまで送信側ノード(パブリッシャー)受信側ノード(サブスクライバー)を個別に起動し、rqtを使ってそれぞれ正常に動作していることが確認できましたので、最後は双方のノードを同時に起動し、互いのノード間で送受信が正常に行えるかを確認していきます。

では、双方のノードを起動してみましょう!

$ ros2 run ros2_example simple_pricing_publisher
[simple_pricing_publisher]: simple_pricing_publisher start!
[simple_pricing_publisher]: <送信>経過時間:[0.50] 現在レート:104.710
[simple_pricing_publisher]: <送信>経過時間:[1.00] 現在レート:109.278
[simple_pricing_publisher]: <送信>経過時間:[1.50] 現在レート:113.628
[simple_pricing_publisher]: <送信>経過時間:[2.00] 現在レート:117.644
[simple_pricing_publisher]: <送信>経過時間:[2.50] 現在レート:121.220
[simple_pricing_publisher]: <送信>経過時間:[3.00] 現在レート:124.273
[simple_pricing_publisher]: <送信>経過時間:[3.50] 現在レート:126.735
[simple_pricing_publisher]: <送信>経過時間:[4.00] 現在レート:128.535
[simple_pricing_publisher]: <送信>経過時間:[4.50] 現在レート:129.632
[simple_pricing_publisher]: <送信>経過時間:[5.00] 現在レート:130.000
$ ros2 run ros2_example simple_pricing_subscriber
[simple_pricing_subscriber]: simple_pricing_subscriber start!
[simple_pricing_subscriber]: <受信>現在レート:104.710
[simple_pricing_subscriber]: <受信>現在レート:109.278
[simple_pricing_subscriber]: <受信>現在レート:113.628
[simple_pricing_subscriber]: <受信>現在レート:117.644
[simple_pricing_subscriber]: <受信>現在レート:121.220
[simple_pricing_subscriber]: <受信>現在レート:124.273
[simple_pricing_subscriber]: <受信>現在レート:126.735
[simple_pricing_subscriber]: <受信>現在レート:128.535
[simple_pricing_subscriber]: <受信>現在レート:129.632
[simple_pricing_subscriber]: <受信>現在レート:130.000

送信ノード側現在レート受信ノード側現在レートの値が一致していることがわかると思います。

これで互いのノード間で正常にトピックの送受信を行えることができました。

rqtNode Graphも確認しておきます。

2つのノード間でトピックの送受信が成立しているときは、下図のようなノード・グラフとなります。

独自メッセージ型

ここまで為替レートを送受信するメッセージはスタンダートで定義されているFloat32型を使用してきました。

float32 data

FXをやったことがある人ならわかると思いますが、為替レートにはASKBIDの2種類があります。

もし、ROSでシステムトレードを製作しようとした場合、Float32型では変数dataが1つしか存在しないため、1つの為替レートしか送信することができません。

しかし、ROSではメッセージ型を独自に定義することができるため、メッセージにASKとBIDの2つの変数を定義したメッセージを作成してあげれば、2つの為替レートを1つのメッセージで送信できるようになります。

次のページでは独自メッセージの定義方法と、ASKとBIDの2つの為替レートを送受信するプログラムを書いて、動作を確認していきます。

スポンサーリンク

コメント

タイトルとURLをコピーしました