-
Notifications
You must be signed in to change notification settings - Fork 20
[ISSUE #27] poll_batches() API call and IT tests #127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
c9850bb to
1ea2031
Compare
|
Hi @luoyuxia, PTAL 🙏 |
leekeiabstraction
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the PR! Hope you don't mind me reviewing it, left a comment.
|
@leekeiabstraction I've splitted the logic intro two different scanner types. PTAL 🙏 |
79b260f to
3b0708f
Compare
|
@fresh-borzoni Thanks for the pr. I'll have a look when i find some time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a new batch-oriented scanning API (RecordBatchLogScanner) for analytics workloads, providing compile-time separation between per-record and batch access patterns. The implementation shares common logic via LogScannerInner while exposing distinct public APIs for each mode.
- Created
RecordBatchLogScannerfor direct ArrowRecordBatchaccess alongside existingLogScanner - Implemented shared
LogScannerInnercontaining common scanning logic for both scanner types - Added comprehensive integration tests covering basic functionality, projections, order preservation, and offset tracking
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/fluss/src/client/table/scanner.rs | Core implementation: Added RecordBatchLogScanner and LogScannerInner with separate poll_batches() and collect_batches() methods for batch-level access |
| crates/fluss/src/client/table/mod.rs | Exports new RecordBatchLogScanner type in public API |
| crates/fluss/src/client/table/log_fetch_buffer.rs | Extended CompletedFetch trait with fetch_batches() method and added next_fetched_batch() helper for direct batch extraction |
| crates/fluss/src/record/arrow.rs | Added LogRecordBatch::record_batch() method for efficient batch-level access without row iteration |
| crates/fluss/tests/integration/table.rs | Added 5 comprehensive integration tests covering basic functionality, empty results, projection, order preservation, and offset continuation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
luoyuxia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fresh-borzoni Thanks for the pr. Only left minor comments. PTAL
|
@luoyuxia Thanks for the review. |
luoyuxia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fresh-borzoni Thanks for quick update. LGTM!
Purpose
Linked issue: close #27
New feature for analytics use cases. Provides scanner variants with compile-time mode separation between per-record and batch access patterns.
Brief change log
** Scanner architecture: **
RecordBatchLogScannerfor batch access (separate fromLogScanner)LogScannerInnerholding shared implementationArc<LogScannerInner>with distinct public APIs:LogScanner::poll()→Result<ScanRecords>(per-record with offset/timestamp metadata)RecordBatchLogScanner::poll()→Result<Vec<RecordBatch>>(direct batch access)Implementation details:
TableScan::create_record_batch_log_scanner()constructorCompletedFetchtrait withfetch_batches()methodLogRecordBatch::record_batch()for direct batch extractionCleanup:
poll_mode: AtomicU8, mode checking logic)Error::IllegalStatevariant (no longer needed)LogScannerAPI unchangedTests
Integration Tests:
Added test_poll_batches IT test with scenarios:
Basic functionality and data correctness
Timeout behavior on empty table
Field projection support
Order preservation across writes
Offset tracking across consecutive polls
starting from non-zero offset, batch should be sliced
Existing record-level tests continue to pass
API and Format
New API: