本ページではROSマルチスレッドのしくみについて、説明していきます。
ROSマルチスレッドのしくみ
ROSのマルチスレッドを理解する上でまずはExecutor(エグゼキュータ)とCallback groups(コールバック・グループ)の概念を理解する必要があります。
Executor(エグゼキュータ)
ROSはイベント駆動型プログラミングのため、基本的にコールバック関数による割り込みでイベント処理がされていきます。
ROSのコールバックには以下のような種類があります。
このコールバック関数の実行管理を担っているのが、Executorになります。
Executorには下記の2種類があり、マルチ・スレッドで実行したい場合はMultiThreadedExecutorを選択する必要があります。
- SingleThreadedExecutor(デフォルト)
- 単一スレッドでコールバック処理を一度に1つずつ実行する。
- 新しいコールバック処理が実行を開始するときに前のコールバック処理が実行中の場合は、実行完了後に新しいコールバック処理が実行開始される。
- MultiThreadedExecutor
- 複数スレッドでコールバック処理を同時に実行できる。
- コールバック・グループの設定で動作の異なる2種類の同時処理実行方法を設定できる。
MultiThreadedExecutorを選択しただけでは並行処理は実行されません。
ここからさらにコールバック・グループの設定を行うことで初めて並行処理が実行可能になります。
rclpy.spin()
を呼び出すときに、第2引数であるexecutor
の指定を省略した場合はデフォルトのExecutorが設定されます。
デフォルトはSingleThreadedExecutor
です。
# The global spin functions need an executor to do the work
# A persistent executor can re-run async tasks that yielded in rclpy.spin*().
__executor = None
def get_global_executor() -> 'Executor':
global __executor
if __executor is None:
# imported locally to avoid loading extensions on module import
from rclpy.executors import SingleThreadedExecutor
__executor = SingleThreadedExecutor()
context = get_default_context()
def reset_executor():
global __executor
__executor.shutdown()
__executor = None
context.on_shutdown(reset_executor)
return __executor
def spin(node: 'Node', executor: 'Executor' = None) -> None:
"""
Execute work and block until the context associated with the executor is shutdown.
Callbacks will be executed by the provided executor.
This function blocks.
:param node: A node to add to the executor to check for work.
:param executor: The executor to use, or the global executor if ``None``.
"""
executor = get_global_executor() if executor is None else executor
try:
executor.add_node(node)
while executor.context.ok():
executor.spin_once()
finally:
executor.remove_node(node)
Callback groups(コールバック・グループ)
MultiThreadedExecutorと組み合わせて同時処理の実行方法をコールバック・グループで設定します。
同時処理実行方法として、ROSでは次の2種類が提供されています。
- Mutually Exclusive Callback Group(デフォルト)
- グループ内のコールバック処理を一度に1つずつ実行する(並行して実行されない)。
- つまり、グループ内のコールバック処理がSingleThreadedExecutorによって実行されたときと同じ挙動となる。
- Reentrant Callback Group
- 文字通り、Reentrant(再入可能)であるため、グループ内のコールバック処理も並行して実行される。
異なるコールバック・グループに属するコールバック処理は、互いに阻害されることなく、常に並行して実行される。
Mutually Exclusive Callback Groupは並行して実行されることがないため、非スレッド・セーフなリソースにアクセスするコールバック処理は、全て同一のMutually Exclusive Callback Groupに設定するのが好ましいです。
一方、Reentrant Callback Groupは全て並行して実行されてしまうので、使いどころの判断が難しそうですね~
まさにその通りで、Reentrant Callback Groupは全て並行処理されてしまうため、コールバック処理内でアクセスしているリソースがスレッド・セーフであるか否かをよく確認して使用するようにしましょう!
引数でコールバック・グループが設定可能なメソッド(create_service()
、create_timer()
など)でコールバック・グループの引数設定を省略した場合、デフォルトのコールバック・グループが設定されます。
デフォルトはMutuallyExclusiveCallbackGroup
です。
class Node:
"""
A Node in the ROS graph.
A Node is the primary entrypoint in a ROS system for communication.
It can be used to create ROS entities such as publishers, subscribers, services, etc.
"""
PARAM_REL_TOL = 1e-6
"""
Relative tolerance for floating point parameter values' comparison.
See `math.isclose` documentation.
"""
def __init__(
self,
node_name: str,
*,
context: Optional[Context] = None,
cli_args: Optional[List[str]] = None,
namespace: Optional[str] = None,
use_global_arguments: bool = True,
enable_rosout: bool = True,
start_parameter_services: bool = True,
parameter_overrides: Optional[List[Parameter]] = None,
allow_undeclared_parameters: bool = False,
automatically_declare_parameters_from_overrides: bool = False,
enable_logger_service: bool = False
) -> None:
"""
Create a Node.
:param node_name: A name to give to this node. Validated by :func:`validate_node_name`.
:param context: The context to be associated with, or ``None`` for the default global
context.
:param cli_args: A list of strings of command line args to be used only by this node.
These arguments are used to extract remappings used by the node and other ROS specific
settings, as well as user defined non-ROS arguments.
:param namespace: The namespace to which relative topic and service names will be prefixed.
Validated by :func:`validate_namespace`.
:param use_global_arguments: ``False`` if the node should ignore process-wide command line
args.
:param enable_rosout: ``False`` if the node should ignore rosout logging.
:param start_parameter_services: ``False`` if the node should not create parameter
services.
:param parameter_overrides: A list of overrides for initial values for parameters declared
on the node.
:param allow_undeclared_parameters: True if undeclared parameters are allowed.
This flag affects the behavior of parameter-related operations.
:param automatically_declare_parameters_from_overrides: If True, the "parameter overrides"
will be used to implicitly declare parameters on the node during creation.
:param enable_logger_service: ``True`` if ROS2 services are created to allow external nodes
to get and set logger levels of this node. Otherwise, logger levels are only managed
locally. That is, logger levels cannot be changed remotely.
"""
self.__handle = None
self._context = get_default_context() if context is None else context
self._parameters: dict = {}
self._publishers: List[Publisher] = []
self._subscriptions: List[Subscription] = []
self._clients: List[Client] = []
self._services: List[Service] = []
self._timers: List[Timer] = []
self._guards: List[GuardCondition] = []
self.__waitables: List[Waitable] = []
self._default_callback_group = MutuallyExclusiveCallbackGroup()
self._pre_set_parameters_callbacks: List[Callable[[List[Parameter]], List[Parameter]]] = []
self._on_set_parameters_callbacks: \
List[Callable[[List[Parameter]], SetParametersResult]] = []
self._post_set_parameters_callbacks: List[Callable[[List[Parameter]], None]] = []
self._rate_group = ReentrantCallbackGroup()
self._allow_undeclared_parameters = allow_undeclared_parameters
self._parameter_overrides = {}
self._descriptors = {}
コールバック・グループの挙動の違いを確認してみる
それでは、コールバック・グループのMutually ExclusiveとReentrantの挙動の違いを確認していきましょう!
まずは2つのコールバック・グループ①、②を設定します。
そして、各コールバック・グループにはには、下記のようにタイマーCallback①、②、③を設定します。
- コールバック・グループ①
- タイマーCallback①、タイマーCallback②を設定(実行時間に1sラップあり)
- コールバック・グループ②
- タイマーCallback③を設定(実行時間に1sラップあり)
それでは、上記の条件でそれぞれ、コードを書いて実行してみましょう!
以降で紹介するサンプルコードは全てROSパッケージ化した状態でGitHubに載せています。
自分の環境で実行して挙動を確認してみたい場合は下記からクローンして使用してください。
本ページで使用するコード
Mutually Exclusiveのコードと実行結果
コードは下記となります。
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.node import Node
import datetime
import time
class TimerCallbackGroupTest(Node):
"""
タイマー・コールバックグループ・テストノード
(MutuallyExclusiveCallbackGroup)
"""
def __init__(self) -> None:
"""
ノードの初期化
"""
super().__init__("timer_callbackgroup_test")
# ロガー取得
self.logger = self.get_logger()
# コールバック・グループ01
cb_grp_01 = MutuallyExclusiveCallbackGroup()
# コールバック・グループ02
cb_grp_02 = MutuallyExclusiveCallbackGroup()
# タイマー01の定義
self.timer01 = self.create_timer(10.0, self._timer_callback_01, cb_grp_01)
time.sleep(1.0) # 1.0秒停止
# タイマー02の定義
self.timer02 = self.create_timer(10.0, self._timer_callback_02, cb_grp_01)
time.sleep(3.0) # 3.0秒停止
self.timer03 = self.create_timer(5.0, self._timer_callback_03, cb_grp_02)
# 開始時刻を取得
self._start_time = datetime.datetime.now()
def _timer_callback_01(self) -> None:
"""
timerコールバック01
"""
# 開始経過時刻を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<01>timer_callback:開始".format(elps))
time.sleep(2.0) # 2.0秒停止
# 終了経過時間を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<01>timer_callback:終了".format(elps))
def _timer_callback_02(self) -> None:
"""
timerコールバック02
"""
# 開始経過時刻を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<02>timer_callback:開始".format(elps))
time.sleep(2.0) # 2.0秒停止
# 終了経過時間を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<02>timer_callback:終了".format(elps))
def _timer_callback_03(self) -> None:
"""
timerコールバック03
"""
# 開始経過時刻を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<03>timer_callback:開始".format(elps))
time.sleep(6.0) # 6.0秒停止
# 終了経過時間を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<03>timer_callback:終了".format(elps))
def main(args: list[str] | None = None) -> None:
# ROSの初期化
rclpy.init(args=args)
# マルチスレッドExecutorを生成
executor = MultiThreadedExecutor()
# timer_callbackgroup_testノードの作成
tcgt = TimerCallbackGroupTest()
tcgt.get_logger().info("timer_callbackgroup_test start!")
try:
# ノードの実行開始
rclpy.spin(tcgt, executor)
except (KeyboardInterrupt, ExternalShutdownException):
pass
else:
# ROSのシャットダウン
rclpy.shutdown()
finally:
# ノードの破棄
tcgt.destroy_node()
26~29行目でコールバック・グループをMutually Exclusiveに設定しています。
# コールバック・グループ01
cb_grp_01 = MutuallyExclusiveCallbackGroup()
# コールバック・グループ02
cb_grp_02 = MutuallyExclusiveCallbackGroup()
以下、実行結果です。
$ ros2 run ros2_example timer_cbg_mutually_exclusive
[INFO][timer_callbackgroup_test]: order_server start!
[INFO][timer_callbackgroup_test]: [0:00:05.002341]<03>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:06.017345]<01>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:08.003856]<01>timer_callback:終了
[INFO][timer_callbackgroup_test]: [0:00:08.007077]<02>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:10.009405]<02>timer_callback:終了
[INFO][timer_callbackgroup_test]: [0:00:11.009280]<03>timer_callback:終了
[INFO][timer_callbackgroup_test]: [0:00:11.011711]<03>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:16.016655]<01>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:17.018077]<03>timer_callback:終了
[INFO][timer_callbackgroup_test]: [0:00:17.028020]<03>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:18.001231]<01>timer_callback:終了
[INFO][timer_callbackgroup_test]: [0:00:18.010971]<02>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:20.016284]<02>timer_callback:終了
Mutually Exclusiveは同ーのコールバック・グループに属するコールバック処理は並行して実行されません。
そのため、ラップしていた1sのタイムラグが発生して下図のタイミングチャートのように実行されます。
Reentrantのコードと実行結果
コードは下記となります。
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node
import datetime
import time
class TimerCallbackGroupTest(Node):
"""
タイマー・コールバックグループ・テストノード
(ReentrantCallbackGroup)
"""
def __init__(self) -> None:
"""
ノードの初期化
"""
super().__init__("timer_callbackgroup_test")
# ロガー取得
self.logger = self.get_logger()
# コールバック・グループ01
cb_grp_01 = ReentrantCallbackGroup()
# コールバック・グループ02
cb_grp_02 = ReentrantCallbackGroup()
# タイマー01の定義
self.timer01 = self.create_timer(10.0, self._timer_callback_01, cb_grp_01)
time.sleep(1.0) # 1.0秒停止
# タイマー02の定義
self.timer02 = self.create_timer(10.0, self._timer_callback_02, cb_grp_01)
time.sleep(3.0) # 3.0秒停止
self.timer03 = self.create_timer(5.0, self._timer_callback_03, cb_grp_02)
# 開始時刻を取得
self._start_time = datetime.datetime.now()
def _timer_callback_01(self) -> None:
"""
timerコールバック01
"""
# 開始経過時刻を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<01>timer_callback:開始".format(elps))
time.sleep(2.0) # 2.0秒停止
# 終了経過時間を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<01>timer_callback:終了".format(elps))
def _timer_callback_02(self) -> None:
"""
timerコールバック02
"""
# 開始経過時刻を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<02>timer_callback:開始".format(elps))
time.sleep(2.0) # 2.0秒停止
# 終了経過時間を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<02>timer_callback:終了".format(elps))
def _timer_callback_03(self) -> None:
"""
timerコールバック03
"""
# 開始経過時刻を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<03>timer_callback:開始".format(elps))
time.sleep(6.0) # 6.0秒停止
# 終了経過時間を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<03>timer_callback:終了".format(elps))
def main(args: list[str] | None = None) -> None:
# ROSの初期化
rclpy.init(args=args)
# マルチスレッドExecutorを生成
executor = MultiThreadedExecutor()
# timer_callbackgroup_testノードの作成
tcgt = TimerCallbackGroupTest()
tcgt.get_logger().info("timer_callbackgroup_test start!")
try:
# ノードの実行開始
rclpy.spin(tcgt, executor)
except (KeyboardInterrupt, ExternalShutdownException):
pass
else:
# ROSのシャットダウン
rclpy.shutdown()
finally:
# ノードの破棄
tcgt.destroy_node()
26~29行目でコールバック・グループをReentrantに設定しています。
# コールバック・グループ01
cb_grp_01 = ReentrantCallbackGroup()
# コールバック・グループ02
cb_grp_02 = ReentrantCallbackGroup()
以下、実行結果です。
$ ros2 run ros2_example timer_cbg_reentrant
[INFO][timer_callbackgroup_test]: timer_callbackgroup_test start!
[INFO][timer_callbackgroup_test]: [0:00:05.000746]<03>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:06.019213]<01>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:07.000313]<02>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:08.003798]<01>timer_callback:終了
[INFO][timer_callbackgroup_test]: [0:00:09.004936]<02>timer_callback:終了
[INFO][timer_callbackgroup_test]: [0:00:10.001023]<03>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:11.004389]<03>timer_callback:終了
[INFO][timer_callbackgroup_test]: [0:00:15.001372]<03>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:16.017866]<01>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:16.009200]<03>timer_callback:終了
[INFO][timer_callbackgroup_test]: [0:00:17.018796]<02>timer_callback:開始
[INFO][timer_callbackgroup_test]: [0:00:18.005943]<01>timer_callback:終了
[INFO][timer_callbackgroup_test]: [0:00:19.005161]<02>timer_callback:終了
Reentrantは再入可能のため、同ーのコールバック・グループに属するコールバック処理は全て並行して実行されます。
そのため、下図のタイミングチャートのように実行されます。
まとめ
ROS並行処理(マルチスレッド)
- Executor(エグゼキュータ)
- SingleThreadedExecutor
- 単一スレッドでコールバック処理を一度に1つずつ実行する。
- 新しいコールバック処理が実行を開始するときに前のコールバック処理が実行中の場合は、実行完了後に新しいコールバック処理が実行開始される。
- MultiThreadedExecutor
- 複数スレッドでコールバック処理を同時に実行できる。
- コールバック・グループの設定で動作の異なる2種類の同時処理実行方法を設定できる。
- SingleThreadedExecutor
- Callback groups(コールバック・グループ)
- Mutually Exclusive Callback Group
- グループ内のコールバック処理を一度に1つずつ実行する(並行して実行されない)。
- つまり、グループ内のコールバック処理がSingleThreadedExecutorによって実行されたときと同じ挙動となる。
- Reentrant Callback Group
- 文字通り、Reentrant(再入可能)であるため、グループ内のコールバック処理も並行して実行される。
- Mutually Exclusive Callback Group
コメント