개발하는 핑구
article thumbnail

Create Custom ROS2 Interfaces (Msg and Srv)

ROS2 can communicate with interfaces (msg or srv).

How is it possible that you can use those interfaces directly in your code?

Let’s say you create a message definition inside a package. when you use the colcon build, the message will be passed by the build system. the source code will be generated for this message in any ros2 surpported language C++, Python, etc…

So that’s why you can directly include the message header in you C++ code, because the build system is generated this header file.

Create and build Custom Msg

cd ros2_ws/src
ros2 pkg create my_robot_interfaces
cd my_robot_interfaces
rm -rf include/
rm -r src/
mkdir msg
  • package.xml

  • CMakeLists.txt

  • HardwareStatus.msg
cd msg
touch HardwareStatus.msg

HardwareStatus.msg

int64 temperature
bool are_motors_ready
string debug_message
  • build
cd ~/ros2_ws
colcon build --packages-select my_robot_interfaces
  • where they are installed
cd ~/ros2_ws/install/my_robot_interfaces/lib/python3.8/site-packages/my_robot_interfaces/msg
ls
# _hardware_status.py  _hardware_status_s.c  __init__.py

cd ~/ros2_ws/install/my_robot_interfaces/include/my_robot_interfaces/msg
ls
"
detail
hardware_status.h
hardware_status.hpp
rosidl_generator_c__visibility_control.h
rosidl_typesupport_fastrtps_cpp__visibility_control.h
rosidl_typesupport_fastrtps_c__visibility_control.h
rosidl_typesupport_introspection_c__visibility_control.h
"
  • show interfaces
ros2 interface show my_robot_interfaces/msg/HardwareStatus 
"
int64 temperature
bool are_motors_ready
string debug_message
"

Use your Custom Msg in a Node

In Python

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node

from my_robot_interfaces.msg import HardwareStatus
 
class HardwareStatusPublisherNode(Node):
    def __init__(self):
        super().__init__("hardware_status_publisher")
        self.hw_status_publisher_ = self.create_publisher(HardwareStatus, "hardware_status", 10)
        self.timer_ = self.create_timer(1.0, self.publish_hw_status)
        self.get_logger().info("Hardware status publisher has been started.")

    def publish_hw_status(self):
        msg = HardwareStatus()
        msg.temperature = 45
        msg.are_motors_ready = True
        msg.debug_message = "Nothing special here"
        self.hw_status_publisher_.publish(msg)

 
 
def main(args=None):
    rclpy.init(args=args)
    node = HardwareStatusPublisherNode()
    rclpy.spin(node)
    rclpy.shutdown()
 
 
if __name__ == "__main__":
    main()

if you can’t import my_robot_interfaces

  • Ctrl + , and click “Edit in settings.json

  • add lines in extraPaths

In C++

#include "rclcpp/rclcpp.hpp"
#include "my_robot_interfaces/msg/hardware_status.hpp"

class HardWareStatusPublisher : public rclcpp::Node
{
public:
    HardWareStatusPublisher() : Node("hw_status_publisher")
    {
        pub_ = this->create_publisher<my_robot_interfaces::msg::HardwareStatus>(
            "hardware_status", 10);
        timer_ = this->create_wall_timer(
            std::chrono::seconds(1),
            std::bind(&HardWareStatusPublisher::publishNumber, this));
        RCLCPP_INFO(this->get_logger(), "Hardware status publisher has been started.");
    }

private:
    void publishNumber()
    {
        auto msg = my_robot_interfaces::msg::HardwareStatus();
        msg.temperature = 57;
        msg.are_motors_ready = false;
        msg.debug_message = "Motors are too hot!";
        pub_->publish(msg);
    }

    rclcpp::Publisher<my_robot_interfaces::msg::HardwareStatus>::SharedPtr pub_;
    rclcpp::TimerBase::SharedPtr timer_;
};

int main(int argc, char **argv)
{
    rclcpp::init(argc, argv);
    auto node = std::make_shared<HardWareStatusPublisher>();
    rclcpp::spin(node);
    rclcpp::shutdown();
    return 0;
}
  • if you can’t include my_robot_interfaces, add line path in ~/ros2_ws/src/.vscode

  • and to use the interfaces, add line depend in package.xml, CMakeLists.txt

Create and Build Custom Srv

cd ~/ros2_ws/src/my_robot_interfaces/
mkdir srv
cd srv
touch ComputeRectangleArea.srv

