Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions cpp/include/rapidsmpf/integrations/cudf/partition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,37 @@ std::vector<PackedData> unspill_partitions(
std::shared_ptr<Statistics> statistics = Statistics::disabled()
);

/// @brief The amount of extra memory to reserve for packing.
constexpr size_t packing_wiggle_room_per_column = 1024; ///< 1 KiB per column

/**
* @brief The total amount of extra memory to reserve for packing.
*
* @param table The table to pack.
* @return The total amount of extra memory to reserve for packing.
*/
inline size_t total_packing_wiggle_room(cudf::table_view const& table) {
return packing_wiggle_room_per_column * static_cast<size_t>(table.num_columns());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: This is likely not enough if the table has many nested columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

/**
* @brief Pack a table using a @p chunk_size device buffer using `cudf::chunked_pack`.
*
* All device operations will be performed on @p bounce_buf 's stream.
* `cudf::chunked_pack` requires the buffer to be at least 1 MiB in size.
*
* @param table The table to pack.
* @param bounce_buf A device bounce buffer to use for packing.
* @param data_res Memory reservation for the data buffer. If the final packed buffer size
* is with in a wiggle room, this @p data_res will be padded to the packed buffer size.
*
* @return A `PackedData` containing the packed table.
*
* @throws std::runtime_error If the memory allocation fails.
* @throws std::invalid_argument If the bounce buffer is not in device memory.
*/
PackedData chunked_pack(
cudf::table_view const& table, Buffer& bounce_buf, MemoryReservation& data_res
);

} // namespace rapidsmpf
50 changes: 50 additions & 0 deletions cpp/src/integrations/cudf/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,54 @@ std::vector<PackedData> unspill_partitions(
statistics->add_bytes_stat("spill-bytes-host-to-device", non_device_size);
return ret;
}

PackedData chunked_pack(
cudf::table_view const& table, Buffer& bounce_buf, MemoryReservation& data_res
) {
RAPIDSMPF_EXPECTS(
bounce_buf.mem_type() == MemoryType::DEVICE,
"bounce buffer must be in device memory",
std::invalid_argument
);
// all copies will be done on the bounce buffer's stream
auto const& stream = bounce_buf.stream();
auto* br = data_res.br();
size_t chunk_size = bounce_buf.size;

cudf::chunked_pack packer(table, chunk_size, stream, br->device_mr());
auto const packed_size = packer.get_total_contiguous_size();

// if the packed size > data reservation, and it is within the wiggle room, pad the
// data reservation to the packed size from the same memory type.
if (packed_size > data_res.size()) {
if (packed_size <= data_res.size() + total_packing_wiggle_room(table)) {
data_res =
data_res.br()->reserve(data_res.mem_type(), packed_size, true).first;
}
}

// allocate the data buffer
auto data_buf = br->allocate(packed_size, stream, data_res);

bounce_buf.write_access([&](std::byte* bounce_buf_ptr, rmm::cuda_stream_view) {
// all copies are done on the same stream, so we can omit the stream parameter
cudf::device_span<uint8_t> buf_span(
reinterpret_cast<uint8_t*>(bounce_buf_ptr), chunk_size
Comment on lines +320 to +322
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let use use cuda::std::span.

Also the comment is meaningless, device_span has no stream parameter to its ctor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment refers to the stream_view arg in L299 😇

);

data_buf->write_access([&](std::byte* data_ptr, rmm::cuda_stream_view) {
size_t offset = 0;
while (packer.has_next()) {
size_t n_bytes = packer.next(buf_span);
RAPIDSMPF_CUDA_TRY(cudaMemcpyAsync(
data_ptr + offset, buf_span.data(), n_bytes, cudaMemcpyDefault, stream
));
offset += n_bytes;
}
});
});

return {packer.build_metadata(), std::move(data_buf)};
}

} // namespace rapidsmpf
70 changes: 48 additions & 22 deletions cpp/src/streaming/cudf/table_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <memory>

