记一次解决rosbag2内存错误问题过程

问题背景

由于工作需要,需要开发一个程序,根据记录的信息,从rosbag2保存的数据包根据打点的时间戳,提取出一段段单独的数据包。 为了存储空间考虑,保存时以及提取时都指定了使用压缩的方式进行存储。压缩器和压缩模式如下:

1
2
compression_format: zstd
compression_mode: message

rosbag2不仅有命令行工具,同时还开放API给开发者使用。rosbag2还提供了convert功能,可以对录下来的bagfile进行二次提取。因此开发这部分功能可以参考源码里面的bag_rewriter.cpp

开发环境

ROS: ROS2-humble

OS: ubuntu 22.04

language: cpp17

rosbag2: 0.15.4-2jammy.20230118.031755

问题描述

完成上述数据提取功能后进行测试,发现提取的时候如果同时提取多个目标(构建多个writer,读取message后进行写操作,rosbag2源码参考部分,并且都进行压缩的时候,会有core dump,错误信息:

1
2
corrupted size vs. prev_size
[ros2run]: Aborted

然而在其他情况:1.同时提取多个目标,但是不进行压缩;2.提取一个目标,进行压缩;的时候,程序都可以正常运行

为了确保不是自己编写的程序导致的问题,又使用rosbag2 cli进行类似的尝试。根据converting-bags的说明,我使用下面的output_options.yaml进行数据转换:

1
2
3
4
5
6
7
8
9
output_bags:
- uri: out_bag01
all: true
compression_format: zstd
compression_mode: message
- uri: out_bag02
all: true
compression_format: zstd
compression_mode: message

然后使用如下命令进行提取

1
ros2 bag convert -i ./path/to/bag/file -o output_options.yaml

果不其然,直接使用rosbag2的包也会有core问题,不过报的信息会有区别,不同次运行报的信息还不一样,主要是类似下面这样:

1
2
3
malloc(): unsorted double linked list corrupted
free(): corrupted unsorted chunks
corrupted double-linked list

问题复现

简单起见可以直接使用rosbag2 cli复现这个问题 1. 准备一个ros2的bag包 2. 新建一个配置文件命名为:output_options.yaml,并填入下面内容:

1
2
3
4
5
6
7
8
9
output_bags:
- uri: out_bag01
all: true
compression_format: zstd
compression_mode: message
- uri: out_bag02
all: true
compression_format: zstd
compression_mode: message
3. 执行数据转换命令
1
2
source /opt/ros/humble/setup.bash
ros2 bag convert -i ./path/to/bag/file -o output_options.yaml

问题定位

增加日志

由于已经确认不是自己写的程序的问题,所以其实可以跳过这一步

gdb检查

cpp程序排查还是需要上gdb,不要自己瞎折腾瞎改代码。文末记录如何在ubuntu上生成core文件。 运行一次程序,生成core文件后使用gdb检查错误堆栈

1
2
3
gdb ros_ws/install/package/lib/package/excutable /path/to/corefile

bt # gdb命令,打印错误堆栈

可以看到如下的报错信息

backtraces

虽然是在子线程中出错,但是大概也能看到是在rosbag2_compression::SequentialCompressonWriter::compression_thread_fn()函数中出错。 这也符合了现象,只在进行压缩的时候会出错。

同时,从错误堆栈里可以看出应该是rosbag2_compression_zstd::ZstdCompressor在进行压缩处理的时候调用rcutils_uint8_array_resize函数出错。

对比不同的writer

由于在进行多个writers写数据,但是不执行压缩的时候并没有相关问题发生,所以可以对比一下普通的writerSequentialCompressonWriter在写函数中的区别

普通的writer,即rosbag2_cpp::SequentialWriter

普通的writer,即rosbag2_cpp::SequentialWriter中,void SequentialWriter::write(std::shared_ptr<rosbag2_storage::SerializedBagMessage> message)函数比较简单,核心代码是这几行

1
2
3
4
5
6
7
8
if (storage_options_.max_cache_size == 0u) {
// If cache size is set to zero, we write to storage directly
storage_->write(converted_msg);
++topic_information->message_count;
} else {
// Otherwise, use cache buffer
message_cache_->push(converted_msg);
}

由于bag_rewrite在调用writer的时候,max_cache_size始终设置为0,因此执行的是if分支,也就是直接调用底层的数据库封装类直接写入硬盘。

带压缩的writer即rosbag2_compression::SequentialCompressionWriter

带压缩功能的writer这部分的逻辑增加了一个单独线程做压缩处理,函数void SequentialCompressionWriter::write( std::shared_ptr<rosbag2_storage::SerializedBagMessage> message)的核心部分主要是这几行

1
2
3
4
5
6
std::lock_guard<std::mutex> lock(compressor_queue_mutex_);
while (compressor_message_queue_.size() > compression_options_.compression_queue_size) {
compressor_message_queue_.pop();
}
compressor_message_queue_.push(message);
compressor_condition_.notify_one();

其中的compressor_message_queue_在另一个线程,即函数void SequentialCompressionWriter::compression_thread_fn()被处理。 这里面有一个对传入的SerializedMessageSharedPtr对象中的内存直接进行处理

重新查看调用API的代码

调用rosbag2 API进行数据写操作的主要过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// create rosbag2_storage::StorageOptions
rosbag2_storage::StorageOptions storageOptions;
// create reader
auto inputReader = rosbag2_transport::ReaderWriterFactory::make_reader(storageOptions);
// open file
inputReader->open(storageOptions);

// create multiplt writers
...

// perform write
while (inputReader->has_next())
{
std::shared_ptr<rosbag2_storage::SerializedBagMessage> nextMsg = nullptr;
nextMsg = inputReader->read_next();
for (auto & writer : outputWriters):
{
writer->write(nextMsg);
}
}

可以看到,我们从reader中拿到的SerializedBagMessage是以共享指针的形式使用,当我们把共享指针传给不同的writer时,不同的writer内部同时对同一块内存进行数据压缩处理,因此也就导致了不同线程操作同一块地址空间的错误。

修复

由于不想更改rosbag2的代码,因此想办法在调用层解决。既然问题共享指针指向的内存被不同线程同时处理,那么我们就在每一次调用writer->write(nextMsg)的时候对nextMsg执行一次深拷贝,让不同的writer操作不同内存即可。 这里比较复杂的是SerializedBagMessage中使用一个指向rcutils_uint8_array_t的指针对内存进行管理。而rcutils_uint8_array_t里面又使用一个裸uint8_t*指针指向管理的内存。同时rcutils的文档不够详细,也没有提供函数对rcutils_uint8_array_t执行深拷贝操作。不过只要能够明白上面的内存管理关系,修复起来不太困难,关于rcutils_uint8_array_t的操作可以参考rosbag2里面的代码,最终修复的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
...create reader && writer
// perform write
while (inputReader->has_next())
{
std::shared_ptr<rosbag2_storage::SerializedBagMessage> nextMsg = nullptr;
nextMsg = inputReader->read_next();
for (auto & writer : outputWriters):
{
std::shared_ptr<rosbag2_storage::SerializedBagMessage> nextMsg_copy = std::make_shared<rosbag2_storage::SerializedBagMessage>();
// 新建rcutils_uint8_array_t需要按照下面这样的操作
nextMsg_copy->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,
[](rcutils_uint8_array_t * msg) {
auto fini_return = rcutils_uint8_array_fini(msg);
delete msg;
if (fini_return != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rosbag2_cpp"),
"Failed to destroy serialized message: " << rcutils_get_error_string().str);
}
});

// 新建后需要执行内存初始化
rcutils_allocator_t allocator = rcutils_get_default_allocator();
auto ret = rcutils_uint8_array_init(
nextMsg_copy->serialized_data.get(),
nextMsg->serialized_data->buffer_capacity,
&allocator);

// 检查是否初始化成功
if (ret != RCUTILS_RET_OK)
{
RCLCPP_ERROR(get_logger(), "Init arrry failed with ret code = %d", ret);
}

// 复制内容
nextMsg_copy->topic_name = nextMsg->topic_name;
nextMsg_copy->time_stamp = nextMsg->time_stamp;
nextMsg_copy->serialized_data->buffer_capacity = nextMsg->serialized_data->buffer_capacity;
nextMsg_copy->serialized_data->buffer_length = nextMsg->serialized_data->buffer_length;
// 复制内存
memcpy((void*)nextMsg_copy->serialized_data->buffer,
(void*)nextMsg->serialized_data->buffer, nextMsg->serialized_data->buffer_length);
writer->write(nextMsg_copy);
}
}