ComputeRectangleArea.srv

float64 length
float64 width
---
float64 area

Add line in CMakeLists.txt

Debug Msg and Srv with ROS2 tools

show interface list

ros2 interface list
"
Messages:
    action_msgs/msg/GoalInfo
    action_msgs/msg/GoalStatus
    action_msgs/msg/GoalStatusArray
    actionlib_msgs/msg/GoalID
    actionlib_msgs/msg/GoalStatus
    actionlib_msgs/msg/GoalStatusArray
    builtin_interfaces/msg/Duration
    builtin_interfaces/msg/Time
    diagnostic_msgs/msg/DiagnosticArray
    diagnostic_msgs/msg/DiagnosticStatus
    diagnostic_msgs/msg/KeyValue
    example_interfaces/msg/Bool
    example_interfaces/msg/Byte
		...
"
ros2 interface package sensor_msgs
"
sensor_msgs/msg/NavSatFix
sensor_msgs/msg/RegionOfInterest
sensor_msgs/msg/BatteryState
sensor_msgs/msg/MagneticField
sensor_msgs/msg/JoyFeedback
sensor_msgs/msg/LaserScan
sensor_msgs/msg/JoyFeedbackArray
...
"

find interface from node

ros2 node list
"
/hardware_status_publisher
/add_two_ints_server
"

ros2 node info /hardware_status_publisher
"
/hardware_status_publisher
  Subscribers:

  Publishers:
    **/hardware_status: my_robot_interfaces/msg/HardwareStatus <-- that's a topic**
    /parameter_events: rcl_interfaces/msg/ParameterEvent
    /rosout: rcl_interfaces/msg/Log
  Service Servers:
    /hardware_status_publisher/describe_parameters: rcl_interfaces/srv/DescribeParameters
    /hardware_status_publisher/get_parameter_types: rcl_interfaces/srv/GetParameterTypes
    /hardware_status_publisher/get_parameters: rcl_interfaces/srv/GetParameters
    /hardware_status_publisher/list_parameters: rcl_interfaces/srv/ListParameters
    /hardware_status_publisher/set_parameters: rcl_interfaces/srv/SetParameters
    /hardware_status_publisher/set_parameters_atomically: rcl_interfaces/srv/SetParametersAtomically
  Service Clients:

  Action Servers:

  Action Clients:
"

we can know it is a publisher and publishing topic. So let’s find the topic’s info

ros2 topic list
"
/hardware_status
/parameter_events
/rosout
"

ros2 topic info /hardware_status
"
Type: my_robot_interfaces/msg/HardwareStatus
Publisher count: 1
Subscription count: 0
"

ros2 interface show my_robot_interfaces/msg/HardwareStatus 
"
int64 temperature
bool are_motors_ready
string debug_message
"

And what is /add_two_ints_server?

ros2 node info /add_two_ints_server 
"
/add_two_ints_server
  Subscribers:

  Publishers:
    /parameter_events: rcl_interfaces/msg/ParameterEvent
    /rosout: rcl_interfaces/msg/Log
  Service Servers:
    **/add_two_ints: example_interfaces/srv/AddTwoInts**
    /add_two_ints_server/describe_parameters: rcl_interfaces/srv/DescribeParameters
    /add_two_ints_server/get_parameter_types: rcl_interfaces/srv/GetParameterTypes
    /add_two_ints_server/get_parameters: rcl_interfaces/srv/GetParameters
    /add_two_ints_server/list_parameters: rcl_interfaces/srv/ListParameters
    /add_two_ints_server/set_parameters: rcl_interfaces/srv/SetParameters
    /add_two_ints_server/set_parameters_atomically: rcl_interfaces/srv/SetParametersAtomically
  Service Clients:

  Action Servers:

  Action Clients:
"

we can know it is server and providing service. So let’s find the service’s info

ros2 service list
"
/add_two_ints
/add_two_ints_server/describe_parameters
/add_two_ints_server/get_parameter_types
/add_two_ints_server/get_parameters
/add_two_ints_server/list_parameters
/add_two_ints_server/set_parameters
/add_two_ints_server/set_parameters_atomically
/hardware_status_publisher/describe_parameters
/hardware_status_publisher/get_parameter_types
/hardware_status_publisher/get_parameters
/hardware_status_publisher/list_parameters
/hardware_status_publisher/set_parameters
/hardware_status_publisher/set_parameters_atomically
"

ros2 service type /add_two_ints
# example_interfaces/srv/AddTwoInts

