Skip to content

Commit a8c352d

Browse files
committed
apacheGH-48741: [C++] Fix deadlock in CSV AsyncThreadedTableReader destructor
1 parent de9ff0d commit a8c352d

File tree

5 files changed

+129
-74
lines changed

5 files changed

+129
-74
lines changed

cpp/src/arrow/csv/column_builder.cc

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ namespace csv {
4747

4848
class BlockParser;
4949

50-
class ConcreteColumnBuilder : public ColumnBuilder {
50+
class ConcreteColumnBuilder : public ColumnBuilder,
51+
public std::enable_shared_from_this<ConcreteColumnBuilder> {
5152
public:
5253
explicit ConcreteColumnBuilder(MemoryPool* pool, std::shared_ptr<TaskGroup> task_group,
5354
int32_t col_index = -1)
@@ -152,15 +153,17 @@ void NullColumnBuilder::Insert(int64_t block_index,
152153
const int32_t num_rows = parser->num_rows();
153154
DCHECK_GE(num_rows, 0);
154155

155-
task_group_->Append([this, block_index, num_rows]() -> Status {
156-
std::unique_ptr<ArrayBuilder> builder;
157-
RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder));
158-
std::shared_ptr<Array> res;
159-
RETURN_NOT_OK(builder->AppendNulls(num_rows));
160-
RETURN_NOT_OK(builder->Finish(&res));
161-
162-
return SetChunk(block_index, res);
163-
});
156+
// `self` keeps us alive, `this` allows easy access to derived / protected member vars
157+
task_group_->Append(
158+
[self = shared_from_this(), this, block_index, num_rows]() -> Status {
159+
std::unique_ptr<ArrayBuilder> builder;
160+
RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder));
161+
std::shared_ptr<Array> res;
162+
RETURN_NOT_OK(builder->AppendNulls(num_rows));
163+
RETURN_NOT_OK(builder->Finish(&res));
164+
165+
return SetChunk(block_index, res);
166+
});
164167
}
165168

