【シストレ開発#2】ROS2の独自メッセージ型を作成してみた!

ROSがそれなりに使えるようになってきました。たっきん(Twitter)です!

ROSを使ったシストレ開発ブログ第2弾です。

今回の記事はブログというより、ROSの使い方の備忘録的な感じで書いていこうと思います。

ちなみに、ROSには旧バージョンのROS1次世代バージョンのROS2があるらしく、現在僕が使っているROSは次世代バージョンのROS2となります。

ROS2を使い始めてから2ヵ月くらい経ち、お試しノードを作成したり、通信のサンプルを動かしてみたりしてROSの要領を掴んできました。

そして今回、初めてトピック通信の「独自メッセージ型」を定義して実際に動かすことに成功しました!

独自メッセージ型の作成手順は一通り覚えたものの、日が経つと忘れてしまいそうな感じがしたので、今回はROS2の「独自メッセージ型」の作成方法についての備忘録を書いていきます。

Sponsored Link

ROS2のメッセージ通信

独自メッセージ型の作成手順を説明する前に、まずはROS2の通信手法について簡単に理解しておきましょう!

ROS2のノード間の通信は基本的にメッセージの送受信で行われます。

そして、このメッセージの通信方式には以下の2種類があります。

  • トピック(TOPIC)通信
  • サービス(SERVICE)通信

トピック(TOPIC)通信

トピック通信を形式的に説明するなら「一方通行の非同期通信」になりますが、通信方式に詳しくない人からするとイメージが付きにくいと思います。

なので、より簡単に説明するなら「送信ノードが受信ノードに対し、一方的にメッセージを投げつける」イメージになります。

ちなみに、トピックを送信する側のノードをPublisher(バブリッシャ)、受信する側のノードをSubscriber(サブスクライバ)と言います。

ちなみに、トピック通信は1対多の通信が可能なため、1つのPublisherに対し、複数のSubscriberにメッセージを投げつけることができます。

サービス(SERVICE)通信

トピック通信はメッセージを一方的に投げつける通信でした。

しかし、時には投げつけたメッセージに対し、応答が必要な場合もあると思います。

例えば、加算処理。

送信元ノードに2つの変数a、bがあり、この2つ変数を加算処理を持つノードへ送信したとします。

加算処理を持つノードは受け取った2つの変数a、bを加算しましたが、計算結果を送信元へ送ってあげなければ送信元ノードは結果を得ることができません。

このような場合にサービス通信が役に立ちます。

サービス通信の場合、メッセージを投げつける側のノード(サービスを受ける側のノード)をClient、受け取ったメッセージに対し、結果を送り返す側のノード(サービスを提供する側のノード)をServerと言います。

Clientノードはメッセージに2つの変数a=2、b=3を載せてServerへRequestメッセージを送信します。

受け取ったServerノードはa+bを計算し、計算結果ans=5としてReplyメッセージをClientノードへ送り返します。

トピック通信と違い、サービス通信は送信元ノードと受信元ノードとの1対1での通信となります。

メッセージの型

トピックにもサービスにもメッセージには型があり、決められた型でしか送受信することができません。

ROS2には、スタンダードで定義されている型があり、以下のような組み込み型に対応した型が存在します。

ROSメッセージ型C++Python 3
Int8 int8_t int
UInt8 uint8_t int
Int16 int16_t int
UInt16 uint16_t int
Int32 int32_t int
UInt32 uint32_t int
Int64 int64_t int
UInt64 uint64_t int
Float32 float float
Float64 double float
String std::string str
Bool uint8_t bool

上記は一例で、その他にもスタンダードで使えるROSメッセージ型がありますのでもっと知りたい方は以下のリンクを参照してください。

http://wiki.ros.org/std_msgs

基本的にはスタンダードで定義されている型を使うのが好ましいらしいですが、場合によっては型を自分で定義したい場合もあります。

そのような場合は自分で型を定義することができます。

実際に「独自メッセージ型」を作成してみる

情報のピックアップ

OANDA APIで提供されている「PricingStream」では以下の為替情報をOANDAサーバーから取得することができます。

  • type
  • time
  • bids:price
  • bids:liquidity
  • asks:price
  • asks:liquidity
  • closeoutBid
  • closeoutAsk
  • status
  • tradeable
  • instrument

各パラメータの意味はdeveloper.oanda.comに載っています。

上記パラメータの中で実際にシストレ開発で必要になりそうな情報をピックアップすると以下のようになりました。

  • time:データを取得した時間
  • closeoutBid :売り決済価格
  • closeoutAsk:買い 決済価格
  • tradeable:トレード可能/不可能フラグ
  • instrument:通貨ペア

