Skip to content

Commit 093d87e

Browse files
authored
Accept device spans instead of buffers in hybrid scan APIs. (rapidsai#21026)
Closes rapidsai/hercules-planning#14 Closes rapidsai#20939 This PR updates the hybrid scan APIs to accept `device_span`s instead of `rmm::device_buffers` allowing the user to coalesce and/or combine column chunk IO into larger buffers as needed and create spans out of it. Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) - Matthew Murray (https://github.com/Matt711) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Matthew Murray (https://github.com/Matt711) - Bradley Dice (https://github.com/bdice) URL: rapidsai#21026
1 parent fdf0a1c commit 093d87e

26 files changed

+491
-490
lines changed

cpp/benchmarks/io/parquet/experimental/parquet_dictionary_page_filter.cpp

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -65,32 +65,51 @@ cudf::host_span<uint8_t const> fetch_page_index_bytes(
6565
}
6666

6767
/**
68-
* @brief Fetches a list of byte ranges from a host buffer into a vector of device buffers
68+
* @brief Converts a span of device buffers into a vector of corresponding device spans
69+
*
70+
* @tparam T Type of output device spans
71+
* @param buffers Host span of device buffers
72+
* @return Device spans corresponding to the input device buffers
73+
*/
74+
template <typename T>
75+
std::vector<cudf::device_span<T const>> make_device_spans(
76+
cudf::host_span<rmm::device_buffer const> buffers)
77+
requires(sizeof(T) == 1)
78+
{
79+
std::vector<cudf::device_span<T const>> device_spans(buffers.size());
80+
std::transform(buffers.begin(), buffers.end(), device_spans.begin(), [](auto const& buffer) {
81+
return cudf::device_span<T const>{static_cast<T const*>((buffer.data())), buffer.size()};
82+
});
83+
return device_spans;
84+
}
85+
86+
/**
87+
* @brief Fetches a list of byte ranges from a host buffer into device buffers
6988
*
7089
* @param host_buffer Host buffer span
7190
* @param byte_ranges Byte ranges to fetch
7291
* @param stream CUDA stream
92+
* @param mr Device memory resource
7393
*
74-
* @return Vector of device buffers
94+
* @return Device buffers
7595
*/
7696
std::vector<rmm::device_buffer> fetch_byte_ranges(
7797
cudf::host_span<uint8_t const> host_buffer,
7898
cudf::host_span<cudf::io::text::byte_range_info const> byte_ranges,
79-
rmm::cuda_stream_view stream)
99+
rmm::cuda_stream_view stream,
100+
rmm::device_async_resource_ref mr)
80101
{
81-
std::vector<rmm::device_buffer> buffers{};
82-
buffers.reserve(byte_ranges.size());
102+
std::vector<rmm::device_buffer> buffers(byte_ranges.size());
83103

84104
std::transform(
85-
byte_ranges.begin(),
86-
byte_ranges.end(),
87-
std::back_inserter(buffers),
88-
[&](auto const& byte_range) {
105+
byte_ranges.begin(), byte_ranges.end(), buffers.begin(), [&](auto const& byte_range) {
89106
auto const chunk_offset = host_buffer.data() + byte_range.offset();
90-
auto const chunk_size = byte_range.size();
91-
auto buffer = rmm::device_buffer(chunk_size, stream);
92-
CUDF_CUDA_TRY(cudaMemcpyAsync(
93-
buffer.data(), chunk_offset, chunk_size, cudaMemcpyHostToDevice, stream.value()));
107+
auto const chunk_size = static_cast<size_t>(byte_range.size());
108+
auto buffer = rmm::device_buffer(chunk_size, stream, mr);
109+
cudf::detail::cuda_memcpy_async(
110+
cudf::device_span<uint8_t>{static_cast<uint8_t*>(buffer.data()), chunk_size},
111+
cudf::host_span<uint8_t const>{chunk_offset, chunk_size},
112+
stream);
94113
return buffer;
95114
});
96115

@@ -157,9 +176,10 @@ void BM_parquet_filter_string_row_groups_with_dicts_common(nvbench::state& state
157176
// If we have dictionary page byte ranges, filter row groups with dictionary pages
158177
CUDF_EXPECTS(dict_page_byte_ranges.size() > 0, "No dictionary page byte ranges found");
159178

160-
// Fetch dictionary page buffers from the input file buffer
161-
std::vector<rmm::device_buffer> dictionary_page_buffers =
162-
fetch_byte_ranges(file_buffer_span, dict_page_byte_ranges, stream);
179+
// Fetch dictionary page buffers and corresponding device spans from the input file buffer
180+
auto dictionary_page_buffers = fetch_byte_ranges(
181+
file_buffer_span, dict_page_byte_ranges, stream, cudf::get_current_device_resource_ref());
182+
auto dictionary_page_data = make_device_spans<uint8_t>(dictionary_page_buffers);
163183

164184
auto mem_stats_logger = cudf::memory_stats_logger();
165185
state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value()));
@@ -168,7 +188,7 @@ void BM_parquet_filter_string_row_groups_with_dicts_common(nvbench::state& state
168188
try_drop_l3_cache();
169189
timer.start();
170190
std::ignore = reader->filter_row_groups_with_dictionary_pages(
171-
dictionary_page_buffers, input_row_group_indices, read_opts, stream);
191+
dictionary_page_data, input_row_group_indices, read_opts, stream);
172192
timer.stop();
173193
});
174194