166169
//////////////////////////////////////////////////////////////////////////
@@ -169,11 +172,11 @@ void NullColumnBuilder::Insert(int64_t block_index,
169172
class TypedColumnBuilder : public ConcreteColumnBuilder {
170173
public:
171174
TypedColumnBuilder(const std::shared_ptr<DataType>& type, int32_t col_index,
172-
const ConvertOptions& options, MemoryPool* pool,
173-
const std::shared_ptr<TaskGroup>& task_group)
174-
: ConcreteColumnBuilder(pool, task_group, col_index),
175+
std::shared_ptr<ConvertOptions> options, MemoryPool* pool,
176+
std::shared_ptr<TaskGroup> task_group)
177+
: ConcreteColumnBuilder(pool, std::move(task_group), col_index),
175178
type_(type),
176-
options_(options) {}
179+
options_(std::move(options)) {}
177180

178181
Status Init();
179182

@@ -185,13 +188,13 @@ class TypedColumnBuilder : public ConcreteColumnBuilder {
185188
std::shared_ptr<DataType> type_;
186189
// CAUTION: ConvertOptions can grow large (if it customizes hundreds or
187190
// thousands of columns), so avoid copying it in each TypedColumnBuilder.
188-
const ConvertOptions& options_;
191+
std::shared_ptr<ConvertOptions> options_;
189192

190193
std::shared_ptr<Converter> converter_;
191194
};
192195

193196
Status TypedColumnBuilder::Init() {
194-
ARROW_ASSIGN_OR_RAISE(converter_, Converter::Make(type_, options_, pool_));
197+
ARROW_ASSIGN_OR_RAISE(converter_, Converter::Make(type_, *options_, pool_));
195198
return Status::OK();
196199
}
197200

@@ -202,7 +205,7 @@ void TypedColumnBuilder::Insert(int64_t block_index,
202205
ReserveChunks(block_index);
203206

204207
// We're careful that all references in the closure outlive the Append() call
205-
task_group_->Append([this, parser, block_index]() -> Status {
208+
task_group_->Append([self = shared_from_this(), this, parser, block_index]() -> Status {
206209
return SetChunk(block_index, converter_->Convert(*parser, col_index_));
207210
});
208211
}
@@ -212,11 +215,11 @@ void TypedColumnBuilder::Insert(int64_t block_index,
212215

213216
class InferringColumnBuilder : public ConcreteColumnBuilder {
214217
public:
215-
InferringColumnBuilder(int32_t col_index, const ConvertOptions& options,
216-
MemoryPool* pool, const std::shared_ptr<TaskGroup>& task_group)
217-
: ConcreteColumnBuilder(pool, task_group, col_index),
218-
options_(options),
219-
infer_status_(options) {}
218+
InferringColumnBuilder(int32_t col_index, std::shared_ptr<ConvertOptions> options,
219+
MemoryPool* pool, std::shared_ptr<TaskGroup> task_group)
220+
: ConcreteColumnBuilder(pool, std::move(task_group), col_index),
221+
options_(std::move(options)),
222+
infer_status_(*options_) {}
220223

221224
Status Init();
222225

@@ -237,7 +240,8 @@ class InferringColumnBuilder : public ConcreteColumnBuilder {
237240

238241
// CAUTION: ConvertOptions can grow large (if it customizes hundreds or
239242
// thousands of columns), so avoid copying it in each InferringColumnBuilder.
240-
const ConvertOptions& options_;
243+
// However, it needs to be owned because of async task execution, hence shared_ptr.
244+
std::shared_ptr<ConvertOptions> options_;
241245

242246
// Current inference status
243247
InferStatus infer_status_;
@@ -257,7 +261,9 @@ Status InferringColumnBuilder::UpdateType() {
257261
}
258262

259263
void InferringColumnBuilder::ScheduleConvertChunk(int64_t chunk_index) {
260-
task_group_->Append([this, chunk_index]() { return TryConvertChunk(chunk_index); });
264+
task_group_->Append([self = shared_from_this(), this, chunk_index]() {
265+
return TryConvertChunk(chunk_index);
266+
});
261267
}
262268

263269
Status InferringColumnBuilder::TryConvertChunk(int64_t chunk_index) {
@@ -361,26 +367,26 @@ Result<std::shared_ptr<ChunkedArray>> InferringColumnBuilder::Finish() {
361367

362368
Result<std::shared_ptr<ColumnBuilder>> ColumnBuilder::Make(
363369
MemoryPool* pool, const std::shared_ptr<DataType>& type, int32_t col_index,
364-
const ConvertOptions& options, const std::shared_ptr<TaskGroup>& task_group) {
365-
auto ptr =
366-
std::make_shared<TypedColumnBuilder>(type, col_index, options, pool, task_group);
370+
std::shared_ptr<ConvertOptions> options, std::shared_ptr<TaskGroup> task_group) {
371+
auto ptr = std::make_shared<TypedColumnBuilder>(type, col_index, std::move(options),
372+
pool, std::move(task_group));
367373
RETURN_NOT_OK(ptr->Init());
368374
return ptr;
369375
}
370376

371377
Result<std::shared_ptr<ColumnBuilder>> ColumnBuilder::Make(
372-
MemoryPool* pool, int32_t col_index, const ConvertOptions& options,
373-
const std::shared_ptr<TaskGroup>& task_group) {
374-
auto ptr =
375-
std::make_shared<InferringColumnBuilder>(col_index, options, pool, task_group);
378+
MemoryPool* pool, int32_t col_index, std::shared_ptr<ConvertOptions> options,
379+
std::shared_ptr<TaskGroup> task_group) {
380+
auto ptr = std::make_shared<InferringColumnBuilder>(col_index, std::move(options), pool,
381+
std::move(task_group));
376382
RETURN_NOT_OK(ptr->Init());
377383
return ptr;
378384
}
379385

380386
Result<std::shared_ptr<ColumnBuilder>> ColumnBuilder::MakeNull(
381387
MemoryPool* pool, const std::shared_ptr<DataType>& type,
382-
const std::shared_ptr<TaskGroup>& task_group) {
383-
return std::make_shared<NullColumnBuilder>(type, pool, task_group);
388+
std::shared_ptr<TaskGroup> task_group) {
389+
return std::make_shared<NullColumnBuilder>(type, pool, std::move(task_group));
384390
}
385391

386392
} // namespace csv

cpp/src/arrow/csv/column_builder.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,19 +53,19 @@ class ARROW_EXPORT ColumnBuilder {
5353
/// Construct a strictly-typed ColumnBuilder.
5454
static Result<std::shared_ptr<ColumnBuilder>> Make(
5555
MemoryPool* pool, const std::shared_ptr<DataType>& type, int32_t col_index,
56-
const ConvertOptions& options,
57-
const std::shared_ptr<arrow::internal::TaskGroup>& task_group);
56+
std::shared_ptr<ConvertOptions> options,
57+
std::shared_ptr<arrow::internal::TaskGroup> task_group);
5858

5959
/// Construct a type-inferring ColumnBuilder.
6060
static Result<std::shared_ptr<ColumnBuilder>> Make(
61-
MemoryPool* pool, int32_t col_index, const ConvertOptions& options,
62-
const std::shared_ptr<arrow::internal::TaskGroup>& task_group);
61+
MemoryPool* pool, int32_t col_index, std::shared_ptr<ConvertOptions> options,
62+
std::shared_ptr<arrow::internal::TaskGroup> task_group);
6363

6464
/// Construct a ColumnBuilder for a column of nulls
6565
/// (i.e. not present in the CSV file).
6666
static Result<std::shared_ptr<ColumnBuilder>> MakeNull(
6767
MemoryPool* pool, const std::shared_ptr<DataType>& type,
68-
const std::shared_ptr<arrow::internal::TaskGroup>& task_group);
68+
std::shared_ptr<arrow::internal::TaskGroup> task_group);
6969

7070
protected:
7171
explicit ColumnBuilder(std::shared_ptr<arrow::internal::TaskGroup> task_group)

cpp/src/arrow/csv/column_builder_test.cc

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ using ChunkData = std::vector<std::vector<std::string>>;
4949

5050
class ColumnBuilderTest : public ::testing::Test {
5151
public:
52+
std::shared_ptr<ConvertOptions> MakeOptions(ConvertOptions options) {
53+
return std::make_shared<ConvertOptions>(std::move(options));
54+
}
55+
5256
void AssertBuilding(const std::shared_ptr<ColumnBuilder>& builder,
5357
const ChunkData& chunks, bool validate_full,
5458
std::shared_ptr<ChunkedArray>* out) {
@@ -76,8 +80,8 @@ class ColumnBuilderTest : public ::testing::Test {
7680
std::shared_ptr<ChunkedArray> expected, bool validate_full = true) {
7781
std::shared_ptr<ColumnBuilder> builder;
7882
std::shared_ptr<ChunkedArray> actual;
79-
ASSERT_OK_AND_ASSIGN(builder,
80-
ColumnBuilder::Make(default_memory_pool(), 0, options, tg));
83+
ASSERT_OK_AND_ASSIGN(
84+
builder, ColumnBuilder::Make(default_memory_pool(), 0, MakeOptions(options), tg));
8185
AssertBuilding(builder, csv_data, validate_full, &actual);
8286
AssertChunkedEqual(*actual, *expected);
8387
}
@@ -96,8 +100,8 @@ class ColumnBuilderTest : public ::testing::Test {
96100
std::shared_ptr<ChunkedArray> expected) {
97101
std::shared_ptr<ColumnBuilder> builder;
98102
std::shared_ptr<ChunkedArray> actual;
99-
ASSERT_OK_AND_ASSIGN(
100-
builder, ColumnBuilder::Make(default_memory_pool(), type, 0, options, tg));
103+
ASSERT_OK_AND_ASSIGN(builder, ColumnBuilder::Make(default_memory_pool(), type, 0,
104+
MakeOptions(options), tg));
101105
AssertBuilding(builder, csv_data, &actual);
102106
AssertChunkedEqual(*actual, *expected);
103107
}
@@ -219,8 +223,8 @@ TEST_F(TypedColumnBuilderTest, Empty) {
219223
auto options = ConvertOptions::Defaults();
220224
auto tg = TaskGroup::MakeSerial();
221225
std::shared_ptr<ColumnBuilder> builder;
222-
ASSERT_OK_AND_ASSIGN(
223-
builder, ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg));
226+
ASSERT_OK_AND_ASSIGN(builder, ColumnBuilder::Make(default_memory_pool(), int32(), 0,
227+
MakeOptions(options), tg));
224228

225229
std::shared_ptr<ChunkedArray> actual;
226230
AssertBuilding(builder, {}, &actual);
@@ -242,8 +246,8 @@ TEST_F(TypedColumnBuilderTest, Insert) {
242246
auto options = ConvertOptions::Defaults();
243247
auto tg = TaskGroup::MakeSerial();
244248
std::shared_ptr<ColumnBuilder> builder;
245-
ASSERT_OK_AND_ASSIGN(
246-
builder, ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg));
249+
ASSERT_OK_AND_ASSIGN(builder, ColumnBuilder::Make(default_memory_pool(), int32(), 0,
250+
MakeOptions(options), tg));
247251

248252
std::shared_ptr<BlockParser> parser;
249253
std::shared_ptr<ChunkedArray> actual, expected;
@@ -298,8 +302,8 @@ class InferringColumnBuilderTest : public ColumnBuilderTest {
298302
bool validate_full = true) {
299303
std::shared_ptr<ColumnBuilder> builder;
300304
std::shared_ptr<ChunkedArray> actual;
301-
ASSERT_OK_AND_ASSIGN(builder,
302-
ColumnBuilder::Make(default_memory_pool(), 0, options, tg));
305+
ASSERT_OK_AND_ASSIGN(
306+
builder, ColumnBuilder::Make(default_memory_pool(), 0, MakeOptions(options), tg));
303307
AssertBuilding(builder, csv_data, validate_full, &actual);
304308
ASSERT_EQ(actual->num_chunks(), static_cast<int>(csv_data.size()));
305309
for (int i = 0; i < actual->num_chunks(); ++i) {

cpp/src/arrow/csv/reader.cc

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,10 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader {
769769
protected:
770770
// Make column builders from conversion schema
771771
Status MakeColumnBuilders() {
772+
// This is making a single copy of ConvertOptions, which is reasonable even if
773+
// it holds a slightly large container, rather than a distinct copy for each
774+
// ColumnBuilder.
775+
auto owned_options = std::make_shared<ConvertOptions>(convert_options_);
772776
for (const auto& column : conversion_schema_.columns) {
773777
std::shared_ptr<ColumnBuilder> builder;
774778
if (column.is_missing) {
@@ -777,11 +781,11 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader {
777781
} else if (column.type != nullptr) {
778782
ARROW_ASSIGN_OR_RAISE(
779783
builder, ColumnBuilder::Make(io_context_.pool(), column.type, column.index,
780-
convert_options_, task_group_));
784+
owned_options, task_group_));
781785
} else {
782-
ARROW_ASSIGN_OR_RAISE(builder,
783-
ColumnBuilder::Make(io_context_.pool(), column.index,
784-
convert_options_, task_group_));
786+
ARROW_ASSIGN_OR_RAISE(
787+
builder, ColumnBuilder::Make(io_context_.pool(), column.index, owned_options,
788+
task_group_));
785789
}
786790
column_builders_.push_back(std::move(builder));
787791
}
@@ -1021,13 +1025,9 @@ class AsyncThreadedTableReader
10211025
convert_options, /*count_rows=*/false),
10221026
cpu_executor_(cpu_executor) {}
10231027

1024-
~AsyncThreadedTableReader() override {
1025-
if (task_group_) {
1026-
// In case of error, make sure all pending tasks are finished before
1027-
// we start destroying BaseTableReader members
1028-
ARROW_UNUSED(task_group_->Finish());
1029-
}
1030-
}
1028+
// XXX Ideally we can create a child StopToken for the tasks spawned here, and
1029+
// request cancellation in our destructor, to avoid running superfluous tasks
1030+
// in case of early error.
10311031

10321032
Status Init() override {
10331033
ARROW_ASSIGN_OR_RAISE(auto istream_it,
@@ -1048,6 +1048,8 @@ class AsyncThreadedTableReader
10481048
Result<std::shared_ptr<Table>> Read() override { return ReadAsync().result(); }
10491049

10501050
Future<std::shared_ptr<Table>> ReadAsync() override {
1051+
// Note that this task group doesn't survive our destruction, so it's essential
1052+
// that async callbacks own a strong ref to us.
10511053
task_group_ = TaskGroup::MakeThreaded(cpu_executor_, io_context_.stop_token());
10521054

10531055
auto self = shared_from_this();

0 commit comments

Comments
 (0)