ros2 interface show example_interfaces/srv/AddTwoInts
"
int64 a
int64 b
---
int64 sum
"

Activity

In this activity you will implement the battery + led panel example that we used to understand Services in the previous section. When the battery is empty we power on a LED, and when the battery is full we power it off.

So, here’s the initial state:

Blue boxes are for nodes, orange for services, and green for topics.

You can simply represent the state of the battery by a “battery_state” variable inside the battery node, and the LED panel by an integer array, inside the LED panel node.

At first, the battery is full, and all the LEDs are powered off ([0, 0, 0]).

Now, you will fake the battery state evolution. Let’s say that the battery is empty after 4 seconds, and then after 6 more seconds it’s full again. And so on.

When the battery is empty, the battery node will send a request to the LED panel, to power on one LED.

And, 6 seconds later, when the battery is full again, it will send another request to power off the LED.

And you can continue looping between those 2 states indefinitely (until you press CTRL+C).

You will have to create:

  • 1 node for the battery
  • 1 node for the LED panel
  • A custom msg definition for the “led_states” topic
  • A custom srv definition for the “set_led” service

This activity really sums up everything you’ve seen before. If you skipped some of the previous activities, don’t skip that one!

Of course there is not only one solution. You may succeed with a different code that the one I will provide.

And, as usual, I’ll see you in the next lecture for the solution. For more clarity and to help you navigate easily between the steps, I will separate the solution in 3 lectures.

Solution

LedStateArray.msg

int64[] led_states

SetLed.srv

int64 led_number
int64 state
---
bool success

led_panel.py (server/publisher)

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
 
from my_robot_interfaces.msg import LedStateArray
from my_robot_interfaces.srv import SetLed
 
class LedPanelNode(Node):
    def __init__(self):
        super().__init__("led_panel")
        self.led_states_ = [0, 0, 0]
        self.led_states_publisher_ = self.create_publisher(
            LedStateArray, "led_states", 10)
        self.led_states_timer_ = self.create_timer(4, self.publish_led_states)  
        self.led_service_ = self.create_service(
            SetLed, "set_led", self.callback_set_led)

        self.get_logger().info("LED panel has been started.")

    def publish_led_states(self):
        msg = LedStateArray()
        msg.led_states = self.led_states_
        self.led_states_publisher_.publish(msg)

    def callback_set_led(self, request, response):
        led_number = request.led_number
        state = request.state

        if led_number>len(self.led_states_) or led_number<=0:
            response.success = False
            return response

        if state not in [0, 1]:
            response.success = False
            return response

        self.led_states_[led_number-1] = state
        response.success = True
        self.publish_led_states()
        return response
 
def main(args=None):
    rclpy.init(args=args)
    node = LedPanelNode()
    rclpy.spin(node)
    rclpy.shutdown()
 
 
if __name__ == "__main__":
    main()

battery.py (client)

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from functools import partial

from my_robot_interfaces.srv import SetLed
 
 
class BatteryNode(Node):
    def __init__(self):
        super().__init__("battery") 
        self.battery_state_ = "full"
        self.last_time_battery_state_changed_ = self.get_current_time_seconds()
        self.battery_timer_ = self.create_timer(0.1, self.check_battery_state)
        self.get_logger().info("Battery node has been started")

    def get_current_time_seconds(self):
        secs, nsecs = self.get_clock().now().seconds_nanoseconds()
        return secs + nsecs/1000000000.0

    def check_battery_state(self):
        time_now = self.get_current_time_seconds()
        if self.battery_state_ == "full":
            if time_now - self.last_time_battery_state_changed_ > 4.0:
                self.battery_state_ = "empty"
                self.get_logger().info("Battery is empty! Charging battery...")
                self.last_time_battery_state_changed_ = time_now
                self.call_set_led_server(3, 1)
        else:
            if time_now - self.last_time_battery_state_changed_ > 6.0:
                self.battery_state_ = "full"
                self.get_logger().info("Battery is now full again")
                self.last_time_battery_state_changed_ = time_now
                self.call_set_led_server(3, 0)

    def call_set_led_server(self, led_number, state):
        client = self.create_client(SetLed, "set_led")
        while not client.wait_for_service(1.0):
            self.get_logger().warn("Waiting for Server Set LED...")

        request = SetLed.Request()
        request.led_number = led_number
        request.state = state

        future = client.call_async(request)
        future.add_done_callback(
            partial(self.callback_call_set_led, led_number=led_number, state=state))

    def callback_call_set_led(self, future, led_number, state):
        try:
            response = future.result()
            self.get_logger().info(str(response.success))
        except Exception as e:
            self.get_logger().error("Service call failed %r" % (e,))
 
