ROS2是机器人领域的著名SDK,可以加速学习效率。ROS2主要解决了两个问题:

  • 通信:基于Node,包含topic、service、action通信方式
  • 代码架构:模块化开发,简化维护

一、架构

userland: code

client: rclc rclcpp rclpy etc…

abstract: rcl

middleware: rmw rmw_implementation rmw_fastrtps etc …

dds: fastdds Context_RTI etc…

二、实现

ROS2代码惯例:

./
../
include/... # 头文件
src/...     # 实现

1、Node实现

推荐继承Node实现业务功能, 主要选择合理的通信方式,注册回调函数。

namespace palomino
{
    class VincentDriver : public rclcpp::Node
    {
        // ...
    };
}

int main(int argc, char * argv[])
{
    rclcpp::init(argc, argv);
    rclcpp::spin(std::make_shared<palomino::VincentDriver>());
    rclcpp::shutdown();
    return 0;
}

Node的声明:


// node.hpp

class Node : public std::enable_shared_from_this<Node>
{
public:
  explicit Node(
    const std::string & node_name,
    const NodeOptions & options = NodeOptions());

  explicit Node(
    const std::string & node_name,
    const std::string & namespace_,
    const NodeOptions & options = NodeOptions());

  virtual ~Node();

  // 创建发布者
  template<
    typename MessageT,
    typename AllocatorT = std::allocator<void>,
    typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>>
  std::shared_ptr<PublisherT>
  create_publisher(
    const std::string & topic_name,
    const rclcpp::QoS & qos,
    const PublisherOptionsWithAllocator<AllocatorT> & options =
    PublisherOptionsWithAllocator<AllocatorT>()
  );
  
  // 创建订阅者
  template<
    typename MessageT,
    typename CallbackT,
    typename AllocatorT = std::allocator<void>,
    typename SubscriptionT = rclcpp::Subscription<MessageT, AllocatorT>,
    typename MessageMemoryStrategyT = typename SubscriptionT::MessageMemoryStrategyType
  >
  std::shared_ptr<SubscriptionT>
  create_subscription(
    const std::string & topic_name,
    const rclcpp::QoS & qos,
    CallbackT && callback,
    const SubscriptionOptionsWithAllocator<AllocatorT> & options =
    SubscriptionOptionsWithAllocator<AllocatorT>(),
    typename MessageMemoryStrategyT::SharedPtr msg_mem_strat = (
      MessageMemoryStrategyT::create_default()
    )
  );

  // 创建service的客户端
  template<typename ServiceT>
  typename rclcpp::Client<ServiceT>::SharedPtr
  create_client(
    const std::string & service_name,
    const rclcpp::QoS & qos = rclcpp::ServicesQoS(),
    rclcpp::CallbackGroup::SharedPtr group = nullptr);

  // 创建service的服务端
  template<typename ServiceT, typename CallbackT>
  typename rclcpp::Service<ServiceT>::SharedPtr
  create_service(
    const std::string & service_name,
    CallbackT && callback,
    const rclcpp::QoS & qos = rclcpp::ServicesQoS(),
    rclcpp::CallbackGroup::SharedPtr group = nullptr);

  // 参数相关操作方法
  // 其他服务方法

protected:
  Node(
    const Node & other,
    const std::string & sub_namespace);

private:

  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_;
  rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph_;
  rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging_;
  rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers_;
  rclcpp::node_interfaces::NodeTopicsInterface::SharedPtr node_topics_;
  rclcpp::node_interfaces::NodeServicesInterface::SharedPtr node_services_;
  rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock_;
  rclcpp::node_interfaces::NodeParametersInterface::SharedPtr node_parameters_;
  rclcpp::node_interfaces::NodeTimeSourceInterface::SharedPtr node_time_source_;
  rclcpp::node_interfaces::NodeTypeDescriptionsInterface::SharedPtr node_type_descriptions_;
  rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr node_waitables_;

  const rclcpp::NodeOptions node_options_;
  const std::string sub_namespace_;
  const std::string effective_namespace_;

  class NodeImpl;
  // This member is meant to be a place to backport features into stable distributions,
  // and new features targeting Rolling should not use this.
  // See the comment in node.cpp for more information.
  std::shared_ptr<NodeImpl> hidden_impl_{nullptr};
};

2、启动

上面的代码示例中,main函数调用了三个ROS2 C++客户端库的函数:

  • init: 初始化通信,设置全局信号处理
  • spin: 自旋监听、处理事件
  • shutdown: 关闭

1、init


// utilities.cpp

void
init(
  int argc,
  char const * const * argv,
  const InitOptions & init_options,
  SignalHandlerOptions signal_handler_options)
{
  using rclcpp::contexts::get_global_default_context;
  get_global_default_context()->init(argc, argv, init_options);
  // Install the signal handlers.
  install_signal_handlers(signal_handler_options);
}

调用上下文init方法,进行具体的初始化:


// context.cpp

void
Context::init(
  int argc,
  char const * const * argv,
  const rclcpp::InitOptions & init_options)
{
  // 加锁
  std::lock_guard<std::recursive_mutex> init_lock(init_mutex_);
  if (this->is_valid()) {
    throw rclcpp::ContextAlreadyInitialized();
  }
  this->clean_up();
  rcl_context_t * context = new rcl_context_t;
  if (!context) {
    throw std::runtime_error("failed to allocate memory for rcl context");
  }
  *context = rcl_get_zero_initialized_context();

  // 调用rcl_init
  rcl_ret_t ret = rcl_init(argc, argv, init_options.get_rcl_init_options(), context);
  if (RCL_RET_OK != ret) {
    delete context;
    rclcpp::exceptions::throw_from_rcl_error(ret, "failed to initialize rcl");
  }
  rcl_context_.reset(context, __delete_context);
  
  ...
  // 设置日志器
  // 检测未解析参数
  // 内部数据维护
}

Context init又调用了rcl库的init函数:

// rcl/src/rcl/init.c 

rcl_ret_t
rcl_init(
  int argc,
  char const * const * argv,
  const rcl_init_options_t * options,
  rcl_context_t * context)
{
  rcl_ret_t fail_ret = RCL_RET_ERROR;

  // 参数解析
  // 检测上下文实现
  // 初始化rcl上下文
  // 获取domain_id
  size_t * domain_id = &context->impl->init_options.impl->rmw_init_options.domain_id;
  if (RCL_DEFAULT_DOMAIN_ID == *domain_id) {
    // Get actual domain id based on environment variable.
    ret = rcl_get_default_domain_id(domain_id);
  }
  
  // 发现配置
  const rmw_discovery_options_t original_discovery_options =
    options->impl->rmw_init_options.discovery_options;
  rmw_discovery_options_t * discovery_options =
    &context->impl->init_options.impl->rmw_init_options.discovery_options;

  // 很长的一串代码,发现配置

  rmw_security_options_t * security_options =
    &context->impl->init_options.impl->rmw_init_options.security_options;
  ret = rcl_get_security_options_from_environment(
    context->impl->init_options.impl->rmw_init_options.enclave,
    &context->impl->allocator,
    security_options);

  // Initialize rmw_init.
  rmw_ret_t rmw_ret = rmw_init(
    &(context->impl->init_options.impl->rmw_init_options),
    &(context->impl->rmw_context));

  TRACETOOLS_TRACEPOINT(rcl_init, (const void *)context);

  return RCL_RET_OK;
fail:
  __cleanup_context(context);
  return fail_ret;
}

rcl init函数又调用了rmw的初始化函数:

// rmw_implementation/src/functions.cpp 

rmw_ret_t
rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
{
  prefetch_symbols();
  if (!symbol_rmw_init) {
    symbol_rmw_init = get_symbol("rmw_init");
  }
  if (!symbol_rmw_init) {
    return RMW_RET_ERROR;
  }

  typedef rmw_ret_t (* FunctionSignature)(const rmw_init_options_t *, rmw_context_t *);
  FunctionSignature func = reinterpret_cast<FunctionSignature>(symbol_rmw_init);
  return func(options, context);
}

rmw是DDS中间件实现的抽象层,根据环境变量加载DDS实现,fastdds的中间件实现如下:

// rmw_fasttps/rmw_fastrtps_cpp/src/rmw_init.cpp

