diff --git a/cpp/src/arrow/csv/column_builder.cc b/cpp/src/arrow/csv/column_builder.cc index 393df18ec4c..5d54b3445ec 100644 --- a/cpp/src/arrow/csv/column_builder.cc +++ b/cpp/src/arrow/csv/column_builder.cc @@ -47,7 +47,8 @@ namespace csv { class BlockParser; -class ConcreteColumnBuilder : public ColumnBuilder { +class ConcreteColumnBuilder : public ColumnBuilder, + public std::enable_shared_from_this { public: explicit ConcreteColumnBuilder(MemoryPool* pool, std::shared_ptr task_group, int32_t col_index = -1) @@ -133,8 +134,8 @@ class ConcreteColumnBuilder : public ColumnBuilder { class NullColumnBuilder : public ConcreteColumnBuilder { public: explicit NullColumnBuilder(const std::shared_ptr& type, MemoryPool* pool, - const std::shared_ptr& task_group) - : ConcreteColumnBuilder(pool, task_group), type_(type) {} + std::shared_ptr task_group) + : ConcreteColumnBuilder(pool, std::move(task_group)), type_(type) {} void Insert(int64_t block_index, const std::shared_ptr& parser) override; @@ -152,15 +153,17 @@ void NullColumnBuilder::Insert(int64_t block_index, const int32_t num_rows = parser->num_rows(); DCHECK_GE(num_rows, 0); - task_group_->Append([this, block_index, num_rows]() -> Status { - std::unique_ptr builder; - RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder)); - std::shared_ptr res; - RETURN_NOT_OK(builder->AppendNulls(num_rows)); - RETURN_NOT_OK(builder->Finish(&res)); - - return SetChunk(block_index, res); - }); + // `self` keeps us alive, `this` allows easy access to derived / protected member vars + task_group_->Append( + [self = shared_from_this(), this, block_index, num_rows]() -> Status { + std::unique_ptr builder; + RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder)); + std::shared_ptr res; + RETURN_NOT_OK(builder->AppendNulls(num_rows)); + RETURN_NOT_OK(builder->Finish(&res)); + + return SetChunk(block_index, res); + }); } ////////////////////////////////////////////////////////////////////////// @@ -169,11 +172,11 @@ void NullColumnBuilder::Insert(int64_t block_index, class TypedColumnBuilder : public ConcreteColumnBuilder { public: TypedColumnBuilder(const std::shared_ptr& type, int32_t col_index, - const ConvertOptions& options, MemoryPool* pool, - const std::shared_ptr& task_group) - : ConcreteColumnBuilder(pool, task_group, col_index), + std::shared_ptr options, MemoryPool* pool, + std::shared_ptr task_group) + : ConcreteColumnBuilder(pool, std::move(task_group), col_index), type_(type), - options_(options) {} + options_(std::move(options)) {} Status Init(); @@ -185,13 +188,13 @@ class TypedColumnBuilder : public ConcreteColumnBuilder { std::shared_ptr type_; // CAUTION: ConvertOptions can grow large (if it customizes hundreds or // thousands of columns), so avoid copying it in each TypedColumnBuilder. - const ConvertOptions& options_; + std::shared_ptr options_; std::shared_ptr converter_; }; Status TypedColumnBuilder::Init() { - ARROW_ASSIGN_OR_RAISE(converter_, Converter::Make(type_, options_, pool_)); + ARROW_ASSIGN_OR_RAISE(converter_, Converter::Make(type_, *options_, pool_)); return Status::OK(); } @@ -202,7 +205,7 @@ void TypedColumnBuilder::Insert(int64_t block_index, ReserveChunks(block_index); // We're careful that all references in the closure outlive the Append() call - task_group_->Append([this, parser, block_index]() -> Status { + task_group_->Append([self = shared_from_this(), this, parser, block_index]() -> Status { return SetChunk(block_index, converter_->Convert(*parser, col_index_)); }); } @@ -212,11 +215,11 @@ void TypedColumnBuilder::Insert(int64_t block_index, class InferringColumnBuilder : public ConcreteColumnBuilder { public: - InferringColumnBuilder(int32_t col_index, const ConvertOptions& options, - MemoryPool* pool, const std::shared_ptr& task_group) - : ConcreteColumnBuilder(pool, task_group, col_index), - options_(options), - infer_status_(options) {} + InferringColumnBuilder(int32_t col_index, std::shared_ptr options, + MemoryPool* pool, std::shared_ptr task_group) + : ConcreteColumnBuilder(pool, std::move(task_group), col_index), + options_(std::move(options)), + infer_status_(*options_) {} Status Init(); @@ -237,7 +240,8 @@ class InferringColumnBuilder : public ConcreteColumnBuilder { // CAUTION: ConvertOptions can grow large (if it customizes hundreds or // thousands of columns), so avoid copying it in each InferringColumnBuilder. - const ConvertOptions& options_; + // However, it needs to be owned because of async task execution, hence shared_ptr. + std::shared_ptr options_; // Current inference status InferStatus infer_status_; @@ -257,7 +261,9 @@ Status InferringColumnBuilder::UpdateType() { } void InferringColumnBuilder::ScheduleConvertChunk(int64_t chunk_index) { - task_group_->Append([this, chunk_index]() { return TryConvertChunk(chunk_index); }); + task_group_->Append([self = shared_from_this(), this, chunk_index]() { + return TryConvertChunk(chunk_index); + }); } Status InferringColumnBuilder::TryConvertChunk(int64_t chunk_index) { @@ -361,26 +367,26 @@ Result> InferringColumnBuilder::Finish() { Result> ColumnBuilder::Make( MemoryPool* pool, const std::shared_ptr& type, int32_t col_index, - const ConvertOptions& options, const std::shared_ptr& task_group) { - auto ptr = - std::make_shared(type, col_index, options, pool, task_group); + std::shared_ptr options, std::shared_ptr task_group) { + auto ptr = std::make_shared(type, col_index, std::move(options), + pool, std::move(task_group)); RETURN_NOT_OK(ptr->Init()); return ptr; } Result> ColumnBuilder::Make( - MemoryPool* pool, int32_t col_index, const ConvertOptions& options, - const std::shared_ptr& task_group) { - auto ptr = - std::make_shared(col_index, options, pool, task_group); + MemoryPool* pool, int32_t col_index, std::shared_ptr options, + std::shared_ptr task_group) { + auto ptr = std::make_shared(col_index, std::move(options), pool, + std::move(task_group)); RETURN_NOT_OK(ptr->Init()); return ptr; } Result> ColumnBuilder::MakeNull( MemoryPool* pool, const std::shared_ptr& type, - const std::shared_ptr& task_group) { - return std::make_shared(type, pool, task_group); + std::shared_ptr task_group) { + return std::make_shared(type, pool, std::move(task_group)); } } // namespace csv diff --git a/cpp/src/arrow/csv/column_builder.h b/cpp/src/arrow/csv/column_builder.h index 07279db313e..9fc4643d9d4 100644 --- a/cpp/src/arrow/csv/column_builder.h +++ b/cpp/src/arrow/csv/column_builder.h @@ -53,19 +53,19 @@ class ARROW_EXPORT ColumnBuilder { /// Construct a strictly-typed ColumnBuilder. static Result> Make( MemoryPool* pool, const std::shared_ptr& type, int32_t col_index, - const ConvertOptions& options, - const std::shared_ptr& task_group); + std::shared_ptr options, + std::shared_ptr task_group); /// Construct a type-inferring ColumnBuilder. static Result> Make( - MemoryPool* pool, int32_t col_index, const ConvertOptions& options, - const std::shared_ptr& task_group); + MemoryPool* pool, int32_t col_index, std::shared_ptr options, + std::shared_ptr task_group); /// Construct a ColumnBuilder for a column of nulls /// (i.e. not present in the CSV file). static Result> MakeNull( MemoryPool* pool, const std::shared_ptr& type, - const std::shared_ptr& task_group); + std::shared_ptr task_group); protected: explicit ColumnBuilder(std::shared_ptr task_group) diff --git a/cpp/src/arrow/csv/column_builder_test.cc b/cpp/src/arrow/csv/column_builder_test.cc index cb178c1d2b3..dddfb922e72 100644 --- a/cpp/src/arrow/csv/column_builder_test.cc +++ b/cpp/src/arrow/csv/column_builder_test.cc @@ -49,6 +49,10 @@ using ChunkData = std::vector>; class ColumnBuilderTest : public ::testing::Test { public: + std::shared_ptr MakeOptions(ConvertOptions options) { + return std::make_shared(std::move(options)); + } + void AssertBuilding(const std::shared_ptr& builder, const ChunkData& chunks, bool validate_full, std::shared_ptr* out) { @@ -76,8 +80,8 @@ class ColumnBuilderTest : public ::testing::Test { std::shared_ptr expected, bool validate_full = true) { std::shared_ptr builder; std::shared_ptr actual; - ASSERT_OK_AND_ASSIGN(builder, - ColumnBuilder::Make(default_memory_pool(), 0, options, tg)); + ASSERT_OK_AND_ASSIGN( + builder, ColumnBuilder::Make(default_memory_pool(), 0, MakeOptions(options), tg)); AssertBuilding(builder, csv_data, validate_full, &actual); AssertChunkedEqual(*actual, *expected); } @@ -96,8 +100,8 @@ class ColumnBuilderTest : public ::testing::Test { std::shared_ptr expected) { std::shared_ptr builder; std::shared_ptr actual; - ASSERT_OK_AND_ASSIGN( - builder, ColumnBuilder::Make(default_memory_pool(), type, 0, options, tg)); + ASSERT_OK_AND_ASSIGN(builder, ColumnBuilder::Make(default_memory_pool(), type, 0, + MakeOptions(options), tg)); AssertBuilding(builder, csv_data, &actual); AssertChunkedEqual(*actual, *expected); } @@ -219,8 +223,8 @@ TEST_F(TypedColumnBuilderTest, Empty) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK_AND_ASSIGN( - builder, ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg)); + ASSERT_OK_AND_ASSIGN(builder, ColumnBuilder::Make(default_memory_pool(), int32(), 0, + MakeOptions(options), tg)); std::shared_ptr actual; AssertBuilding(builder, {}, &actual); @@ -242,8 +246,8 @@ TEST_F(TypedColumnBuilderTest, Insert) { auto options = ConvertOptions::Defaults(); auto tg = TaskGroup::MakeSerial(); std::shared_ptr builder; - ASSERT_OK_AND_ASSIGN( - builder, ColumnBuilder::Make(default_memory_pool(), int32(), 0, options, tg)); + ASSERT_OK_AND_ASSIGN(builder, ColumnBuilder::Make(default_memory_pool(), int32(), 0, + MakeOptions(options), tg)); std::shared_ptr parser; std::shared_ptr actual, expected; @@ -298,8 +302,8 @@ class InferringColumnBuilderTest : public ColumnBuilderTest { bool validate_full = true) { std::shared_ptr builder; std::shared_ptr actual; - ASSERT_OK_AND_ASSIGN(builder, - ColumnBuilder::Make(default_memory_pool(), 0, options, tg)); + ASSERT_OK_AND_ASSIGN( + builder, ColumnBuilder::Make(default_memory_pool(), 0, MakeOptions(options), tg)); AssertBuilding(builder, csv_data, validate_full, &actual); ASSERT_EQ(actual->num_chunks(), static_cast(csv_data.size())); for (int i = 0; i < actual->num_chunks(); ++i) { diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 3c4e7e3da0c..8720331965e 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -769,6 +769,10 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader { protected: // Make column builders from conversion schema Status MakeColumnBuilders() { + // This is making a single copy of ConvertOptions, which is reasonable even if + // it holds a slightly large container, rather than a distinct copy for each + // ColumnBuilder. + auto owned_options = std::make_shared(convert_options_); for (const auto& column : conversion_schema_.columns) { std::shared_ptr builder; if (column.is_missing) { @@ -777,11 +781,11 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader { } else if (column.type != nullptr) { ARROW_ASSIGN_OR_RAISE( builder, ColumnBuilder::Make(io_context_.pool(), column.type, column.index, - convert_options_, task_group_)); + owned_options, task_group_)); } else { - ARROW_ASSIGN_OR_RAISE(builder, - ColumnBuilder::Make(io_context_.pool(), column.index, - convert_options_, task_group_)); + ARROW_ASSIGN_OR_RAISE( + builder, ColumnBuilder::Make(io_context_.pool(), column.index, owned_options, + task_group_)); } column_builders_.push_back(std::move(builder)); } @@ -1021,13 +1025,9 @@ class AsyncThreadedTableReader convert_options, /*count_rows=*/false), cpu_executor_(cpu_executor) {} - ~AsyncThreadedTableReader() override { - if (task_group_) { - // In case of error, make sure all pending tasks are finished before - // we start destroying BaseTableReader members - ARROW_UNUSED(task_group_->Finish()); - } - } + // XXX Ideally we can create a child StopToken for the tasks spawned here, and + // request cancellation in our destructor, to avoid running superfluous tasks + // in case of early error. Status Init() override { ARROW_ASSIGN_OR_RAISE(auto istream_it, @@ -1048,6 +1048,8 @@ class AsyncThreadedTableReader Result> Read() override { return ReadAsync().result(); } Future> ReadAsync() override { + // Note that this task group doesn't survive our destruction, so it's essential + // that async callbacks own a strong ref to us. task_group_ = TaskGroup::MakeThreaded(cpu_executor_, io_context_.stop_token()); auto self = shared_from_this(); diff --git a/cpp/src/arrow/csv/reader_test.cc b/cpp/src/arrow/csv/reader_test.cc index 57cc7d8efa5..23206717a1c 100644 --- a/cpp/src/arrow/csv/reader_test.cc +++ b/cpp/src/arrow/csv/reader_test.cc @@ -21,7 +21,10 @@ #include #include +#include +#include #include +#include #include #include #include @@ -67,14 +70,15 @@ class StreamingReaderAsTableReader : public TableReader { }; using TableReaderFactory = std::function>( - std::shared_ptr, ParseOptions)>; + std::shared_ptr, ParseOptions, std::optional block_size)>; using StreamingReaderFactory = std::function>( std::shared_ptr)>; void TestEmptyTable(TableReaderFactory reader_factory) { auto empty_buffer = std::make_shared(""); auto empty_input = std::make_shared(empty_buffer); - auto maybe_reader = reader_factory(empty_input, ParseOptions::Defaults()); + auto maybe_reader = + reader_factory(empty_input, ParseOptions::Defaults(), /*block_size=*/{}); // Streaming reader fails on open, table readers fail on first read if (maybe_reader.ok()) { ASSERT_FINISHES_AND_RAISES(Invalid, (*maybe_reader)->ReadAsync()); @@ -86,7 +90,8 @@ void TestEmptyTable(TableReaderFactory reader_factory) { void TestHeaderOnly(TableReaderFactory reader_factory) { auto header_only_buffer = std::make_shared("a,b,c\n"); auto input = std::make_shared(header_only_buffer); - ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input, ParseOptions::Defaults())); + ASSERT_OK_AND_ASSIGN( + auto reader, reader_factory(input, ParseOptions::Defaults(), /*block_size=*/{})); ASSERT_FINISHES_OK_AND_ASSIGN(auto table, reader->ReadAsync()); ASSERT_EQ(table->schema()->num_fields(), 3); ASSERT_EQ(table->num_rows(), 0); @@ -101,6 +106,33 @@ void TestHeaderOnlyStreaming(StreamingReaderFactory reader_factory) { ASSERT_EQ(next_batch, nullptr); } +void TestStraddling(TableReaderFactory reader_factory) { + // The largest line is 25 bytes, it is surrounded by many valid lines + constexpr std::string_view kData = + "a,b\n1,2\n3,4\n5,6\n7,8\n111111111111,22222222222\n1,2\n3,4\n5,6\n7,8\n"; + constexpr int kHeaderLineLen = 4; + constexpr int kLargestLineLen = 25; + + auto data = std::make_shared(kData); + auto input = std::make_shared(data); + // Sanity check that a large enough block size reads the data successfully + ASSERT_OK_AND_ASSIGN( + auto reader, reader_factory(input, ParseOptions::Defaults(), /*block_size=*/{})); + ASSERT_FINISHES_OK(reader->ReadAsync()); + + for (int block_size = kHeaderLineLen; block_size < kLargestLineLen / 2; ++block_size) { + input = std::make_shared(data); + ASSERT_OK_AND_ASSIGN(auto reader, + reader_factory(input, ParseOptions::Defaults(), block_size)); + ASSERT_FINISHES_AND_RAISES(Invalid, reader->ReadAsync()); + reader.reset(); + } + // This could deadlock in the TableReader destructor if it is not able to recover + // gracefully after an error (GH-48741). Alternatively, the destructor could + // deadlock at process shutdown when the thread pool is reaped. + internal::GetCpuThreadPool()->WaitForIdle(); +} + void StressTableReader(TableReaderFactory reader_factory) { #ifdef ARROW_VALGRIND const int NTASKS = 10; @@ -114,7 +146,8 @@ void StressTableReader(TableReaderFactory reader_factory) { std::vector>> task_futures(NTASKS); for (int i = 0; i < NTASKS; i++) { auto input = std::make_shared(table_buffer); - ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input, ParseOptions::Defaults())); + ASSERT_OK_AND_ASSIGN( + auto reader, reader_factory(input, ParseOptions::Defaults(), /*block_size=*/{})); task_futures[i] = reader->ReadAsync(); } auto combined_future = All(task_futures); @@ -143,7 +176,8 @@ void StressInvalidTableReader(TableReaderFactory reader_factory) { std::vector>> task_futures(NTASKS); for (int i = 0; i < NTASKS; i++) { auto input = std::make_shared(table_buffer); - ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input, ParseOptions::Defaults())); + ASSERT_OK_AND_ASSIGN( + auto reader, reader_factory(input, ParseOptions::Defaults(), /*block_size=*/{})); task_futures[i] = reader->ReadAsync(); } auto combined_future = All(task_futures); @@ -161,7 +195,8 @@ void TestNestedParallelism(std::shared_ptr thread_pool, const int NROWS = 1000; ASSERT_OK_AND_ASSIGN(auto table_buffer, MakeSampleCsvBuffer(NROWS)); auto input = std::make_shared(table_buffer); - ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input, ParseOptions::Defaults())); + ASSERT_OK_AND_ASSIGN( + auto reader, reader_factory(input, ParseOptions::Defaults(), /*block_size=*/{})); Future> table_future; @@ -200,16 +235,18 @@ void TestInvalidRowsSkipped(TableReaderFactory reader_factory, bool async) { return (row_num + 1) % static_cast(INVALID_EVERY) != 0; })); auto input = std::make_shared(table_buffer); - ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input, std::move(opts))); + ASSERT_OK_AND_ASSIGN(auto reader, + reader_factory(input, std::move(opts), /*block_size=*/{})); ASSERT_OK_AND_ASSIGN(auto table, reader->Read()); ASSERT_EQ(NROWS - NINVALID, table->num_rows()); ASSERT_EQ(NINVALID, num_invalid_rows); } TableReaderFactory MakeSerialFactory() { - return [](std::shared_ptr input_stream, ParseOptions parse_options) { + return [](std::shared_ptr input_stream, ParseOptions parse_options, + std::optional block_size) { auto read_options = ReadOptions::Defaults(); - read_options.block_size = 1 << 10; + read_options.block_size = block_size.value_or(1 << 10); read_options.use_threads = false; return TableReader::Make(io::default_io_context(), input_stream, read_options, std::move(parse_options), ConvertOptions::Defaults()); @@ -218,6 +255,7 @@ TableReaderFactory MakeSerialFactory() { TEST(SerialReaderTests, Empty) { TestEmptyTable(MakeSerialFactory()); } TEST(SerialReaderTests, HeaderOnly) { TestHeaderOnly(MakeSerialFactory()); } +TEST(SerialReaderTests, Straddling) { TestStraddling(MakeSerialFactory()); } TEST(SerialReaderTests, Stress) { StressTableReader(MakeSerialFactory()); } TEST(SerialReaderTests, StressInvalid) { StressInvalidTableReader(MakeSerialFactory()); } TEST(SerialReaderTests, NestedParallelism) { @@ -234,11 +272,11 @@ Result MakeAsyncFactory( ARROW_ASSIGN_OR_RAISE(thread_pool, internal::ThreadPool::Make(1)); } return [thread_pool]( - std::shared_ptr input_stream, - ParseOptions parse_options) -> Result> { + std::shared_ptr input_stream, ParseOptions parse_options, + std::optional block_size) -> Result> { ReadOptions read_options = ReadOptions::Defaults(); read_options.use_threads = true; - read_options.block_size = 1 << 10; + read_options.block_size = block_size.value_or(1 << 10); auto table_reader = TableReader::Make(io::IOContext(thread_pool.get()), input_stream, read_options, std::move(parse_options), ConvertOptions::Defaults()); @@ -254,6 +292,10 @@ TEST(AsyncReaderTests, HeaderOnly) { ASSERT_OK_AND_ASSIGN(auto table_factory, MakeAsyncFactory()); TestHeaderOnly(table_factory); } +TEST(AsyncReaderTests, Straddling) { + ASSERT_OK_AND_ASSIGN(auto table_factory, MakeAsyncFactory()); + TestStraddling(table_factory); +} TEST(AsyncReaderTests, Stress) { ASSERT_OK_AND_ASSIGN(auto table_factory, MakeAsyncFactory()); StressTableReader(table_factory); @@ -274,10 +316,10 @@ TEST(AsyncReaderTests, InvalidRowsSkipped) { TableReaderFactory MakeStreamingFactory(bool use_threads = true) { return [use_threads]( - std::shared_ptr input_stream, - ParseOptions parse_options) -> Result> { + std::shared_ptr input_stream, ParseOptions parse_options, + std::optional block_size) -> Result> { auto read_options = ReadOptions::Defaults(); - read_options.block_size = 1 << 10; + read_options.block_size = block_size.value_or(1 << 10); read_options.use_threads = use_threads; ARROW_ASSIGN_OR_RAISE( auto streaming_reader, @@ -303,6 +345,7 @@ TEST(StreamingReaderTests, HeaderOnly) { ASSERT_OK_AND_ASSIGN(auto table_factory, MakeStreamingReaderFactory()); TestHeaderOnlyStreaming(table_factory); } +TEST(StreamingReaderTests, Straddling) { TestStraddling(MakeStreamingFactory()); } TEST(StreamingReaderTests, Stress) { StressTableReader(MakeStreamingFactory()); } TEST(StreamingReaderTests, StressInvalid) { StressInvalidTableReader(MakeStreamingFactory()); diff --git a/testing b/testing index da559b54125..19dda67f485 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit da559b5412551c490a97c67f6779fcefd4352aff +Subproject commit 19dda67f485ffb3ffa92f4c6fa083576ef052d58