Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 41 additions & 35 deletions cpp/src/arrow/csv/column_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ namespace csv {

class BlockParser;

class ConcreteColumnBuilder : public ColumnBuilder {
class ConcreteColumnBuilder : public ColumnBuilder,
public std::enable_shared_from_this<ConcreteColumnBuilder> {
public:
explicit ConcreteColumnBuilder(MemoryPool* pool, std::shared_ptr<TaskGroup> task_group,
int32_t col_index = -1)
Expand Down Expand Up @@ -133,8 +134,8 @@ class ConcreteColumnBuilder : public ColumnBuilder {
class NullColumnBuilder : public ConcreteColumnBuilder {
public:
explicit NullColumnBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool,
const std::shared_ptr<TaskGroup>& task_group)
: ConcreteColumnBuilder(pool, task_group), type_(type) {}
std::shared_ptr<TaskGroup> task_group)
: ConcreteColumnBuilder(pool, std::move(task_group)), type_(type) {}

void Insert(int64_t block_index, const std::shared_ptr<BlockParser>& parser) override;

Expand All @@ -152,15 +153,17 @@ void NullColumnBuilder::Insert(int64_t block_index,
const int32_t num_rows = parser->num_rows();
DCHECK_GE(num_rows, 0);

task_group_->Append([this, block_index, num_rows]() -> Status {
std::unique_ptr<ArrayBuilder> builder;
RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder));
std::shared_ptr<Array> res;
RETURN_NOT_OK(builder->AppendNulls(num_rows));
RETURN_NOT_OK(builder->Finish(&res));

return SetChunk(block_index, res);
});
// `self` keeps us alive, `this` allows easy access to derived / protected member vars
task_group_->Append(
[self = shared_from_this(), this, block_index, num_rows]() -> Status {
std::unique_ptr<ArrayBuilder> builder;
RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder));
std::shared_ptr<Array> res;
RETURN_NOT_OK(builder->AppendNulls(num_rows));
RETURN_NOT_OK(builder->Finish(&res));

return SetChunk(block_index, res);
});
}