rmw_ret_t
rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
{
  // 参数检查
  
  // domain_id
  auto restore_context = rcpputils::make_scope_exit(
    [context]() {*context = rmw_get_zero_initialized_context();});

  context->instance_id = options->instance_id;
  context->implementation_identifier = eprosima_fastrtps_identifier;
  context->actual_domain_id =
    RMW_DEFAULT_DOMAIN_ID == options->domain_id ? 0uL : options->domain_id;

  // 设置上下文实现
  context->impl = new (std::nothrow) rmw_context_impl_t();

  auto cleanup_impl = rcpputils::make_scope_exit(
    [context]() {delete context->impl;});

  context->impl->is_shutdown = false;
  context->options = rmw_get_zero_initialized_init_options();
  rmw_ret_t ret = rmw_init_options_copy(options, &context->options);

  cleanup_impl.cancel();
  restore_context.cancel();
  return RMW_RET_OK;
}

2、spin

spin有两个重载的函数实现:


// rclcpp/src/rclcpp/executors.cpp
// 重载入口函数
void
rclcpp::spin(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
{
  rclcpp::ExecutorOptions options;
  options.context = node_ptr->get_context();

  // 执行器
  rclcpp::executors::SingleThreadedExecutor exec(options);

  // 添加节点
  exec.add_node(node_ptr);
  // 自旋,执行逻辑
  exec.spin();
  // 去除节点
  exec.remove_node(node_ptr);
}

// 入口函数
void
rclcpp::spin(rclcpp::Node::SharedPtr node_ptr)
{
  rclcpp::spin(node_ptr->get_node_base_interface());
}

SingleThreadedExecutor spin方法的实现:


// rclcpp/src/rclcpp/executors/single_threaded_executor.cpp

void
SingleThreadedExecutor::spin()
{
  if (spinning.exchange(true)) {
    throw std::runtime_error("spin() called while already spinning");
  }
  RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););

  // Clear any previous result and rebuild the waitset
  this->wait_result_.reset();
  this->entities_need_rebuild_ = true;

  // 单循环中获取可执行的任务并执行
  while (rclcpp::ok(this->context_) && spinning.load()) {
    rclcpp::AnyExecutable any_executable;

    // 获取成功返回
    // 否则超时重试一次

    /*
    内部调用
    bool
    Executor::get_next_ready_executable(AnyExecutable & any_executable);

    该函数依次从timer、subscription、services、clients、waitables、callback_group获取可执行任务
    */
    if (get_next_executable(any_executable)) {
      // 根据上一步获取的任务类型,通过dispatch分发给回调函数执行
      execute_any_executable(any_executable);
    }
  }
}

3、shutdown

init的逆操作:


// rclcpp/src/rclcpp/utilities.cpp

bool
shutdown(Context::SharedPtr context, const std::string & reason)
{
  using rclcpp::contexts::get_global_default_context;
  auto default_context = get_global_default_context();
  if (nullptr == context) {
    context = default_context;
  }
  bool ret = context->shutdown(reason);
  if (context == default_context) {
    uninstall_signal_handlers();
  }
  return ret;
}

3、信号处理

在init和shutdown的过程中都有信号处理的设置:

  // init
  install_signal_handlers(signal_handler_options);

  // shutdown
  uninstall_signal_handlers();

ROS2使用一个单独的线程处理上下文的shutdown,保证安全退出:

bool
SignalHandler::install(SignalHandlerOptions signal_handler_options)
{
  // 只保留了核心函数
  // 加锁,保证线程安全
  // 原子操作

  ...

  setup_wait_for_signal();
  signal_received_.store(false);

  SignalHandler::signal_handler_type handler_argument;
  handler_argument = &this->signal_handler;

  // 注册信号处理函数
  old_sigint_handler_ = set_signal_handler(SIGINT, handler_argument);
  old_sigterm_handler_ = set_signal_handler(SIGTERM, handler_argument);

  // 新线程,检测signal_received_是否被设置
  // 被设置后获取所有contexts,调用shutdown
  signal_handler_thread_ = std::thread(&SignalHandler::deferred_signal_handler, this);
  return true;
}

内部实现使用sigaction或signal信号处理机制,sigaction是 POSIX 标准推荐的信号处理方式,支持更精细的控制(如阻塞其他信号、获取信号上下文信息等)。