よって今回は上記5つの情報を盛り込んだ独自メッセージ型を定義していきます。

独自メッセージ専用パッケージの作成

pythonを使ったROS2のパッケージではCMakeLists.txtが生成されないため、別途メッセージ定義用ROS2パッケージを作成する必要があります。

今回はoanda_api_msgsというパッケージ名で作成していきます。

$ ros2 pkg create --build-type ament_cmake oanda_api_msgs

独自メッセージ型の定義ファイルはmsgディレクトリの下に作成していきます。

トピックの独自メッセージ型の拡張子は.msgとするようです。

今回はPricing.msgというファイル名で作成していきます。

# Pricing Definitions of OANDA-API.
# Reference:
#    https://developer.oanda.com/rest-live-v20/pricing-df/
#    https://oanda-api-v20.readthedocs.io/en/latest/endpoints/pricing/pricingstream.html

# The date/time when the Price was created
string time

# The Price’s Instrument.
string instrument

# The closeout bid Price.
float32 closeout_bid

# The closeout ask Price.
float32 closeout_ask

# Flag indicating if the Price is tradeable or not
bool tradeable

ここまでで、ファイルツリー構造は以下のようになっているはずです。

oanda_api_msgs
  ├── msg
  │   └── Pricing.msg
  ├── CMakeLists.txt
  └── package.xml

Pricing.msgを作成しただけでは独自メッセージ型を使うことができません。
package.xmlとCMakeLists.txtも修正する必要があります。

<package.xml>

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>oanda_api_msgs</name>
  <version>0.1.0</version>
  <description>
    oanda_api msg package.
  </description>
  <maintainer email="takkin.takilog@gmail.com">takkin</maintainer>
  <license>Apache License 2.0</license>

  <buildtool_depend>ament_cmake</buildtool_depend>
  <buildtool_depend>rosidl_default_generators</buildtool_depend>

  <exec_depend>std_msgs</exec_depend>
  <exec_depend>action_msgs</exec_depend>
  <exec_depend>rosidl_default_runtime</exec_depend>

  <member_of_group>rosidl_interface_packages</member_of_group>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

ポイントとなるのは以下の3行を追加することですね。

13:  <buildtool_depend>rosidl_default_generators</buildtool_depend>

17:  <exec_depend>rosidl_default_runtime</exec_depend>

19:  <member_of_group>rosidl_interface_packages</member_of_group>

CMakeLists.txtは以下のようになります。

<CMakeLists.txt>

cmake_minimum_required(VERSION 3.5)
project(oanda_api_msgs)

set(CMAKE_CXX_STANDARD 14)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)
find_package(std_msgs REQUIRED)
find_package(action_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/Pricing.msg"
  DEPENDENCIES std_msgs action_msgs
)

ament_export_dependencies(rosidl_default_runtime)

ament_package()

以上で独自メッセージ専用パッケージの作成が完了しました。

Subscriberの作成

次は作成した独自メッセージ型のSubscriberを作成していきましょう!

今回は前回の記事で作成したoanda_apiパッケージを修正して盛り込んでいきます。

ガチでシストレ開発始めます!たっきん(Twitter)です! 今まではOANDA APIを使って窓埋めや仲値&ゴトー日の解析を行ってきてチャ...

実際に修正したコードの全容は以下のようになります。

import rclpy
from rclpy.node import Node
from std_msgs.msg import Bool
from oanda_api_msgs.msg import Pricing
from oandapyV20 import API
from oandapyV20.endpoints.pricing import PricingStream
from oandapyV20.exceptions import V20Error, StreamTerminated


