ROS机器人编程实践

第一部分 基础知识

第1章 概述

ROS 虽然名称里面带个OS,但其实它并不是 OS,而是一个开发框架,就像 Express.js web 框架一样,区别在于它是面向机器人领域,同时市面上还有很多基于该框架的第三方工具包,共同组成一个生态,让整个机器人的开发变得更加容易。

为了降低耦合程度,提高各个模块的通用性,ROS 主要使用消息协议将不同的模块组织起来,进行协同工作;

第2章 预备知识

ROS 图

由于 ROS 的设计理论是各个独立程序之间相互通信以达成目标,因此,使用图(节点代表程序、边代表通信)可以很好的对设计概念进行可视化;

roscore

roscore 用于协助实现节点间的通信,某种程度来说,它有点像是一个路由器,或者说有点像 k8s 中的服务发现;节点通过它获得另外一个节点的地址,以便和其他节点建立通信;

另外,roscore 还提供了一个参数服务器,方便节点存储和读取数据;

catkin、工作区及 ROS 程序包

catkin 是一个构建工具,猜测有点像 cmake;后来发现它真的是基于 cmake 的扩展,添加了一些额外的功能;

问:为什么 ROS 使用自己的构建工具,而不是使用现有的构建工具如 cmake?

答:原来是需要进行功能上的扩展,同时也基于 cmake

工作区:代码文件的组织方式,以便文件之间可以相互导入或引用,并且与其他工作区的文件实现隔离;

catkin_init_workspace 命令可快速初始化一个工作区,有点像 IDE 里面新建了一个项目一样,之后通常将代码放在src 文件夹中;

1
2
3
4
5
6
7
8
9
 # 用于在当前 shell 环境中批量设置环境变量,也可以将命令放到 .bashrc 中,以便每次启动新的命令行窗口时,能够自动运行;
source /opt/ros/indigo/setup.bash

mkdir -p ~/catkin_ws/src
cd ~/catkin_ws/src
catkin_init_workspace

cd ~/catkin_ws
catkin_make # 用于生成 CmakeLists 文件

ROS 程序包:包存储在 src 文件夹中,包中包含有一个 CmakeLists 文件和一个 package.xml 文件,package.xml 主要用于描述一些包的相关信息,如包名称、版本号、作者等等;感觉有点点像 js 里面的 package.json 文件;

1
2
cd ~/catkin_ws/src
catkin_create_pkg my_awesome_code rospy # 创建一个名为 my_awesome_code 的包,该包依赖 rospy 包

rosrun

本质上每个节点都类似一个可执行程序,由于不同的可执行程序,存放在不同的文件夹中,导致运行该程序时,通常需要输入长长的路径前缀。rosrun 可以简化命令的输入,它会自动寻找可执行程序的位置,运行它,同时还可以传递参数给它;

1
2
# 命令格式如下
rosrun <pakcage_executable> [ARGS]

注意:运行任意的 ros 程序前,都需要确保 roscore 已经在运行,因为它是服务发现的中枢;

rqt_graph 命令可用于绘制并显示节点图,类似下面这个样子

rosrun 从可用于启动单个节点(程序),如果要批量启动节点,则可以使用 roslaunch 命令

命名、命名空间以及重映射

ROS 通过使用命名空间来避免节点的命名冲突,命名空间类似路径名称,使用斜杠做为分隔符,例如

  • /left/image
  • /right/image

命名空间支持重映射,类似起了个别名,这样其他程序可以根据别名来通信;这个机制可以降低程序之间的耦合性;

1
2
# 将命名空间 right/image 重映射到 image 上
./image_view image:=right/image
1
2
3
4
5
6
# 使用 ns 参数设置程序的命名空间,这样同一个程序,可以在不同的命名空间创建出实例
./camera __ns:=right

# 使用 name 参数设置程序的名称,这样使用同一个程序,可以创建出多个实例
./talker __name:=talker1
./talker __name:=talker2

roslaunch

roslaunch 用于批量启动多个节点,因此它需要配套一个 launch 文件,以便从文件中读取多个节点信息,并启动它们;

1
2
# 命令格式
roslaunch <PACKAGE LAUNCH_FILE

launch 文件示例

tab 键

由于很多命令都比较长,可使用 tab 来自动补全命令,这样能够加快输入速度,

tf:坐标系转换

位置:由 (x, y, z) 三维向量表示

朝向:由 (roll, pitch, yaw) 三维向量表示,分别代表绕各个坐标轴旋转了多少角度;

位姿:由 (x, y, z, roll, pitch, yaw) 组成的六维向量表示;

ROS 使用 /tf 消息主题在节点间共享坐标变换数据,只需在代码中订阅该消息,即可查询有关节点在某个时刻的坐标信息;这些信息包括:

  • 参与变换的两个坐标系名称(分别为 parent 和child)
  • 坐标系之间的相对位置和朝向;
  • 变换的时间截;

第3章 话题

话题:某个拥有特定数据类型的信息流;有点像 MQ 中的消息队列,有消息的发布者,也有消息的订阅者

将消息发布到话题