def main(args=None):
    rclpy.init(args=args)
    node = BatteryNode()
    rclpy.spin(node)
    rclpy.shutdown()
 
 
if __name__ == "__main__":
    main()

led_panel.cpp

#include "rclcpp/rclcpp.hpp"
#include "my_robot_interfaces/msg/led_state_array.hpp"
#include "my_robot_interfaces/srv/set_led.hpp"

class LedPanelNode : public rclcpp::Node
{
public:
    LedPanelNode() : Node("led_panel"), led_states_(3, 0)
    {
        led_states_publisher_ =
            this->create_publisher<my_robot_interfaces::msg::LedStateArray>("led_states", 10);
        led_states_timer_ =
            this->create_wall_timer(std::chrono::seconds(4),
                                    std::bind(&LedPanelNode::publishLedStates, this));
        set_led_service_ = this->create_service<my_robot_interfaces::srv::SetLed>(
            "set_led",
            std::bind(&LedPanelNode::callbackSetLed, this, std::placeholders::_1, std::placeholders::_2));
        RCLCPP_INFO(this->get_logger(), "Led panel node has been started");
    }

private:
    void publishLedStates()
    {
        auto msg = my_robot_interfaces::msg::LedStateArray();
        msg.led_states = led_states_;
        led_states_publisher_->publish(msg);
    }

    void callbackSetLed(const my_robot_interfaces::srv::SetLed::Request::SharedPtr request,
                        const my_robot_interfaces::srv::SetLed::Response::SharedPtr response)
    {
        int64_t led_number = request->led_number;
        int64_t state = request->state;

        if (led_number > (int64_t)led_states_.size() || led_number <= 0)
        {
            response->success = false;
            return;
        }

        if (state != 0 && state != 1)
        {
            response->success = false;
            return;
        }

        led_states_.at(led_number - 1) = state;
        response->success = true;
        publishLedStates();
    }

    std::vector<int64_t> led_states_;

    rclcpp::Publisher<my_robot_interfaces::msg::LedStateArray>::SharedPtr led_states_publisher_;
    rclcpp::TimerBase::SharedPtr led_states_timer_;
    rclcpp::Service<my_robot_interfaces::srv::SetLed>::SharedPtr set_led_service_;
};

int main(int argc, char **argv)
{
    rclcpp::init(argc, argv);
    auto node = std::make_shared<LedPanelNode>();
    rclcpp::spin(node);
    rclcpp::shutdown();
    return 0;
}

Conclusion

In this section you have seen how to create your own ROS2 interfaces, for Topics and Services.

Topics and Services are the communication layer. Interfaces are the actual content that you send.

To recap, here’s how to create a custom interface:

  • Create a new package only for your msg and srv definitions.
  • Setup the package (CMakeLists.txt and package.xml)
  • Create a msg/ and srv/ folders, place your custom msg definitions and srv definitions here.

Once you’ve setup your package, adding a new interface is really simple:

  • Add a new file in the right folder: msg/ or srv/
  • Add one line into CMakeLists.txt
  • Compile with “colcon build”
  • And don’t forget to source your ROS2 workspace when you want to use those messages!

Here’s what you can use inside a msg or srv definition:

  • Any primitive type defined by ROS2 (most common ones: int64, float64, bool, string, and array of those)
  • Any message you’ve already created in this package.
  • Any message from another package. In this case don’t forget to add a dependency for the other package in both package.xml and CMakeLists.txt.

And now, when you compile the definitions, new interfaces will be created, along with headers/modules ready to be included in your C++ or Python nodes.

You’re almost ready to start your own ROS2 application! In the next section you’ll see how to add settings to your nodes at run time with ROS2 parameters.

'Robotics > ROS2' 카테고리의 다른 글

[ROS2 시작하기 - 7] Launch  (0) 2023.03.02
[ROS2 시작하기 - 6] Parameter  (0) 2023.03.01
[ROS2 시작하기 - 4] Service  (0) 2023.02.24
[ROS2 시작하기 - 3] Topic  (0) 2023.02.23
[ROS2 시작하기 - 2] Tools  (0) 2023.02.21
profile

개발하는 핑구

@sinq

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!