【ROS×Python】並行処理(マルチスレッド)を実装しよう!

 ※ 本記事は広告・プロモーションを含みます。
Python

たっきん( X (旧Twitter) )です!

今日はROSで「並行処理(マルチスレッド)」を行う方法について紹介したいと思います。

並行処理(マルチスレッド)は文字通り処理を並行して実行することであり、この機能を使うことでリアルタイム性を向上することができるようになります。

この機能はPythonのthreadingモジュールを使用しても実現はできるのですが、ROSにも標準で並行処理(マルチスレッド)機能が実装されています。

ROSは基本的にコールバック処理を主体としたイベント駆動型プログラミングになるのですが、このイベント駆動に調和する形でマルチスレッド機能がROSモジュール化されているため、ROSで並行処理(マルチスレッド)を実装するならthreadingモジュールではなく、ROS標準機能のマルチスレッドを使用した方が良いです。

マルチスレッドはトレースが難解になりやすく、プログラミング初心者にとってはハードルが高い分野になってきますが、リアルタイムプログラミングを実践する上で避けては通れない道となりますので、頑張って使いこなせるようになりましょう!

マルチスレッドを使わないとどうなるのか?

まずはリアルタイム性が要求されるシステムについて、マルチスレッドではなくシングルスレッドで実行した場合、どうなってしまうのかを簡単に説明します。

ROSはコールバック処理を主体としたイベント駆動型プログラミングであることは先ほど説明しましたが、実行タイミングによっては、このコールバックのイベント処理が重複してしまうことがあります。

例えば、下図のように「タイマーCallbackイベント」「発注Callbackイベント」の2つのイベント処理が定期的に実行される場合、発注イベントが重複しない場合は即座に発注処理が実行されますが、発注イベントが重複した場合は実行中のイベント処理が終了した後に発注処理が実行されるため、発注実行までにタイムラグが発生してしまいます。

サンプルコードを使って上記の挙動を実際に確認してみましょう!

発注リクエストを送信する側(クライアント)

次のコードはROSサービスで発注リクエストを送信する側(クライアント)のノードになります。

発注リクエストは5秒周期で送信するようになってます。

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.parameter import Parameter
from ros2_example_msgs.srv import OrderCreate
import datetime
import time


class OrderClient(Node):
    """
    発注サービスClientノード

    """

    def __init__(self) -> None:
        """
        ノードの初期化
        """
        super().__init__("order_client")

        # ロガー取得
        self.logger = self.get_logger()

        # パラメータの定義
        self.declare_parameter("instrument", Parameter.Type.STRING)
        self.declare_parameter("units", Parameter.Type.INTEGER)

        # order_createサービスclientの定義
        self.cli = self.create_client(
            OrderCreate,
            "order_create",
        )

        # 開始時刻を取得
        self._start_time = datetime.datetime.now()

        # サービスServerが有効になるまで待機
        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.logger.info("サービスServerが有効になるまで待機中・・・")

        time.sleep(1.0)  # 1.0秒停止

        # 5.0秒周期で実行されるROSタイマーの定義
        self.timer = self.create_timer(5.0, self._timer_callback)

    def _timer_callback(self) -> None:
        """
        timerコールバック
        """
        # order_createサービスの引数
        req = OrderCreate.Request()
        req.instrument = self.get_parameter("instrument").value
        req.units = self.get_parameter("units").value

        # 開始経過時刻を取得
        elps = datetime.datetime.now() - self._start_time
        self.logger.info(
            "[{}]<CLT>発注リクエスト送信(通貨ペア[{}] 数量[{}])".format(
                elps, req.instrument, req.units
            )
        )

        # サービスの非同期実行
        future = self.cli.call_async(req)


def main(args: list[str] | None = None) -> None:
    # ROSの初期化
    rclpy.init(args=args)
    # order_clientノードの作成
    oc = OrderClient()
    oc.get_logger().info("order_client start!")
    try:
        # ノードの実行開始
        rclpy.spin(oc)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass
    else:
        # ROSのシャットダウン
        rclpy.shutdown()
    finally:
        # ノードの破棄
        oc.destroy_node()

発注リクエストを受信する側(サーバー)<シングルスレッド>

次のコードはROSサービスの発注リクエストを受信する側(サーバー)のノードになります。

「タイマーCallbackイベント」「発注Callbackイベント」の2つのイベント処理をシングルスレッドで実装しています。

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from ros2_example_msgs.srv import OrderCreate
import datetime
import time


