Skip to content

Conversation

@jefferyann-db
Copy link
Contributor

  • Create the MCAP Spark data source reader
  • Folder level glob filtering for *.mcap files
  • Basic encoders for Data Parsing
  • Implemented Topic Filtering at Read Time
  • Added Sequence Number Support to output Dataframe.
  • Documentation Updates
  • Test Coverage
  • Make file

@dmoore247 dmoore247 self-requested a review November 13, 2025 18:16
@dmoore247 dmoore247 added the enhancement New feature or request label Nov 13, 2025
Copy link
Collaborator

@dmoore247 dmoore247 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick summary (2-line)
I reviewed the feat/mcap branch focusing on mcap files under mcap (source, tests, README, requirements). I found several bugs and usability/installation problems that will frustrate users and a couple of robustness/security issues to fix before publishing.

Highest-priority items (fix before release)
Crash when message encoding is missing (high severity — bug / causes runtime crash)
Evidence:
File: mcap_datasource.py
Code (around line ~76):
enc = (channel.message_encoding or schema.encoding).lower()
Problem: if both channel.message_encoding and schema.encoding are None, calling .lower() will raise AttributeError and crash the reader before any decoder error handling runs.
Impact:
Reading MCAP files that lack encoding metadata will crash the job (executor failure) instead of producing readable fallback output.
Suggested fix:
Normalize encoding safely: check raw value before calling .lower(). Example (conceptual):
enc_raw = channel.message_encoding or getattr(schema, "encoding", None)
if not enc_raw:
enc = "fallback"
else:
enc = enc_raw.lower()
Then fall back to the fallback decoder when encoding is unknown.
How to verify:
Unit tests or a small script that feeds a sample MCAP message with missing encoding; reader should not raise AttributeError and should yield rows with encoding="fallback" and data containing hex raw_data.
recursiveFileLookup option declared but not implemented (high/medium severity — user frustration)
Evidence:
mcap_datasource.py sets self.recursiveFileLookup = bool(self.options.get("recursiveFileLookup", "false"))
_path_handler(path, glob_pattern) has no parameter to enable recursion and ignores recursiveFileLookup.
README documents recursiveFileLookup as an option.
Impact:
Users enabling recursiveFileLookup will expect subdirectory traversal; they won't get it, causing confusion or missing files silently.
Suggested fix:
Update _path_handler to accept recursive (bool) and when true use Path.rglob(glob_pattern) or walk the directory tree. For example:
if path_obj.is_dir():
files = sorted(path_obj.rglob(glob_pattern)) if recursive else sorted(path_obj.glob(glob_pattern))
Ensure MCAPDataSourceReader passes self.recursiveFileLookup to _path_handler.
How to verify:
Create nested directories with .mcap files and assert that with recursiveFileLookup=true they are discovered.
Test script import path and registration are incorrect / will fail locally (high user-friction)
Evidence:
File: test_mcap_datasource.py
The test does sys.path.insert(0, str(Path(file).parent)) (inserts test), then from mcap_datasource import MCAPDataSource. The actual implementation is in mcap_datasource.py, so the module won't be found unless package installed or PYTHONPATH includes src.
The test calls spark.dataSource.register(MCAPDataSource) which is not a standard PySpark API for registering a Python-based DataSource — likely to fail.
Impact:
Users running the test as-is will get ImportError or registration errors. This blocks simple local validation.
Suggested fixes:
Adjust test to add src to PYTHONPATH (or recommend installing package via editable install in dev instructions).
Example change in test: sys.path.insert(0, str(Path(file).parent.parent / "src"))
Remove or replace spark.dataSource.register(...) with an approach that works for the intended Spark version or explain installation steps. If the source is to be used without packaging, instruct users to set PYTHONPATH (or install the package with pip install -e .) and then do spark.read.format("mcap")....
Provide a small wrapper module or setup.py/pyproject to make mcap importable (pip install -e mcap/) which is the cleaner developer experience.
How to verify:
Run the test after adding src to PYTHONPATH. Ensure import works and that reading a sample test.mcap proceeds or errors meaningfully.
Medium-priority items (should fix before publication)
Incomplete or inconsistent README / missing example files (medium)
Evidence:
README.md references mcap_spark_example.py and mcap_reader but these files are not present in the tree (there's mcap_datasource.py, but no example script in mcap).
README install instructions list pip install pyspark but requirements.txt doesn't include pyspark. Also README doesn't mention installing the mcap package from this repo (editable install) for local dev testing.
Impact:
Friction for new users trying to run examples or tests; missing example scripts lead to confusion.
Suggested fixes:
Add a short mcap_spark_example.py (or include usage snippet in README and ensure import paths are clear).
Document how to make the package importable for development (e.g., cd mcap && pip install -e . or PYTHONPATH=./mcap/src), or add a minimal pyproject.toml/setup.cfg to support editable install.
Either include pyspark in requirements.txt under an [dev] extras section or explicitly recommend installation steps in README.
How to verify:
Follow README steps from a fresh environment and confirm example runs.
Partitioning logic: potential uneven partitions / off-by-one for small lists (medium)
Evidence:
In MCAPDataSourceReader.partitions():
partition_size_max = int(max(1, length / self.numPartitions))
start is incremented by partition_size_max
If length < numPartitions, this produces partition_size_max = 1 and partitions equal to number of files — OK. But the logic could produce more partitions than requested or uneven distribution.
Impact:
Not a security risk, but can produce suboptimal parallelization. Document behavior or adjust algorithm to produce up to numPartitions partitions as requested.
Suggested fixes:
Consider computing base partition size with integer division and distributing remainder across partitions to keep partition count ≤ numPartitions.
How to verify:
Tests that exercise various combinations of length and numPartitions.
Lower-priority / robustness and style
Broad excepts and re-raising without context (low-medium)
Evidence:
except Exception as e: logs Error reading MCAP file {file_path}: {e} then raise (re-raises original). Good to preserve stack but consider using logger.exception() to capture full stack trace.
Suggested improvement:
Use logger.exception("...") where appropriate so tracebacks appear in logs.
How to verify:
Induce an error and check logs show tracebacks.
Potential decoder API mismatch or fragile assumptions (investigate)
Evidence:
DecoderFactory().decoder_for(message.log_time, schema) — unusual ordering; if the decoder factory expects (schema, ...) this could be wrong. I couldn't verify against the mcap-protobuf-support API in this environment.
Impact:
Runtime decoder errors if API different; current code may fail for some protobuf messages.
Suggested action:
Confirm DecoderFactory.decoder_for signature against mcap-protobuf-support version pinned in requirements.txt, and add unit tests around protobuf decoding. Add defensive try/catch with clear logged message if decoder API differs.
How to verify:
Add a small unit test decoding a protobuf-encoded message and assert fields are converted to dict.
Logging might expose large data in production (low)
Evidence:
The reader logs file paths and may log exceptions with message content; if MCAP files include sensitive payloads, logging whole messages or JSON could leak data.
Suggested mitigation:
Avoid logging raw message content. Keep logs to filenames, counts, and error summaries. Provide an optional debug flag to enable detailed logging.
Installation & configuration issues that will frustrate users
No packaging provided for the mcap data source (missing pyproject/setup). Developers must either adjust PYTHONPATH to include src or you should provide a minimal pyproject.toml so pip install -e mcap/ works.
Test script (test_mcap_datasource.py) assumes module import from current dir; update to insert src into sys.path or recommend editable install in README.
README references example files that are not present — add them or update docs.
README's installation bullets don't mention editable install or packaging; also do not reference the exact version constraints for pyspark (could be important for compatibility). Consider adding a "Development" section that explains how to run tests locally.
Concrete actionable checklist (prioritized)
Fix encoding None bug:
Update _read_mcap_file to safely compute enc without calling .lower() on None and fallback to "fallback".
Implement recursiveFileLookup:
Accept recursive flag in _path_handler and use rglob or os.walk when true; pass the flag from MCAPDataSourceReader.
Make tests runnable:
Update test_mcap_datasource.py to add src to PYTHONPATH OR provide packaging (pip install -e mcap/) and update README test instructions.
Remove or document spark.dataSource.register usage; provide a working local example for PySpark (explain how to add the data source to PYTHONPATH).
Improve logging on exceptions to use logger.exception() for tracebacks.
Add a minimal packaging (pyproject.toml) in mcap or a short README dev section explaining editable install and how to run the test.
Add a couple of unit tests:
One that exercises missing encoding flows to ensure fallback behavior.
One that tests recursive file discovery behavior.
Verify the DecoderFactory usage against the mcap-protobuf-support API. Add defensive code if necessary.
How to validate locally (quick checks)
After fixes, run:
Make mcap importable: cd to repo root and either:
export PYTHONPATH to include src OR
create pyproject.toml and run pip install -e mcap/
Run the test script:
python test_mcap_datasource.py
Check that reading a small test.mcap yields rows and does not crash when encoding metadata is missing.
Add unit tests to cover the two main bugs and run pytest on test.
Files I inspected
mcap_datasource.py — core code; primary location of bugs and missing features.
test_mcap_datasource.py — test expects different import layout and has a registration call likely to fail.
README.md — installation and examples; several inconsistencies vs repo contents.
requirements.txt — pinned mcap/protobuf dependencies; consider adding dev packaging instructions.
Final notes and recommendations
The most urgent actionable fixes are the encoding None crash and implementing recursiveFileLookup so the behavior matches README and user expectations.
Improve local developer experience by adding minimal packaging (pyproject) or clearer README dev instructions; this will drastically reduce friction when running tests or examples.
Add a couple of small unit tests (fallback encoding & recursive discovery) to prevent regressions.
Optionally add CI that runs a subset of tests with a small MCAP sample (if you can include a small MCAP file under test data with permissive license).
If you want, I can:

Draft the exact small patches for the encoding fix and recursiveFileLookup (no commit; I will only show the suggested diff), OR
Update the test to be runnable locally and create a minimal pyproject.toml for mcap to support pip install -e mcap/ (again, I will only propose changes; you asked for no changes now unless you want them).
Which would you like me to do next?

… data source with recursive file lookup and update README for usage examples
@jefferyann-db
Copy link
Contributor Author

Quick summary (2-line) I reviewed the feat/mcap branch focusing on mcap files under mcap (source, tests, README, requirements). I found several bugs and usability/installation problems that will frustrate users and a couple of robustness/security issues to fix before publishing.

Highest-priority items (fix before release) Crash when message encoding is missing (high severity — bug / causes runtime crash) Evidence: File: mcap_datasource.py Code (around line ~76): enc = (channel.message_encoding or schema.encoding).lower() Problem: if both channel.message_encoding and schema.encoding are None, calling .lower() will raise AttributeError and crash the reader before any decoder error handling runs. Impact: Reading MCAP files that lack encoding metadata will crash the job (executor failure) instead of producing readable fallback output. Suggested fix: Normalize encoding safely: check raw value before calling .lower(). Example (conceptual): enc_raw = channel.message_encoding or getattr(schema, "encoding", None) if not enc_raw: enc = "fallback" else: enc = enc_raw.lower() Then fall back to the fallback decoder when encoding is unknown. How to verify: Unit tests or a small script that feeds a sample MCAP message with missing encoding; reader should not raise AttributeError and should yield rows with encoding="fallback" and data containing hex raw_data. recursiveFileLookup option declared but not implemented (high/medium severity — user frustration) Evidence: mcap_datasource.py sets self.recursiveFileLookup = bool(self.options.get("recursiveFileLookup", "false")) _path_handler(path, glob_pattern) has no parameter to enable recursion and ignores recursiveFileLookup. README documents recursiveFileLookup as an option. Impact: Users enabling recursiveFileLookup will expect subdirectory traversal; they won't get it, causing confusion or missing files silently. Suggested fix: Update _path_handler to accept recursive (bool) and when true use Path.rglob(glob_pattern) or walk the directory tree. For example: if path_obj.is_dir(): files = sorted(path_obj.rglob(glob_pattern)) if recursive else sorted(path_obj.glob(glob_pattern)) Ensure MCAPDataSourceReader passes self.recursiveFileLookup to _path_handler. How to verify: Create nested directories with .mcap files and assert that with recursiveFileLookup=true they are discovered. Test script import path and registration are incorrect / will fail locally (high user-friction) Evidence: File: test_mcap_datasource.py The test does sys.path.insert(0, str(Path(file).parent)) (inserts test), then from mcap_datasource import MCAPDataSource. The actual implementation is in mcap_datasource.py, so the module won't be found unless package installed or PYTHONPATH includes src. The test calls spark.dataSource.register(MCAPDataSource) which is not a standard PySpark API for registering a Python-based DataSource — likely to fail. Impact: Users running the test as-is will get ImportError or registration errors. This blocks simple local validation. Suggested fixes: Adjust test to add src to PYTHONPATH (or recommend installing package via editable install in dev instructions). Example change in test: sys.path.insert(0, str(Path(file).parent.parent / "src")) Remove or replace spark.dataSource.register(...) with an approach that works for the intended Spark version or explain installation steps. If the source is to be used without packaging, instruct users to set PYTHONPATH (or install the package with pip install -e .) and then do spark.read.format("mcap").... Provide a small wrapper module or setup.py/pyproject to make mcap importable (pip install -e mcap/) which is the cleaner developer experience. How to verify: Run the test after adding src to PYTHONPATH. Ensure import works and that reading a sample test.mcap proceeds or errors meaningfully. Medium-priority items (should fix before publication) Incomplete or inconsistent README / missing example files (medium) Evidence: README.md references mcap_spark_example.py and mcap_reader but these files are not present in the tree (there's mcap_datasource.py, but no example script in mcap). README install instructions list pip install pyspark but requirements.txt doesn't include pyspark. Also README doesn't mention installing the mcap package from this repo (editable install) for local dev testing. Impact: Friction for new users trying to run examples or tests; missing example scripts lead to confusion. Suggested fixes: Add a short mcap_spark_example.py (or include usage snippet in README and ensure import paths are clear). Document how to make the package importable for development (e.g., cd mcap && pip install -e . or PYTHONPATH=./mcap/src), or add a minimal pyproject.toml/setup.cfg to support editable install. Either include pyspark in requirements.txt under an [dev] extras section or explicitly recommend installation steps in README. How to verify: Follow README steps from a fresh environment and confirm example runs. Partitioning logic: potential uneven partitions / off-by-one for small lists (medium) Evidence: In MCAPDataSourceReader.partitions(): partition_size_max = int(max(1, length / self.numPartitions)) start is incremented by partition_size_max If length < numPartitions, this produces partition_size_max = 1 and partitions equal to number of files — OK. But the logic could produce more partitions than requested or uneven distribution. Impact: Not a security risk, but can produce suboptimal parallelization. Document behavior or adjust algorithm to produce up to numPartitions partitions as requested. Suggested fixes: Consider computing base partition size with integer division and distributing remainder across partitions to keep partition count ≤ numPartitions. How to verify: Tests that exercise various combinations of length and numPartitions. Lower-priority / robustness and style Broad excepts and re-raising without context (low-medium) Evidence: except Exception as e: logs Error reading MCAP file {file_path}: {e} then raise (re-raises original). Good to preserve stack but consider using logger.exception() to capture full stack trace. Suggested improvement: Use logger.exception("...") where appropriate so tracebacks appear in logs. How to verify: Induce an error and check logs show tracebacks. Potential decoder API mismatch or fragile assumptions (investigate) Evidence: DecoderFactory().decoder_for(message.log_time, schema) — unusual ordering; if the decoder factory expects (schema, ...) this could be wrong. I couldn't verify against the mcap-protobuf-support API in this environment. Impact: Runtime decoder errors if API different; current code may fail for some protobuf messages. Suggested action: Confirm DecoderFactory.decoder_for signature against mcap-protobuf-support version pinned in requirements.txt, and add unit tests around protobuf decoding. Add defensive try/catch with clear logged message if decoder API differs. How to verify: Add a small unit test decoding a protobuf-encoded message and assert fields are converted to dict. Logging might expose large data in production (low) Evidence: The reader logs file paths and may log exceptions with message content; if MCAP files include sensitive payloads, logging whole messages or JSON could leak data. Suggested mitigation: Avoid logging raw message content. Keep logs to filenames, counts, and error summaries. Provide an optional debug flag to enable detailed logging. Installation & configuration issues that will frustrate users No packaging provided for the mcap data source (missing pyproject/setup). Developers must either adjust PYTHONPATH to include src or you should provide a minimal pyproject.toml so pip install -e mcap/ works. Test script (test_mcap_datasource.py) assumes module import from current dir; update to insert src into sys.path or recommend editable install in README. README references example files that are not present — add them or update docs. README's installation bullets don't mention editable install or packaging; also do not reference the exact version constraints for pyspark (could be important for compatibility). Consider adding a "Development" section that explains how to run tests locally. Concrete actionable checklist (prioritized) Fix encoding None bug: Update _read_mcap_file to safely compute enc without calling .lower() on None and fallback to "fallback". Implement recursiveFileLookup: Accept recursive flag in _path_handler and use rglob or os.walk when true; pass the flag from MCAPDataSourceReader. Make tests runnable: Update test_mcap_datasource.py to add src to PYTHONPATH OR provide packaging (pip install -e mcap/) and update README test instructions. Remove or document spark.dataSource.register usage; provide a working local example for PySpark (explain how to add the data source to PYTHONPATH). Improve logging on exceptions to use logger.exception() for tracebacks. Add a minimal packaging (pyproject.toml) in mcap or a short README dev section explaining editable install and how to run the test. Add a couple of unit tests: One that exercises missing encoding flows to ensure fallback behavior. One that tests recursive file discovery behavior. Verify the DecoderFactory usage against the mcap-protobuf-support API. Add defensive code if necessary. How to validate locally (quick checks) After fixes, run: Make mcap importable: cd to repo root and either: export PYTHONPATH to include src OR create pyproject.toml and run pip install -e mcap/ Run the test script: python test_mcap_datasource.py Check that reading a small test.mcap yields rows and does not crash when encoding metadata is missing. Add unit tests to cover the two main bugs and run pytest on test. Files I inspected mcap_datasource.py — core code; primary location of bugs and missing features. test_mcap_datasource.py — test expects different import layout and has a registration call likely to fail. README.md — installation and examples; several inconsistencies vs repo contents. requirements.txt — pinned mcap/protobuf dependencies; consider adding dev packaging instructions. Final notes and recommendations The most urgent actionable fixes are the encoding None crash and implementing recursiveFileLookup so the behavior matches README and user expectations. Improve local developer experience by adding minimal packaging (pyproject) or clearer README dev instructions; this will drastically reduce friction when running tests or examples. Add a couple of small unit tests (fallback encoding & recursive discovery) to prevent regressions. Optionally add CI that runs a subset of tests with a small MCAP sample (if you can include a small MCAP file under test data with permissive license). If you want, I can:

Draft the exact small patches for the encoding fix and recursiveFileLookup (no commit; I will only show the suggested diff), OR Update the test to be runnable locally and create a minimal pyproject.toml for mcap to support pip install -e mcap/ (again, I will only propose changes; you asked for no changes now unless you want them). Which would you like me to do next?

All changes implemented

Copy link
Collaborator

@dmoore247 dmoore247 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@dmoore247 dmoore247 merged commit 6f3c1d8 into main Dec 18, 2025
1 check passed
@dmoore247 dmoore247 deleted the feat/mcap branch December 18, 2025 18:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants