Skip to content

Commit c6ecd4c

Browse files
committed
[C++][Dataset] Add ORC predicate pushdown tests for all-null and filesystem sources
1 parent a2f6f94 commit c6ecd4c

File tree

1 file changed

+99
-0
lines changed

1 file changed

+99
-0
lines changed

cpp/src/arrow/dataset/file_orc_test.cc

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@
1717

1818
#include "arrow/dataset/file_orc.h"
1919

20+
#include <algorithm>
2021
#include <memory>
2122
#include <utility>
2223

2324
#include "arrow/adapters/orc/adapter.h"
25+
#include "arrow/array/builder_primitive.h"
26+
#include "arrow/compute/api.h"
2427
#include "arrow/dataset/dataset_internal.h"
2528
#include "arrow/dataset/discovery.h"
2629
#include "arrow/dataset/file_base.h"
2730
#include "arrow/dataset/partition.h"
31+
#include "arrow/filesystem/mockfs.h"
2832
#include "arrow/dataset/test_util_internal.h"
2933
#include "arrow/io/memory.h"
3034
#include "arrow/record_batch.h"
@@ -88,6 +92,101 @@ TEST_P(TestOrcFileFormatScan, ScanRecordBatchReaderWithDuplicateColumnError) {
8892
TestScanWithDuplicateColumnError();
8993
}
9094
TEST_P(TestOrcFileFormatScan, ScanWithPushdownNulls) { TestScanWithPushdownNulls(); }
95+
96+
TEST_P(TestOrcFileFormatScan, PredicatePushdownAllNullStripes) {
97+
auto value_field = field("i64", int64());
98+
const auto test_schema = schema({value_field});
99+
100+
const int64_t nrows = 2048;
101+
Int64Builder null_builder;
102+
ASSERT_OK(null_builder.AppendNulls(nrows));
103+
ASSERT_OK_AND_ASSIGN(auto all_null_values, null_builder.Finish());
104+
105+
Int64Builder value_builder;
106+
for (int64_t i = 0; i < nrows; ++i) {
107+
ASSERT_OK(value_builder.Append(i));
108+
}
109+
ASSERT_OK_AND_ASSIGN(auto non_null_values, value_builder.Finish());
110+
111+
auto all_null_batch = RecordBatch::Make(test_schema, nrows, {all_null_values});
112+
auto non_null_batch = RecordBatch::Make(test_schema, nrows, {non_null_values});
113+
114+
ASSERT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create());
115+
adapters::orc::WriteOptions write_options;
116+
write_options.stripe_size = 4096;
117+
ASSERT_OK_AND_ASSIGN(auto writer,
118+
adapters::orc::ORCFileWriter::Open(sink.get(), write_options));
119+
ASSERT_OK(writer->Write(*all_null_batch));
120+
ASSERT_OK(writer->Write(*non_null_batch));
121+
ASSERT_OK(writer->Close());
122+
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
123+
124+
FileSource source(buffer);
125+
ASSERT_OK_AND_ASSIGN(auto fragment_base, format_->MakeFragment(source, literal(true)));
126+
auto orc_fragment = checked_pointer_cast<OrcFileFragment>(fragment_base);
127+
128+
ASSERT_OK_AND_ASSIGN(auto input, source.Open());
129+
ASSERT_OK_AND_ASSIGN(auto reader,
130+
adapters::orc::ORCFileReader::Open(std::move(input),
131+
default_memory_pool()));
132+
133+
std::vector<int> all_null_stripes;
134+
std::vector<int> non_all_null_stripes;
135+
const int64_t num_stripes = reader->NumberOfStripes();
136+
for (int64_t stripe = 0; stripe < num_stripes; ++stripe) {
137+
ASSERT_OK_AND_ASSIGN(auto stripe_stats, reader->GetStripeStatistics(stripe));
138+
const auto* col_stats = stripe_stats->getColumnStatistics(1);
139+
ASSERT_NE(col_stats, nullptr);
140+
141+
if (col_stats->hasNull() && col_stats->getNumberOfValues() == 0) {
142+
all_null_stripes.push_back(static_cast<int>(stripe));
143+
} else {
144+
non_all_null_stripes.push_back(static_cast<int>(stripe));
145+
}
146+
}
147+
ASSERT_FALSE(all_null_stripes.empty());
148+
ASSERT_FALSE(non_all_null_stripes.empty());
149+
150+
ASSERT_OK_AND_ASSIGN(
151+
auto is_null_selected,
152+
orc_fragment->FilterStripes(compute::is_null(compute::field_ref("i64"))));
153+
for (int stripe : all_null_stripes) {
154+
EXPECT_NE(std::find(is_null_selected.begin(), is_null_selected.end(), stripe),
155+
is_null_selected.end());
156+
}
157+
158+
ASSERT_OK_AND_ASSIGN(
159+
auto is_not_null_selected,
160+
orc_fragment->FilterStripes(
161+
compute::not_(compute::is_null(compute::field_ref("i64")))));
162+
for (int stripe : all_null_stripes) {
163+
EXPECT_EQ(std::find(is_not_null_selected.begin(), is_not_null_selected.end(), stripe),
164+
is_not_null_selected.end());
165+
}
166+
}
167+
168+
TEST_P(TestOrcFileFormatScan, PredicatePushdownWithFileSystemSource) {
169+
auto mock_fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
170+
std::shared_ptr<Schema> test_schema = schema({field("x", int64())});
171+
std::shared_ptr<RecordBatch> batch = RecordBatchFromJSON(test_schema, "[[0], [1], [2]]");
172+
173+
ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::OutputStream> out_stream,
174+
mock_fs->OpenOutputStream("/foo.orc"));
175+
ASSERT_OK_AND_ASSIGN(auto writer, adapters::orc::ORCFileWriter::Open(out_stream.get()));
176+
ASSERT_OK(writer->Write(*batch));
177+
ASSERT_OK(writer->Close());
178+
179+
FileSource source("/foo.orc", mock_fs);
180+
ASSERT_OK_AND_ASSIGN(auto fragment_base, format_->MakeFragment(source, literal(true)));
181+
auto orc_fragment = checked_pointer_cast<OrcFileFragment>(fragment_base);
182+
183+
ASSERT_OK_AND_ASSIGN(
184+
auto stripes,
185+
orc_fragment->FilterStripes(compute::greater(compute::field_ref("x"),
186+
compute::literal(int64_t{1}))));
187+
ASSERT_FALSE(stripes.empty());
188+
}
189+
91190
INSTANTIATE_TEST_SUITE_P(TestScan, TestOrcFileFormatScan,
92191
::testing::ValuesIn(TestFormatParams::Values()),
93192
TestFormatParams::ToTestNameString);

0 commit comments

Comments
 (0)