class OrderServer(Node):
    """
    発注サービスServerノード

    """

    def __init__(self) -> None:
        """
        ノードの初期化
        """
        super().__init__("order_server")

        # ロガー取得
        self.logger = self.get_logger()

        # order_createサービスserverの定義
        self.srv = self.create_service(
            OrderCreate,
            "order_create",
            self._order_create,
        )

        # 開始時刻を取得
        self._start_time = datetime.datetime.now()

        # 5.0秒周期で実行されるROSタイマーの定義
        self.timer = self.create_timer(5.0, self._timer_callback)

    def _timer_callback(self) -> None:
        """
        timerコールバック
        """
        # 開始経過時刻を取得
        elps = datetime.datetime.now() - self._start_time
        self.logger.info("[{}]<SVR>timer_callback:開始".format(elps))

        time.sleep(3.0)  # 3.0秒停止

        # 終了経過時間を取得
        elps = datetime.datetime.now() - self._start_time
        self.logger.info("[{}]<SVR>timer_callback:終了".format(elps))

    def _order_create(self, req, rsp):
        """
        order_createサービスコールバック
        """
        # 開始経過時刻を取得
        elps = datetime.datetime.now() - self._start_time
        self.logger.info(
            "[{}]<SVR>発注リクエスト受信(通貨ペア[{}] 数量[{}])".format(
                elps, req.instrument, req.units
            )
        )

        time.sleep(1.0)  # 1.0秒停止

        # 終了経過時間を取得
        elps = datetime.datetime.now() - self._start_time
        self.logger.info(
            "[{}]<SVR>発注レスポンス送信(通貨ペア[{}] 数量[{}])".format(
                elps, req.instrument, req.units
            )
        )

        rsp.result = True
        return rsp


def main(args: list[str] | None = None) -> None:
    # ROSの初期化
    rclpy.init(args=args)
    # order_serverノードの作成
    os = OrderServer()
    os.get_logger().info("order_server start!")
    try:
        # ノードの実行開始
        rclpy.spin(os)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass
    else:
        # ROSのシャットダウン
        rclpy.shutdown()
    finally:
        # ノードの破棄
        os.destroy_node()

以下、「タイマーCallbackイベント」実装箇所です。

ROSタイマーを使って5秒周期でコールバック処理が呼び出されるように実装しています。

実行時間は3秒に設定しています。

# 5.0秒周期で実行されるROSタイマーの定義
self.timer = self.create_timer(5.0, self._timer_callback)
def _timer_callback(self) -> None:
    """
    timerコールバック
    """
    # 開始経過時刻を取得
    elps = datetime.datetime.now() - self._start_time
    self.logger.info("[{}]<SVR>timer_callback:開始".format(elps))

    time.sleep(3.0)  # 3.0秒停止

    # 終了経過時間を取得
    elps = datetime.datetime.now() - self._start_time
    self.logger.info("[{}]<SVR>timer_callback:終了".format(elps))

以下、「発注Callbackイベント」実装箇所です。

クライアント側からの発注リクエストを受信するとコールバック処理が実行されます。

# order_createサービスserverの定義
self.srv = self.create_service(
    OrderCreate,
    "order_create",
    self._order_create,
)
def _order_create(self, req, rsp):
    """
    order_createサービスコールバック
    """
    # 開始経過時刻を取得
    elps = datetime.datetime.now() - self._start_time
    self.logger.info(
        "[{}]<SVR>発注リクエスト受信(通貨ペア[{}] 数量[{}])".format(
            elps, req.instrument, req.units
        )
    )

    time.sleep(1.0)  # 1.0秒停止

    # 終了経過時間を取得
    elps = datetime.datetime.now() - self._start_time
    self.logger.info(
        "[{}]<SVR>発注レスポンス送信(通貨ペア[{}] 数量[{}])".format(
            elps, req.instrument, req.units
        )
    )

    rsp.result = True
    return rsp

Launchファイル

上記2つのクライアント、サーバーノードを実行するためのLaunchファイルになります。

from launch.launch_description import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node


