第一部分 基础知识
第1章 概述
ROS 虽然名称里面带个OS,但其实它并不是 OS,而是一个开发框架,就像 Express.js web 框架一样,区别在于它是面向机器人领域,同时市面上还有很多基于该框架的第三方工具包,共同组成一个生态,让整个机器人的开发变得更加容易。
为了降低耦合程度,提高各个模块的通用性,ROS 主要使用消息协议将不同的模块组织起来,进行协同工作;
第2章 预备知识
ROS 图
由于 ROS 的设计理论是各个独立程序之间相互通信以达成目标,因此,使用图(节点代表程序、边代表通信)可以很好的对设计概念进行可视化;
roscore
roscore 用于协助实现节点间的通信,某种程度来说,它有点像是一个路由器,或者说有点像 k8s 中的服务发现;节点通过它获得另外一个节点的地址,以便和其他节点建立通信;
另外,roscore 还提供了一个参数服务器,方便节点存储和读取数据;
catkin、工作区及 ROS 程序包
catkin 是一个构建工具,猜测有点像 cmake;后来发现它真的是基于 cmake 的扩展,添加了一些额外的功能;
问:为什么 ROS 使用自己的构建工具,而不是使用现有的构建工具如 cmake?
答:原来是需要进行功能上的扩展,同时也基于 cmake
工作区:代码文件的组织方式,以便文件之间可以相互导入或引用,并且与其他工作区的文件实现隔离;
catkin_init_workspace 命令可快速初始化一个工作区,有点像 IDE 里面新建了一个项目一样,之后通常将代码放在src 文件夹中;
1 2 3 4 5 6 7 8 9 source /opt/ros/indigo/setup.bashmkdir -p ~/catkin_ws/srccd ~/catkin_ws/src catkin_init_workspacecd ~/catkin_ws catkin_make
ROS 程序包:包存储在 src 文件夹中,包中包含有一个 CmakeLists 文件和一个 package.xml 文件,package.xml 主要用于描述一些包的相关信息,如包名称、版本号、作者等等;感觉有点点像 js 里面的 package.json 文件;
1 2 cd ~/catkin_ws/src catkin_create_pkg my_awesome_code rospy
rosrun
本质上每个节点都类似一个可执行程序,由于不同的可执行程序,存放在不同的文件夹中,导致运行该程序时,通常需要输入长长的路径前缀。rosrun 可以简化命令的输入,它会自动寻找可执行程序的位置,运行它,同时还可以传递参数给它;
1 2 rosrun <pakcage_executable> [ARGS]
注意:运行任意的 ros 程序前,都需要确保 roscore 已经在运行,因为它是服务发现的中枢;
rqt_graph 命令可用于绘制并显示节点图,类似下面这个样子
rosrun 从可用于启动单个节点(程序),如果要批量启动节点,则可以使用 roslaunch 命令
命名、命名空间以及重映射
ROS 通过使用命名空间来避免节点的命名冲突,命名空间类似路径名称,使用斜杠做为分隔符,例如
命名空间支持重映射,类似起了个别名,这样其他程序可以根据别名来通信;这个机制可以降低程序之间的耦合性;
1 2 ./image_view image:=right/image
1 2 3 4 5 6 ./camera __ns:=right ./talker __name:=talker1 ./talker __name:=talker2
roslaunch
roslaunch 用于批量启动多个节点,因此它需要配套一个 launch 文件,以便从文件中读取多个节点信息,并启动它们;
1 2 roslaunch <PACKAGE LAUNCH_FILE
launch 文件示例
tab 键
由于很多命令都比较长,可使用 tab 来自动补全命令,这样能够加快输入速度,
tf:坐标系转换
位置:由 (x, y, z) 三维向量表示
朝向:由 (roll, pitch, yaw) 三维向量表示,分别代表绕各个坐标轴旋转了多少角度;
位姿:由 (x, y, z, roll, pitch, yaw) 组成的六维向量表示;
ROS 使用 /tf 消息主题在节点间共享坐标变换数据,只需在代码中订阅该消息,即可查询有关节点在某个时刻的坐标信息;这些信息包括:
参与变换的两个坐标系名称(分别为 parent 和child)
坐标系之间的相对位置和朝向;
变换的时间截;
第3章 话题
话题:某个拥有特定数据类型的信息流;有点像 MQ 中的消息队列,有消息的发布者,也有消息的订阅者
将消息发布到话题
在工作区的 src 目录,创建一个名称为 basics 的包
1 catkin_create_pkg basics rospy
在 basics 包中的 src 目录中,创建一个 topic_publisher.py 文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import rospyfrom std_msgs.msg import Int32 rospy.init_node("topic_publisher" ) pub = rospy.Publisher('counter' , Int32) rate = rospy.Rate(2 ) count = 0 while not rospy.is_shutdown(): pub.publish(count) count += 1 rate.sleep()
以上代码有依赖 std_msgs.msg 包,因此需要在 pakcage.xml 文件中添加依赖信息,以便可以构建成功;
1 <depend package ="std_msgs" > </depend >
使用 source 设置当前目录做为工作空间,导入相关配置,以便可以运行以上述代码;
将 topic_publisher.py 文件设置为可执行文件权限
1 chmod u+x topic_publisher.py
在工作区中 src 目录中,输入以下命令行,运行 topic_publisher.py
1 rosrun basics top_publisher.py
只需确保 top_publisher.py 在 basics 目录即可,因为 rosrun 会自动查询可执行文件的位置;
运行起来后,即可通过以下命令,在另外一个终端查看主题的状态
1 2 3 4 5 rostopic list rostopic echo counter -n 5 rostopic hz counter rostopic info counter rostopic find std_msgs/Int32
订阅一个话题
编写 topic_subscriber.py 文件,订阅话题
1 2 3 4 5 6 7 8 9 10 11 12 import rospyfrom std_msgs.msg import Int32def callback (msg ): print (msg.data) rospy.init_node("topic_subscriber" ) sub = rospy.Subsciber("counter" , Int32, callback) rospy.spin()
ROS 使用事件驱动的机制,类似 js,因此不可避免会大量用到回调函数
运行订阅文件
1 2 3 source devel/setup.bash chmod u+x basics/src/topic_subscriber.py rosrun basics topic_subscriber.py
锁存话题
如果将某个话题标记为锁存的,那么订阅者在成功订阅话题后,将会收到该话题的最后一条消息;
有了锁存话题,就可以确保那么在发布者之后创建的订阅者,也能够收到发布者之前发布的消息。因为有可能发布者只发布一次消息,而不是不断的发布消息;
1 2 pub = rospy.Publisher("map" , nav_msgs/OccupancyGrid, latched=True )
定义自己的消息类型
ROS 的内置消息类型
除了内置的消息类型外,也可以自定义消息类型;ROS 为了让消息的数据格式能够在不同的语言中通用,增加了一层抽象,使用一个单独的 msg 文件来定义消息的数据格式。该文件由 catkin_make 编译后,即可由不同的语言编写的模块所读取和识别,即 A 语言编写的节点,能够订阅 B 语言编写的节点发布的消息。代价是每次消息的数据格式有变化,都需要重新运行一次 catkin_make,不然无法生效;
1 2 3 float32 real float32 imaginary
在运行 catkin_make 之前,我们需要在 package.yml 中添加构建信息
1 2 <build_depend > message_generation</build_depend > <run_depend > message_runtime</run_depend >
之后还需要修改一 下 CMakeLists 文件,,以便 catkin_make 知道如何自动构建与消息生成相关的代码,需要添加更新的内容包括
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 find_package (catkin REQUIRED COMPONENTS roscpp rospy std_msgs message_generation ) catkin_package( CATKIN_DEPENDS message_runtime ) add_message_files( FILES Complex.msg ) generate_messages( DEPENDENCIES std_msgs )
消息格式定义好了后,就可以在代码中进行引用了
让发布者和订阅者协同工作
第4章 服务
在节点间传递数据,除了使用话题来发布和订阅消息数据外,还有一种方法是“服务”(跨进程函数调用,个人感觉有点点像 RPC 远程进程调用,不过并非点对点的,而是在中间增加了一层抽象);
话题比较适用于高频传递数据的场景,服务比较适合低频场景,并且数据能够被快速处理,无须花费太长的时间,以免造成阻塞;
定义服务
1 2 3 4 string words --- uint32 count
同话题一样,也需要在 package.yml 文件中定义依赖
1 2 <build_depend > message_generation</build_depend > <run_depend > message_runtime</run_depend >
并在 CMakeLists 文件中定义构建信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 find_package (catkin REQUIRED COMPONENTS roscpp rospy std_msgs message_generation ) catkin_package( CATKIN_DEPENDS message_runtime ) add_service_files( FILES WordCount.srv ) generate_messages( DEPENDENCIES std_msgs )
注:rossrv list 命令查看所有可用服务;
运行 catkin_make 命令后,基于服务定义的文件,会生成三个类,它们分别是:
WordCount
WordCountRequest
WordCountResponse
以上类可以被代码文件导入并使用;
实现服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import rospyfrom basic.srv import WordCount, WordCountResponse def count_words (request ): return WordCountResponse(len (request.words.split())) rospy.init_node("service_server" ) service = rospy.Service("word_count" , WordCount, count_words) rospy.spin()
回调函数的返回值可以有多种形式,例如元组、列表或者字典;不管使用哪种形式,ROS 底层代码都会将其翻译成 WordCountResponse 对象;
1 2 3 4 5 6 7 8 def count_words (request ): return len (request.words.split()) def count_words (request ): return [len (request.words.split())] def count_words (request ): return { "count" : len (request.words.split())}
使用服务
可使用 rosservice call 在命令行中调用服务
另外也可以在代码中调用服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import rospyfrom basic.srv import WordCountimport sys rospy.init_node("service_client" ) rospy.wait_for_service("word_count" ) word_counter = rospy.ServiceProxy("word_count" , WordCount) words = " " .join(sys.argv[1 :]) word_count = word_counter(words) print (words, "->" , word_count)
由于服务的调用是同步的,因此当提供服务的节点不可用时,调用方的代码将会因此阻塞,一直要等到服务恢复正常时,才会继续往下;
第5章 动作
服务比较适用于简单的值查询,如果某个操作耗时较长或者用时未知,则服务显然用起来很不方便;对于时间不确定,且以目标导航型的操作来说,例如指挥机器人到达某个位置,那么使用”动作“机制更加合适;
”动作“是异步的,在执行动作的过程中,会实时提供反馈,并且还支持中途取消;
动作的定义
跟主题和服务一样,动作同样使用文件进行定义(文件名后缀为 .action),动作的定义由三部分构成,分别是:
目标:由动作的客户端发送
结果:由动作的服务端发送
反馈:由动作的服务定期发送
这三部分对应三种消息格式,本质上,动作是对主题消息的更高级的一种封装;
调用 catkin_make 构建动作定义文件后,会生成目标、结果、反馈等相关的类,可在代码中引入并使用
流程大致为:动作服务器启动后,等待客户端给它下命令;服务器接到命令后,开始干活,同时定期发送进度反馈给客户端;客户端可中途取消动作;如果不取消,服务端在动作完成后,还会给客户端发送一个结果;
action 动作文件一般放在包文件夹中的单独的 action 文件夹中,文件内容示例如下:
1 2 3 4 5 6 7 8 9 10 duration time_to_wait --- duration time_elapsed uint32 updates_sent --- duration time_elapsed duration time_remaining
注:此处的 duration 是内置的原始类型;
实现一个基本的动作服务器
通过创建一个定时器,来模拟创建动作服务器;原理跟话题差不多,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import rospyimport timeimport actionlibfrom basics.msg import TimerAction, TimerGoal, TimerResult def do_timer (goal ): start_time = time.time() time.sleep(goal.time_to_wait.to_sec()) result = TimerResult() result.time_elapsed = rospy.Duration.from_sec(time.time() - start_time) result.updates_sent = 0 server.set_succeded(result) rospy.init_node("timer_action_server" ) server = actionlib.SimpleActionServer("timer" , TimerAction, do_timer, False ) server.start() rospy.spin()
功能检查
编写好 simple_action_server.py 文件后,在运行之前,需要准备几件事情:
确保在当前 terminal 运行了 source ./devel/setup.bash,以便能够让环境变量中的 PYTHONPATH 指向当前目录中的 lib,这样才能够找到并导入已编译的 basics 程序包;
确保运行 chmod u+x simple_action_server.py,以便将文件设置为可执行的状态;
1 2 3 4 5 source ./devel/setup.bashchmod u+x simple_action_server.py rosrun basics simple_action_server.py
运行后应该可以看到以下几个新增加的 /timer 话题
动作的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import rospyimport actionlibfrom basics.msg import TimerAction, TimerGoal, TimerResult rospy.init_node("timer_action_client" ) client = actionlib.SimpleActionClient("timer" , TimerAction) client.wait_for_server() goal = TimerGoal() goal.time_to_wait = rospy.Duration.from_sec(5.0 ) client.send_goal(goal) client.wait_for_result() print ("Time elapsed: " , client.get_result().time_elapsed.to_sec())
实现一个更复杂的动作服务器
总体来说,动作跟服务很像,区别在于服务的调用是同步的,而动作的调用是异步的;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import rospyimport time, actionlibfrom basics.msg import TimerAction, TimerGoal, TimerResult, TimerFeedbackdef do_timer (goal ): start_time = time.time() update_count = 0 if goal.time_to_wait.to_sec() > 60.0 : result = TimerResult() result.time_elapsed = rospy.Duration.from_sec(time.time() - start_time) result.updates_sent = update_count server.set_aborted(result, "Timer aborted due to too long wait" ) return while (time.time() - start_time) < goal.time_to_wait.to_sec(): if server.is_preempt_requested(): result = TimerResult() result.time_elapsed = \ rospy.Duration.from_sec(time.time() - start_time) result.updates_sent = update_count server.set_preempted(result, "Timer preempted" ) return feedback = TimerFeedback() feedback.time_elapsed = \ rospy.Duration.from_sec(time.time() - start_time) feedback.time_remaining = goal.time_to_wait - feedback.time_elapsed server.publish_feedback(feedback) update_count += 1 time.sleep(1.0 ) result = TimerResult() result.time_elapsed = rospy.Duration.from_sec(time.time() - start_time) result.updates_sent = update_count server.set_succeeded(result, "Timer completed successfully" ) rospy.init_node("fancy_action_server" ) server = actionlib.SimpleActionServer("timer" , TimerAction, do_timer, False ) server.start() rospy.spin()
使用更复杂的动作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import rospyimport time, actionlibfrom basics.msg import TimerAction, TimerGoal, TimerResult, TimerFeedbackdef feedback_cb (feedback ): print ("[Feedback] Time elapsed: " , feedback.time_elapsed.to_sec()) print ("[Feedback] Time remains: " , feedback.time_remaining.to_sec()) rospy.init_node("timer_action_client" ) client = actionlib.SimpleActionClient("timer" , TimerAction) client.wait_for_server() goal = TimerGoal() goal.time_to_wait = rospy.Duration.from_sec(5.0 ) client.send_goal(goal, feedback_cb=feedback_cb) client.wait_for_result() print ("[Result] State: " , client.get_state())print ("[Result] Status: " , client.get_goal_status_text())print ("[Result] Time elapsed: " , client.get_result().time_elapsed.to_sec())print ("[Result] Updates sent: " , client.get_result().updates_sent)
小结
话题、服务、动作三者的区别:
动作比服务更具通用性,因此除非有特殊需求,否则一般都优先使用动作;
第6章 机器人与仿真器
子系统
将大问题拆分成小问题是一种常见的解决问题的思路,在 ROS 里面同样如此。通过将一个复杂的系统,拆分成多个小型的子系统,能够有效的降低复杂度;一种常见的子系统拆分方案是拆分成执行、感知、计算三个子系统;
执行主要负责与轮子、机械臂等部件进行交互;感知主要负责与传感器交互,例如摄像头、激光雷达等;计算主要是收集前两个子系统的数据,进行运算得出下一步行动的结果,以完成任务目标;
执行子系统:移动平台
差分驱动:有两个轮子,当两个轮子同向转动时,机器人会向前进;当反向转运时,机器人会原地转动;
由于两个轮子在静止时无法保持稳定,因此差分底盘一般还会配备几个没有动力的脚轮,用来提供支撑;这样当机器人静止时,也能够保持稳定,不会倒下去;
当然,不配备支撑脚轮也是可以的,这就需要机器的人底盘实时处于运动的状态,以实现自平衡,即使机器人实际上处于静止状态;;平衡轮的直径可以尽量设计的大一些,这种过障碍时,会更稳定,波动更小;该设计的优点是狭小空间的通过性更好,缺点是断电后会倒掉;
差分驱动也可以是多轮的,此时变成了滑动转向;当轮子比较多时,一般会用履带包裹,就像坦克或挖掘机那样;滑动结构与地面的摩擦力很大,因此轮子的磨损比较快,另外也容易对地面造成破坏;
阿克曼平台:后轮不转动,只转动前轮,来实现转向;家用小轿车即采用此种设计;缺点是不能横向移动;需要通过倒车来实现横移,即日常生活中常见的侧方停车;
表克纳姆底盘:优点是可以全向移动,缺点是滚轮直径小,因此只适合光滑表面,不然磨损很快;
为了实现不同底盘之间的指令复用,有必要对移动底盘进行抽象,通常会采用 twist 消息类型,用来表达三维空间中的线速度和角速度;底盘平台收到 twist 消息后,再根据自己的机械构造,拆分成相匹的轮子驱动指令;
执行子系统:机械臂
另外一个常见的执行子系统是机械臂,以便机器人能够完成操纵物体的任务,例如抓取和放置物品;
两种常见的机械臂结构:
旋转关节:常用于车间生产等工业机器人
线性关节:有点像3D打印机
理论上只需要 6 个自由度就能够让机械臂在工作环境中达到任意的位姿,但受限于机械结构,实际上很难达到,因此一般会增加 1 个自由度,来增加整个工作区的空间;但增加自由度意味着增加成本,可靠性下降,因此实际使用多少个自由度,主要取决于工作环境的需要和操作复杂程度;
传感子系统
传感器能够让机器人感知环境,并相应的调整自己的行为;传感器的种类非常丰富,并且不一定越多越好,而是根据任务需要而定。有些很复杂的操纵任务,甚至可能只需要几个二值传感器(如限位开关)就足够了;
除了二值传感器,还有一类传感器是“量传感器”,例如压力传感器;每个传感器一般会有其局限性,例如最大或最小有效距离等,因此一般需要组合多个传感器,来提高感知的准确度;
普通相机
双目立体相机的精度受限于相机的一些参数,并且要求拍摄目标有明显的纹理特征,否则难以比对两个视野的差异,例如一堵白墙的场景中,检测效果就不太好;
ROS 一般使用 sensor_msgs/Image 消息类型来传递图像信息,该消息包含了一系列与图像有关的数据,例如尺寸、格式、图像数据等;
深度相机
双目立体相机一般是被动式的工作,而深度相机则是主动式的,它会向环境投影光线,形成纹理,然后被相机捕捉;投射的光线一般使用红外波段(非可见光),以避免影响物体的颜色;
另一种深度相机是投射结构光(例如微软的 Kinect),同样也是通过相机捕捉纹理特征来重构三维空间;
还有一种深度相机是使用红外 LED 或激光发射头,原理有点像蝙蝠的声纳,区别是雷达投射的是光线,而不是声波;然后计算发出光线到收到光线反射的时间差来计算深度;
深度相机产生的数据一般是点云的形式,ROS 中的消息类型为 sensor_msgs/PointCloud2;由于相机无法保证每个区域都能正确返回数据,因此有时候在点云的可视化数据中,会存在一些“洞“,一般需要使用算法进行处理;
激光雷达
激光雷达的原理跟红外 LED 深度相机的原理相同,只是精度更高;常用于精度要求高的场景,例如自动驾驶;
ROS 中激光雷达的消息类型一般为 sensor_msgs/LaserScan;虽然不同雷达厂家的原始输出格式不同,但最终都会使用 ROS 的驱动包转换成统一的格式;
旋转编码器
旋转编码器用来感知某个可旋转部件的旋转圈数,从而计算位移,有点像汽车车轮上面的里程计;但考虑外部环境存在干扰因素,例如地面的摩擦系数,风阻等;旋转编码器直接计算出来的结果并不准确,通常都会和实际值出现一定的偏差;
旋转编码器在移动平台和机械臂上的使用方式差别很大,一个用来计算移动距离,消息类型为geometry_msgs/Transform;一个用来计算姿态,消息类型为 sensor_msgs/JointState;
计算子系统
由于 ROS 的设计架构是节点间的消息传递,因此不可避免带来了消息格式转换的额外计算开销;这种开销也导致了对处理器一定的门槛要求,例如 8 位处理器通常难以满足计算性能的要求;不过随着处理器的性能变得越来越强大,现在 ROS 已经能够运行在绝大多数的终端设备上了;
机器人系统举例
目前市面上已经有一些成熟的开箱即用的机器人开发平台,购买来后即可进行试验;
常用的机器人开发平台有:
PR2,据说售价 40 万美元;
Fetch,用于仓储业务的移动操作机器人;
Robonaut2,工作在国际空间站;
TurtleBot,用于 ROS 教育和原型开发;价格在 2000 美元以内;
仿真器
仿真软件有很多好处,最主要的好处包括:
免费,毕竟购买机器人的成本还是很高的;
不会产生机械故障,让开发者能够专注于算法的验证;
Stage:常用的 2D 仿真软件,建模语言简单,运行速度快,纯刚体世界;
Gazebo:适用于 3D 场景的仿真;可选择不同的引擎做刚体仿真;
SLAM,simultanious localization and mapping,同步定位与地图构建
2D 仿真,机器人只能在 2D 平面上运动;
市面上还有其他仿真软件,例如还有 MORSE,V-REP 等;每个都有其优缺点和取舍,因此需要根据自身的业务场景,选择最合适的;
第7章 Wander-bot
目标:编写一个可以自由行走,并自动避开障碍的机器人
创建包
创建工作区
创建程序包
编写可执行文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import rospyfrom geometry_msgs.msg import Twist cmd_vel_pub = rospy.Publisher("cmd_vel" , Twist, queue_size=1 ) rospy.init_node("red_light_green_light" ) red_light_twist = Twist() green_light_twist = Twist() green_light_twist.linear.x = 0.5 driving_forward = False light_change_time = rospy.Time.now() rate = rospy.Rate(10 )while not rospy.is_shutdown(): if driving_forward: cmd_vel_pub.publish(green_light_twist) else : cmd_vel_pub.publish(red_light_twist) if rospy.Time.now() > light_change_time: driving_forward = not driving_forward light_change_time = rospy.Time.now() + rospy.Duration(3 ) rate.sleep()
安装 ROS 程序包
1 2 3 4 5 6 7 8 sudo apt-get install ros-noetic-joy ros-noetic-teleop-twist-joy \ ros-noetic-teleop-twist-keyboard ros-noetic-laser-proc \ ros-noetic-rgbd-launch ros-noetic-rosserial-arduino \ ros-noetic-rosserial-python ros-noetic-rosserial-client \ ros-noetic-rosserial-msgs ros-noetic-amcl ros-noetic-map-server \ ros-noetic-move-base ros-noetic-urdf ros-noetic-xacro \ ros-noetic-compressed-image-transport ros-noetic-rqt* ros-noetic-rviz \ ros-noetic-gmapping ros-noetic-navigation ros-noetic-interactive-markers
安装 TurtleBot3 程序包
1 2 3 sudo apt install ros-noetic-dynamixel-sdksudo apt install ros-noetic-turtlebot3-msgssudo apt install ros-noetic-turtlebot3
安装 TurtleBot3 simulations 仿真器程序包
1 2 3 cd ~/wanderbot_ws/src/ git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3_simulations.gitcd ~/wanderbot_ws && catkin_make
设置当前文件来为新的工作区
启动仿真器,检查是否成功安装
1 2 export TURTLEBOT3_MODEL=waffle roslaunch turtlebot3_gazebo turtlebot3_world.launch
若成功安装,会出现以下界面
另外开启一个 terminal 终端,运行编写好的可执行文件
1 ./red_light_green_light.py
如果运行成功,每隔3秒钟,在预览窗口里面,可以看见小车开始移动了起来;不过移动的方向不太直,偶尔会自己打偏,不知为何;
读取传感器数据
如果想要在 ROS 中获取某个话题的讯息,可以考虑将其打印在终端上面,这样可以直观的看到消息是否在发布,同时确保消息类型与预期相符;
在新的终端窗口中,运行以下命令
便会不断打印出 scan 话题的信息流,格式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 --- header: seq : 45 stamp: secs: 11094 nsecs: 949000000 frame_id: "base_scan" angle_min: 0.0 angle_max: 6.28318977355957 angle_increment: 0.017501922324299812 time_increment: 0.0 scan_time: 0.0 range_min: 0.11999999731779099 range_max: 3.5 ranges: [0.1380552351474762, 0.12576285004615784, 0.12553857266902924, 0.14307521283626556, 0.12852567434310913, 0.15272736549377441, 0.12958106398582458, 0.11999999731779099, 0.13228699564933777, 0.11999999731779099, 0.14200454950332642, 0.11999999731779099, ... 0.14552319049835205] intensities: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... 0.0, 0.0, 0.0]
计算障碍物的距离
1 2 3 4 5 6 7 8 bearing = msg.angle_min + i * msg.angle_max / len (msg.ranges) range_ahead = msg.ranges[len (msg.ranges) * 0.5 ] closest_range = min (msg.ranges)
创建一个能够计算机器人正前方障碍物距离的节点(进程)
1 2 3 4 5 6 7 8 9 10 11 12 13 import rospyfrom sensor_mgs.msg import LaserScandef scan_callback (msg ): range_ahead = msg.ranges[len (msg.ranges) / 2 ] print ("range ahead: " , range_ahead) rospy.init_node("range_ahead" ) scan_sub = rospy.Subscriber("scan" , LaserScan, scan_callback) rospy.spin()
运行起来后,它会持续打印出前方障碍物的距离,类似下面这个样子
感知环境并移动
组合移动和感知两部分功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import rospyfrom geometry_msgs.msg import Twistfrom sensor_msgs.msg import LaserScandef scan_callback (msg ): global g_range_ahead g_range_ahead = min (msg.ranges) g_range_ahead = 1 scan_sub = rospy.Subscriber("scan" , LaserScan, scan_callback) cmd_vel_pub = rospy.Publisher("cmd_vel" , Twist, queue_size=1 ) rospy.init_node("wander" ) state_change_time = rospy.Time.now() driving_forward = True rate = rospy.Rate(10 ) safe_dist = 0.4 while not rospy.is_shutdown(): print ("range ahead" , g_range_ahead) if driving_forward: if (g_range_ahead < safe_dist or rospy.Time.now() > state_change_time): driving_forward = False state_change_time = rospy.Time.now() + rospy.Duration(5 ) else : if rospy.Time.now() > state_change_time: driving_forward = True state_change_time = rospy.Time.now() + rospy.Duration(30 ) twist = Twist() if driving_forward: twist.linear.x = 0.5 twist.angular.z = 0 else : twist.angular.z = 1 twist.linear.x = 0 cmd_vel_pub.publish(twist) rate.sleep()
第二部分 使用 ROS 驱动机器人行走
第8章 遥控机器人
目标:编写一个支持通过键盘进行遥控的机器人
开发模式
思路:将任务分解成多个节点(进程),每个节点负责分工一个小事情,节点之间通过话题交换数据;
对于遥控场景,一种分解方式如下:
节点A:监听键盘事件,广播消息;
节点B:监听广播,输出 Twist 消息作为响应;
键盘驱动
编写一个节点,监听键盘的输入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import sys, select, tty, termiosimport rospyfrom std_msgs.msg import Stringif __name__ == "__main__" : key_pub = rospy.Publisher("keys" , String, queue_size=1 ) rospy.init_node("keyboard_driver" ) rate = rospy.Rate(100 ) old_attr = termios.tcgetattr(sys.stdin) tty.setcbreak(sys.stdin.fileno()) print ("Publishing keystrokers. Press Ctrl-C to exit..." ) while not rospy.is_shutdown(): if select.select([sys.stdin], [], [], 0 )[0 ] == [sys.stdin]: key_pub.publish(sys.stdin.read(1 )) rate.sleep() termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_attr)
运动生成器
编写一个节点,用来对键盘输入做出响应,当按下 wsadx 等键时,分别做出前进、停止、左转、右转、后退等动作;同时机器持续执行最后一条指令,直到有新的指令进来,以便能够保证机器人一直处于运动的状态;因为 ROS 默认如果一定的时间内(如500ms)没有新指令进来,就会暂停机器人;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import rospyfrom std_msgs.msg import Stringfrom geometry_msgs.msg import Twist key_mapping = { "w" : [0 , 1 ], "x" : [0 , -1 ], "a" : [-1 , 0 ], "d" : [1 , 0 ], "s" : [0 , 0 ] } g_last_twist = None def keys_cb (msg, twist_pub ): global g_last_twist if len (msg.data) == 0 or msg.data[0 ] not in key_mapping: return vels = key_mapping[msg.data[0 ]] g_last_twist.angular.z = vels[0 ] g_last_twist.linear.x = vels[1 ] twist_pub.publish(g_last_twist) if __name__ == "__main__" : rospy.init_node("keys_to_twist" ) twist_pub = rospy.Publisher("cmd_vel" , Twist, queue_size=1 ) rospy.Subscriber("keys" , String, keys_cb, twist_pub) rate = rospy.Rate(10 ) g_last_twist = Twist() while not rospy.is_shutdown(): twist_pub.publish(g_last_twist) rate.sleep()
ROS 有内置一个 rqt_plot 命令行工具,用于可视化消息流中的数据,此处运行的命令为:
1 rqt_plot cmd_vel/linear/x cmd_vel/angular/z
运行后显示的窗口,可看到监听字段的实时数值,它会不断滚动刷新;
参数服务器
同一段代码,可能使用于不同的场景,例如移动平台在公路上(如自动驾驶)和在室内走廊中(如扫地机),显然会使用不同的速度进行移动;因此参数不适合写死在代码中,最好是支持外部传入。可以有很多种传递方法,例如消息、或者命令行也行;
1 2 ./keys_to_twist.py _linear_scale:=0.5 _angular_scale=0.4
1 2 3 if rospy.has_param("~linear_scale" ): scale = rospy.get_param("~linear_scale" ) pass
此处之所以能够使用 has_param() 和 get_param() 来查询和读取参数值,显然 ROS 将参数保存到了某个全局的位置,即参数服务器;
速度斜坡曲线
从物理定律来说,机器人无法瞬间启动或停止;如果发动机瞬时切换到过高的速度,也会带来一系列负面作用,例如打滑,甚至造成损坏;因此,更好的做法是引入加速度控制机制,按照可控的加速率,将速度逐步升高到目标值;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 import rospyimport mathfrom std_msgs.msg import Stringfrom geometry_msgs.msg import Twist key_mapping = {"w" : [0 , 1 ], "x" : [0 , -1 ], "a" : [-1 , 0 ], "d" : [1 , 0 ], "s" : [0 , 0 ]} g_twist_pub = None g_target_twist = None g_last_twist = None g_last_twist_send_time = None g_vel_scales = [0.1 , 0.1 ] g_vel_ramps = [1 , 1 ] def ramped_vel (v_prev, v_target, t_prev, t_now, ramp_rate ): step = ramp_rate * (t_now - t_prev).to_sec() sign = 1.0 if (v_target > v_prev) else -1.0 error = math.fabs(v_target - v_prev) if error < step: return v_target else : return v_prev + sign * step def ramped_twist (prev, target, t_prev, t_now, ramps ): tw = Twist() tw.angular.z = ramped_vel(prev.angular.z, target.angular.z, t_prev, t_now, ramps[0 ]) tw.linear.x = ramped_vel(prev.linear.x, target.linear.x, t_prev, t_now, ramps[1 ]) return twdef send_twist (): global g_last_twist_send_time, g_target_twist, g_last_twist, g_vel_scales, g_vel_ramps, g_twist_pub t_now = rospy.Time.now() g_last_twist = ramped_twist( g_last_twist, g_target_twist, g_last_twist_send_time, t_now, g_vel_ramps ) g_last_twist_send_time = t_now g_twist_pub.publish(g_last_twist)def keys_cb (msg ): global g_target_twist, g_last_twist, g_vel_scales if len (msg.data) == 0 or msg.data[0 ] not in key_mapping: return vels = key_mapping[msg.data[0 ]] g_target_twist.angular.z = vels[0 ] * g_vel_scales[0 ] g_target_twist.linear.x = vels[1 ] * g_vel_scales[1 ]def fetch_param (name, default ): if rospy.has_param(name): return rospy.get_param(name) else : print ("parameter {} not defined. Default to {}" .format (name, default)) return defaultif __name__ == "__main__" : rospy.init_node("keys_to_twist" ) g_last_twist_send_time = rospy.Time.now() g_twist_pub = rospy.Publisher("cmd_vel" , Twist, queue_size=1 ) rospy.Subscriber("keys" , String, keys_cb) g_target_twist = Twist() g_last_twist = Twist() g_vel_scales[0 ] = fetch_param("~angular_scale" , 0.1 ) g_vel_scales[1 ] = fetch_param("~linear_scale" , 0.1 ) g_vel_ramps[0 ] = fetch_param("~angular_accel" , 1.0 ) g_vel_ramps[1 ] = fetch_param("~angular_accel" , 1.0 ) rate = rospy.Rate(20 ) while not rospy.is_shutdown(): send_twist() rate.sleep()
开车
1 2 3 python3 ./keys_to_twist_with_ramps.py _linear_scale:=0.5 \ _angular_scale:=1.0 _linear_accel:=1.0 _angular_accel:=1.0
之后在键盘监听窗口,通过 wsadx 键控制小车的行进和转向;
rviz
rviz 是 ros visualization 的缩写,从名称即可看出来这是一个可视化的工具;
在仿真器中,相机可以安装在任何位置,不受任何物理限制,非常便于开发和测试;不过一般来说,相机通常会安装在机器人身上,尤其是肩上或脖子以上的位置,因为这个位置有助于获得更好的视野,同时相机也会跟随机器人进行移动,方便代入机器的视角来设计相应的算法;
运行以下命令,会启动 rviz 窗口;窗口中有很多参数项,可根据需要进行配置,例如设置视角的位置;设置好了后,还可以保存配置文件,以便后续复用,避免每次手工重复设置;
启动后的窗口类似下面这个样子;rviz 自带了不少插件,可以借助这些插件,将数据可视化;