在工作区的 src 目录,创建一个名称为 basics 的包

1
catkin_create_pkg basics rospy

在 basics 包中的 src 目录中,创建一个 topic_publisher.py 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python3
# 以上这行很重要 ,不可省略,如果没有写,有可能 rosrun 会找不到 python 可执行程序

import rospy
from std_msgs.msg import Int32

rospy.init_node("topic_publisher")
pub = rospy.Publisher('counter', Int32) # Int32 参数用来指定话题中的数据类型为 Int32
rate = rospy.Rate(2) # 设置消息速率
count = 0

while not rospy.is_shutdown():
pub.publish(count)
count += 1
rate.sleep()

以上代码有依赖 std_msgs.msg 包,因此需要在 pakcage.xml 文件中添加依赖信息,以便可以构建成功;

1
<depend package="std_msgs"></depend>

使用 source 设置当前目录做为工作空间,导入相关配置,以便可以运行以上述代码;

1
source devel/setup.bash

将 topic_publisher.py 文件设置为可执行文件权限

1
chmod u+x topic_publisher.py

在工作区中 src 目录中,输入以下命令行,运行 topic_publisher.py

1
rosrun basics top_publisher.py

只需确保 top_publisher.py 在 basics 目录即可,因为 rosrun 会自动查询可执行文件的位置;

运行起来后,即可通过以下命令,在另外一个终端查看主题的状态

1
2
3
4
5
rostopic list # 查看当前主题列表
rostopic echo counter -n 5 # 查看 counter 主题最近的5条消息
rostopic hz counter # 查看 counter 主题的消息频率
rostopic info counter # 查看 counter 主题的基本信息
rostopic find std_msgs/Int32 # 通过消息类型查询相关的主题

订阅一个话题

编写 topic_subscriber.py 文件,订阅话题

1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python3

import rospy
from std_msgs.msg import Int32

def callback(msg):
print(msg.data)

rospy.init_node("topic_subscriber")
sub = rospy.Subsciber("counter", Int32, callback)

rospy.spin()

ROS 使用事件驱动的机制,类似 js,因此不可避免会大量用到回调函数

运行订阅文件

1
2
3
source devel/setup.bash # 设置环境变量
chmod u+x basics/src/topic_subscriber.py # 设置为可执行文件
rosrun basics topic_subscriber.py # 运行

锁存话题

如果将某个话题标记为锁存的,那么订阅者在成功订阅话题后,将会收到该话题的最后一条消息;

有了锁存话题,就可以确保那么在发布者之后创建的订阅者,也能够收到发布者之前发布的消息。因为有可能发布者只发布一次消息,而不是不断的发布消息;

1
2
# 通过 latched 参数将话题标记为锁存话题
pub = rospy.Publisher("map", nav_msgs/OccupancyGrid, latched=True)

定义自己的消息类型

ROS 的内置消息类型

除了内置的消息类型外,也可以自定义消息类型;ROS 为了让消息的数据格式能够在不同的语言中通用,增加了一层抽象,使用一个单独的 msg 文件来定义消息的数据格式。该文件由 catkin_make 编译后,即可由不同的语言编写的模块所读取和识别,即 A 语言编写的节点,能够订阅 B 语言编写的节点发布的消息。代价是每次消息的数据格式有变化,都需要重新运行一次 catkin_make,不然无法生效;

1
2
3
# Complext.msg 数据文件示例 
float32 real
float32 imaginary

在运行 catkin_make 之前,我们需要在 package.yml 中添加构建信息

1
2
<build_depend>message_generation</build_depend>
<run_depend>message_runtime</run_depend>

之后还需要修改一 下 CMakeLists 文件,,以便 catkin_make 知道如何自动构建与消息生成相关的代码,需要添加更新的内容包括

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
find_package(catkin REQUIRED COMPONENTS
roscpp
rospy
std_msgs
message_generation # 明确应查找 message_generation 包,以便用于编译
)

catkin_package(
CATKIN_DEPENDS message_runtime # 运行时的依赖
)

add_message_files(
FILES Complex.msg # 指定要编译的消息文件
)

generate_messages(
DEPENDENCIES # 声明创建信息时需要的依赖
std_msgs
)

消息格式定义好了后,就可以在代码中进行引用了

让发布者和订阅者协同工作

第4章 服务

在节点间传递数据,除了使用话题来发布和订阅消息数据外,还有一种方法是“服务”(跨进程函数调用,个人感觉有点点像 RPC 远程进程调用,不过并非点对点的,而是在中间增加了一层抽象);

话题比较适用于高频传递数据的场景,服务比较适合低频场景,并且数据能够被快速处理,无须花费太长的时间,以免造成阻塞;

定义服务

1
2
3
4
# WordCount.srv 服务定义文件的示例,一般会用一个单独的 srv 目录来统一存放服务定义文件
string words
--- # 输入和输出的分隔符,上面是输入,下面是输出
uint32 count

同话题一样,也需要在 package.yml 文件中定义依赖

1
2
<build_depend>message_generation</build_depend>
<run_depend>message_runtime</run_depend>

