ROS2是机器人领域的著名SDK,可以加速学习效率。ROS2主要解决了两个问题:
- 通信:基于Node,包含topic、service、action通信方式
- 代码架构:模块化开发,简化维护
一、架构
userland: code
client: rclc rclcpp rclpy etc…
abstract: rcl
middleware: rmw rmw_implementation rmw_fastrtps etc …
dds: fastdds Context_RTI etc…
二、实现
ROS2代码惯例:
./
../
include/... # 头文件
src/... # 实现
1、Node实现
推荐继承Node实现业务功能, 主要选择合理的通信方式,注册回调函数。
namespace palomino
{
class VincentDriver : public rclcpp::Node
{
// ...
};
}
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<palomino::VincentDriver>());
rclcpp::shutdown();
return 0;
}
Node的声明:
// node.hpp
class Node : public std::enable_shared_from_this<Node>
{
public:
explicit Node(
const std::string & node_name,
const NodeOptions & options = NodeOptions());
explicit Node(
const std::string & node_name,
const std::string & namespace_,
const NodeOptions & options = NodeOptions());
virtual ~Node();
// 创建发布者
template<
typename MessageT,
typename AllocatorT = std::allocator<void>,
typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>>
std::shared_ptr<PublisherT>
create_publisher(
const std::string & topic_name,
const rclcpp::QoS & qos,
const PublisherOptionsWithAllocator<AllocatorT> & options =
PublisherOptionsWithAllocator<AllocatorT>()
);
// 创建订阅者
template<
typename MessageT,
typename CallbackT,
typename AllocatorT = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<MessageT, AllocatorT>,
typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType
>
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
const rclcpp::QoS & qos,
CallbackT && callback,
const SubscriptionOptionsWithAllocator<AllocatorT> & options =
SubscriptionOptionsWithAllocator<AllocatorT>(),
typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
MessageMemoryStrategyT::create_default()
)
);
// 创建service的客户端
template<typename ServiceT>
typename rclcpp::Client<ServiceT>::SharedPtr
create_client(
const std::string & service_name,
const rclcpp::QoS & qos = rclcpp::ServicesQoS(),
rclcpp::CallbackGroup::SharedPtr group = nullptr);
// 创建service的服务端
template<typename ServiceT, typename CallbackT>
typename rclcpp::Service<ServiceT>::SharedPtr
create_service(
const std::string & service_name,
CallbackT && callback,
const rclcpp::QoS & qos = rclcpp::ServicesQoS(),
rclcpp::CallbackGroup::SharedPtr group = nullptr);
// 参数相关操作方法
// 其他服务方法
protected:
Node(
const Node & other,
const std::string & sub_namespace);
private:
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_;
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph_;
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging_;
rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers_;
rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr node_topics_;
rclcpp::node_interfaces::NodeServicesInterface::SharedPtr node_services_;
rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock_;
rclcpp::node_interfaces::NodeParametersInterface::SharedPtr node_parameters_;
rclcpp::node_interfaces::NodeTimeSourceInterface::SharedPtr node_time_source_;
rclcpp::node_interfaces::NodeTypeDescriptionsInterface::SharedPtr node_type_descriptions_;
rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr node_waitables_;
const rclcpp::NodeOptions node_options_;
const std::string sub_namespace_;
const std::string effective_namespace_;
class NodeImpl;
// This member is meant to be a place to backport features into stable distributions,
// and new features targeting Rolling should not use this.
// See the comment in node.cpp for more information.
std::shared_ptr<NodeImpl> hidden_impl_{nullptr};
};
2、启动
上面的代码示例中,main函数调用了三个ROS2 C++客户端库的函数:
- init: 初始化通信,设置全局信号处理
- spin: 自旋监听、处理事件
- shutdown: 关闭
1、init
// utilities.cpp
void
init(
int argc,
char const * const * argv,
const InitOptions & init_options,
SignalHandlerOptions signal_handler_options)
{
using rclcpp::contexts::get_global_default_context;
get_global_default_context()->init(argc, argv, init_options);
// Install the signal handlers.
install_signal_handlers(signal_handler_options);
}
调用上下文init方法,进行具体的初始化:
// context.cpp
void
Context::init(
int argc,
char const * const * argv,
const rclcpp::InitOptions & init_options)
{
// 加锁
std::lock_guard<std::recursive_mutex> init_lock(init_mutex_);
if (this->is_valid()) {
throw rclcpp::ContextAlreadyInitialized();
}
this->clean_up();
rcl_context_t * context = new rcl_context_t;
if (!context) {
throw std::runtime_error("failed to allocate memory for rcl context");
}
*context = rcl_get_zero_initialized_context();
// 调用rcl_init
rcl_ret_t ret = rcl_init(argc, argv, init_options.get_rcl_init_options(), context);
if (RCL_RET_OK != ret) {
delete context;
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to initialize rcl");
}
rcl_context_.reset(context, __delete_context);
...
// 设置日志器
// 检测未解析参数
// 内部数据维护
}
Context init又调用了rcl库的init函数:
// rcl/src/rcl/init.c
rcl_ret_t
rcl_init(
int argc,
char const * const * argv,
const rcl_init_options_t * options,
rcl_context_t * context)
{
rcl_ret_t fail_ret = RCL_RET_ERROR;
// 参数解析
// 检测上下文实现
// 初始化rcl上下文
// 获取domain_id
size_t * domain_id = &context->impl->init_options.impl->rmw_init_options.domain_id;
if (RCL_DEFAULT_DOMAIN_ID == *domain_id) {
// Get actual domain id based on environment variable.
ret = rcl_get_default_domain_id(domain_id);
}
// 发现配置
const rmw_discovery_options_t original_discovery_options =
options->impl->rmw_init_options.discovery_options;
rmw_discovery_options_t * discovery_options =
&context->impl->init_options.impl->rmw_init_options.discovery_options;
// 很长的一串代码,发现配置
rmw_security_options_t * security_options =
&context->impl->init_options.impl->rmw_init_options.security_options;
ret = rcl_get_security_options_from_environment(
context->impl->init_options.impl->rmw_init_options.enclave,
&context->impl->allocator,
security_options);
// Initialize rmw_init.
rmw_ret_t rmw_ret = rmw_init(
&(context->impl->init_options.impl->rmw_init_options),
&(context->impl->rmw_context));
TRACETOOLS_TRACEPOINT(rcl_init, (const void *)context);
return RCL_RET_OK;
fail:
__cleanup_context(context);
return fail_ret;
}
rcl init函数又调用了rmw的初始化函数:
// rmw_implementation/src/functions.cpp
rmw_ret_t
rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
{
prefetch_symbols();
if (!symbol_rmw_init) {
symbol_rmw_init = get_symbol("rmw_init");
}
if (!symbol_rmw_init) {
return RMW_RET_ERROR;
}
typedef rmw_ret_t (* FunctionSignature)(const rmw_init_options_t *, rmw_context_t *);
FunctionSignature func = reinterpret_cast<FunctionSignature>(symbol_rmw_init);
return func(options, context);
}
rmw是DDS中间件实现的抽象层,根据环境变量加载DDS实现,fastdds的中间件实现如下:
// rmw_fasttps/rmw_fastrtps_cpp/src/rmw_init.cpp
rmw_ret_t
rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
{
// 参数检查
// domain_id
auto restore_context = rcpputils::make_scope_exit(
[context]() {*context = rmw_get_zero_initialized_context();});
context->instance_id = options->instance_id;
context->implementation_identifier = eprosima_fastrtps_identifier;
context->actual_domain_id =
RMW_DEFAULT_DOMAIN_ID == options->domain_id ? 0uL : options->domain_id;
// 设置上下文实现
context->impl = new (std::nothrow) rmw_context_impl_t();
auto cleanup_impl = rcpputils::make_scope_exit(
[context]() {delete context->impl;});
context->impl->is_shutdown = false;
context->options = rmw_get_zero_initialized_init_options();
rmw_ret_t ret = rmw_init_options_copy(options, &context->options);
cleanup_impl.cancel();
restore_context.cancel();
return RMW_RET_OK;
}
2、spin
spin有两个重载的函数实现:
// rclcpp/src/rclcpp/executors.cpp
// 重载入口函数
void
rclcpp::spin(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
{
rclcpp::ExecutorOptions options;
options.context = node_ptr->get_context();
// 执行器
rclcpp::executors::SingleThreadedExecutor exec(options);
// 添加节点
exec.add_node(node_ptr);
// 自旋,执行逻辑
exec.spin();
// 去除节点
exec.remove_node(node_ptr);
}
// 入口函数
void
rclcpp::spin(rclcpp::Node::SharedPtr node_ptr)
{
rclcpp::spin(node_ptr->get_node_base_interface());
}
SingleThreadedExecutor spin方法的实现:
// rclcpp/src/rclcpp/executors/single_threaded_executor.cpp
void
SingleThreadedExecutor::spin()
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
// Clear any previous result and rebuild the waitset
this->wait_result_.reset();
this->entities_need_rebuild_ = true;
// 单循环中获取可执行的任务并执行
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_executable;
// 获取成功返回
// 否则超时重试一次
/*
内部调用
bool
Executor::get_next_ready_executable(AnyExecutable & any_executable);
该函数依次从timer、subscription、services、clients、waitables、callback_group获取可执行任务
*/
if (get_next_executable(any_executable)) {
// 根据上一步获取的任务类型,通过dispatch分发给回调函数执行
execute_any_executable(any_executable);
}
}
}
3、shutdown
init的逆操作:
// rclcpp/src/rclcpp/utilities.cpp
bool
shutdown(Context::SharedPtr context, const std::string & reason)
{
using rclcpp::contexts::get_global_default_context;
auto default_context = get_global_default_context();
if (nullptr == context) {
context = default_context;
}
bool ret = context->shutdown(reason);
if (context == default_context) {
uninstall_signal_handlers();
}
return ret;
}
3、信号处理
在init和shutdown的过程中都有信号处理的设置:
// init
install_signal_handlers(signal_handler_options);
// shutdown
uninstall_signal_handlers();
ROS2使用一个单独的线程处理上下文的shutdown,保证安全退出:
bool
SignalHandler::install(SignalHandlerOptions signal_handler_options)
{
// 只保留了核心函数
// 加锁,保证线程安全
// 原子操作
...
setup_wait_for_signal();
signal_received_.store(false);
SignalHandler::signal_handler_type handler_argument;
handler_argument = &this->signal_handler;
// 注册信号处理函数
old_sigint_handler_ = set_signal_handler(SIGINT, handler_argument);
old_sigterm_handler_ = set_signal_handler(SIGTERM, handler_argument);
// 新线程,检测signal_received_是否被设置
// 被设置后获取所有contexts,调用shutdown
signal_handler_thread_ = std::thread(&SignalHandler::deferred_signal_handler, this);
return true;
}
内部实现使用sigaction或signal信号处理机制,sigaction是 POSIX 标准推荐的信号处理方式,支持更精细的控制(如阻塞其他信号、获取信号上下文信息等)。