Skip to content

Commit 932821c

Browse files
committed
GH-48741: [C++] Fix deadlock in CSV AsyncThreadedTableReader destructor
1 parent 7ba1a7b commit 932821c

File tree

2 files changed

+24
-19
lines changed

2 files changed

+24
-19
lines changed

cpp/src/arrow/csv/column_builder.cc

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,21 @@
3636
#include "arrow/status.h"
3737
#include "arrow/type.h"
3838
#include "arrow/type_fwd.h"
39+
#include "arrow/util/checked_cast.h"
3940
#include "arrow/util/logging_internal.h"
4041
#include "arrow/util/task_group.h"
4142

4243
namespace arrow {
4344

45+
using internal::checked_pointer_cast;
4446
using internal::TaskGroup;
4547

4648
namespace csv {
4749

4850
class BlockParser;
4951

50-
class ConcreteColumnBuilder : public ColumnBuilder {
52+
class ConcreteColumnBuilder : public ColumnBuilder,
53+
public std::enable_shared_from_this<ConcreteColumnBuilder> {
5154
public:
5255
explicit ConcreteColumnBuilder(MemoryPool* pool, std::shared_ptr<TaskGroup> task_group,
5356
int32_t col_index = -1)
@@ -152,15 +155,17 @@ void NullColumnBuilder::Insert(int64_t block_index,
152155
const int32_t num_rows = parser->num_rows();
153156
DCHECK_GE(num_rows, 0);
154157

155-
task_group_->Append([this, block_index, num_rows]() -> Status {
156-
std::unique_ptr<ArrayBuilder> builder;
157-
RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder));
158-
std::shared_ptr<Array> res;
159-
RETURN_NOT_OK(builder->AppendNulls(num_rows));
160-
RETURN_NOT_OK(builder->Finish(&res));
161-
162-
return SetChunk(block_index, res);
163-
});
158+
// `self` keeps us alive, `this` allows easy access to derived / protected member vars
159+
task_group_->Append(
160+
[self = shared_from_this(), this, block_index, num_rows]() -> Status {
161+
std::unique_ptr<ArrayBuilder> builder;
162+
RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder));
163+
std::shared_ptr<Array> res;
164+
RETURN_NOT_OK(builder->AppendNulls(num_rows));
165+
RETURN_NOT_OK(builder->Finish(&res));
166+
167+
return SetChunk(block_index, res);
168+
});
164169
}
165170

166171
//////////////////////////////////////////////////////////////////////////
@@ -202,7 +207,7 @@ void TypedColumnBuilder::Insert(int64_t block_index,
202207
ReserveChunks(block_index);
203208

204209
// We're careful that all references in the closure outlive the Append() call
205-
task_group_->Append([this, parser, block_index]() -> Status {
210+
task_group_->Append([self = shared_from_this(), this, parser, block_index]() -> Status {
206211
return SetChunk(block_index, converter_->Convert(*parser, col_index_));
207212
});
208213
}
@@ -257,7 +262,9 @@ Status InferringColumnBuilder::UpdateType() {
257262
}
258263

259264
void InferringColumnBuilder::ScheduleConvertChunk(int64_t chunk_index) {
260-
task_group_->Append([this, chunk_index]() { return TryConvertChunk(chunk_index); });
265+
task_group_->Append([self = shared_from_this(), this, chunk_index]() {
266+
return TryConvertChunk(chunk_index);
267+
});
261268
}
262269

263270
Status InferringColumnBuilder::TryConvertChunk(int64_t chunk_index) {

cpp/src/arrow/csv/reader.cc

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,13 +1021,9 @@ class AsyncThreadedTableReader
10211021
convert_options, /*count_rows=*/false),
10221022
cpu_executor_(cpu_executor) {}
10231023

1024-
~AsyncThreadedTableReader() override {
1025-
if (task_group_) {
1026-
// In case of error, make sure all pending tasks are finished before
1027-
// we start destroying BaseTableReader members
1028-
ARROW_UNUSED(task_group_->Finish());
1029-
}
1030-
}
1024+
// XXX Ideally we can create a child StopToken for the tasks spawned here, and
1025+
// request cancellation in our destructor, to avoid running superfluous tasks
1026+
// in case of early error.
10311027

10321028
Status Init() override {
10331029
ARROW_ASSIGN_OR_RAISE(auto istream_it,
@@ -1048,6 +1044,8 @@ class AsyncThreadedTableReader
10481044
Result<std::shared_ptr<Table>> Read() override { return ReadAsync().result(); }
10491045

10501046
Future<std::shared_ptr<Table>> ReadAsync() override {
1047+
// Note that this task group doesn't survive our destruction, so it's essential
1048+
// that async callbacks own a strong ref to us.
10511049
task_group_ = TaskGroup::MakeThreaded(cpu_executor_, io_context_.stop_token());
10521050

10531051
auto self = shared_from_this();

0 commit comments

Comments
 (0)