diff --git a/cpp/src/arrow/csv/chunker.cc b/cpp/src/arrow/csv/chunker.cc index 705451fccf5f..2ba097717691 100644 --- a/cpp/src/arrow/csv/chunker.cc +++ b/cpp/src/arrow/csv/chunker.cc @@ -89,21 +89,21 @@ class Lexer { case AT_ESCAPE: // will never reach here if escaping = false // just to hint the compiler to remove dead code - if (!SpecializedOptions::escaping) return nullptr; + if constexpr (!SpecializedOptions::escaping) return nullptr; goto AtEscape; case IN_QUOTED_FIELD: - if (!SpecializedOptions::quoting) return nullptr; + if constexpr (!SpecializedOptions::quoting) return nullptr; goto InQuotedField; case AT_QUOTED_QUOTE: - if (!SpecializedOptions::quoting) return nullptr; + if constexpr (!SpecializedOptions::quoting) return nullptr; goto AtQuotedQuote; case AT_QUOTED_ESCAPE: - if (!SpecializedOptions::quoting) return nullptr; + if constexpr (!SpecializedOptions::quoting) return nullptr; goto AtQuotedEscape; } FieldStart: - if (!SpecializedOptions::quoting) { + if constexpr (!SpecializedOptions::quoting) { goto InField; } else { // At the start of a field diff --git a/cpp/src/arrow/csv/fuzz.cc b/cpp/src/arrow/csv/fuzz.cc index 721673ae0d53..39148f1aacd6 100644 --- a/cpp/src/arrow/csv/fuzz.cc +++ b/cpp/src/arrow/csv/fuzz.cc @@ -16,6 +16,7 @@ // under the License. #include +#include #include #include @@ -25,17 +26,21 @@ #include "arrow/io/memory.h" #include "arrow/status.h" #include "arrow/table.h" +#include "arrow/util/fuzz_internal.h" #include "arrow/util/macros.h" #include "arrow/util/thread_pool.h" namespace arrow::csv { +using ::arrow::internal::GetCpuThreadPool; +using ::arrow::io::InputStream; + Status FuzzCsvReader(const uint8_t* data, int64_t size) { // Since the Fuzz-allocated data is not owned, any task that outlives the TableReader // may try to read memory that has been deallocated. Hence we wait for all pending // tasks to end before leaving. struct TaskGuard { - ~TaskGuard() { ::arrow::internal::GetCpuThreadPool()->WaitForIdle(); } + ~TaskGuard() { GetCpuThreadPool()->WaitForIdle(); } }; auto io_context = arrow::io::default_io_context(); @@ -51,25 +56,71 @@ Status FuzzCsvReader(const uint8_t* data, int64_t size) { // mix of dict-encoded and non-dict-encoded columns when reading. convert_options.auto_dict_max_cardinality = 50; - auto input_stream = - std::make_shared<::arrow::io::BufferReader>(std::make_shared(data, size)); + // TODO should we also test non-inferring table read? - // TODO test other reader types - { + auto read_table_serial = [=](std::shared_ptr input) mutable -> Status { + read_options.use_threads = false; ARROW_ASSIGN_OR_RAISE(auto table_reader, - TableReader::Make(io_context, input_stream, read_options, + TableReader::Make(io_context, input, read_options, parse_options, convert_options)); - TaskGuard task_guard; ARROW_ASSIGN_OR_RAISE(auto table, table_reader->Read()); - RETURN_NOT_OK(table->ValidateFull()); + return table->ValidateFull(); + }; + + auto read_table_threaded = [=](std::shared_ptr input) mutable -> Status { + read_options.use_threads = true; + ARROW_ASSIGN_OR_RAISE(auto table_reader, + TableReader::Make(io_context, input, read_options, + parse_options, convert_options)); + ARROW_ASSIGN_OR_RAISE(auto table, table_reader->Read()); + return table->ValidateFull(); + }; + + auto read_streaming = [=](std::shared_ptr input) mutable -> Status { + read_options.use_threads = true; + ARROW_ASSIGN_OR_RAISE( + auto reader, StreamingReader::Make(io_context, input, read_options, parse_options, + convert_options)); + ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable()); + return table->ValidateFull(); + }; + + auto count_rows = [=](std::shared_ptr input) mutable -> Status { + read_options.use_threads = true; + parse_options.newlines_in_values = false; + auto fut = CountRowsAsync(io_context, input, GetCpuThreadPool(), read_options, + parse_options); + return fut.status(); + }; + + auto count_rows_allow_newlines = + [=](std::shared_ptr input) mutable -> Status { + read_options.use_threads = true; + parse_options.newlines_in_values = true; + auto fut = CountRowsAsync(io_context, input, GetCpuThreadPool(), read_options, + parse_options); + return fut.status(); + }; + + using ReadFunc = decltype(std::function(read_table_serial)); + + // Test all reader types regardless of outcome + Status st; + for (auto read_func : {ReadFunc(read_table_serial), ReadFunc(read_table_threaded), + ReadFunc(read_streaming), ReadFunc(count_rows), + ReadFunc(count_rows_allow_newlines)}) { + auto input = + std::make_shared<::arrow::io::BufferReader>(std::make_shared(data, size)); + TaskGuard task_guard; + st &= read_func(input); } - return Status::OK(); + return st; } } // namespace arrow::csv extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { auto status = arrow::csv::FuzzCsvReader(data, static_cast(size)); - ARROW_UNUSED(status); + arrow::internal::LogFuzzStatus(status, data, size); return 0; }