def generate_launch_description() -> LaunchDescription:
    # Launch引数名の定義
    VN_INSTRUMENT = "instrument"
    VN_UNITS = "units"

    # Launch引数の値を受け取るための変数定義
    lc_instrument = LaunchConfiguration(VN_INSTRUMENT)
    lc_units = LaunchConfiguration(VN_UNITS)

    # Launch引数の宣言
    #   ※"default_value"を設定した場合、外部から呼び出されたときに引数が指定されていないと
    #    "default_value"の値が初期値として設定される
    #   ※"default_value"を設定しない場合、外部から呼び出されたときに引数が指定されていないと
    #    エラーとなり、処理を停止する。
    declare_instrument_usdjpy_cmd = DeclareLaunchArgument(
        VN_INSTRUMENT,
        default_value="USD/JPY",
        description="通貨ペア",
    )
    declare_units_cmd = DeclareLaunchArgument(
        VN_UNITS,
        default_value="100",
        description="発注数量",
    )

    # ノード・アクションの定義
    node_act_os = Node(
        package="ros2_example",
        executable="order_server_st",
        name=None,
        namespace=None,
        parameters=None,
        output="screen",
    )
    node_act_oc = Node(
        package="ros2_example",
        executable="order_client",
        name=None,
        namespace=None,
        exec_name="order_client_uj",
        parameters=[{"instrument": lc_instrument, "units": lc_units}],
        output="screen",
    )

    # LaunchDescriptionオブジェクトの生成
    ld = LaunchDescription()

    # Launch引数アクションの追加
    ld.add_action(declare_instrument_usdjpy_cmd)
    ld.add_action(declare_units_cmd)

    # ノード・アクションの追加
    ld.add_action(node_act_os)
    ld.add_action(node_act_oc)

    return ld

実行してみる

以下、実行結果です。

$ ros2 launch ros2_example order_singlethread_launch.py
[order_server_st-1] [INFO][order]: order_server start!
[order_client_uj-2] [INFO][order]: order_client start!
[order_server_st-1] [INFO][order]: [0:00:05.000702]<SVR>timer_callback:開始
[order_client_uj-2] [INFO][order]: [0:00:06.002584]<CLT>発注リクエスト送信(通貨ペア[USD/JPY] 数量[100])
[order_server_st-1] [INFO][order]: [0:00:08.005699]<SVR>timer_callback:終了
[order_server_st-1] [INFO][order]: [0:00:08.009491]<SVR>発注リクエスト受信(通貨ペア[USD/JPY] 数量[100])
[order_server_st-1] [INFO][order]: [0:00:09.013245]<SVR>発注レスポンス送信(通貨ペア[USD/JPY] 数量[100])

実行してから6秒後にクライアントから発注リクエストを送信していますが、サーバー側で受信処理が実行されたのは8秒後となりました。

タイミングチャートとしては下記のようになり、発注リクエストを受信してから実際に発注処理が開始されるまで2秒のタイムラグが発生しています。

システムトレードのようにリアルタイム性が要求されるアプリケーションの場合、このタイムラグは致命的な欠陥になる場合があります。

このタイムラグを解決する手段としてマルチスレッドを使うのが最適でしょう!

マルチスレッドでコードを書き直してみる

では、先ほどのシングルスレッド動作のコードをマルチスレッド動作に変更したコードで挙動の変化を確認していきましょう!

マルチスレッドで実装すれば並行処理が可能になるため、発注イベントが重複しても即座に発注処理が実行されるはずです。

発注リクエストを受信する側(サーバー)<マルチスレッド>

変更箇所はハイライトされている行になります。

import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.node import Node
from ros2_example_msgs.srv import OrderCreate
import datetime
import time


class OrderServer(Node):
    """
    発注サービスServerノード

    """

    def __init__(self) -> None:
        """
        ノードの初期化
        """
        super().__init__("order_server")

        # ロガー取得
        self.logger = self.get_logger()

        # コールバック・グループ①(発注サービス用)
        cb_grp_or = MutuallyExclusiveCallbackGroup()
        # コールバック・グループ②(タイマー用)
        cb_grp_tm = MutuallyExclusiveCallbackGroup()

        # order_createサービスserverの定義
        self.srv = self.create_service(
            OrderCreate, "order_create", self._order_create, callback_group=cb_grp_or
        )

        # 開始時刻を取得
        self._start_time = datetime.datetime.now()

        # 5.0秒周期で実行されるROSタイマーの定義
        self.timer = self.create_timer(5.0, self._timer_callback, cb_grp_tm)

    def _timer_callback(self) -> None:
        """
        timerコールバック
        """
        # 開始経過時刻を取得
        elps = datetime.datetime.now() - self._start_time
        self.logger.info("[{}]<SVR>timer_callback:開始".format(elps))

        time.sleep(3.0)  # 3.0秒停止

        # 終了経過時間を取得
        elps = datetime.datetime.now() - self._start_time
        self.logger.info("[{}]<SVR>timer_callback:終了".format(elps))

    def _order_create(self, req, rsp):
        """
        order_createサービスコールバック
        """
        # 開始経過時刻を取得
        elps = datetime.datetime.now() - self._start_time
        self.logger.info(
            "[{}]<SVR>発注リクエスト受信(通貨ペア[{}] 数量[{}])".format(
                elps, req.instrument, req.units
            )
        )

        time.sleep(1.0)  # 1.0秒停止

        # 終了経過時間を取得
        elps = datetime.datetime.now() - self._start_time
        self.logger.info(
            "[{}]<SVR>発注レスポンス送信(通貨ペア[{}] 数量[{}])".format(
                elps, req.instrument, req.units
            )
        )

        rsp.result = True
        return rsp