并在 CMakeLists 文件中定义构建信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
find_package(catkin REQUIRED COMPONENTS
roscpp
rospy
std_msgs
message_generation # 明确应查找 message_generation 包,以便用于编译
)

catkin_package(
CATKIN_DEPENDS message_runtime # 运行时的依赖
)

add_service_files(
FILES
WordCount.srv # 指定要编译的服务文件
)

generate_messages(
DEPENDENCIES # 声明创建消息时需要的依赖
std_msgs
)

注:rossrv list 命令查看所有可用服务;

运行 catkin_make 命令后,基于服务定义的文件,会生成三个类,它们分别是:

  • WordCount
  • WordCountRequest
  • WordCountResponse

以上类可以被代码文件导入并使用;

实现服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python3
# service_server.py

import rospy

from basic.srv import WordCount, WordCountResponse # 导入 catkin_make 基于配置文件生成的类

def count_words(request):
return WordCountResponse(len(request.words.split()))

rospy.init_node("service_server")
service = rospy.Service("word_count", WordCount, count_words)

rospy.spin()

回调函数的返回值可以有多种形式,例如元组、列表或者字典;不管使用哪种形式,ROS 底层代码都会将其翻译成 WordCountResponse 对象;

1
2
3
4
5
6
7
8
def count_words(request):
return len(request.words.split()) # 返回单个数值

def count_words(request):
return [len(request.words.split())] # 返回列表

def count_words(request):
return { "count": len(request.words.split())} # 返回字典

使用服务

可使用 rosservice call 在命令行中调用服务

另外也可以在代码中调用服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python3
# service_client.py

import rospy

from basic.srv import WordCount

import sys

rospy.init_node("service_client")
rospy.wait_for_service("word_count") # 此行很重要,因为服务是同步的,需要进行等待,确保服务开启后才能调用
word_counter = rospy.ServiceProxy("word_count", WordCount) # 创建一个变量,引用远程服务的函数
words = " ".join(sys.argv[1:])
word_count = word_counter(words) # 通过变量调用远程服务的函数
print(words, "->", word_count)

由于服务的调用是同步的,因此当提供服务的节点不可用时,调用方的代码将会因此阻塞,一直要等到服务恢复正常时,才会继续往下;

第5章 动作

服务比较适用于简单的值查询,如果某个操作耗时较长或者用时未知,则服务显然用起来很不方便;对于时间不确定,且以目标导航型的操作来说,例如指挥机器人到达某个位置,那么使用”动作“机制更加合适;

”动作“是异步的,在执行动作的过程中,会实时提供反馈,并且还支持中途取消;

动作的定义

跟主题和服务一样,动作同样使用文件进行定义(文件名后缀为 .action),动作的定义由三部分构成,分别是:

  • 目标:由动作的客户端发送
  • 结果:由动作的服务端发送
  • 反馈:由动作的服务定期发送

这三部分对应三种消息格式,本质上,动作是对主题消息的更高级的一种封装;

调用 catkin_make 构建动作定义文件后,会生成目标、结果、反馈等相关的类,可在代码中引入并使用

流程大致为:动作服务器启动后,等待客户端给它下命令;服务器接到命令后,开始干活,同时定期发送进度反馈给客户端;客户端可中途取消动作;如果不取消,服务端在动作完成后,还会给客户端发送一个结果;

action 动作文件一般放在包文件夹中的单独的 action 文件夹中,文件内容示例如下:

1
2
3
4
5
6
7
8
9
10
# 第1部分:目标
duration time_to_wait
---
# 第2部分:结果
duration time_elapsed
uint32 updates_sent
---
# 第3部分:反馈
duration time_elapsed
duration time_remaining

注:此处的 duration 是内置的原始类型;

实现一个基本的动作服务器

通过创建一个定时器,来模拟创建动作服务器;原理跟话题差不多,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#!/usr/bin/env python3
# simple_action_server.py

import rospy

import time
import actionlib
from basics.msg import TimerAction, TimerGoal, TimerResult # catkin_make 解析定义文件后自动生成的类

def do_timer(goal):
start_time = time.time()
time.sleep(goal.time_to_wait.to_sec())

result = TimerResult()
result.time_elapsed = rospy.Duration.from_sec(time.time() - start_time)
result.updates_sent = 0

server.set_succeded(result)

rospy.init_node("timer_action_server")
# 创建一个动作服务器,参数分别为:名称、动作类、回调函数、是否自动启动(应始终禁用,否则容易出现异常)
server = actionlib.SimpleActionServer("timer", TimerAction, do_timer, False)
server.start()
rospy.spin()

功能检查

编写好 simple_action_server.py 文件后,在运行之前,需要准备几件事情:

  • 确保在当前 terminal 运行了 source ./devel/setup.bash,以便能够让环境变量中的 PYTHONPATH 指向当前目录中的 lib,这样才能够找到并导入已编译的 basics 程序包;
  • 确保运行 chmod u+x simple_action_server.py,以便将文件设置为可执行的状态;
1
2
3
4
5
source ./devel/setup.bash

chmod u+x simple_action_server.py

