从 ROS Bag 文件中提取源数据指南

📋 概述

本文档介绍如何从 ROS .bag 文件中提取图像和点云等源数据,无需安装完整的 ROS 环境。

🛠️ 环境准备

1. 安装必要的 Python 包

pip install rosbags opencv-python numpy

2. 验证安装

python -c "import rosbags, cv2, numpy; print('环境就绪')"

🔍 分析 Bag 文件内容

查看文件信息

# inspect_bag.py
from rosbags.rosbag1 import Reader as Reader1
from rosbags.rosbag2 import Reader as Reader2
import os

file_path = "converted_ros1_bag.bag"  # 你的 bag 文件路径

if file_path.endswith('.bag'):
    Reader = Reader1
else:
    Reader = Reader2

with Reader(file_path) as reader:
    print("=== 话题信息 ===")
    for connection in reader.connections:
        print(f"话题: {connection.topic} | 类型: {connection.msgtype} | 数量: {connection.msgcount}")

    print("\n=== 文件概览 ===")
    total_msg_count = sum(conn.msgcount for conn in reader.connections)
    print(f"总消息数: {total_msg_count}")
    duration_ns = reader.duration
    print(f"时长: {duration_ns / 1e9:.2f} 秒")

运行命令:

python inspect_bag.py

📸 提取图像数据

分析图像数据格式

python -c "
from rosbags.rosbag1 import Reader

with Reader('converted_ros1_bag.bag') as reader:
    count = 0
    for connection, timestamp, rawdata in reader.messages():
        if connection.topic == '/rs_camera/rgb' and count < 2:
            print(f'消息 {count}: 长度={len(rawdata)} 字节')
            print(f'前50字节(hex): {rawdata[:50].hex()}')
            print(f'后50字节(hex): {rawdata[-50:].hex()}')
            count += 1
"

在这里插入图片描述

提取所有图像

# extract_images.py
from rosbags.rosbag1 import Reader
import cv2
import numpy as np
import os

def extract_all_images():
    os.makedirs('extracted_images', exist_ok=True)
    count = 0
    
    with Reader('converted_ros1_bag.bag') as reader:
        for connection, timestamp, rawdata in reader.messages():
            if connection.topic == '/rs_camera/rgb':
                try:
                    # 根据分析得到的图像尺寸调整参数
                    # 示例:1920x1080 RGB 图像
                    width, height = 1920, 1080
                    expected_size = width * height * 3
                    
                    # 提取图像数据(通常在消息末尾)
                    image_data = rawdata[-expected_size:]
                    img_array = np.frombuffer(image_data, dtype=np.uint8).reshape(height, width, 3)
                    
                    # 保存为 PNG
                    cv2.imwrite(f'extracted_images/image_{count:06d}.png', img_array)
                    count += 1
                    
                    if count % 100 == 0:
                        print(f'已提取 {count} 张图像')
                        
                except Exception as e:
                    print(f'处理图像 {count} 时出错: {e}')
                    continue
    
    print(f'成功提取 {count} 张图像到 extracted_images/')

if __name__ == "__main__":
    extract_all_images()

运行命令:

python extract_images.py

🌐 提取点云数据

提取点云数据

# extract_pointclouds.py
from rosbags.rosbag1 import Reader
import os

def extract_all_pointclouds():
    os.makedirs('extracted_pointclouds', exist_ok=True)
    count = 0
    
    with Reader('converted_ros1_bag.bag') as reader:
        for connection, timestamp, rawdata in reader.messages():
            if connection.topic == '/rs_lidar/points':
                try:
                    # 保存原始点云二进制数据
                    with open(f'extracted_pointclouds/pointcloud_{count:06d}.bin', 'wb') as f:
                        f.write(rawdata)
                    
                    # 保存元数据信息
                    with open(f'extracted_pointclouds/pointcloud_{count:06d}_info.txt', 'w') as f:
                        f.write(f"时间戳: {timestamp}\n")
                        f.write(f"话题: {connection.topic}\n")
                        f.write(f"消息类型: {connection.msgtype}\n")
                        f.write(f"数据长度: {len(rawdata)} 字节\n")
                    
                    count += 1
                    
                    if count % 50 == 0:
                        print(f'已提取 {count} 帧点云')
                        
                except Exception as e:
                    print(f'处理点云 {count} 时出错: {e}')
                    continue
    
    print(f'成功提取 {count} 帧点云数据到 extracted_pointclouds/')

