Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 11 additions & 38 deletions cpp/src/io/csv/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/host_worker_pool.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/detail/utilities/visitor_overload.hpp>
#include <cudf/io/csv.hpp>
Expand Down Expand Up @@ -965,43 +963,39 @@ table_with_metadata read_csv(cudf::io::datasource* source,
}
}

// Process string columns with doublequote handling in parallel using thread pool
// Process string columns with doublequote handling
auto const num_string_cols = string_col_indices.size();
if (num_string_cols > 0) {
auto const quotechar = parse_opts.quotechar;
cudf::string_scalar quotechar_scalar(std::string(1, quotechar), true, stream);
cudf::string_scalar dblquotechar_scalar(std::string(2, quotechar), true, stream);
constexpr size_t max_tasks = 4;
auto const cols_per_task = cudf::util::div_rounding_up_safe(num_string_cols, max_tasks);
auto const num_tasks = cudf::util::div_rounding_up_safe(num_string_cols, cols_per_task);
auto streams = cudf::detail::fork_streams(stream, num_tasks);

auto process_string_column = [&](size_t str_col_idx, rmm::cuda_stream_view col_stream) {
for (size_t str_col_idx = 0; str_col_idx < num_string_cols; ++str_col_idx) {
auto const col_idx = string_col_indices[str_col_idx];
auto const& is_quoted = device_span<bool>(is_quoted_flags[str_col_idx]);
auto* buffer = &out_buffers[col_idx];

// Count how many rows were quoted to determine the fast path
auto const num_quoted = thrust::count(
rmm::exec_policy_nosync(col_stream), is_quoted.begin(), is_quoted.end(), true);
auto const num_quoted =
thrust::count(rmm::exec_policy(stream), is_quoted.begin(), is_quoted.end(), true);
if (num_quoted == 0) {
// Fast path: no rows were quoted, skip replacement entirely
out_columns[col_idx] = make_column(*buffer, nullptr, std::nullopt, col_stream);
out_columns[col_idx] = make_column(*buffer, nullptr, std::nullopt, stream);
} else {
auto replaced_all_col = cudf::strings::detail::replace(
cudf::make_strings_column(*buffer->_strings, col_stream)->view(),
cudf::make_strings_column(*buffer->_strings, stream)->view(),
dblquotechar_scalar,
quotechar_scalar,
-1,
col_stream,
stream,
mr);
if (std::cmp_equal(num_quoted, num_records)) {
// Fast path: all rows were quoted, apply replacement to all
out_columns[col_idx] = std::move(replaced_all_col);
} else {
// Need to replace only the quoted rows
auto const replaced_all_view =
cudf::column_device_view::create(replaced_all_col->view(), col_stream);
cudf::column_device_view::create(replaced_all_col->view(), stream);
auto const replaced_all_iter = cudf::detail::make_optional_iterator<cudf::string_view>(
*replaced_all_view, cudf::nullate::DYNAMIC{replaced_all_col->nullable()});

Expand All @@ -1014,7 +1008,7 @@ table_with_metadata read_csv(cudf::io::datasource* source,
auto const& p = original_pairs[idx];
return p.first != nullptr
? cuda::std::optional<cudf::string_view>{cudf::string_view{p.first,
p.second}}
p.second}}
: cuda::std::nullopt;
}));

Expand All @@ -1023,35 +1017,14 @@ table_with_metadata read_csv(cudf::io::datasource* source,
replaced_all_iter + num_records,
original_iter,
[is_quoted] __device__(size_type idx) { return is_quoted[idx]; },
col_stream,
stream,
mr);
}
}
};

std::vector<std::future<void>> tasks;
tasks.reserve(num_tasks);

for (size_t task_id = 0; task_id < num_tasks; ++task_id) {
auto const start_col = task_id * cols_per_task;
auto const end_col = std::min(start_col + cols_per_task, num_string_cols);
auto const col_stream = streams[task_id];
tasks.emplace_back(
cudf::detail::host_worker_pool().submit_task([&, start_col, end_col, col_stream]() {
for (size_t str_col_idx = start_col; str_col_idx < end_col; ++str_col_idx) {
process_string_column(str_col_idx, col_stream);
}
}));
}

for (auto& task : tasks) {
task.get();
}

cudf::detail::join_streams(streams, stream);
}

// Create output columns for the columns that were not processed in the parallel loop
// Create output columns for non-string columns
for (size_t i = 0; i < column_types.size(); ++i) {
if (!out_columns[i]) {
out_columns[i] = make_column(out_buffers[i], nullptr, std::nullopt, stream);
Expand Down