rosrun basics simple_action_server.py
1
2
# 在新窗口运行如下命令
rostopc list

运行后应该可以看到以下几个新增加的 /timer 话题

动作的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python3
# simple_action_client.py

import rospy

import actionlib
from basics.msg import TimerAction, TimerGoal, TimerResult


rospy.init_node("timer_action_client")
client = actionlib.SimpleActionClient("timer", TimerAction) # 此处的动作名称和类名需要和 server 保持一致
client.wait_for_server() # 等待服务器启动,如果服务器未启动,代码会暂时阻塞在这里,直到 server 启动为止
goal = TimerGoal() # 创建动作目标
goal.time_to_wait = rospy.Duration.from_sec(5.0) # 在 goal 属性上设置数据,以发给 server
client.send_goal(goal)
client.wait_for_result() # 此处也会阻塞,直到结果返回
print("Time elapsed: ", client.get_result().time_elapsed.to_sec())

实现一个更复杂的动作服务器

总体来说,动作跟服务很像,区别在于服务的调用是同步的,而动作的调用是异步的;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#!/usr/bin/env python3
# fancy_action_server.py

import rospy
import time, actionlib
from basics.msg import TimerAction, TimerGoal, TimerResult, TimerFeedback

def do_timer(goal):
start_time = time.time()
update_count = 0

if goal.time_to_wait.to_sec() > 60.0:
result = TimerResult()
result.time_elapsed = rospy.Duration.from_sec(time.time() - start_time)
result.updates_sent = update_count
server.set_aborted(result, "Timer aborted due to too long wait")
return

while(time.time() - start_time) < goal.time_to_wait.to_sec():
if server.is_preempt_requested(): # 如果发生抢占,即动作未完成,又进来了新动作
result = TimerResult()
result.time_elapsed = \
rospy.Duration.from_sec(time.time() - start_time)
result.updates_sent = update_count
server.set_preempted(result, "Timer preempted")
return

feedback = TimerFeedback()
feedback.time_elapsed = \
rospy.Duration.from_sec(time.time() - start_time)
feedback.time_remaining = goal.time_to_wait - feedback.time_elapsed
server.publish_feedback(feedback)
update_count += 1

time.sleep(1.0)

result = TimerResult()
result.time_elapsed = rospy.Duration.from_sec(time.time() - start_time)
result.updates_sent = update_count
server.set_succeeded(result, "Timer completed successfully")

rospy.init_node("fancy_action_server")
server = actionlib.SimpleActionServer("timer", TimerAction, do_timer, False)
server.start()
rospy.spin()

使用更复杂的动作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/usr/bin/env python3
# fancy_action_client.py

import rospy
import time, actionlib
from basics.msg import TimerAction, TimerGoal, TimerResult, TimerFeedback

def feedback_cb(feedback):
print("[Feedback] Time elapsed: ", feedback.time_elapsed.to_sec())
print("[Feedback] Time remains: ", feedback.time_remaining.to_sec())

rospy.init_node("timer_action_client")
client = actionlib.SimpleActionClient("timer", TimerAction)
client.wait_for_server()
goal = TimerGoal()
goal.time_to_wait = rospy.Duration.from_sec(5.0)

# 测试 server 端取消动作
# goal.time_to_wait = rospy.Duration.from_sec(500.0)

client.send_goal(goal, feedback_cb=feedback_cb)


# 测试取消动作
# time.sleep(3.0)
# client.cancel_goal()

client.wait_for_result() # 此处会阻塞,直到动作完成或取消,才会往下执行
print("[Result] State: ", client.get_state())
print("[Result] Status: ", client.get_goal_status_text())
print("[Result] Time elapsed: ", client.get_result().time_elapsed.to_sec())
print("[Result] Updates sent: ", client.get_result().updates_sent)

小结

话题、服务、动作三者的区别:

动作比服务更具通用性,因此除非有特殊需求,否则一般都优先使用动作;

第6章 机器人与仿真器

子系统

将大问题拆分成小问题是一种常见的解决问题的思路,在 ROS 里面同样如此。通过将一个复杂的系统,拆分成多个小型的子系统,能够有效的降低复杂度;一种常见的子系统拆分方案是拆分成执行、感知、计算三个子系统;

执行主要负责与轮子、机械臂等部件进行交互;感知主要负责与传感器交互,例如摄像头、激光雷达等;计算主要是收集前两个子系统的数据,进行运算得出下一步行动的结果,以完成任务目标;

执行子系统:移动平台

差分驱动:有两个轮子,当两个轮子同向转动时,机器人会向前进;当反向转运时,机器人会原地转动;

由于两个轮子在静止时无法保持稳定,因此差分底盘一般还会配备几个没有动力的脚轮,用来提供支撑;这样当机器人静止时,也能够保持稳定,不会倒下去;

当然,不配备支撑脚轮也是可以的,这就需要机器的人底盘实时处于运动的状态,以实现自平衡,即使机器人实际上处于静止状态;;平衡轮的直径可以尽量设计的大一些,这种过障碍时,会更稳定,波动更小;该设计的优点是狭小空间的通过性更好,缺点是断电后会倒掉;