class StreamApi(Node):

    def __init__(self):
        super().__init__("stream_api")

        # Set logger lebel
        self.__logger = super().get_logger()
        self.__logger.set_level(rclpy.logging.LoggingSeverity.DEBUG)

        PRMNM_ACCOUNT_NUMBER = "account_number"
        PRMNM_ACCESS_TOKEN = "access_token"
        PRMNM_INSTRUMENTS = "instruments"
        TPCNM_PRICING = "pricing_"
        TPCNM_ACT_FLG = "activate_flag"

        # Declare parameter
        self.declare_parameter(PRMNM_ACCOUNT_NUMBER)
        self.declare_parameter(PRMNM_ACCESS_TOKEN)
        self.declare_parameter(PRMNM_INSTRUMENTS)

        # initialize
        self.__act_flg = True

        account_number = self.get_parameter(PRMNM_ACCOUNT_NUMBER).value
        access_token = self.get_parameter(PRMNM_ACCESS_TOKEN).value
        instrumentslist = self.get_parameter(PRMNM_INSTRUMENTS).value
        self.__logger.debug("[OANDA]Account Number:%s" % account_number)
        self.__logger.debug("[OANDA]Access Token:%s" % access_token)
        self.__logger.debug("[OANDA]Instruments:%s" % instrumentslist)

        self.__api = API(access_token=access_token)
        instruments = ",".join(instrumentslist)
        params = {"instruments": instruments}
        self.__ps = PricingStream(account_number, params)

        # Declare publisher and subscriber
        self.__pub_dict = {}
        for instrument in instrumentslist:
            suffix = instrument.replace("_", "").lower()
            pub = self.create_publisher(Pricing, TPCNM_PRICING + suffix)
            self.__pub_dict[instrument] = pub.publish

        self.__sub_act = self.create_subscription(Bool, TPCNM_ACT_FLG,
                                                  self.__act_flg_callback)

    def background(self):
        if self.__act_flg:
            self.__request()

    def __act_flg_callback(self, msg):
        if msg.data:
            self.__act_flg = True
        else:
            self.__act_flg = False

    def __request(self):
        TIME = "time"
        INSTRUMENT = "instrument"
        BID = "closeoutBid"
        ASK = "closeoutAsk"
        TRADEABLE = "tradeable"

        try:
            for rsp in self.__api.request(self.__ps):

                rclpy.spin_once(self, timeout_sec=0)
                if not self.__act_flg:
                    self.__logger.debug("terminate PricingStream")
                    self.__ps.terminate()

                if TRADEABLE in rsp.keys():
                    msg = Pricing()
                    msg.time = rsp[TIME]
                    msg.instrument = rsp[INSTRUMENT]
                    msg.closeout_bid = float(rsp[BID])
                    msg.closeout_ask = float(rsp[ASK])
                    msg.tradeable = rsp[TRADEABLE]

                    # Publish topics
                    self.__pub_dict[msg.instrument](msg)

        except V20Error as e:
            print("Error: {}".format(e))
        except StreamTerminated as e:
            print("StreamTerminated: {}".format(e))

    def __handler(self, func, msg):
        func(msg)


def main(args=None):
    rclpy.init(args=args)
    stream_api = StreamApi()
    try:
        while True:
            rclpy.spin_once(stream_api, timeout_sec=1.0)
            stream_api.background()
    except KeyboardInterrupt:
        stream_api.destroy_node()
        rclpy.shutdown()

実際にメッセージを作成しているのは81~86行目になります。

 4:    from oanda_api_msgs.msg import Pricing

81:    msg = Pricing()
82:    msg.time = rsp[TIME]
83:    msg.instrument = rsp[INSTRUMENT]
84:    msg.closeout_bid = float(rsp[BID])
85:    msg.closeout_ask = float(rsp[ASK])
86:    msg.tradeable = rsp[TRADEABLE]

また上記のコードを実行するにはパラメータの設定が必要になります。

以下のパラメータファイルを用意しましょう!

account_numberaccess_tokenは自身で取得した値に置き換えてください。

stream_api:
    ros__parameters:
        account_number: "999-999-9999999-999"
        access_token: "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz-zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"
        instruments: ["USD_JPY", "EUR_JPY", "EUR_USD"]

動作を確認してみる

では実際に今回作成した独自メッセージ:Pricing型を動かしてみましょう!

実行コマンドは以下です。

$ ros2 run oanda_api price_stream_publisher __params:=ファイルの絶対パス/oanda_params.yaml 

実行するとノード:stream_apiが起動し、以下のようにドル円ユーロ円ユーロドルの3通貨のメッセージが送信されているはずです。

トピックモニタで値を確認してみても、ちゃんと値が取得できているのがわかります。

さいごに

今回はROSの独自メッセージ型を定義し、実際に動かすまでの手順をまとめてみました。

自分自身でメッセージ型を定義できるとROS通信の拡張性が増して、シストレ開発の効率が上がりそうな感じがしています。

ROSを使っていて今のところかゆいところに手が届かないようなことはないので、このままROSを使って開発を進めていこうと思います。

また何か進捗があったら、ブログなり備忘録なりを書いていこうと思っています。

また、今回作成したROSパッケージ:oanda_apioanda_api_msgsはGitHubにアップしておきました。

Sponsored Link