-
Notifications
You must be signed in to change notification settings - Fork 1k
Preprocess parquet repetition and definition levels #21139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
… improve/refactor common code with string offset memory accounting
|
BEFORE Benchmarks: parquet_read_decode[0] NVIDIA RTX A5000
parquet_read_chunks[0] NVIDIA RTX A5000
|
|
AFTER benchmarks: parquet_read_decode[0] NVIDIA RTX A5000
parquet_read_chunks[0] NVIDIA RTX A5000
|
| return max_depth_valid_count; | ||
| } | ||
|
|
||
| // is the page marked nullable or not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to a common header
| (s->input_row_count <= last_row)) { | ||
| int next_valid_count; | ||
| block.sync(); | ||
| processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for all cases
| } | ||
| block.sync(); | ||
|
|
||
| if (!t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is still present, is lower in the diff
| // the core loop. decode batches of level stream data using rle_stream objects | ||
| // and pass the results to update_page_sizes | ||
| int processed = 0; | ||
| while (processed < s->page.num_input_values) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No longer need to loop for rep/def buffers, can call update_page_sizes() in one shot
| return total_len; | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
old code path finally no longer needed, superseded by rle_stream
| __syncthreads(); | ||
|
|
||
| // do something with the level data | ||
| while (start_val < processed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most of the changes in here are due to nuking this inner loop (no longer need to buffer the level decode), and of course removing the decode itself. highly recommend hiding whitespace diffs
| // Fixed length byte array: Offsets are fixed, no need to allocate offset buffer | ||
| if (chunk.physical_type == Type::FIXED_LEN_BYTE_ARRAY) { return 0; } | ||
|
|
||
| // Estimate number of offsets based on page.num_input_values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optimize string #offsets determination, combining logic with new level decode logic
mhaseeb123
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick glance, will do a detailed review a bit later. Couple of questions and minor comments.
| rmm::exec_policy_nosync(stream), | ||
| iter, | ||
| iter + pages.size(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just iterate over pages here since the functor only uses the page_idx to access pages[page_idx] anyway. In that case, we can also remove the pages struct member
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we want to utilize the gpu parallelism. If we loop over pages here then we get no parallelism.
| s, pp, chunks, min_row, num_rows, all_types_filter{}, page_processing_stage::PREPROCESS)) { | ||
| return; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might need code to skip pages based on subpass_page_mask here and other kernels?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No code is needed here. We won't use the def/rep levels for these pages at all so there's nothing to set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated allocate_level_decode_space() to skip allocating memory for rep/def levels though since we don't need it.
| struct compute_page_string_offset_size { | ||
| device_span<PageInfo const> pages; | ||
| device_span<ColumnChunkDesc const> chunks; | ||
| size_t skip_rows; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also update this functor's () op to directly iterate over pages instead of page_idx.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is executed by thrust::transform. If we loop over pages within the operator then we get no parallelism.
The parquet repetition and definition levels are decoded multiple times throughout the total decoding process: not just during the decode itself by also during setup in
compute_page_sizes_kernel()andcompute_string_page_bounds_kernel(). And during chunked reads even these setup steps are run multiple times, exploding the cost of re-decoding them.Instead we decode the levels just once per subpass into a temporary buffer, and just read these results wherever they're needed. This dramatically speeds up the list and chunked cuDF benchmarks, as highlighted below.
Centralizing this grants several advantages. First the old (non-rle_stream) rep/def decode is now ripped entirely out of
decode_split_page_data_kernel(),decode_page_data(), and the delta decode kernels, simplifying maintenance. Less shared memory is needed in the decode kernels for the rle_run and result buffers. And as the decode kernel complexity decreases, unnecessary buffer loops are removed and the register count decreases. And future improvements torle_streamdecode can be further studied in their own isolated environment (except dictionary & bool decode still need it).Benchmarks
Checklist