差分驱动也可以是多轮的,此时变成了滑动转向;当轮子比较多时,一般会用履带包裹,就像坦克或挖掘机那样;滑动结构与地面的摩擦力很大,因此轮子的磨损比较快,另外也容易对地面造成破坏;

阿克曼平台:后轮不转动,只转动前轮,来实现转向;家用小轿车即采用此种设计;缺点是不能横向移动;需要通过倒车来实现横移,即日常生活中常见的侧方停车;

表克纳姆底盘:优点是可以全向移动,缺点是滚轮直径小,因此只适合光滑表面,不然磨损很快;

为了实现不同底盘之间的指令复用,有必要对移动底盘进行抽象,通常会采用 twist 消息类型,用来表达三维空间中的线速度和角速度;底盘平台收到 twist 消息后,再根据自己的机械构造,拆分成相匹的轮子驱动指令;

执行子系统:机械臂

另外一个常见的执行子系统是机械臂,以便机器人能够完成操纵物体的任务,例如抓取和放置物品;

两种常见的机械臂结构:

  • 旋转关节:常用于车间生产等工业机器人
  • 线性关节:有点像3D打印机

理论上只需要 6 个自由度就能够让机械臂在工作环境中达到任意的位姿,但受限于机械结构,实际上很难达到,因此一般会增加 1 个自由度,来增加整个工作区的空间;但增加自由度意味着增加成本,可靠性下降,因此实际使用多少个自由度,主要取决于工作环境的需要和操作复杂程度;

传感子系统

传感器能够让机器人感知环境,并相应的调整自己的行为;传感器的种类非常丰富,并且不一定越多越好,而是根据任务需要而定。有些很复杂的操纵任务,甚至可能只需要几个二值传感器(如限位开关)就足够了;

除了二值传感器,还有一类传感器是“量传感器”,例如压力传感器;每个传感器一般会有其局限性,例如最大或最小有效距离等,因此一般需要组合多个传感器,来提高感知的准确度;

普通相机

双目立体相机的精度受限于相机的一些参数,并且要求拍摄目标有明显的纹理特征,否则难以比对两个视野的差异,例如一堵白墙的场景中,检测效果就不太好;

ROS 一般使用 sensor_msgs/Image 消息类型来传递图像信息,该消息包含了一系列与图像有关的数据,例如尺寸、格式、图像数据等;

深度相机

双目立体相机一般是被动式的工作,而深度相机则是主动式的,它会向环境投影光线,形成纹理,然后被相机捕捉;投射的光线一般使用红外波段(非可见光),以避免影响物体的颜色;

另一种深度相机是投射结构光(例如微软的 Kinect),同样也是通过相机捕捉纹理特征来重构三维空间;

还有一种深度相机是使用红外 LED 或激光发射头,原理有点像蝙蝠的声纳,区别是雷达投射的是光线,而不是声波;然后计算发出光线到收到光线反射的时间差来计算深度;

深度相机产生的数据一般是点云的形式,ROS 中的消息类型为 sensor_msgs/PointCloud2;由于相机无法保证每个区域都能正确返回数据,因此有时候在点云的可视化数据中,会存在一些“洞“,一般需要使用算法进行处理;

激光雷达

激光雷达的原理跟红外 LED 深度相机的原理相同,只是精度更高;常用于精度要求高的场景,例如自动驾驶;

ROS 中激光雷达的消息类型一般为 sensor_msgs/LaserScan;虽然不同雷达厂家的原始输出格式不同,但最终都会使用 ROS 的驱动包转换成统一的格式;

旋转编码器

旋转编码器用来感知某个可旋转部件的旋转圈数,从而计算位移,有点像汽车车轮上面的里程计;但考虑外部环境存在干扰因素,例如地面的摩擦系数,风阻等;旋转编码器直接计算出来的结果并不准确,通常都会和实际值出现一定的偏差;

旋转编码器在移动平台和机械臂上的使用方式差别很大,一个用来计算移动距离,消息类型为geometry_msgs/Transform;一个用来计算姿态,消息类型为 sensor_msgs/JointState;

计算子系统

由于 ROS 的设计架构是节点间的消息传递,因此不可避免带来了消息格式转换的额外计算开销;这种开销也导致了对处理器一定的门槛要求,例如 8 位处理器通常难以满足计算性能的要求;不过随着处理器的性能变得越来越强大,现在 ROS 已经能够运行在绝大多数的终端设备上了;

机器人系统举例

目前市面上已经有一些成熟的开箱即用的机器人开发平台,购买来后即可进行试验;

常用的机器人开发平台有:

  • PR2,据说售价 40 万美元;
  • Fetch,用于仓储业务的移动操作机器人;
  • Robonaut2,工作在国际空间站;
  • TurtleBot,用于 ROS 教育和原型开发;价格在 2000 美元以内;

仿真器

仿真软件有很多好处,最主要的好处包括:

  • 免费,毕竟购买机器人的成本还是很高的;
  • 不会产生机械故障,让开发者能够专注于算法的验证;

