Skip to content

Commit f3acf1a

Browse files
committed
Support for nested types non-key fields
1 parent ed354c0 commit f3acf1a

File tree

3 files changed

+103
-17
lines changed

3 files changed

+103
-17
lines changed

cpp/src/arrow/acero/asof_join_node.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,6 +1229,9 @@ class AsofJoinNode : public ExecNode {
12291229
case Type::LARGE_STRING:
12301230
case Type::BINARY:
12311231
case Type::LARGE_BINARY:
1232+
case Type::LIST:
1233+
case Type::FIXED_SIZE_LIST:
1234+
case Type::STRUCT:
12321235
return Status::OK();
12331236
default:
12341237
return Status::Invalid("Unsupported type for data field ", field->name(), " : ",

cpp/src/arrow/acero/asof_join_node_test.cc

Lines changed: 96 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,14 +1294,6 @@ TRACED_TEST(AsofJoinTest, TestUnsupportedByType, {
12941294
field("r0_v0", float32())}));
12951295
})
12961296

1297-
TRACED_TEST(AsofJoinTest, TestUnsupportedDatatype, {
1298-
// List is unsupported
1299-
DoRunInvalidTypeTest(
1300-
schema({field("time", int64()), field("key", int32()), field("l_v0", float64())}),
1301-
schema({field("time", int64()), field("key", int32()),
1302-
field("r0_v0", list(int32()))}));
1303-
})
1304-
13051297
TRACED_TEST(AsofJoinTest, TestMissingKeys, {
13061298
DoRunMissingKeysTest(
13071299
schema({field("time1", int64()), field("key", int32()), field("l_v0", float64())}),
@@ -1824,5 +1816,101 @@ TEST(AsofJoinTest, OneSideTsAllGreaterThanTheOther) {
18241816
}
18251817
}
18261818

1819+
// GH-44729: Testing nested data type for non-key fields
1820+
TEST(AsofJoinTest, FixedListDataType) {
1821+
const int32_t list_size = 3;
1822+
auto list_type = arrow::fixed_size_list(arrow::int32(), list_size);
1823+
1824+
auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])");
1825+
auto right_batch = ExecBatchFromJSON({list_type, int64()}, R"([
1826+
[[0, 1, 2], 2],
1827+
[[3, 4, 5], 3],
1828+
[[6, 7, 8], 4]
1829+
])");
1830+
1831+
Declaration left{"exec_batch_source",
1832+
ExecBatchSourceNodeOptions(schema({field("on", int64())}),
1833+
{std::move(left_batch)})};
1834+
Declaration right{"exec_batch_source",
1835+
ExecBatchSourceNodeOptions(
1836+
schema({field("colVals", list_type), field("on", int64())}),
1837+
{std::move(right_batch)})};
1838+
1839+
AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1);
1840+
Declaration asof_join{
1841+
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)};
1842+
1843+
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join)));
1844+
1845+
auto exp_batch = ExecBatchFromJSON({int64(), list_type}, R"([
1846+
[1, [0, 1, 2]],
1847+
[2, [0, 1, 2]],
1848+
[3, [3, 4, 5]]
1849+
])");
1850+
1851+
AssertExecBatchesEqual(result.schema, {exp_batch}, result.batches);
1852+
}
1853+
1854+
TEST(AsofJoinTest, ListDataType) {
1855+
auto list_type = list(int32());
1856+
1857+
auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])");
1858+
auto right_batch = ExecBatchFromJSON({list_type, int64()}, R"([
1859+
[[0, 1, 2, 9], 2],
1860+
[[3, 4, 5, 7], 3],
1861+
[[6, 7, 8], 4]
1862+
])");
1863+
1864+
Declaration left{"exec_batch_source",
1865+
ExecBatchSourceNodeOptions(schema({field("on", int64())}),
1866+
{std::move(left_batch)})};
1867+
Declaration right{"exec_batch_source",
1868+
ExecBatchSourceNodeOptions(
1869+
schema({field("colVals", list_type), field("on", int64())}),
1870+
{std::move(right_batch)})};
1871+
1872+
AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1);
1873+
Declaration asof_join{
1874+
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)};
1875+
1876+
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join)));
1877+
auto exp_batch = ExecBatchFromJSON({int64(), list_type}, R"([
1878+
[1, [0, 1, 2, 9]],
1879+
[2, [0, 1, 2, 9]],
1880+
[3, [3, 4, 5, 7]]
1881+
])");
1882+
1883+
AssertExecBatchesEqual(result.schema, {exp_batch}, result.batches);
1884+
}
1885+
1886+
TEST(AsofJoinTest, StructTestDataType) {
1887+
auto struct_type = struct_({field("key", utf8()), field("value", int64())});
1888+
1889+
auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])");
1890+
auto right_batch = ExecBatchFromJSON({struct_type, int64()}, R"([
1891+
[{"key": "a", "value": 1}, 2],
1892+
[{"key": "b", "value": 3}, 3],
1893+
[{"key": "c", "value": 5}, 4]
1894+
])");
1895+
1896+
Declaration left{"exec_batch_source",
1897+
ExecBatchSourceNodeOptions(schema({field("on", int64())}),
1898+
{std::move(left_batch)})};
1899+
Declaration right{"exec_batch_source",
1900+
ExecBatchSourceNodeOptions(
1901+
schema({field("col", struct_type), field("on", int64())}),
1902+
{std::move(right_batch)})};
1903+
AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1);
1904+
Declaration asof_join{
1905+
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)};
1906+
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join)));
1907+
1908+
auto exp_batch = ExecBatchFromJSON({int64(), struct_type}, R"([
1909+
[1, {"key": "a", "value": 1}],
1910+
[2, {"key": "a", "value": 1}],
1911+
[3, {"key": "b", "value": 3}]
1912+
])");
1913+
AssertExecBatchesEqual(result.schema, {exp_batch}, result.batches);
1914+
}
18271915
} // namespace acero
18281916
} // namespace arrow