if __name__ == "__main__":
    extract_all_pointclouds()

运行命令:

python extract_pointclouds.py

🎯 一键提取脚本

批量提取所有数据

# extract_all.py
from rosbags.rosbag1 import Reader
import cv2
import numpy as np
import os

def extract_all_data():
    # 创建输出目录
    image_dir = 'extracted_data/images'
    pcd_dir = 'extracted_data/pointclouds'
    os.makedirs(image_dir, exist_ok=True)
    os.makedirs(pcd_dir, exist_ok=True)
    
    image_count = 0
    pcd_count = 0
    
    with Reader('converted_ros1_bag.bag') as reader:
        print("开始提取所有传感器数据...")
        
        for connection, timestamp, rawdata in reader.messages():
            try:
                if connection.topic == '/rs_camera/rgb':
                    # 提取图像
                    width, height = 1920, 1080
                    expected_size = width * height * 3
                    image_data = rawdata[-expected_size:]
                    img_array = np.frombuffer(image_data, dtype=np.uint8).reshape(height, width, 3)
                    cv2.imwrite(f'{image_dir}/image_{image_count:06d}.png', img_array)
                    image_count += 1
                    
                elif connection.topic == '/rs_lidar/points':
                    # 提取点云
                    with open(f'{pcd_dir}/pointcloud_{pcd_count:06d}.bin', 'wb') as f:
                        f.write(rawdata)
                    pcd_count += 1
                
                # 进度显示
                if (image_count + pcd_count) % 100 == 0:
                    print(f'处理进度: {image_count} 图像, {pcd_count} 点云')
                    
            except Exception as e:
                continue
    
    print(f"提取完成!")
    print(f"- 图像: {image_count} 张 -> {image_dir}")
    print(f"- 点云: {pcd_count} 帧 -> {pcd_dir}")

if __name__ == "__main__":
    extract_all_data()

运行命令:

python extract_all.py

🔧 故障排除

常见问题及解决方案

  1. 编码错误

    # 使用正确的 typestore
    from rosbags.typesys import get_typestore, Stores
    typestore = get_typestore(Stores.ROS1_NOETIC)
    
  2. 图像尺寸不匹配

    # 尝试常见图像尺寸
    sizes_to_try = [
        (480, 640),     # 640x480
        (720, 1280),    # 1280x720
        (1080, 1920),   # 1920x1080
    ]
    
  3. 数据位置不确定

    # 尝试不同的偏移量
    for offset in [0, 100, 200, 500, 1000]:
        image_data = rawdata[offset:offset+expected_size]
    

📊 验证提取结果

# 检查提取的文件
ls -la extracted_images/ | head -5
ls -la extracted_pointclouds/ | head -5

# 统计文件数量
echo "图像数量: $(ls extracted_images/*.png 2>/dev/null | wc -l)"
echo "点云数量: $(ls extracted_pointclouds/*.bin 2>/dev/null | wc -l)"

# 检查文件完整性
file extracted_images/*.png 2>/dev/null | head -3

💡 使用技巧

  1. 增量提取: 如果数据量很大,可以分批提取
  2. 进度监控: 定期打印进度信息
  3. 错误处理: 使用 try-except 避免单个文件错误影响整体提取
  4. 元数据保存: 同时保存时间戳等元数据信息

📁 输出文件结构

extracted_data/
├── images/
│   ├── image_000000.png
│   ├── image_000001.png
│   └── ...
└── pointclouds/
    ├── pointcloud_000000.bin
    ├── pointcloud_000000_info.txt
    └── ...
Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