cpp/examples/hybrid_scan_io/common_utils.cpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -136,20 +136,20 @@ std::vector<rmm::device_buffer> fetch_byte_ranges(
136136
static std::mutex mutex;
137137

138138
std::vector<rmm::device_buffer> buffers(byte_ranges.size());
139-
140139
{
141140
std::lock_guard<std::mutex> lock(mutex);
142141

143-
std::for_each(thrust::counting_iterator<size_t>(0),
144-
thrust::counting_iterator(byte_ranges.size()),
145-
[&](auto const idx) {
146-
auto const chunk_offset = host_buffer.data() + byte_ranges[idx].offset();
147-
auto const chunk_size = byte_ranges[idx].size();
148-
auto buffer = rmm::device_buffer(chunk_size, stream, mr);
149-
CUDF_CUDA_TRY(cudaMemcpyAsync(
150-
buffer.data(), chunk_offset, chunk_size, cudaMemcpyDefault, stream.value()));
151-
buffers[idx] = std::move(buffer);
152-
});
142+
std::transform(
143+
byte_ranges.begin(), byte_ranges.end(), buffers.begin(), [&](auto const& byte_range) {
144+
auto const chunk_offset = host_buffer.data() + byte_range.offset();
145+
auto const chunk_size = static_cast<size_t>(byte_range.size());
146+
auto buffer = rmm::device_buffer(chunk_size, stream, mr);
147+
cudf::detail::cuda_memcpy_async(
148+
cudf::device_span<uint8_t>{static_cast<uint8_t*>(buffer.data()), chunk_size},
149+
cudf::host_span<uint8_t const>{chunk_offset, chunk_size},
150+
stream);
151+
return buffer;
152+
});
153153
}
154154

155155
return buffers;

cpp/examples/hybrid_scan_io/common_utils.hpp

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,33 @@ cudf::host_span<uint8_t const> fetch_page_index_bytes(
7777
cudf::host_span<uint8_t const> buffer, cudf::io::text::byte_range_info const page_index_bytes);
7878

7979
/**
80-
* @brief Fetches a list of byte ranges from a host buffer into a vector of device buffers
80+
* @brief Converts a span of device buffers into a vector of corresponding device spans
81+
*
82+
* @tparam T Type of output device spans
83+
* @param buffers Host span of device buffers
84+
* @return Device spans corresponding to the input device buffers
85+
*/
86+
template <typename T>
87+
std::vector<cudf::device_span<T const>> make_device_spans(
88+
cudf::host_span<rmm::device_buffer const> buffers)
89+
requires(sizeof(T) == 1)
90+
{
91+
std::vector<cudf::device_span<T const>> device_spans(buffers.size());
92+
std::transform(buffers.begin(), buffers.end(), device_spans.begin(), [](auto const& buffer) {
93+
return cudf::device_span<T const>{static_cast<T const*>(buffer.data()), buffer.size()};
94+
});
95+
return device_spans;
96+
}
97+
98+
/**
99+
* @brief Fetches a list of byte ranges from a host buffer into device buffers
81100
*
82101
* @param host_buffer Host buffer span
83102
* @param byte_ranges Byte ranges to fetch
84103
* @param stream CUDA stream
85-
* @param mr Device memory resource to create device buffers with
104+
* @param mr Device memory resource
86105
*
87-
* @return Vector of device buffers
106+
* @return Device buffers
88107
*/
89108
std::vector<rmm::device_buffer> fetch_byte_ranges(
90109
cudf::host_span<uint8_t const> host_buffer,

cpp/examples/hybrid_scan_io/hybrid_scan_io.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,12 @@ auto hybrid_scan(io_source const& io_source,
141141
dict_page_byte_ranges.size()) {
142142
std::cout << "READER: Filter row groups with dictionary pages...\n";
143143
timer.reset();
144-
// Fetch dictionary page buffers from the input file buffer
145-
std::vector<rmm::device_buffer> dictionary_page_buffers =
144+
// Fetch dictionary page buffers and corresponding device spans from the input file buffer
145+
auto dictionary_page_buffers =
146146
fetch_byte_ranges(file_buffer_span, dict_page_byte_ranges, stream, mr);
147+
auto dictionary_page_data = make_device_spans<uint8_t>(dictionary_page_buffers);
147148
dictionary_page_filtered_row_group_indices = reader->filter_row_groups_with_dictionary_pages(
148-
dictionary_page_buffers, current_row_group_indices, options, stream);
149+
dictionary_page_data, current_row_group_indices, options, stream);
149150

150151
// Update current row group indices
151152
current_row_group_indices = dictionary_page_filtered_row_group_indices;
@@ -166,8 +167,9 @@ auto hybrid_scan(io_source const& io_source,
166167
mr, bloom_filter_alignment);
167168
std::cout << "READER: Filter row groups with bloom filters...\n";
168169
timer.reset();
169-
std::vector<rmm::device_buffer> bloom_filter_data =
170+
auto bloom_filter_buffers =
170171
fetch_byte_ranges(file_buffer_span, bloom_filter_byte_ranges, stream, aligned_mr);
172+
auto bloom_filter_data = make_device_spans<uint8_t>(bloom_filter_buffers);
171173
// Filter row groups with bloom filters
172174
bloom_filtered_row_group_indices = reader->filter_row_groups_with_bloom_filters(
173175
bloom_filter_data, current_row_group_indices, options, stream);
@@ -207,14 +209,15 @@ auto hybrid_scan(io_source const& io_source,
207209
reader->filter_column_chunks_byte_ranges(current_row_group_indices, options);
208210
auto filter_column_chunk_buffers =
209211
fetch_byte_ranges(file_buffer_span, filter_column_chunk_byte_ranges, stream, mr);
212+
auto filter_column_chunk_data = make_device_spans<uint8_t>(filter_column_chunk_buffers);
210213

211214
// Materialize the table with only the filter columns
212215
auto row_mask_mutable_view = row_mask->mutable_view();
213216
auto filter_table =
214217
reader
215218
->materialize_filter_columns(
216219
current_row_group_indices,
217-
std::move(filter_column_chunk_buffers),
220+
filter_column_chunk_data,
218221
row_mask_mutable_view,
219222
prune_filter_data_pages ? use_data_page_mask::YES : use_data_page_mask::NO,
220223
options,
@@ -239,13 +242,14 @@ auto hybrid_scan(io_source const& io_source,
239242
reader->payload_column_chunks_byte_ranges(current_row_group_indices, options);
240243
auto payload_column_chunk_buffers =
241244
fetch_byte_ranges(file_buffer_span, payload_column_chunk_byte_ranges, stream, mr);
245+
auto payload_column_chunk_data = make_device_spans<uint8_t>(payload_column_chunk_buffers);
242246

243247
// Materialize the table with only the payload columns
244248
auto payload_table =
245249
reader
246250
->materialize_payload_columns(
247251
current_row_group_indices,
248-
std::move(payload_column_chunk_buffers),
252+
payload_column_chunk_data,
249253
row_mask->view(),
250254
prune_payload_data_pages ? use_data_page_mask::YES : use_data_page_mask::NO,
251255
options,

cpp/examples/hybrid_scan_io/hybrid_scan_pipeline.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,10 @@ struct hybrid_scan_fn {
9696
reader->all_column_chunks_byte_ranges(row_groups_indices, options);
9797
auto all_column_chunk_buffers =
9898
fetch_byte_ranges(file_buffer_span, all_column_chunk_byte_ranges, stream, mr);
99-
table.get() =
100-
std::move(reader
101-
->materialize_all_columns(
102-
row_groups_indices, std::move(all_column_chunk_buffers), options, stream)
103-
.tbl);
99+
auto all_column_chunk_data = make_device_spans<uint8_t>(all_column_chunk_buffers);
100+
table.get() = std::move(
101+
reader->materialize_all_columns(row_groups_indices, all_column_chunk_data, options, stream)
102+
.tbl);
104103
stream.synchronize_no_throw();
105104
}
106105
};
@@ -163,10 +162,14 @@ auto hybrid_scan_pipelined(io_source const& io_source,
163162

164163
timer.print_elapsed_millis();
165164

166-
std::cout << "Creating row group partitions... \n";
167-
timer.reset();
165+
if (num_partitions > 1) {
166+
std::cout << "Creating row group partitions... \n";
167+
timer.reset();
168+
}
168169

169170
if (num_partitions == 1) {
171+
std::cout << "Reading as single partition... \n";
172+
timer.reset();
170173
hybrid_scan_fn{.table = std::ref(tables.front()),
171174
.reader = std::move(readers.front()),
172175
.file_buffer_span = file_buffer_span,
@@ -175,6 +178,7 @@ auto hybrid_scan_pipelined(io_source const& io_source,
175178
.options = options,
176179
.stream = stream_pool.get_stream(),
177180
.mr = mr}();
181+
timer.print_elapsed_millis();
178182
return std::move(tables.front());
179183
}
180184

0 commit comments

Comments
 (0)