cpp/src/arrow/acero/unmaterialized_table_internal.h

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ class UnmaterializedCompositeTable {
114114
MATERIALIZE_CASE(BINARY)
115115
MATERIALIZE_CASE(LARGE_BINARY)
116116
MATERIALIZE_CASE(FIXED_SIZE_LIST)
117+
MATERIALIZE_CASE(LIST)
118+
MATERIALIZE_CASE(STRUCT)
117119
default:
118120
return arrow::Status::Invalid("Unsupported data type ",
119121
field->type()->ToString(), " for field ",
@@ -167,8 +169,6 @@ class UnmaterializedCompositeTable {
167169
num_rows += slice.Size();
168170
}
169171

170-
171-
172172
template <class Type, class Builder = typename arrow::TypeTraits<Type>::BuilderType>
173173
arrow::Result<std::shared_ptr<arrow::Array>> materializeColumn(
174174
const std::shared_ptr<arrow::DataType>& type, int i_col) {
@@ -181,13 +181,8 @@ class UnmaterializedCompositeTable {
181181
for (const auto& unmaterialized_slice : slices) {
182182
const auto& [batch, start, end] = unmaterialized_slice.components[table_index];
183183
if (batch) {
184-
ARROW_RETURN_NOT_OK(builder.AppendArraySlice(*batch->column_data(column_index),start,end-start));
185-
186-
// for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
187-
// arrow::Status st = BuilderAppend<Type, Builder>(
188-
// builder, batch->column_data(column_index), rowNum);
189-
// ARROW_RETURN_NOT_OK(st);
190-
// }
184+
ARROW_RETURN_NOT_OK(builder.AppendArraySlice(*batch->column_data(column_index),
185+
start, end - start));
191186
} else {
192187
for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
193188
ARROW_RETURN_NOT_OK(builder.AppendNull());

0 commit comments

Comments
 (0)