Stage:常用的 2D 仿真软件,建模语言简单,运行速度快,纯刚体世界;

Gazebo:适用于 3D 场景的仿真;可选择不同的引擎做刚体仿真;

SLAM,simultanious localization and mapping,同步定位与地图构建

2D 仿真,机器人只能在 2D 平面上运动;

市面上还有其他仿真软件,例如还有 MORSE,V-REP 等;每个都有其优缺点和取舍,因此需要根据自身的业务场景,选择最合适的;

第7章 Wander-bot

目标:编写一个可以自由行走,并自动避开障碍的机器人

创建包

创建工作区

创建程序包

编写可执行文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python3
# red_light_green_light.py

import rospy
from geometry_msgs.msg import Twist

# queue_size 参数用来设置队列中的消息数量上限,如果有新的消息进来,超过了消息数量上限
# 则丢弃旧的,仅保留最新的消息
cmd_vel_pub = rospy.Publisher("cmd_vel", Twist, queue_size=1)
rospy.init_node("red_light_green_light")

red_light_twist = Twist() # Twist 构建的对象初始字段的默认值为0,意味着停止状态
green_light_twist = Twist()
green_light_twist.linear.x = 0.5 # 线性度设置为 0.5 米/秒

driving_forward = False
light_change_time = rospy.Time.now()
rate = rospy.Rate(10)

while not rospy.is_shutdown():
if driving_forward:
# 通过 while 不断发布前进的命令,否则如长时间没有收到新命令,会触发超时保护,自动停下来
cmd_vel_pub.publish(green_light_twist)
else:
cmd_vel_pub.publish(red_light_twist)

if rospy.Time.now() > light_change_time: # 定时翻转运动状态,走3秒,停3秒
driving_forward = not driving_forward
light_change_time = rospy.Time.now() + rospy.Duration(3)

rate.sleep()

安装 ROS 程序包

1
2
3
4
5
6
7
8
sudo apt-get install ros-noetic-joy ros-noetic-teleop-twist-joy \
ros-noetic-teleop-twist-keyboard ros-noetic-laser-proc \
ros-noetic-rgbd-launch ros-noetic-rosserial-arduino \
ros-noetic-rosserial-python ros-noetic-rosserial-client \
ros-noetic-rosserial-msgs ros-noetic-amcl ros-noetic-map-server \
ros-noetic-move-base ros-noetic-urdf ros-noetic-xacro \
ros-noetic-compressed-image-transport ros-noetic-rqt* ros-noetic-rviz \
ros-noetic-gmapping ros-noetic-navigation ros-noetic-interactive-markers

安装 TurtleBot3 程序包

1
2
3
sudo apt install ros-noetic-dynamixel-sdk
sudo apt install ros-noetic-turtlebot3-msgs
sudo apt install ros-noetic-turtlebot3

安装 TurtleBot3 simulations 仿真器程序包

1
2
3
cd ~/wanderbot_ws/src/
git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3_simulations.git
cd ~/wanderbot_ws && catkin_make

设置当前文件来为新的工作区

1
source devel/setup.sh

启动仿真器,检查是否成功安装

1
2
export TURTLEBOT3_MODEL=waffle
roslaunch turtlebot3_gazebo turtlebot3_world.launch

若成功安装,会出现以下界面

另外开启一个 terminal 终端,运行编写好的可执行文件

1
./red_light_green_light.py

如果运行成功,每隔3秒钟,在预览窗口里面,可以看见小车开始移动了起来;不过移动的方向不太直,偶尔会自己打偏,不知为何;

读取传感器数据

如果想要在 ROS 中获取某个话题的讯息,可以考虑将其打印在终端上面,这样可以直观的看到消息是否在发布,同时确保消息类型与预期相符;

在新的终端窗口中,运行以下命令

1
rostopic echo scan

便会不断打印出 scan 话题的信息流,格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
---
header:
seq: 45
stamp:
secs: 11094
nsecs: 949000000
frame_id: "base_scan"
angle_min: 0.0
angle_max: 6.28318977355957
angle_increment: 0.017501922324299812
time_increment: 0.0
scan_time: 0.0
range_min: 0.11999999731779099
range_max: 3.5
ranges: [0.1380552351474762, 0.12576285004615784, 0.12553857266902924, 0.14307521283626556, 0.12852567434310913, 0.15272736549377441, 0.12958106398582458, 0.11999999731779099, 0.13228699564933777, 0.11999999731779099, 0.14200454950332642, 0.11999999731779099, ... 0.14552319049835205]
intensities: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... 0.0, 0.0, 0.0]

计算障碍物的距离

1
2
3
4
5
6
7
8
# i 是 ranges 中的 item 索引
bearing = msg.angle_min + i * msg.angle_max / len(msg.ranges)

# 机器人正前方的障碍物距离,那么 i 取 len(msg.ranges) * 0.5
range_ahead = msg.ranges[len(msg.ranges) * 0.5]

# 离机器人最近的障碍物距离
closest_range = min(msg.ranges)

