|
| 1 | +# MCAP Spark Data Source |
| 2 | + |
| 3 | +A custom Apache Spark data source for reading MCAP (ROS 2 bag) files. |
| 4 | + |
| 5 | +## Overview |
| 6 | + |
| 7 | +This data source allows you to read MCAP files directly into Spark DataFrames, making it easy to process and analyze robotics data at scale using Spark SQL and DataFrame APIs. |
| 8 | + |
| 9 | +## Features |
| 10 | + |
| 11 | +- ✅ Read MCAP files with multiple message types (protobuf, JSON, etc.) |
| 12 | +- ✅ Automatic message decoding |
| 13 | +- ✅ Partitioned reading for parallel processing |
| 14 | +- ✅ Support for glob patterns to read multiple files |
| 15 | +- ✅ JSON output for flexible schema handling |
| 16 | +- ✅ Compatible with Delta Lake and Parquet formats |
| 17 | + |
| 18 | +## Installation |
| 19 | + |
| 20 | +### Requirements |
| 21 | + |
| 22 | +```bash |
| 23 | +pip install pyspark |
| 24 | +pip install mcap |
| 25 | +pip install mcap-protobuf-support |
| 26 | +pip install protobuf |
| 27 | +``` |
| 28 | + |
| 29 | +### Files |
| 30 | + |
| 31 | +- `mcap_datasource.py` - The main data source implementation |
| 32 | +- `mcap_reader` - Standalone reader (non-Spark) |
| 33 | + |
| 34 | +## Usage |
| 35 | + |
| 36 | +### Basic Usage |
| 37 | + |
| 38 | +```python |
| 39 | +from pyspark.sql import SparkSession |
| 40 | + |
| 41 | +spark = SparkSession.builder \ |
| 42 | + .appName("MCAP Reader") \ |
| 43 | + .getOrCreate() |
| 44 | + |
| 45 | +# Read a single MCAP file |
| 46 | +df = spark.read.format("mcap") \ |
| 47 | + .option("path", "/path/to/file.mcap") \ |
| 48 | + .load() |
| 49 | + |
| 50 | +df.show() |
| 51 | +``` |
| 52 | + |
| 53 | +### Reading Multiple Files |
| 54 | + |
| 55 | +```python |
| 56 | +# Read all MCAP files in a directory (non-recursive) |
| 57 | +df = spark.read.format("mcap") \ |
| 58 | + .option("path", "/path/to/mcap/directory") \ |
| 59 | + .option("pathGlobFilter", "*.mcap") \ |
| 60 | + .load() |
| 61 | + |
| 62 | +# Read all MCAP files recursively including subdirectories |
| 63 | +df = spark.read.format("mcap") \ |
| 64 | + .option("path", "/path/to/mcap/directory") \ |
| 65 | + .option("recursiveFileLookup", "true") \ |
| 66 | + .load() |
| 67 | +``` |
| 68 | + |
| 69 | +### Filtering by Topic at Read Time |
| 70 | + |
| 71 | +For better performance, filter by topic during the read operation instead of after loading: |
| 72 | + |
| 73 | +```python |
| 74 | +# Read only "pose" topic messages (more efficient than df.filter()) |
| 75 | +df = spark.read.format("mcap") \ |
| 76 | + .option("path", "/path/to/file.mcap") \ |
| 77 | + .option("topicFilter", "pose") \ |
| 78 | + .load() |
| 79 | + |
| 80 | +# Read all topics (default behavior) |
| 81 | +df = spark.read.format("mcap") \ |
| 82 | + .option("path", "/path/to/file.mcap") \ |
| 83 | + .option("topicFilter", "*") \ |
| 84 | + .load() |
| 85 | + |
| 86 | +# Or simply omit topicFilter to read all topics |
| 87 | +df = spark.read.format("mcap") \ |
| 88 | + .option("path", "/path/to/file.mcap") \ |
| 89 | + .load() |
| 90 | +``` |
| 91 | + |
| 92 | +### Options |
| 93 | + |
| 94 | +| Option | Default | Description | |
| 95 | +|--------|---------|-------------| |
| 96 | +| `path` | *(required)* | Path to MCAP file(s) or directory | |
| 97 | +| `pathGlobFilter` | `*.mcap` | Glob pattern for file matching | |
| 98 | +| `numPartitions` | `4` | Number of partitions for parallel processing | |
| 99 | +| `recursiveFileLookup` | `false` | Recursively search subdirectories | |
| 100 | +| `topicFilter` | *(none)* | Filter by specific topic name. Use `*` or omit to read all topics | |
| 101 | + |
| 102 | +### Schema |
| 103 | + |
| 104 | +The data source produces a DataFrame with the following schema: |
| 105 | + |
| 106 | +| Column | Type | Description | |
| 107 | +|--------|------|-------------| |
| 108 | +| `sequence` | BIGINT | Sequential message number within partition (starts at 0) | |
| 109 | +| `topic` | STRING | The message topic (e.g., "pose", "camera_jpeg") | |
| 110 | +| `schema` | STRING | The schema name (e.g., "foxglove.PoseInFrame") | |
| 111 | +| `encoding` | STRING | The encoding type (protobuf, json, etc.) | |
| 112 | +| `log_time` | BIGINT | The message timestamp in nanoseconds | |
| 113 | +| `data` | STRING | JSON string containing all message fields | |
| 114 | + |
| 115 | +### Working with JSON Data |
| 116 | + |
| 117 | +Extract specific fields from the JSON `data` column: |
| 118 | + |
| 119 | +```python |
| 120 | +from pyspark.sql.functions import get_json_object, col |
| 121 | + |
| 122 | +# Extract position coordinates from pose messages |
| 123 | +pose_df = df.filter(col("topic") == "pose") \ |
| 124 | + .select( |
| 125 | + "log_time", |
| 126 | + get_json_object(col("data"), "$.position.x").alias("pos_x"), |
| 127 | + get_json_object(col("data"), "$.position.y").alias("pos_y"), |
| 128 | + get_json_object(col("data"), "$.position.z").alias("pos_z"), |
| 129 | + get_json_object(col("data"), "$.orientation.w").alias("orient_w") |
| 130 | + ) |
| 131 | + |
| 132 | +pose_df.show() |
| 133 | +``` |
| 134 | + |
| 135 | +### Filtering by Topic |
| 136 | + |
| 137 | +```python |
| 138 | +# Get only camera images |
| 139 | +camera_df = df.filter(col("topic") == "camera_jpeg") |
| 140 | + |
| 141 | +# Get only microphone data |
| 142 | +audio_df = df.filter(col("topic") == "microphone") |
| 143 | + |
| 144 | +# Multiple topics |
| 145 | +events_df = df.filter(col("topic").isin(["mouse", "keyboard"])) |
| 146 | + |
| 147 | +# Order by sequence to maintain message order |
| 148 | +ordered_df = df.orderBy("sequence") |
| 149 | +``` |
| 150 | + |
| 151 | +### Aggregations |
| 152 | + |
| 153 | +```python |
| 154 | +# Count messages by topic |
| 155 | +df.groupBy("topic").count().show() |
| 156 | + |
| 157 | +# Get time range per topic |
| 158 | +from pyspark.sql.functions import min, max |
| 159 | + |
| 160 | +df.groupBy("topic") \ |
| 161 | + .agg( |
| 162 | + min("log_time").alias("start_time"), |
| 163 | + max("log_time").alias("end_time") |
| 164 | + ).show() |
| 165 | +``` |
| 166 | + |
| 167 | +### Saving to Delta Lake / Parquet |
| 168 | + |
| 169 | +```python |
| 170 | +# Save as Parquet (partitioned by topic) |
| 171 | +df.write.mode("overwrite") \ |
| 172 | + .partitionBy("topic") \ |
| 173 | + .parquet("/path/to/output/parquet") |
| 174 | + |
| 175 | +# Save as Delta Lake (if Delta is configured) |
| 176 | +df.write.mode("overwrite") \ |
| 177 | + .format("delta") \ |
| 178 | + .partitionBy("topic") \ |
| 179 | + .save("/path/to/output/delta") |
| 180 | +``` |
| 181 | + |
| 182 | +## Example: Complete Pipeline |
| 183 | + |
| 184 | +```python |
| 185 | +from pyspark.sql import SparkSession |
| 186 | +from pyspark.sql.functions import col, get_json_object, from_unixtime |
| 187 | + |
| 188 | +spark = SparkSession.builder \ |
| 189 | + .appName("MCAP Analysis") \ |
| 190 | + .getOrCreate() |
| 191 | + |
| 192 | +# Read MCAP file |
| 193 | +df = spark.read.format("mcap") \ |
| 194 | + .option("path", "/path/to/demo.mcap") \ |
| 195 | + .option("numPartitions", "8") \ |
| 196 | + .load() |
| 197 | + |
| 198 | +# Add human-readable timestamp |
| 199 | +df = df.withColumn( |
| 200 | + "timestamp", |
| 201 | + from_unixtime(col("log_time") / 1e9) |
| 202 | +) |
| 203 | + |
| 204 | +# Process pose data |
| 205 | +pose_df = df.filter(col("topic") == "pose") \ |
| 206 | + .select( |
| 207 | + "timestamp", |
| 208 | + get_json_object(col("data"), "$.position.x").cast("double").alias("x"), |
| 209 | + get_json_object(col("data"), "$.position.y").cast("double").alias("y"), |
| 210 | + get_json_object(col("data"), "$.position.z").cast("double").alias("z") |
| 211 | + ) |
| 212 | + |
| 213 | +# Calculate statistics |
| 214 | +pose_df.describe().show() |
| 215 | + |
| 216 | +# Save results |
| 217 | +pose_df.write.mode("overwrite") \ |
| 218 | + .parquet("/path/to/output/pose_data") |
| 219 | + |
| 220 | +spark.stop() |
| 221 | +``` |
| 222 | + |
| 223 | +## Architecture |
| 224 | + |
| 225 | +### Components |
| 226 | + |
| 227 | +1. **MCAPDataSource**: Main data source class |
| 228 | + - Implements the Spark DataSource interface |
| 229 | + - Defines schema and creates readers |
| 230 | + |
| 231 | +2. **MCAPDataSourceReader**: Reader implementation |
| 232 | + - Handles file discovery and partitioning |
| 233 | + - Coordinates parallel reading across executors |
| 234 | + |
| 235 | +3. **Partition Functions**: |
| 236 | + - `_path_handler`: Discovers files matching glob patterns |
| 237 | + - `_read_mcap_partition`: Reads MCAP files in a partition |
| 238 | + - `_read_mcap_file`: Decodes individual MCAP files |
| 239 | + |
| 240 | +4. **Decoders**: |
| 241 | + - `decode_protobuf_message`: Handles protobuf messages |
| 242 | + - `decode_json_message`: Handles JSON messages |
| 243 | + - `decode_fallback`: Handles unknown formats |
| 244 | + |
| 245 | +### Data Flow |
| 246 | + |
| 247 | +``` |
| 248 | +MCAP Files → File Discovery → Partitioning → Parallel Read → Decode → JSON → DataFrame |
| 249 | +``` |
| 250 | + |
| 251 | +## Performance Tips |
| 252 | + |
| 253 | +1. **Topic Filtering at Read Time**: Use `topicFilter` option for best performance |
| 254 | + ```python |
| 255 | + # BEST: Filter during read (skips unwanted messages early) |
| 256 | + df = spark.read.format("mcap") \ |
| 257 | + .option("path", "/path/to/file.mcap") \ |
| 258 | + .option("topicFilter", "pose") \ |
| 259 | + .load() |
| 260 | + |
| 261 | + # GOOD: Filter after read (still efficient with predicate pushdown) |
| 262 | + df = spark.read.format("mcap") \ |
| 263 | + .option("path", "/path/to/file.mcap") \ |
| 264 | + .load() \ |
| 265 | + .filter(col("topic") == "pose") |
| 266 | + ``` |
| 267 | + |
| 268 | +2. **Partitioning**: Adjust `numPartitions` based on cluster size |
| 269 | + ```python |
| 270 | + .option("numPartitions", "16") # For larger clusters |
| 271 | + ``` |
| 272 | + |
| 273 | +3. **Select Only Needed Fields**: Extract JSON fields early |
| 274 | + ```python |
| 275 | + df.select("log_time", get_json_object(col("data"), "$.position")) |
| 276 | + ``` |
| 277 | + |
| 278 | +4. **Persist for Multiple Actions**: Cache if reusing DataFrame |
| 279 | + ```python |
| 280 | + df = df.filter(...).persist() |
| 281 | + ``` |
| 282 | + |
| 283 | +## Troubleshooting |
| 284 | + |
| 285 | +### "No MCAP files found" |
| 286 | +- Check that the path exists and contains `.mcap` files |
| 287 | +- Verify the `pathGlobFilter` option matches your files |
| 288 | +- Check file permissions |
| 289 | + |
| 290 | +### Decoder Errors |
| 291 | +- Messages with unknown encodings fall back to hex-encoded `raw_data` |
| 292 | +- Check protobuf dependencies if protobuf decoding fails |
| 293 | + |
| 294 | + |
| 295 | + |
| 296 | +## 📄 Third-Party Package Licenses |
| 297 | + |
| 298 | +© 2025 Databricks, Inc. All rights reserved. The source in this project is provided subject to the Databricks License [https://databricks.com/db-license-source]. All included or referenced third party libraries are subject to the licenses set forth below. |
| 299 | + |
| 300 | +| Package | Purpose | License | Source | |
| 301 | +| ------- | ------- | ------- | ------ | |
| 302 | +| mcap | MCAP file format reader | MIT | https://pypi.org/project/mcap/ | |
| 303 | +| mcap-protobuf-support | Protobuf decoder for MCAP | MIT | https://pypi.org/project/mcap-protobuf-support/ | |
| 304 | +| protobuf | Protocol Buffers serialization | BSD-3-Clause | https://pypi.org/project/protobuf/ | |
| 305 | +| pyspark | Apache Spark Python API | Apache-2.0 | https://pypi.org/project/pyspark/ | |
| 306 | + |
| 307 | +## References |
| 308 | + |
| 309 | +- [MCAP Format Specification](https://mcap.dev/) |
| 310 | +- [Apache Spark Data Source API](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataSource.html) |
| 311 | +- [ROS 2 Documentation](https://docs.ros.org/) |
| 312 | +- [Foxglove Studio](https://foxglove.dev/) |
| 313 | + |
0 commit comments