def main(args: list[str] | None = None) -> None:
    # ROSの初期化
    rclpy.init(args=args)
    # マルチスレッドExecutorを生成
    executor = MultiThreadedExecutor()
    # order_serverノードの作成
    os = OrderServer()
    os.get_logger().info("order_server start!")
    try:
        # ノードの実行開始
        rclpy.spin(os, executor)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass
    else:
        # ROSのシャットダウン
        rclpy.shutdown()
    finally:
        # ノードの破棄
        os.destroy_node()

Launchファイル

Launchファイルは下記のようになります。

from launch.launch_description import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node


def generate_launch_description() -> LaunchDescription:
    # Launch引数名の定義
    VN_INSTRUMENT = "instrument"
    VN_UNITS = "units"

    # Launch引数の値を受け取るための変数定義
    lc_instrument = LaunchConfiguration(VN_INSTRUMENT)
    lc_units = LaunchConfiguration(VN_UNITS)

    # Launch引数の宣言
    #   ※"default_value"を設定した場合、外部から呼び出されたときに引数が指定されていないと
    #    "default_value"の値が初期値として設定される
    #   ※"default_value"を設定しない場合、外部から呼び出されたときに引数が指定されていないと
    #    エラーとなり、処理を停止する。
    declare_instrument_usdjpy_cmd = DeclareLaunchArgument(
        VN_INSTRUMENT,
        default_value="USD/JPY",
        description="通貨ペア",
    )
    declare_units_cmd = DeclareLaunchArgument(
        VN_UNITS,
        default_value="100",
        description="発注数量",
    )

    # ノード・アクションの定義
    node_act_os = Node(
        package="ros2_example",
        executable="order_server_mt",
        name=None,
        namespace=None,
        parameters=None,
        output="screen",
    )
    node_act_oc = Node(
        package="ros2_example",
        executable="order_client",
        name=None,
        namespace=None,
        exec_name="order_client_uj",
        parameters=[{"instrument": lc_instrument, "units": lc_units}],
        output="screen",
    )

    # LaunchDescriptionオブジェクトの生成
    ld = LaunchDescription()

    # Launch引数アクションの追加
    ld.add_action(declare_instrument_usdjpy_cmd)
    ld.add_action(declare_units_cmd)

    # ノード・アクションの追加
    ld.add_action(node_act_os)
    ld.add_action(node_act_oc)

    return ld

実行してみる

以下、実行結果です

$ ros2 launch ros2_example order_multithread_launch.py
[order_server_mt-1] [INFO][order]: order_server start!
[order_client_uj-2] [INFO][order]: order_client start!
[order_server_mt-1] [INFO][order]: [0:00:05.001574]<SVR>timer_callback:開始
[order_client_uj-2] [INFO][order]: [0:00:06.002757]<CLT>発注リクエスト送信(通貨ペア[USD/JPY] 数量[100])
[order_server_mt-1] [INFO][order]: [0:00:06.006588]<SVR>発注リクエスト受信(通貨ペア[USD/JPY] 数量[100])
[order_server_mt-1] [INFO][order]: [0:00:07.011495]<SVR>発注レスポンス送信(通貨ペア[USD/JPY] 数量[100])
[order_server_mt-1] [INFO][order]: [0:00:08.006895]<SVR>timer_callback:終了

タイマーCallbackイベント処理中に発注イベントを発生させたとしても、タイマーCallback処理に阻害されることなく狙ったタイミングで発注処理が行えていることがわかると思います。

このようにマルチスレッドを使用することで並行処理によるリアルタイム処理が可能になることはわかってもらえたでしょうか?

マルチスレッドを使用する利点は大きいですが、反面、非スレッド・セーフなリソースにアクセスする場合は細心の注意を払う必要があります。

後述するコールバック・グループの設定を上手く使い分けてスレッド・セーフなプログラムになるよう心がけましょう!

それでは、次のページからROSマルチスレッドのしくみについて、説明していきます。

スポンサーリンク

コメント

タイトルとURLをコピーしました