创建一个能够计算机器人正前方障碍物距离的节点(进程)

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python3
# range_ahead.py

import rospy
from sensor_mgs.msg import LaserScan

def scan_callback(msg):
range_ahead = msg.ranges[len(msg.ranges) / 2]
print("range ahead: ", range_ahead)

rospy.init_node("range_ahead")
scan_sub = rospy.Subscriber("scan", LaserScan, scan_callback)
rospy.spin()

运行起来后,它会持续打印出前方障碍物的距离,类似下面这个样子

感知环境并移动

组合移动和感知两部分功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/usr/bin/env python3
# wander.py

import rospy
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan

def scan_callback(msg):
global g_range_ahead
g_range_ahead = min(msg.ranges)

g_range_ahead = 1 # 使用全局变量记录前方障碍物距离,用来判断是否停止移动并转向

scan_sub = rospy.Subscriber("scan", LaserScan, scan_callback)
cmd_vel_pub = rospy.Publisher("cmd_vel", Twist, queue_size=1)
rospy.init_node("wander")
state_change_time = rospy.Time.now()
driving_forward = True
rate = rospy.Rate(10)

safe_dist = 0.4


while not rospy.is_shutdown():
print("range ahead", g_range_ahead)
if driving_forward:
if (g_range_ahead < safe_dist or rospy.Time.now() > state_change_time):
driving_forward = False
state_change_time = rospy.Time.now() + rospy.Duration(5)
else:
if rospy.Time.now() > state_change_time:
driving_forward = True
state_change_time = rospy.Time.now() + rospy.Duration(30)
twist = Twist()
if driving_forward:
twist.linear.x = 0.5
twist.angular.z = 0
else:
twist.angular.z = 1
twist.linear.x = 0
cmd_vel_pub.publish(twist)

rate.sleep()

第二部分 使用 ROS 驱动机器人行走

第8章 遥控机器人

目标:编写一个支持通过键盘进行遥控的机器人

开发模式

思路:将任务分解成多个节点(进程),每个节点负责分工一个小事情,节点之间通过话题交换数据;

对于遥控场景,一种分解方式如下:

  • 节点A:监听键盘事件,广播消息;
  • 节点B:监听广播,输出 Twist 消息作为响应;

键盘驱动

编写一个节点,监听键盘的输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python3
# key_publisher.py

import sys, select, tty, termios
import rospy
from std_msgs.msg import String

if __name__ == "__main__":
key_pub = rospy.Publisher("keys", String, queue_size=1)
rospy.init_node("keyboard_driver")
rate = rospy.Rate(100)
old_attr = termios.tcgetattr(sys.stdin) # 备份旧属性
tty.setcbreak(sys.stdin.fileno())
print("Publishing keystrokers. Press Ctrl-C to exit...")
while not rospy.is_shutdown():
# 此处的select 参数 0 为超时时间,0 表示不等待,立即返回
if select.select([sys.stdin], [], [], 0)[0] == [sys.stdin]:
key_pub.publish(sys.stdin.read(1))
rate.sleep()
# 恢复之前备份的属性
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)

运动生成器

编写一个节点,用来对键盘输入做出响应,当按下 wsadx 等键时,分别做出前进、停止、左转、右转、后退等动作;同时机器持续执行最后一条指令,直到有新的指令进来,以便能够保证机器人一直处于运动的状态;因为 ROS 默认如果一定的时间内(如500ms)没有新指令进来,就会暂停机器人;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/usr/bin/env python3
# keys_to_twist_using_rate.py

import rospy
from std_msgs.msg import String
from geometry_msgs.msg import Twist

key_mapping = {
"w": [0, 1],
"x": [0, -1],
"a": [-1, 0],
"d": [1, 0],
"s": [0, 0]
}

g_last_twist = None

def keys_cb(msg, twist_pub):
global g_last_twist
if len(msg.data) == 0 or msg.data[0] not in key_mapping:
return
vels = key_mapping[msg.data[0]]
g_last_twist.angular.z = vels[0]
g_last_twist.linear.x = vels[1]
twist_pub.publish(g_last_twist)

if __name__ == "__main__":
rospy.init_node("keys_to_twist")
twist_pub = rospy.Publisher("cmd_vel", Twist, queue_size=1)
rospy.Subscriber("keys", String, keys_cb, twist_pub)
rate = rospy.Rate(10)
g_last_twist = Twist()
while not rospy.is_shutdown():
twist_pub.publish(g_last_twist)
rate.sleep() # 每10ms 发布一次指令

ROS 有内置一个 rqt_plot 命令行工具,用于可视化消息流中的数据,此处运行的命令为:

1
rqt_plot cmd_vel/linear/x cmd_vel/angular/z

运行后显示的窗口,可看到监听字段的实时数值,它会不断滚动刷新;

image-20241016181114044

参数服务器

同一段代码,可能使用于不同的场景,例如移动平台在公路上(如自动驾驶)和在室内走廊中(如扫地机),显然会使用不同的速度进行移动;因此参数不适合写死在代码中,最好是支持外部传入。可以有很多种传递方法,例如消息、或者命令行也行;