//////////////////////////////////////////////////////////////////////////
Expand All @@ -169,11 +172,11 @@ void NullColumnBuilder::Insert(int64_t block_index,
class TypedColumnBuilder : public ConcreteColumnBuilder {
public:
TypedColumnBuilder(const std::shared_ptr<DataType>& type, int32_t col_index,
const ConvertOptions& options, MemoryPool* pool,
const std::shared_ptr<TaskGroup>& task_group)
: ConcreteColumnBuilder(pool, task_group, col_index),
std::shared_ptr<ConvertOptions> options, MemoryPool* pool,
std::shared_ptr<TaskGroup> task_group)
: ConcreteColumnBuilder(pool, std::move(task_group), col_index),
type_(type),
options_(options) {}
options_(std::move(options)) {}

Status Init();

Expand All @@ -185,13 +188,13 @@ class TypedColumnBuilder : public ConcreteColumnBuilder {
std::shared_ptr<DataType> type_;
// CAUTION: ConvertOptions can grow large (if it customizes hundreds or
// thousands of columns), so avoid copying it in each TypedColumnBuilder.
const ConvertOptions& options_;
std::shared_ptr<ConvertOptions> options_;

std::shared_ptr<Converter> converter_;
};

Status TypedColumnBuilder::Init() {
ARROW_ASSIGN_OR_RAISE(converter_, Converter::Make(type_, options_, pool_));
ARROW_ASSIGN_OR_RAISE(converter_, Converter::Make(type_, *options_, pool_));
return Status::OK();
}

Expand All @@ -202,7 +205,7 @@ void TypedColumnBuilder::Insert(int64_t block_index,
ReserveChunks(block_index);

// We're careful that all references in the closure outlive the Append() call
task_group_->Append([this, parser, block_index]() -> Status {
task_group_->Append([self = shared_from_this(), this, parser, block_index]() -> Status {
return SetChunk(block_index, converter_->Convert(*parser, col_index_));
});
}
Expand All @@ -212,11 +215,11 @@ void TypedColumnBuilder::Insert(int64_t block_index,

class InferringColumnBuilder : public ConcreteColumnBuilder {
public:
InferringColumnBuilder(int32_t col_index, const ConvertOptions& options,
MemoryPool* pool, const std::shared_ptr<TaskGroup>& task_group)
: ConcreteColumnBuilder(pool, task_group, col_index),
options_(options),
infer_status_(options) {}
InferringColumnBuilder(int32_t col_index, std::shared_ptr<ConvertOptions> options,
MemoryPool* pool, std::shared_ptr<TaskGroup> task_group)
: ConcreteColumnBuilder(pool, std::move(task_group), col_index),
options_(std::move(options)),
infer_status_(*options_) {}

Status Init();

Expand All @@ -237,7 +240,8 @@ class InferringColumnBuilder : public ConcreteColumnBuilder {

// CAUTION: ConvertOptions can grow large (if it customizes hundreds or
// thousands of columns), so avoid copying it in each InferringColumnBuilder.
const ConvertOptions& options_;
// However, it needs to be owned because of async task execution, hence shared_ptr.
std::shared_ptr<ConvertOptions> options_;

// Current inference status
InferStatus infer_status_;
Expand All @@ -257,7 +261,9 @@ Status InferringColumnBuilder::UpdateType() {
}

void InferringColumnBuilder::ScheduleConvertChunk(int64_t chunk_index) {
task_group_->Append([this, chunk_index]() { return TryConvertChunk(chunk_index); });
task_group_->Append([self = shared_from_this(), this, chunk_index]() {
return TryConvertChunk(chunk_index);
});
}

Status InferringColumnBuilder::TryConvertChunk(int64_t chunk_index) {
Expand Down Expand Up @@ -361,26 +367,26 @@ Result<std::shared_ptr<ChunkedArray>> InferringColumnBuilder::Finish() {

Result<std::shared_ptr<ColumnBuilder>> ColumnBuilder::Make(
MemoryPool* pool, const std::shared_ptr<DataType>& type, int32_t col_index,
const ConvertOptions& options, const std::shared_ptr<TaskGroup>& task_group) {
auto ptr =
std::make_shared<TypedColumnBuilder>(type, col_index, options, pool, task_group);
std::shared_ptr<ConvertOptions> options, std::shared_ptr<TaskGroup> task_group) {
auto ptr = std::make_shared<TypedColumnBuilder>(type, col_index, std::move(options),
pool, std::move(task_group));
RETURN_NOT_OK(ptr->Init());
return ptr;
}

Result<std::shared_ptr<ColumnBuilder>> ColumnBuilder::Make(
MemoryPool* pool, int32_t col_index, const ConvertOptions& options,
const std::shared_ptr<TaskGroup>& task_group) {
auto ptr =
std::make_shared<InferringColumnBuilder>(col_index, options, pool, task_group);
MemoryPool* pool, int32_t col_index, std::shared_ptr<ConvertOptions> options,
std::shared_ptr<TaskGroup> task_group) {
auto ptr = std::make_shared<InferringColumnBuilder>(col_index, std::move(options), pool,
std::move(task_group));
RETURN_NOT_OK(ptr->Init());
return ptr;
}

Result<std::shared_ptr<ColumnBuilder>> ColumnBuilder::MakeNull(
MemoryPool* pool, const std::shared_ptr<DataType>& type,
const std::shared_ptr<TaskGroup>& task_group) {
return std::make_shared<NullColumnBuilder>(type, pool, task_group);
std::shared_ptr<TaskGroup> task_group) {
return std::make_shared<NullColumnBuilder>(type, pool, std::move(task_group));
}

} // namespace csv
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/arrow/csv/column_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ class ARROW_EXPORT ColumnBuilder {
/// Construct a strictly-typed ColumnBuilder.
static Result<std::shared_ptr<ColumnBuilder>> Make(
MemoryPool* pool, const std::shared_ptr<DataType>& type, int32_t col_index,
const ConvertOptions& options,
const std::shared_ptr<arrow::internal::TaskGroup>& task_group);
std::shared_ptr<ConvertOptions> options,
std::shared_ptr<arrow::internal::TaskGroup> task_group);

/// Construct a type-inferring ColumnBuilder.
static Result<std::shared_ptr<ColumnBuilder>> Make(
MemoryPool* pool, int32_t col_index, const ConvertOptions& options,
const std::shared_ptr<arrow::internal::TaskGroup>& task_group);
MemoryPool* pool, int32_t col_index, std::shared_ptr<ConvertOptions> options,
std::shared_ptr<arrow::internal::TaskGroup> task_group);

/// Construct a ColumnBuilder for a column of nulls
/// (i.e. not present in the CSV file).
static Result<std::shared_ptr<ColumnBuilder>> MakeNull(
MemoryPool* pool, const std::shared_ptr<DataType>& type,
const std::shared_ptr<arrow::internal::TaskGroup>& task_group);
std::shared_ptr<arrow::internal::TaskGroup> task_group);

protected:
explicit ColumnBuilder(std::shared_ptr<arrow::internal::TaskGroup> task_group)
Expand Down
24 changes: 14 additions & 10 deletions cpp/src/arrow/csv/column_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ using ChunkData = std::vector<std::vector<std::string>>;

class ColumnBuilderTest : public ::testing::Test {
public:
std::shared_ptr<ConvertOptions> MakeOptions(ConvertOptions options) {
return std::make_shared<ConvertOptions>(std::move(options));
}

void AssertBuilding(const std::shared_ptr<ColumnBuilder>& builder,
const ChunkData& chunks, bool validate_full,
std::shared_ptr<ChunkedArray>* out) {
Expand Down Expand Up @@ -76,8 +80,8 @@ class ColumnBuilderTest : public ::testing::Test {
std::shared_ptr<ChunkedArray> expected, bool validate_full = true) {
std::shared_ptr<ColumnBuilder> builder;
std::shared_ptr<ChunkedArray> actual;
ASSERT_OK_AND_ASSIGN(builder,
ColumnBuilder::Make(default_memory_pool(), 0, options, tg));
ASSERT_OK_AND_ASSIGN(
builder, ColumnBuilder::Make(default_memory_pool(), 0, MakeOptions(options), tg));
AssertBuilding(builder, csv_data, validate_full, &actual);
AssertChunkedEqual(*actual, *expected);
}
Expand All @@ -96,8 +100,8 @@ class ColumnBuilderTest : public ::testing::Test {
std::shared_ptr<ChunkedArray> expected) {
std::shared_ptr<ColumnBuilder> builder;
std::shared_ptr<ChunkedArray> actual;
ASSERT_OK_AND_ASSIGN(
builder, ColumnBuilder::Make(default_memory_pool(), type, 0, options, tg));
ASSERT_OK_AND_ASSIGN(builder, ColumnBuilder::Make(default_memory_pool(), type, 0,
MakeOptions(options), tg));
AssertBuilding(builder, csv_data, &actual);
AssertChunkedEqual(*actual, *expected);
}
Expand Down Expand Up @@ -219,8 +223,8 @@ TEST_F(TypedColumnBuilderTest, Empty) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK_AND_ASSIGN(
builder, ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg));
ASSERT_OK_AND_ASSIGN(builder, ColumnBuilder::Make(default_memory_pool(), int32(), 0,
MakeOptions(options), tg));

std::shared_ptr<ChunkedArray> actual;
AssertBuilding(builder, {}, &actual);
Expand All @@ -242,8 +246,8 @@ TEST_F(TypedColumnBuilderTest, Insert) {
auto options = ConvertOptions::Defaults();
auto tg = TaskGroup::MakeSerial();
std::shared_ptr<ColumnBuilder> builder;
ASSERT_OK_AND_ASSIGN(
builder, ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg));
ASSERT_OK_AND_ASSIGN(builder, ColumnBuilder::Make(default_memory_pool(), int32(), 0,
MakeOptions(options), tg));

std::shared_ptr<BlockParser> parser;
std::shared_ptr<ChunkedArray> actual, expected;
Expand Down Expand Up @@ -298,8 +302,8 @@ class InferringColumnBuilderTest : public ColumnBuilderTest {
bool validate_full = true) {
std::shared_ptr<ColumnBuilder> builder;
std::shared_ptr<ChunkedArray> actual;
ASSERT_OK_AND_ASSIGN(builder,
ColumnBuilder::Make(default_memory_pool(), 0, options, tg));
ASSERT_OK_AND_ASSIGN(
builder, ColumnBuilder::Make(default_memory_pool(), 0, MakeOptions(options), tg));
AssertBuilding(builder, csv_data, validate_full, &actual);
ASSERT_EQ(actual->num_chunks(), static_cast<int>(csv_data.size()));
for (int i = 0; i < actual->num_chunks(); ++i) {
Expand Down
24 changes: 13 additions & 11 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,10 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader {
protected:
// Make column builders from conversion schema
Status MakeColumnBuilders() {
// This is making a single copy of ConvertOptions, which is reasonable even if
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is n shared_ptr VS. n copies of ConvertOptions? (Which makes sense.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

// it holds a slightly large container, rather than a distinct copy for each
// ColumnBuilder.
auto owned_options = std::make_shared<ConvertOptions>(convert_options_);
for (const auto& column : conversion_schema_.columns) {
std::shared_ptr<ColumnBuilder> builder;
if (column.is_missing) {
Expand All @@ -777,11 +781,11 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader {
} else if (column.type != nullptr) {
ARROW_ASSIGN_OR_RAISE(
builder, ColumnBuilder::Make(io_context_.pool(), column.type, column.index,
convert_options_, task_group_));
owned_options, task_group_));
} else {
ARROW_ASSIGN_OR_RAISE(builder,
ColumnBuilder::Make(io_context_.pool(), column.index,
convert_options_, task_group_));
ARROW_ASSIGN_OR_RAISE(
builder, ColumnBuilder::Make(io_context_.pool(), column.index, owned_options,
task_group_));
}
column_builders_.push_back(std::move(builder));
}
Expand Down Expand Up @@ -1021,13 +1025,9 @@ class AsyncThreadedTableReader
convert_options, /*count_rows=*/false),
cpu_executor_(cpu_executor) {}

~AsyncThreadedTableReader() override {
if (task_group_) {
// In case of error, make sure all pending tasks are finished before
// we start destroying BaseTableReader members
ARROW_UNUSED(task_group_->Finish());
}
}
// XXX Ideally we can create a child StopToken for the tasks spawned here, and
// request cancellation in our destructor, to avoid running superfluous tasks
// in case of early error.

Status Init() override {
ARROW_ASSIGN_OR_RAISE(auto istream_it,
Expand All @@ -1048,6 +1048,8 @@ class AsyncThreadedTableReader
Result<std::shared_ptr<Table>> Read() override { return ReadAsync().result(); }

Future<std::shared_ptr<Table>> ReadAsync() override {
// Note that this task group doesn't survive our destruction, so it's essential
// that async callbacks own a strong ref to us.
task_group_ = TaskGroup::MakeThreaded(cpu_executor_, io_context_.stop_token());

auto self = shared_from_this();
Expand Down
Loading
Loading