-
Notifications
You must be signed in to change notification settings - Fork 703
Refactor streamer batch management in table_data #3107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
ashotshakhkyan
commented
Jan 8, 2026
- Remove move constructors and assignments for batch_data, making it non-movable and non-copyable.
- Update batch initialization logic to use promise and mutex, removing atomic flags.
- Update get_sample, value, and value_ptr methods to use new batch_data structure and promise-based initialization.
- Update create_streamer to initialize column_to_batches and batch promises, removing streamer object creation.
- Remove move constructors and assignments for batch_data, making it non-movable and non-copyable. - Update batch initialization logic to use promise and mutex, removing atomic flags. - Update get_sample, value, and value_ptr methods to use new batch_data structure and promise-based initialization. - Update create_streamer to initialize column_to_batches and batch promises, removing streamer object creation.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR refactors the streamer batch management system in table_data by replacing the streaming-based approach with a promise-based batch initialization model. The key changes simplify the architecture by eliminating the column_streamer objects and atomic flags in favor of promises that are resolved once per batch.
- Replaces streaming-based batch fetching with promise-based batch initialization
- Makes
batch_datanon-movable and non-copyable by deleting move constructors/assignments - Updates default configuration values for threading and shared memory usage
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| cpp/deeplake_pg/table_data.hpp | Removes move operations from batch_data, replaces atomic flags with promises, simplifies column_data to a vector of batches, and updates reset() to cancel promises |
| cpp/deeplake_pg/table_data_impl.hpp | Refactors create_streamer to initialize batches with promises instead of creating streamer objects; updates batch access methods to use promise-based initialization with mutex locking |
| cpp/deeplake_pg/extension_init.cpp | Changes default thread count to use hardware concurrency and sets shared memory usage default to false |
| cpp/deeplake_pg/duckdb_deeplake_scan.cpp | Adds workaround to copy string values immediately to prevent issues with temporary nd::array object destruction |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| const auto range_end = std::min<int64_t>(range_start + batch_size_, row_count); | ||
| auto p = async::run_on_main([cv, range_start, range_end, row_count]() { | ||
| return cv->request_range( | ||
| range_start, range_end, storage::fetch_options(static_cast<int>(row_count - range_start))); |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fetch_options calculation uses row_count - range_start which represents the remaining rows from the start of this batch to the end of the dataset. This should likely be range_end - range_start to represent the actual size of the current batch being fetched, or the calculation may need adjustment based on the API's expectations.
| range_start, range_end, storage::fetch_options(static_cast<int>(row_count - range_start))); | |
| range_start, range_end, storage::fetch_options(static_cast<int>(range_end - range_start))); |
| bool is_filter_pushdown_enabled = true; | ||
| int32_t max_streamable_column_width = 128; | ||
| int32_t max_num_threads_for_global_state = 4; | ||
| int32_t max_num_threads_for_global_state = std::thread::hardware_concurrency(); |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::thread::hardware_concurrency() can return 0 if the value is not well-defined or not computable. This conflicts with the minimum value of 1 defined for this GUC parameter at line 203. Initialize with std::max(1u, std::thread::hardware_concurrency()) instead.
| for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) { | ||
| const int64_t row_idx = current_row + row_in_batch; | ||
| auto value = td.get_streamers().value<std::string_view>(col_idx, row_idx); | ||
| // workaround. value is not always remain valid. Trying to make a copy as soon as possible. |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected grammar: 'value is not always remain valid' should be 'value does not always remain valid'.
| // workaround. value is not always remain valid. Trying to make a copy as soon as possible. | |
| // workaround. value does not always remain valid. Trying to make a copy as soon as possible. |
| std::lock_guard lock(batch.mutex_); | ||
| if (static_cast<bool>(batch.promise_)) { | ||
| batch.owner_ = batch.promise_.get_future().get(); | ||
| batch.promise_ = async::promise<nd::array>(); |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resetting the promise by assigning a default-constructed promise is unclear. Consider adding a comment explaining that this marks the batch as initialized, or use a more explicit pattern such as a separate boolean flag or a method like batch.mark_initialized() to make the intent clearer.
| batches[i].initialized_.store(true, std::memory_order_release); | ||
| } | ||
| auto& batch = column_to_batches[column_number][batch_index]; | ||
| if (static_cast<bool>(batch.promise_)) [[unlikely]] { |
Copilot
AI
Jan 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The static_cast<bool>(batch.promise_) pattern is used to check if the promise is valid. Consider documenting what constitutes a valid vs invalid promise state, or use a more self-documenting approach such as a helper method batch.is_initialized() to improve code readability across all three usages (lines 570, 593, 612).


