1616// under the License.
1717
1818#include < cstdint>
19+ #include < functional>
1920#include < memory>
2021#include < optional>
2122
2526#include " arrow/io/memory.h"
2627#include " arrow/status.h"
2728#include " arrow/table.h"
29+ #include " arrow/util/fuzz_internal.h"
2830#include " arrow/util/macros.h"
2931#include " arrow/util/thread_pool.h"
3032
3133namespace arrow ::csv {
3234
35+ using ::arrow::internal::GetCpuThreadPool;
36+ using ::arrow::io::InputStream;
37+
3338Status FuzzCsvReader (const uint8_t * data, int64_t size) {
3439 // Since the Fuzz-allocated data is not owned, any task that outlives the TableReader
3540 // may try to read memory that has been deallocated. Hence we wait for all pending
3641 // tasks to end before leaving.
3742 struct TaskGuard {
38- ~TaskGuard () { :: arrow::internal:: GetCpuThreadPool ()->WaitForIdle (); }
43+ ~TaskGuard () { GetCpuThreadPool ()->WaitForIdle (); }
3944 };
4045
4146 auto io_context = arrow::io::default_io_context ();
@@ -51,25 +56,71 @@ Status FuzzCsvReader(const uint8_t* data, int64_t size) {
5156 // mix of dict-encoded and non-dict-encoded columns when reading.
5257 convert_options.auto_dict_max_cardinality = 50 ;
5358
54- auto input_stream =
55- std::make_shared<::arrow::io::BufferReader>(std::make_shared<Buffer>(data, size));
59+ // TODO should we also test non-inferring table read?
5660
57- // TODO test other reader types
58- {
61+ auto read_table_serial = [=](std::shared_ptr<InputStream> input) mutable -> Status {
62+ read_options. use_threads = false ;
5963 ARROW_ASSIGN_OR_RAISE (auto table_reader,
60- TableReader::Make (io_context, input_stream , read_options,
64+ TableReader::Make (io_context, input , read_options,
6165 parse_options, convert_options));
62- TaskGuard task_guard;
6366 ARROW_ASSIGN_OR_RAISE (auto table, table_reader->Read ());
64- RETURN_NOT_OK (table->ValidateFull ());
67+ return table->ValidateFull ();
68+ };
69+
70+ auto read_table_threaded = [=](std::shared_ptr<InputStream> input) mutable -> Status {
71+ read_options.use_threads = true ;
72+ ARROW_ASSIGN_OR_RAISE (auto table_reader,
73+ TableReader::Make (io_context, input, read_options,
74+ parse_options, convert_options));
75+ ARROW_ASSIGN_OR_RAISE (auto table, table_reader->Read ());
76+ return table->ValidateFull ();
77+ };
78+
79+ auto read_streaming = [=](std::shared_ptr<InputStream> input) mutable -> Status {
80+ read_options.use_threads = true ;
81+ ARROW_ASSIGN_OR_RAISE (
82+ auto reader, StreamingReader::Make (io_context, input, read_options, parse_options,
83+ convert_options));
84+ ARROW_ASSIGN_OR_RAISE (auto table, reader->ToTable ());
85+ return table->ValidateFull ();
86+ };
87+
88+ auto count_rows = [=](std::shared_ptr<InputStream> input) mutable -> Status {
89+ read_options.use_threads = true ;
90+ parse_options.newlines_in_values = false ;
91+ auto fut = CountRowsAsync (io_context, input, GetCpuThreadPool (), read_options,
92+ parse_options);
93+ return fut.status ();
94+ };
95+
96+ auto count_rows_allow_newlines =
97+ [=](std::shared_ptr<InputStream> input) mutable -> Status {
98+ read_options.use_threads = true ;
99+ parse_options.newlines_in_values = true ;
100+ auto fut = CountRowsAsync (io_context, input, GetCpuThreadPool (), read_options,
101+ parse_options);
102+ return fut.status ();
103+ };
104+
105+ using ReadFunc = decltype (std::function (read_table_serial));
106+
107+ // Test all reader types regardless of outcome
108+ Status st;
109+ for (auto read_func : {ReadFunc (read_table_serial), ReadFunc (read_table_threaded),
110+ ReadFunc (read_streaming), ReadFunc (count_rows),
111+ ReadFunc (count_rows_allow_newlines)}) {
112+ auto input =
113+ std::make_shared<::arrow::io::BufferReader>(std::make_shared<Buffer>(data, size));
114+ TaskGuard task_guard;
115+ st &= read_func (input);
65116 }
66- return Status::OK () ;
117+ return st ;
67118}
68119
69120} // namespace arrow::csv
70121
71122extern " C" int LLVMFuzzerTestOneInput (const uint8_t * data, size_t size) {
72123 auto status = arrow::csv::FuzzCsvReader (data, static_cast <int64_t >(size));
73- ARROW_UNUSED (status);
124+ arrow::internal::LogFuzzStatus (status, data, size );
74125 return 0 ;
75126}
0 commit comments