Issue 提交及 PR

这个bug同时提交到rosbag2仓库issues1262中。根据issue的反馈,这个bug在最新的rolling分支已经被修复。不过rolling分支的API与humble 的API差别较大,因此没法直接合并到humble分支中,因此在humble分支这个bug还没有被修复。

PR过程

  1. fork仓库
  2. 克隆fork后的仓库到本地
  3. 从humble分支切出bug_fix分支
  4. 先用原始代码编译并执行test
  5. 先新增一个单元测试用例,并执行编译测试,原始代码无法在新的单元测试下通过
  6. 修复代码
  7. 编译并测试,通过所有测试用例

PS:在修复过程中一开始总是无法通过测试用例,提示无法生成测试结果文件。后面查看发现运行过程中会core dump。经过gdb调试(为了调试能够输出最多的core dump信息,需要colcon build --cmake-args -DCMAKE_BUILD_TYPE=Debug)后发现,是因为声明了共享指针之后没有对齐进行初始化,后面又调用了该指针的成员变量。哎,还是太菜呀~

具体的PR

附录A:ubuntu core文件生成

  1. ubuntu有一个默认的错误搜集系统apport,必须要关掉之后才能生成core文件
1
2
3
sudo service apport status
sudo service apport stop
sudo systemctl apport disble # 似乎无法关闭开机启动,每次需要的时候需要手工确认是否关闭
  1. 设置core文件大小,默认是0,所以不会生成core文件
1
2
3
4
ulimit -c  # 若结果是0,则不会生成core文件
ulimit -c unlimited # 不限制core文件大小
echo "ulimit -c unlimited" >> ~/.bashrc # 选择当前使用的终端
echo "ulimit -c unlimited" >> ~/.zshrc
  1. 设置corefile路径(注意,路径必须普通用户可读!)
1
2
3
sudo mkdir /corefile
sudo chmod 777 /corefile
sysctl -w kernel.core_pattern=/corefile/core-%e-%p-%t
  1. CMakeLists.txt加上``Debug模式下不进行编译器优化
1
2
# In CMakeLists.txt
SET(CMAKE_CXX_FLAGS_DEBUG "$ENV{CXXFLAGS} -O0 -Wall -g -ggdb -Wno-unused-parameter")

注意在编译ROS2包的时候要指明Debug

1
colcon build --packages-select package_name --cmake-args -DCMAKE_BUILD_TYPE=Debug