#include <rapidsmpf/integrations/cudf/partition.hpp>
#include <rapidsmpf/integrations/cudf/utils.hpp>
#include <rapidsmpf/memory/buffer.hpp>
#include <rapidsmpf/streaming/cudf/table_chunk.hpp>
Expand Down Expand Up @@ -169,32 +170,57 @@ TableChunk TableChunk::copy(MemoryReservation& reservation) const {
// serialize `table_view()` into a packed_columns and then we move
// the packed_columns' gpu_data to a new host buffer.

// TODO: use `cudf::chunked_pack()` with a bounce buffer. Currently,
// `cudf::pack()` allocates device memory we haven't reserved.
auto packed_columns =
cudf::pack(table_view(), stream(), br->device_mr());
packed_data = std::make_unique<PackedData>(
std::move(packed_columns.metadata),
br->move(std::move(packed_columns.gpu_data), stream())
// make a reservation for packing
auto [pack_res, overbooking] = br->reserve(
MemoryType::DEVICE,
estimated_memory_usage(table_view(), stream()),
true
);

// Handle the case where `cudf::pack` allocates slightly more than the
// input size. This can occur because cudf uses aligned allocations,
// which may exceed the requested size. To accommodate this, we
// allow some wiggle room.
if (packed_data->data->size > reservation.size()) {
auto const wiggle_room =
1024 * static_cast<std::size_t>(table_view().num_columns());
if (packed_data->data->size <= reservation.size() + wiggle_room) {
reservation =
br->reserve(
MemoryType::HOST, packed_data->data->size, true
)
.first;
if (overbooking > 0) {
// there is not enough memory to pack the table.
size_t avail_dev_mem = pack_res.size() - overbooking;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: Integer overflow. suppose we can't make a reservation, so pack_res.size() is zero, and overbooking is estimated_memory_usage(...). Then this will be (size_t)(-overbooking).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wence- I'm not sure if this is true. Reservation is made with overbooking
https://github.com/nirandaperera/rapidsmpf/blob/Make-unbounded-fanout-state-spillable/cpp/src/memory/buffer_resource.cpp#L58-L64
So, it will be

{MemoryReservation(mem_type, this, size), overbooking};

I think the case you are referring to, is for without overbooking

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overbooking is positive. Can it happen that overbooking is larger than pack_res.size()? If yes, then you have integer overflow.

RAPIDSMPF_EXPECTS(
avail_dev_mem > 1 << 20,
"not enough device memory for the bounce buffer",
std::runtime_error
);
auto bounce_buf = br->allocate(avail_dev_mem, stream(), pack_res);

packed_data = std::make_unique<PackedData>(
chunked_pack(table_view(), *bounce_buf, reservation)
);
} else {
// if there is enough memory to pack the table, use `cudf::pack`
auto packed_columns =
cudf::pack(table_view(), stream(), br->device_mr());
// clear the reservation as we are done with it.
pack_res.clear();
packed_data = std::make_unique<PackedData>(
std::move(packed_columns.metadata),
br->move(std::move(packed_columns.gpu_data), stream())
);

// Handle the case where `cudf::pack` allocates slightly more than
// the input size. This can occur because cudf uses aligned
// allocations, which may exceed the requested size. To
// accommodate this, we allow some wiggle room.
if (packed_data->data->size > reservation.size()) {
if (packed_data->data->size
<= reservation.size()
+ total_packing_wiggle_room(table_view()))
{
reservation =
br->reserve(
MemoryType::HOST, packed_data->data->size, true
)
.first;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to a helper function, I think nesting becomes a problem.
It would also be worth exploring if we even need the non-chunked version?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unless there are real performance issues with chunked_pack we should try and always use that. Can we check?

Also agree, let's make a helper function that encapsulates this (so that we can use it outside of tablechunk-copy as well -> this pattern of cudf::pack happens in many places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let me add a benchmark.

}
// finally copy the packed data device buffer to HOST memory
packed_data->data =
br->move(std::move(packed_data->data), reservation);
}
packed_data->data =
br->move(std::move(packed_data->data), reservation);
}
return TableChunk(std::move(packed_data));
}
Expand Down
88 changes: 88 additions & 0 deletions cpp/tests/test_partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,91 @@ TEST_F(SpillingTest, SpillUnspillRoundtripPreservesDataAndMetadata) {
auto actual = br->move_to_host_buffer(std::move(back_on_host[0].data), res);
EXPECT_EQ(actual->copy_to_uint8_vector(), payload);
}

class NumOfRows : public ::testing::TestWithParam<std::tuple<int, MemoryType>> {
protected:
// cudf::chunked_pack requires at least a 1 MiB bounce buffer
static constexpr size_t chunk_size = 1 << 20;

void SetUp() override {
auto mem_type = std::get<1>(GetParam());

std::unordered_map<MemoryType, BufferResource::MemoryAvailable> memory_available;
// disable all memory types except mem_type
for (auto mt : MEMORY_TYPES) {
if (mt != mem_type) {
memory_available[mt] = []() { return 0; };
}
}

br = std::make_unique<BufferResource>(
cudf::get_current_device_resource_ref(), memory_available
);
stream = cudf::get_default_stream();
}

std::unique_ptr<BufferResource> br;
rmm::cuda_stream_view stream;
};

// test different `num_rows` and `MemoryType`.
INSTANTIATE_TEST_SUITE_P(
ChunkedPack,
NumOfRows,
::testing::Combine(
::testing::Values(0, 9, 1'000, 1'000'000, 10'000'000),
::testing::ValuesIn(MEMORY_TYPES)
),
[](const testing::TestParamInfo<NumOfRows::ParamType>& info) {
return "nrows_" + std::to_string(std::get<0>(info.param)) + "_type_"
+ MEMORY_TYPE_NAMES[static_cast<std::size_t>(std::get<1>(info.param))];
}
);

TEST_P(NumOfRows, chunked_pack) {
auto const [num_rows, mem_type] = GetParam();
std::int64_t const seed = 42;

cudf::table input_table = random_table_with_index(seed, num_rows, 0, 10);

// Get result from split_and_pack with empty splits (single partition)
std::vector<cudf::size_type> empty_splits{};
auto split_result =
rapidsmpf::split_and_pack(input_table, empty_splits, stream, br.get());
ASSERT_EQ(split_result.size(), 1);
auto& split_packed = split_result.at(0);

auto [bounce_buf_res, _] = br->reserve(MemoryType::DEVICE, chunk_size, true);
auto bounce_buf = br->allocate(chunk_size, stream, bounce_buf_res);

auto data_res =
br->reserve_or_fail(estimated_memory_usage(input_table, stream), mem_type);

// Get result from chunked_pack
auto chunked_packed = rapidsmpf::chunked_pack(input_table, *bounce_buf, data_res);

EXPECT_EQ(mem_type, chunked_packed.data->mem_type());

// Unpack both and compare the resulting tables
cudf::packed_columns split_columns{
std::move(split_packed.metadata),
std::make_unique<rmm::device_buffer>(
split_packed.data->data(), split_packed.data->size, stream, br->device_mr()
)
};
cudf::packed_columns chunked_columns{
std::move(chunked_packed.metadata),
std::make_unique<rmm::device_buffer>(
chunked_packed.data->data(),
chunked_packed.data->size,
stream,
br->device_mr()
)
};

stream.synchronize();

auto split_table = cudf::unpack(split_columns);
auto chunked_table = cudf::unpack(chunked_columns);
CUDF_TEST_EXPECT_TABLES_EQUIVALENT(split_table, chunked_table);
}