4747#include < cudf/transform.hpp>
4848
4949#include < cuda_runtime.h>
50+ #include < nvtx3/nvtx3.hpp>
5051
5152#include < filesystem>
5253#include < memory>
@@ -87,6 +88,7 @@ CudfHiveDataSource::CudfHiveDataSource(
8788 outputName);
8889
8990 auto * handle = static_cast <const hive::HiveColumnHandle*>(it->second .get ());
91+ readColumnSet_.emplace (handle->name ());
9092 readColumnNames_.emplace_back (handle->name ());
9193 }
9294
@@ -100,11 +102,9 @@ CudfHiveDataSource::CudfHiveDataSource(
100102 subfieldFilters_.emplace (k.clone (), v->clone ());
101103 // Add fields in the filter to the columns to read if not there
102104 for (const auto & [field, _] : subfieldFilters_) {
103- if (std::find (
104- readColumnNames_.begin (),
105- readColumnNames_.end (),
106- field.toString ()) == readColumnNames_.end ()) {
107- readColumnNames_.push_back (field.toString ());
105+ if (readColumnSet_.count (field.toString ()) == 0 ) {
106+ readColumnSet_.emplace (field.toString ());
107+ readColumnNames_.emplace_back (field.toString ());
108108 }
109109 }
110110 }
@@ -115,11 +115,9 @@ CudfHiveDataSource::CudfHiveDataSource(
115115 remainingFilterExprSet_ = expressionEvaluator_->compile (remainingFilter);
116116 for (const auto & field : remainingFilterExprSet_->distinctFields ()) {
117117 // Add fields in the filter to the columns to read if not there
118- if (std::find (
119- readColumnNames_.begin (),
120- readColumnNames_.end (),
121- field->name ()) == readColumnNames_.end ()) {
122- readColumnNames_.push_back (field->name ());
118+ if (readColumnSet_.count (field->name ()) == 0 ) {
119+ readColumnSet_.emplace (field->name ());
120+ readColumnNames_.emplace_back (field->name ());
123121 }
124122 }
125123
@@ -199,180 +197,80 @@ std::optional<RowVectorPtr> CudfHiveDataSource::next(
199197 auto startTimeUs = getCurrentTimeMicro ();
200198
201199 if (not useExperimentalSplitReader_) {
202- // Read a table chunk using the regular parquet reader
203- VELOX_CHECK_NOT_NULL (splitReader_, " Regular cudf split reader not present" );
200+ // Read table using the regular cudf parquet reader
201+ VELOX_CHECK_NOT_NULL (splitReader_, " cudf parquet reader not present" );
204202
205203 if (not splitReader_->has_next ()) {
206204 return nullptr ;
207205 }
208- // Read a table chunk
206+
209207 auto tableWithMetadata = splitReader_->read_chunk ();
210208 cudfTable = std::move (tableWithMetadata.tbl );
211209 metadata = std::move (tableWithMetadata.metadata );
212210 } else {
213- // Read a table chunk using the experimental parquet reader
211+ // Read table using the experimental parquet reader
214212 VELOX_CHECK_NOT_NULL (
215- exptSplitReader_, " Experimental cudf split reader not present" );
213+ exptSplitReader_, " cuDF hybrid scan reader not present" );
214+ VELOX_CHECK_NOT_NULL (hybridScanState_, " hybrid scan state not present" );
216215
217- // TODO(mh): Replace this with chunked hybrid scan APIs when available in
218- // the pinned cuDF version
219- std::call_once (*tableMaterialized_, [&]() {
216+ std::call_once (*hybridScanState_->isHybridScanSetup_ , [&]() {
220217 auto rowGroupIndices = exptSplitReader_->all_row_groups (readerOptions_);
221218
222- // Temporary options used for filtering row groups. TODO(mh): Remove this
223- // once PR https://github.com/rapidsai/cudf/pull/20604 is merged
224- auto tmpOptions = readerOptions_;
225-
226- if (readerOptions_.get_filter ().has_value ()) {
227- // Filter expression converter
228- auto exprConverter = referenceToNameConverter (
229- readerOptions_.get_filter (),
230- exptSplitReader_->parquet_metadata ().schema ,
231- readColumnNames_);
232- tmpOptions.set_filter (exprConverter.convertedExpression ());
233-
234- // Create a temporary split reader for filtering row groups. TODO(mh):
235- // Remove this once PR https://github.com/rapidsai/cudf/pull/20604 is
236- // merged or the pinned cuDF version is updated
237- auto footerBytes = fetchFooterBytes (dataSource_);
238- auto tmpExptSplitReader = std::make_unique<CudfHybridScanReader>(
239- cudf::host_span<uint8_t const >{
240- footerBytes->data (), footerBytes->size ()},
241- tmpOptions);
242- rowGroupIndices = tmpExptSplitReader->filter_row_groups_with_stats (
243- rowGroupIndices, tmpOptions, stream_);
219+ // Filter row groups using row group byte ranges
220+ if (readerOptions_.get_skip_bytes () > 0 or
221+ readerOptions_.get_num_bytes ().has_value ()) {
222+ rowGroupIndices = exptSplitReader_->filter_row_groups_with_byte_range (
223+ rowGroupIndices, readerOptions_);
244224 }
245225
246- // Workaround: Set a dummy filter expression to avoid erroneous assertion
247- // in `payload_column_chunks_byte_ranges`. TODO(mh): Remove this once PR
248- // https://github.com/rapidsai/cudf/pull/20604 is merged
249- if (not tmpOptions.get_filter ().has_value ()) {
250- auto scalar = cudf::numeric_scalar<int32_t >(0 , false , stream_);
251- auto literal = cudf::ast::literal (scalar);
252- auto filter =
253- cudf::ast::operation (cudf::ast::ast_operator::IDENTITY, literal);
254- tmpOptions.set_filter (filter);
226+ // Filter row groups using column chunk statistics
227+ if (readerOptions_.get_filter ().has_value ()) {
228+ rowGroupIndices = exptSplitReader_->filter_row_groups_with_stats (
229+ rowGroupIndices, readerOptions_, stream_);
255230 }
256231
257232 // Get column chunk byte ranges to fetch
258233 const auto columnChunkByteRanges =
259- exptSplitReader_->payload_column_chunks_byte_ranges (
260- rowGroupIndices, tmpOptions);
261- // Fetch row group data device buffers
262- std::vector<rmm::device_buffer> columnChunkBuffers (
263- columnChunkByteRanges.size ());
264- std::vector<std::future<size_t >> ioFutures{};
265- ioFutures.reserve (columnChunkByteRanges.size ());
266- std::for_each (
267- thrust::counting_iterator<size_t >(0 ),
268- thrust::counting_iterator (columnChunkByteRanges.size ()),
269- [&](auto idx) {
270- const auto & byteRange = columnChunkByteRanges[idx];
271- auto & buffer = columnChunkBuffers[idx];
272-
273- // Pad the buffer size to be a multiple of 8 bytes
274- constexpr size_t bufferPaddingMultiple = 8 ;
275- buffer = rmm::device_buffer (
276- cudf::util::round_up_safe<size_t >(
277- byteRange.size (), bufferPaddingMultiple),
278- stream_,
279- cudf::get_current_device_resource_ref ());
280- // Directly read the column chunk data to the device buffer if
281- // supported
282- if (auto bufferedInput =
283- dynamic_cast <BufferedInputDataSource*>(dataSource_.get ())) {
284- bufferedInput->enqueueForDevice (
285- static_cast <uint64_t >(byteRange.offset ()),
286- static_cast <uint64_t >(byteRange.size ()),
287- static_cast <uint8_t *>(buffer.data ()));
288- } else if (
289- dataSource_->supports_device_read () and
290- dataSource_->is_device_read_preferred (byteRange.size ())) {
291- ioFutures.emplace_back (dataSource_->device_read_async (
292- byteRange.offset (),
293- byteRange.size (),
294- static_cast <uint8_t *>(buffer.data ()),
295- stream_));
296- } else {
297- // Read the column chunk data to the host buffer and copy it to
298- // the device buffer
299- auto hostBuffer =
300- dataSource_->host_read (byteRange.offset (), byteRange.size ());
301- CUDF_CUDA_TRY (cudaMemcpyAsync (
302- buffer.data (),
303- hostBuffer->data (),
304- byteRange.size (),
305- cudaMemcpyHostToDevice,
306- stream_.value ()));
307- }
308- });
309-
310- if (auto bufferedInput =
311- dynamic_cast <BufferedInputDataSource*>(dataSource_.get ())) {
312- bufferedInput->load (stream_);
313- }
234+ exptSplitReader_->all_column_chunks_byte_ranges (
235+ rowGroupIndices, readerOptions_);
236+
237+ // Fetch column chunk byte ranges
238+ nvtxRangePush (" fetchByteRanges" );
239+
240+ // Tuple containing a vector of device buffers, a vector of device spans
241+ // for each input byte range, and a future to wait for all reads to
242+ // complete
243+ auto ioData = fetchByteRangesAsync (
244+ dataSource_,
245+ columnChunkByteRanges,
246+ stream_,
247+ cudf::get_current_device_resource_ref ());
248+
249+ // Wait for all pending reads to complete
250+ std::get<2 >(ioData).wait ();
251+ nvtxRangePop ();
252+
253+ // Save state for hybrid scan reader for future calls to `next()`
254+ hybridScanState_->columnChunkBuffers_ = std::move (std::get<0 >(ioData));
255+ hybridScanState_->columnChunkData_ = std::move (std::get<1 >(ioData));
314256
315- // Wait for all IO futures to complete
316- std::for_each (ioFutures.begin (), ioFutures.end (), [](auto & future) {
317- future.get ();
318- });
319-
320- // Convert device buffers to device spans
321- auto columnChunkData = [&]() {
322- std::vector<cudf::device_span<uint8_t const >> columnChunkData;
323- columnChunkData.reserve (columnChunkBuffers.size ());
324- std::transform (
325- columnChunkBuffers.begin (),
326- columnChunkBuffers.end (),
327- std::back_inserter (columnChunkData),
328- [](auto & buffer) {
329- return cudf::device_span<uint8_t const >{
330- static_cast <uint8_t *>(buffer.data ()), buffer.size ()};
331- });
332- return columnChunkData;
333- }();
334-
335- // Create an all true row mask to read the table in one go without output
336- // filtering. TODO(mh): Remove this once PR
337- // https://github.com/rapidsai/cudf/pull/20604 is merged
338- const auto totalRows =
339- exptSplitReader_->total_rows_in_row_groups (rowGroupIndices);
340-
341- auto const scalarTrue = cudf::numeric_scalar<bool >(true , true , stream_);
342- auto allTrueRowMask =
343- cudf::make_column_from_scalar (scalarTrue, totalRows, stream_);
344-
345- // Read the table in one go
346- auto tableWithMetadata = exptSplitReader_->materialize_payload_columns (
257+ exptSplitReader_->setup_chunking_for_all_columns (
258+ cudfHiveConfig_->maxChunkReadLimit (),
259+ cudfHiveConfig_->maxPassReadLimit (),
347260 rowGroupIndices,
348- columnChunkData,
349- allTrueRowMask->view (),
350- cudf::io::parquet::experimental::use_data_page_mask::NO,
261+ hybridScanState_->columnChunkData_ ,
351262 readerOptions_,
352263 stream_,
353264 cudf::get_current_device_resource_ref ());
354-
355- // Store the read metadata
356- metadata = std::move (tableWithMetadata.metadata );
357-
358- // Apply the subfield filter manually since we passed an all true row mask
359- if (readerOptions_.get_filter ().has_value ()) {
360- std::unique_ptr<cudf::table> table = std::move (tableWithMetadata.tbl );
361- auto filterMask = cudf::compute_column (
362- *table, readerOptions_.get_filter ().value (), stream_);
363- cudfTable = cudf::apply_boolean_mask (
364- table->view (),
365- filterMask->view (),
366- stream_,
367- cudf::get_current_device_resource_ref ());
368- } else {
369- cudfTable = std::move (tableWithMetadata.tbl );
370- }
371265 });
372266
373- if (cudfTable == nullptr ) {
267+ if (not exptSplitReader_-> has_next_table_chunk () ) {
374268 return nullptr ;
375269 }
270+
271+ auto tableWithMetadata = exptSplitReader_->materialize_all_columns_chunk ();
272+ cudfTable = std::move (tableWithMetadata.tbl );
273+ metadata = std::move (tableWithMetadata.metadata );
376274 }
377275
378276 TotalScanTimeCallbackData* callbackData =
@@ -506,18 +404,18 @@ void CudfHiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
506404 if (splitReader_ or exptSplitReader_) {
507405 splitReader_.reset ();
508406 exptSplitReader_.reset ();
509- tableMaterialized_ .reset ();
407+ hybridScanState_ .reset ();
510408 }
511409
512410 // Create a cudf split reader
513411 if (useExperimentalSplitReader_) {
514412 exptSplitReader_ = createExperimentalSplitReader ();
413+ hybridScanState_ = std::make_unique<
414+ facebook::velox::cudf_velox::connector::hive::HybridScanState>();
515415 } else {
516416 splitReader_ = createSplitReader ();
517417 }
518418
519- tableMaterialized_ = std::make_unique<std::once_flag>();
520-
521419 // TODO: `completedBytes_` should be updated in `next()` as we read more and
522420 // more table bytes
523421 try {
@@ -591,7 +489,7 @@ void CudfHiveDataSource::setupCudfDataSourceAndOptions() {
591489 }();
592490
593491 if (dataSource_ == nullptr ) {
594- dataSource_ = std::move (makeDataSourcesFromSourceInfo (sourceInfo).front ());
492+ dataSource_ = std::move (cudf::io::make_datasources (sourceInfo).front ());
595493 }
596494
597495 // Reader options
@@ -640,29 +538,30 @@ CudfHybridScanReaderPtr CudfHiveDataSource::createExperimentalSplitReader() {
640538 stream_ = cudfGlobalStreamPool ().get_stream ();
641539
642540 // Create a hybrid scan reader
643- auto const footerBytes = fetchFooterBytes (dataSource_);
644- auto exptSplitReader = std::make_unique<CudfHybridScanReader>(
645- cudf::host_span<uint8_t const >{footerBytes->data (), footerBytes->size ()},
646- readerOptions_);
541+ nvtxRangePush (" hybridScanReader" );
542+ auto const footerBuffer = fetchFooterBytes (dataSource_);
543+ auto splitReader =
544+ std::make_unique<CudfHybridScanReader>(*footerBuffer, readerOptions_);
545+ nvtxRangePop ();
647546
648547 // Setup page index if available
649- auto const pageIndexByteRange = exptSplitReader ->page_index_byte_range ();
548+ auto const pageIndexByteRange = splitReader ->page_index_byte_range ();
650549 if (not pageIndexByteRange.is_empty ()) {
651- auto const pageIndexBytes = dataSource_-> host_read (
652- pageIndexByteRange. offset (), pageIndexByteRange. size ());
653- exptSplitReader-> setup_page_index (
654- cudf::host_span< uint8_t const >{
655- pageIndexBytes-> data (), pageIndexBytes-> size ()} );
550+ nvtxRangePush ( " setupPageIndex " );
551+ auto const pageIndexBuffer =
552+ fetchPageIndexBytes (dataSource_, pageIndexByteRange);
553+ splitReader-> setup_page_index (*pageIndexBuffer);
554+ nvtxRangePop ( );
656555 }
657556
658- return exptSplitReader ;
557+ return splitReader ;
659558}
660559
661560void CudfHiveDataSource::resetSplit () {
662561 split_.reset ();
663562 splitReader_.reset ();
664563 exptSplitReader_.reset ();
665- tableMaterialized_ .reset ();
564+ hybridScanState_ .reset ();
666565 dataSource_.reset ();
667566}
668567
0 commit comments