Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cpp/src/arrow/csv/chunker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,21 @@ class Lexer {
case AT_ESCAPE:
// will never reach here if escaping = false
// just to hint the compiler to remove dead code
if (!SpecializedOptions::escaping) return nullptr;
if constexpr (!SpecializedOptions::escaping) return nullptr;
goto AtEscape;
case IN_QUOTED_FIELD:
if (!SpecializedOptions::quoting) return nullptr;
if constexpr (!SpecializedOptions::quoting) return nullptr;
goto InQuotedField;
case AT_QUOTED_QUOTE:
if (!SpecializedOptions::quoting) return nullptr;
if constexpr (!SpecializedOptions::quoting) return nullptr;
goto AtQuotedQuote;
case AT_QUOTED_ESCAPE:
if (!SpecializedOptions::quoting) return nullptr;
if constexpr (!SpecializedOptions::quoting) return nullptr;
goto AtQuotedEscape;
}

FieldStart:
if (!SpecializedOptions::quoting) {
if constexpr (!SpecializedOptions::quoting) {
goto InField;
} else {
// At the start of a field
Expand Down
71 changes: 61 additions & 10 deletions cpp/src/arrow/csv/fuzz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <cstdint>
#include <functional>
#include <memory>
#include <optional>

Expand All @@ -25,17 +26,21 @@
#include "arrow/io/memory.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/util/fuzz_internal.h"
#include "arrow/util/macros.h"
#include "arrow/util/thread_pool.h"

namespace arrow::csv {

using ::arrow::internal::GetCpuThreadPool;
using ::arrow::io::InputStream;

Status FuzzCsvReader(const uint8_t* data, int64_t size) {
// Since the Fuzz-allocated data is not owned, any task that outlives the TableReader
// may try to read memory that has been deallocated. Hence we wait for all pending
// tasks to end before leaving.
struct TaskGuard {
~TaskGuard() { ::arrow::internal::GetCpuThreadPool()->WaitForIdle(); }
~TaskGuard() { GetCpuThreadPool()->WaitForIdle(); }
};

auto io_context = arrow::io::default_io_context();
Expand All @@ -51,25 +56,71 @@ Status FuzzCsvReader(const uint8_t* data, int64_t size) {
// mix of dict-encoded and non-dict-encoded columns when reading.
convert_options.auto_dict_max_cardinality = 50;

auto input_stream =
std::make_shared<::arrow::io::BufferReader>(std::make_shared<Buffer>(data, size));
// TODO should we also test non-inferring table read?

// TODO test other reader types
{
auto read_table_serial = [=](std::shared_ptr<InputStream> input) mutable -> Status {
read_options.use_threads = false;
ARROW_ASSIGN_OR_RAISE(auto table_reader,
TableReader::Make(io_context, input_stream, read_options,
TableReader::Make(io_context, input, read_options,
parse_options, convert_options));
TaskGuard task_guard;
ARROW_ASSIGN_OR_RAISE(auto table, table_reader->Read());
RETURN_NOT_OK(table->ValidateFull());
return table->ValidateFull();
};

auto read_table_threaded = [=](std::shared_ptr<InputStream> input) mutable -> Status {
read_options.use_threads = true;
ARROW_ASSIGN_OR_RAISE(auto table_reader,
TableReader::Make(io_context, input, read_options,
parse_options, convert_options));
ARROW_ASSIGN_OR_RAISE(auto table, table_reader->Read());
return table->ValidateFull();
};

auto read_streaming = [=](std::shared_ptr<InputStream> input) mutable -> Status {
read_options.use_threads = true;
ARROW_ASSIGN_OR_RAISE(
auto reader, StreamingReader::Make(io_context, input, read_options, parse_options,
convert_options));
ARROW_ASSIGN_OR_RAISE(auto table, reader->ToTable());
return table->ValidateFull();
};

auto count_rows = [=](std::shared_ptr<InputStream> input) mutable -> Status {
read_options.use_threads = true;
parse_options.newlines_in_values = false;
auto fut = CountRowsAsync(io_context, input, GetCpuThreadPool(), read_options,
parse_options);
return fut.status();
};

auto count_rows_allow_newlines =
[=](std::shared_ptr<InputStream> input) mutable -> Status {
read_options.use_threads = true;
parse_options.newlines_in_values = true;
auto fut = CountRowsAsync(io_context, input, GetCpuThreadPool(), read_options,
parse_options);
return fut.status();
};

using ReadFunc = decltype(std::function(read_table_serial));

// Test all reader types regardless of outcome
Status st;
for (auto read_func : {ReadFunc(read_table_serial), ReadFunc(read_table_threaded),
ReadFunc(read_streaming), ReadFunc(count_rows),
ReadFunc(count_rows_allow_newlines)}) {
auto input =
std::make_shared<::arrow::io::BufferReader>(std::make_shared<Buffer>(data, size));
TaskGuard task_guard;
st &= read_func(input);
}
return Status::OK();
return st;
}

} // namespace arrow::csv

extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
auto status = arrow::csv::FuzzCsvReader(data, static_cast<int64_t>(size));
ARROW_UNUSED(status);
arrow::internal::LogFuzzStatus(status, data, size);
return 0;
}
Loading