1
2
# 此处通过命令行传递参数,之后在代码中可通过 has_param() 和 get_param() 读取外部传入的参数
./keys_to_twist.py _linear_scale:=0.5 _angular_scale=0.4
1
2
3
if rospy.has_param("~linear_scale"):
scale = rospy.get_param("~linear_scale")
pass

此处之所以能够使用 has_param() 和 get_param() 来查询和读取参数值,显然 ROS 将参数保存到了某个全局的位置,即参数服务器;

速度斜坡曲线

从物理定律来说,机器人无法瞬间启动或停止;如果发动机瞬时切换到过高的速度,也会带来一系列负面作用,例如打滑,甚至造成损坏;因此,更好的做法是引入加速度控制机制,按照可控的加速率,将速度逐步升高到目标值;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#!/usr/bin/env python3
# keys_to_twist_with_ramps.py

import rospy
import math
from std_msgs.msg import String
from geometry_msgs.msg import Twist

key_mapping = {"w": [0, 1], "x": [0, -1], "a": [-1, 0], "d": [1, 0], "s": [0, 0]}

g_twist_pub = None
g_target_twist = None
g_last_twist = None
g_last_twist_send_time = None
g_vel_scales = [0.1, 0.1] # 起始速度的缩放倍率,设置较小的初始值,以便从低速开始启动
g_vel_ramps = [1, 1] # 加速度,单位:米/平方秒

def ramped_vel(v_prev, v_target, t_prev, t_now, ramp_rate):
# 计算步进
step = ramp_rate * (t_now - t_prev).to_sec()
sign = 1.0 if (v_target > v_prev) else -1.0
error = math.fabs(v_target - v_prev) # float abs 取浮点数绝对值
if error < step:
return v_target
else:
return v_prev + sign * step # 向目标收敛

def ramped_twist(prev, target, t_prev, t_now, ramps):
tw = Twist()
tw.angular.z = ramped_vel(prev.angular.z, target.angular.z, t_prev, t_now, ramps[0])
tw.linear.x = ramped_vel(prev.linear.x, target.linear.x, t_prev, t_now, ramps[1])
return tw

def send_twist():
global g_last_twist_send_time, g_target_twist, g_last_twist, g_vel_scales, g_vel_ramps, g_twist_pub
t_now = rospy.Time.now()
g_last_twist = ramped_twist(
g_last_twist, g_target_twist, g_last_twist_send_time, t_now, g_vel_ramps
)
g_last_twist_send_time = t_now
g_twist_pub.publish(g_last_twist)

def keys_cb(msg):
global g_target_twist, g_last_twist, g_vel_scales
if len(msg.data) == 0 or msg.data[0] not in key_mapping:
return
vels = key_mapping[msg.data[0]]
g_target_twist.angular.z = vels[0] * g_vel_scales[0]
g_target_twist.linear.x = vels[1] * g_vel_scales[1]

def fetch_param(name, default):
if rospy.has_param(name):
return rospy.get_param(name)
else:
print("parameter {} not defined. Default to {}".format(name, default))
return default

if __name__ == "__main__":
rospy.init_node("keys_to_twist")
g_last_twist_send_time = rospy.Time.now()
g_twist_pub = rospy.Publisher("cmd_vel", Twist, queue_size=1)
rospy.Subscriber("keys", String, keys_cb)
g_target_twist = Twist()
g_last_twist = Twist()
g_vel_scales[0] = fetch_param("~angular_scale", 0.1)
g_vel_scales[1] = fetch_param("~linear_scale", 0.1)
g_vel_ramps[0] = fetch_param("~angular_accel", 1.0)
g_vel_ramps[1] = fetch_param("~angular_accel", 1.0)

rate = rospy.Rate(20)
while not rospy.is_shutdown():
send_twist()
rate.sleep()

开车

1
2
3
# 新建终端,setup 环境,然后运行如下命令
python3 ./keys_to_twist_with_ramps.py _linear_scale:=0.5 \
_angular_scale:=1.0 _linear_accel:=1.0 _angular_accel:=1.0

之后在键盘监听窗口,通过 wsadx 键控制小车的行进和转向;

rviz

rviz 是 ros visualization 的缩写,从名称即可看出来这是一个可视化的工具;

在仿真器中,相机可以安装在任何位置,不受任何物理限制,非常便于开发和测试;不过一般来说,相机通常会安装在机器人身上,尤其是肩上或脖子以上的位置,因为这个位置有助于获得更好的视野,同时相机也会跟随机器人进行移动,方便代入机器的视角来设计相应的算法;

运行以下命令,会启动 rviz 窗口;窗口中有很多参数项,可根据需要进行配置,例如设置视角的位置;设置好了后,还可以保存配置文件,以便后续复用,避免每次手工重复设置;

1
rosrun rviz rviz

启动后的窗口类似下面这个样子;rviz 自带了不少插件,可以借助这些插件,将数据可视化;


ROS机器人编程实践
https://ccw1078.github.io/2024/10/10/ROS机器人编程实践/
作者
ccw
发布于
2024年10月10日
许可协议