Create Custom ROS2 Interfaces (Msg and Srv)
ROS2 can communicate with interfaces (msg or srv).
How is it possible that you can use those interfaces directly in your code?
Let’s say you create a message definition inside a package. when you use the colcon build, the message will be passed by the build system. the source code will be generated for this message in any ros2 surpported language C++, Python, etc…
So that’s why you can directly include the message header in you C++ code, because the build system is generated this header file.
- you can see ros2 built-in-type from https://docs.ros.org/en/rolling/Concepts/About-ROS-Interfaces.html
- and you also see interfaces with msg or srv defined in ros2 github
- https://github.com/ros2/example_interfaces
- https://github.com/ros2/common_interfaces (it is useful for ros2 project)
Create and build Custom Msg
cd ros2_ws/src
ros2 pkg create my_robot_interfaces
cd my_robot_interfaces
rm -rf include/
rm -r src/
mkdir msg
- package.xml
- CMakeLists.txt
- HardwareStatus.msg
cd msg
touch HardwareStatus.msg
HardwareStatus.msg
int64 temperature
bool are_motors_ready
string debug_message
- build
cd ~/ros2_ws
colcon build --packages-select my_robot_interfaces
- where they are installed
cd ~/ros2_ws/install/my_robot_interfaces/lib/python3.8/site-packages/my_robot_interfaces/msg
ls
# _hardware_status.py _hardware_status_s.c __init__.py
cd ~/ros2_ws/install/my_robot_interfaces/include/my_robot_interfaces/msg
ls
"
detail
hardware_status.h
hardware_status.hpp
rosidl_generator_c__visibility_control.h
rosidl_typesupport_fastrtps_cpp__visibility_control.h
rosidl_typesupport_fastrtps_c__visibility_control.h
rosidl_typesupport_introspection_c__visibility_control.h
"
- show interfaces
ros2 interface show my_robot_interfaces/msg/HardwareStatus
"
int64 temperature
bool are_motors_ready
string debug_message
"
Use your Custom Msg in a Node
In Python
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from my_robot_interfaces.msg import HardwareStatus
class HardwareStatusPublisherNode(Node):
def __init__(self):
super().__init__("hardware_status_publisher")
self.hw_status_publisher_ = self.create_publisher(HardwareStatus, "hardware_status", 10)
self.timer_ = self.create_timer(1.0, self.publish_hw_status)
self.get_logger().info("Hardware status publisher has been started.")
def publish_hw_status(self):
msg = HardwareStatus()
msg.temperature = 45
msg.are_motors_ready = True
msg.debug_message = "Nothing special here"
self.hw_status_publisher_.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = HardwareStatusPublisherNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
if you can’t import my_robot_interfaces
- Ctrl + , and click “Edit in settings.json
- add lines in extraPaths
In C++
#include "rclcpp/rclcpp.hpp"
#include "my_robot_interfaces/msg/hardware_status.hpp"
class HardWareStatusPublisher : public rclcpp::Node
{
public:
HardWareStatusPublisher() : Node("hw_status_publisher")
{
pub_ = this->create_publisher<my_robot_interfaces::msg::HardwareStatus>(
"hardware_status", 10);
timer_ = this->create_wall_timer(
std::chrono::seconds(1),
std::bind(&HardWareStatusPublisher::publishNumber, this));
RCLCPP_INFO(this->get_logger(), "Hardware status publisher has been started.");
}
private:
void publishNumber()
{
auto msg = my_robot_interfaces::msg::HardwareStatus();
msg.temperature = 57;
msg.are_motors_ready = false;
msg.debug_message = "Motors are too hot!";
pub_->publish(msg);
}
rclcpp::Publisher<my_robot_interfaces::msg::HardwareStatus>::SharedPtr pub_;
rclcpp::TimerBase::SharedPtr timer_;
};
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = std::make_shared<HardWareStatusPublisher>();
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}
- if you can’t include my_robot_interfaces, add line path in ~/ros2_ws/src/.vscode
- and to use the interfaces, add line depend in package.xml, CMakeLists.txt
Create and Build Custom Srv
cd ~/ros2_ws/src/my_robot_interfaces/
mkdir srv
cd srv
touch ComputeRectangleArea.srv
ComputeRectangleArea.srv
float64 length
float64 width
---
float64 area
Add line in CMakeLists.txt
Debug Msg and Srv with ROS2 tools
show interface list
ros2 interface list
"
Messages:
action_msgs/msg/GoalInfo
action_msgs/msg/GoalStatus
action_msgs/msg/GoalStatusArray
actionlib_msgs/msg/GoalID
actionlib_msgs/msg/GoalStatus
actionlib_msgs/msg/GoalStatusArray
builtin_interfaces/msg/Duration
builtin_interfaces/msg/Time
diagnostic_msgs/msg/DiagnosticArray
diagnostic_msgs/msg/DiagnosticStatus
diagnostic_msgs/msg/KeyValue
example_interfaces/msg/Bool
example_interfaces/msg/Byte
...
"
ros2 interface package sensor_msgs
"
sensor_msgs/msg/NavSatFix
sensor_msgs/msg/RegionOfInterest
sensor_msgs/msg/BatteryState
sensor_msgs/msg/MagneticField
sensor_msgs/msg/JoyFeedback
sensor_msgs/msg/LaserScan
sensor_msgs/msg/JoyFeedbackArray
...
"
find interface from node
ros2 node list
"
/hardware_status_publisher
/add_two_ints_server
"
ros2 node info /hardware_status_publisher
"
/hardware_status_publisher
Subscribers:
Publishers:
**/hardware_status: my_robot_interfaces/msg/HardwareStatus <-- that's a topic**
/parameter_events: rcl_interfaces/msg/ParameterEvent
/rosout: rcl_interfaces/msg/Log
Service Servers:
/hardware_status_publisher/describe_parameters: rcl_interfaces/srv/DescribeParameters
/hardware_status_publisher/get_parameter_types: rcl_interfaces/srv/GetParameterTypes
/hardware_status_publisher/get_parameters: rcl_interfaces/srv/GetParameters
/hardware_status_publisher/list_parameters: rcl_interfaces/srv/ListParameters
/hardware_status_publisher/set_parameters: rcl_interfaces/srv/SetParameters
/hardware_status_publisher/set_parameters_atomically: rcl_interfaces/srv/SetParametersAtomically
Service Clients:
Action Servers:
Action Clients:
"
we can know it is a publisher and publishing topic. So let’s find the topic’s info
ros2 topic list
"
/hardware_status
/parameter_events
/rosout
"
ros2 topic info /hardware_status
"
Type: my_robot_interfaces/msg/HardwareStatus
Publisher count: 1
Subscription count: 0
"
ros2 interface show my_robot_interfaces/msg/HardwareStatus
"
int64 temperature
bool are_motors_ready
string debug_message
"
And what is /add_two_ints_server?
ros2 node info /add_two_ints_server
"
/add_two_ints_server
Subscribers:
Publishers:
/parameter_events: rcl_interfaces/msg/ParameterEvent
/rosout: rcl_interfaces/msg/Log
Service Servers:
**/add_two_ints: example_interfaces/srv/AddTwoInts**
/add_two_ints_server/describe_parameters: rcl_interfaces/srv/DescribeParameters
/add_two_ints_server/get_parameter_types: rcl_interfaces/srv/GetParameterTypes
/add_two_ints_server/get_parameters: rcl_interfaces/srv/GetParameters
/add_two_ints_server/list_parameters: rcl_interfaces/srv/ListParameters
/add_two_ints_server/set_parameters: rcl_interfaces/srv/SetParameters
/add_two_ints_server/set_parameters_atomically: rcl_interfaces/srv/SetParametersAtomically
Service Clients:
Action Servers:
Action Clients:
"
we can know it is server and providing service. So let’s find the service’s info
ros2 service list
"
/add_two_ints
/add_two_ints_server/describe_parameters
/add_two_ints_server/get_parameter_types
/add_two_ints_server/get_parameters
/add_two_ints_server/list_parameters
/add_two_ints_server/set_parameters
/add_two_ints_server/set_parameters_atomically
/hardware_status_publisher/describe_parameters
/hardware_status_publisher/get_parameter_types
/hardware_status_publisher/get_parameters
/hardware_status_publisher/list_parameters
/hardware_status_publisher/set_parameters
/hardware_status_publisher/set_parameters_atomically
"
ros2 service type /add_two_ints
# example_interfaces/srv/AddTwoInts
ros2 interface show example_interfaces/srv/AddTwoInts
"
int64 a
int64 b
---
int64 sum
"
Activity
In this activity you will implement the battery + led panel example that we used to understand Services in the previous section. When the battery is empty we power on a LED, and when the battery is full we power it off.
So, here’s the initial state:
Blue boxes are for nodes, orange for services, and green for topics.
You can simply represent the state of the battery by a “battery_state” variable inside the battery node, and the LED panel by an integer array, inside the LED panel node.
At first, the battery is full, and all the LEDs are powered off ([0, 0, 0]).
Now, you will fake the battery state evolution. Let’s say that the battery is empty after 4 seconds, and then after 6 more seconds it’s full again. And so on.
When the battery is empty, the battery node will send a request to the LED panel, to power on one LED.
And, 6 seconds later, when the battery is full again, it will send another request to power off the LED.
And you can continue looping between those 2 states indefinitely (until you press CTRL+C).
You will have to create:
- 1 node for the battery
- 1 node for the LED panel
- A custom msg definition for the “led_states” topic
- A custom srv definition for the “set_led” service
This activity really sums up everything you’ve seen before. If you skipped some of the previous activities, don’t skip that one!
Of course there is not only one solution. You may succeed with a different code that the one I will provide.
And, as usual, I’ll see you in the next lecture for the solution. For more clarity and to help you navigate easily between the steps, I will separate the solution in 3 lectures.
Solution
LedStateArray.msg
int64[] led_states
SetLed.srv
int64 led_number
int64 state
---
bool success
led_panel.py (server/publisher)
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from my_robot_interfaces.msg import LedStateArray
from my_robot_interfaces.srv import SetLed
class LedPanelNode(Node):
def __init__(self):
super().__init__("led_panel")
self.led_states_ = [0, 0, 0]
self.led_states_publisher_ = self.create_publisher(
LedStateArray, "led_states", 10)
self.led_states_timer_ = self.create_timer(4, self.publish_led_states)
self.led_service_ = self.create_service(
SetLed, "set_led", self.callback_set_led)
self.get_logger().info("LED panel has been started.")
def publish_led_states(self):
msg = LedStateArray()
msg.led_states = self.led_states_
self.led_states_publisher_.publish(msg)
def callback_set_led(self, request, response):
led_number = request.led_number
state = request.state
if led_number>len(self.led_states_) or led_number<=0:
response.success = False
return response
if state not in [0, 1]:
response.success = False
return response
self.led_states_[led_number-1] = state
response.success = True
self.publish_led_states()
return response
def main(args=None):
rclpy.init(args=args)
node = LedPanelNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
battery.py (client)
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from functools import partial
from my_robot_interfaces.srv import SetLed
class BatteryNode(Node):
def __init__(self):
super().__init__("battery")
self.battery_state_ = "full"
self.last_time_battery_state_changed_ = self.get_current_time_seconds()
self.battery_timer_ = self.create_timer(0.1, self.check_battery_state)
self.get_logger().info("Battery node has been started")
def get_current_time_seconds(self):
secs, nsecs = self.get_clock().now().seconds_nanoseconds()
return secs + nsecs/1000000000.0
def check_battery_state(self):
time_now = self.get_current_time_seconds()
if self.battery_state_ == "full":
if time_now - self.last_time_battery_state_changed_ > 4.0:
self.battery_state_ = "empty"
self.get_logger().info("Battery is empty! Charging battery...")
self.last_time_battery_state_changed_ = time_now
self.call_set_led_server(3, 1)
else:
if time_now - self.last_time_battery_state_changed_ > 6.0:
self.battery_state_ = "full"
self.get_logger().info("Battery is now full again")
self.last_time_battery_state_changed_ = time_now
self.call_set_led_server(3, 0)
def call_set_led_server(self, led_number, state):
client = self.create_client(SetLed, "set_led")
while not client.wait_for_service(1.0):
self.get_logger().warn("Waiting for Server Set LED...")
request = SetLed.Request()
request.led_number = led_number
request.state = state
future = client.call_async(request)
future.add_done_callback(
partial(self.callback_call_set_led, led_number=led_number, state=state))
def callback_call_set_led(self, future, led_number, state):
try:
response = future.result()
self.get_logger().info(str(response.success))
except Exception as e:
self.get_logger().error("Service call failed %r" % (e,))
def main(args=None):
rclpy.init(args=args)
node = BatteryNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
led_panel.cpp
#include "rclcpp/rclcpp.hpp"
#include "my_robot_interfaces/msg/led_state_array.hpp"
#include "my_robot_interfaces/srv/set_led.hpp"
class LedPanelNode : public rclcpp::Node
{
public:
LedPanelNode() : Node("led_panel"), led_states_(3, 0)
{
led_states_publisher_ =
this->create_publisher<my_robot_interfaces::msg::LedStateArray>("led_states", 10);
led_states_timer_ =
this->create_wall_timer(std::chrono::seconds(4),
std::bind(&LedPanelNode::publishLedStates, this));
set_led_service_ = this->create_service<my_robot_interfaces::srv::SetLed>(
"set_led",
std::bind(&LedPanelNode::callbackSetLed, this, std::placeholders::_1, std::placeholders::_2));
RCLCPP_INFO(this->get_logger(), "Led panel node has been started");
}
private:
void publishLedStates()
{
auto msg = my_robot_interfaces::msg::LedStateArray();
msg.led_states = led_states_;
led_states_publisher_->publish(msg);
}
void callbackSetLed(const my_robot_interfaces::srv::SetLed::Request::SharedPtr request,
const my_robot_interfaces::srv::SetLed::Response::SharedPtr response)
{
int64_t led_number = request->led_number;
int64_t state = request->state;
if (led_number > (int64_t)led_states_.size() || led_number <= 0)
{
response->success = false;
return;
}
if (state != 0 && state != 1)
{
response->success = false;
return;
}
led_states_.at(led_number - 1) = state;
response->success = true;
publishLedStates();
}
std::vector<int64_t> led_states_;
rclcpp::Publisher<my_robot_interfaces::msg::LedStateArray>::SharedPtr led_states_publisher_;
rclcpp::TimerBase::SharedPtr led_states_timer_;
rclcpp::Service<my_robot_interfaces::srv::SetLed>::SharedPtr set_led_service_;
};
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = std::make_shared<LedPanelNode>();
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}
Conclusion
In this section you have seen how to create your own ROS2 interfaces, for Topics and Services.
Topics and Services are the communication layer. Interfaces are the actual content that you send.
To recap, here’s how to create a custom interface:
- Create a new package only for your msg and srv definitions.
- Setup the package (CMakeLists.txt and package.xml)
- Create a msg/ and srv/ folders, place your custom msg definitions and srv definitions here.
Once you’ve setup your package, adding a new interface is really simple:
- Add a new file in the right folder: msg/ or srv/
- Add one line into CMakeLists.txt
- Compile with “colcon build”
- And don’t forget to source your ROS2 workspace when you want to use those messages!
Here’s what you can use inside a msg or srv definition:
- Any primitive type defined by ROS2 (most common ones: int64, float64, bool, string, and array of those)
- Any message you’ve already created in this package.
- Any message from another package. In this case don’t forget to add a dependency for the other package in both package.xml and CMakeLists.txt.
And now, when you compile the definitions, new interfaces will be created, along with headers/modules ready to be included in your C++ or Python nodes.
You’re almost ready to start your own ROS2 application! In the next section you’ll see how to add settings to your nodes at run time with ROS2 parameters.
'Robotics > ROS2' 카테고리의 다른 글
[ROS2 시작하기 - 7] Launch (0) | 2023.03.02 |
---|---|
[ROS2 시작하기 - 6] Parameter (0) | 2023.03.01 |
[ROS2 시작하기 - 4] Service (0) | 2023.02.24 |
[ROS2 시작하기 - 3] Topic (0) | 2023.02.23 |
[ROS2 시작하기 - 2] Tools (0) | 2023.02.21 |