Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 334db27

Browse files
committed
Support fixed length arrays fetch in ResultSetRegistry.
Signed-off-by: ienkovich <[email protected]>
1 parent 15046f0 commit 334db27

File tree

5 files changed

+163
-17
lines changed

5 files changed

+163
-17
lines changed

omniscidb/ResultSet/ResultSet.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -890,14 +890,16 @@ bool ResultSet::isDirectColumnarConversionPossible() const {
890890
bool ResultSet::isZeroCopyColumnarConversionPossible(size_t column_idx) const {
891891
return query_mem_desc_.didOutputColumnar() &&
892892
query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection &&
893-
!colType(column_idx)->isVarLen() && appended_storage_.empty() && storage_ &&
893+
!colType(column_idx)->isVarLen() && !colType(column_idx)->isArray() &&
894+
appended_storage_.empty() && storage_ &&
894895
(lazy_fetch_info_.empty() || !lazy_fetch_info_[column_idx].is_lazily_fetched);
895896
}
896897

897898
bool ResultSet::isChunkedZeroCopyColumnarConversionPossible(size_t column_idx) const {
898899
return query_mem_desc_.didOutputColumnar() &&
899900
query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection &&
900-
!colType(column_idx)->isVarLen() && storage_ &&
901+
!colType(column_idx)->isVarLen() && !colType(column_idx)->isArray() &&
902+
storage_ &&
901903
(lazy_fetch_info_.empty() || !lazy_fetch_info_[column_idx].is_lazily_fetched);
902904
}
903905

omniscidb/ResultSetRegistry/ColumnarResults.cpp

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,6 @@ ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_
5959
auto timer = DEBUG_TIMER(__func__);
6060
column_buffers_.resize(num_columns);
6161
for (size_t i = 0; i < num_columns; ++i) {
62-
if (target_types[i]->isFixedLenArray()) {
63-
throw ColumnarConversionNotSupported();
64-
}
6562
if (target_types[i]->isVarLen()) {
6663
// Allocate and fill offsets buffer.
6764
offset_buffers_.resize(num_columns, nullptr);
@@ -270,21 +267,84 @@ inline void ColumnarResults::writeBackCell(const TargetValue& col_val,
270267
}
271268
};
272269

270+
auto write_arr_null_value = [this, column_idx](size_t row_idx,
271+
const hdk::ir::Type* type) {
272+
switch (type->id()) {
273+
case hdk::ir::Type::kBoolean:
274+
((int8_t*)column_buffers_[column_idx])[row_idx] =
275+
inline_null_array_value<int8_t>();
276+
break;
277+
case hdk::ir::Type::kInteger:
278+
case hdk::ir::Type::kDecimal:
279+
case hdk::ir::Type::kTimestamp:
280+
case hdk::ir::Type::kExtDictionary:
281+
case hdk::ir::Type::kTime:
282+
case hdk::ir::Type::kDate:
283+
case hdk::ir::Type::kInterval:
284+
switch (type->size()) {
285+
case 1:
286+
((int8_t*)column_buffers_[column_idx])[row_idx] =
287+
inline_null_array_value<int8_t>();
288+
break;
289+
case 2:
290+
((int16_t*)column_buffers_[column_idx])[row_idx] =
291+
inline_null_array_value<int16_t>();
292+
break;
293+
case 4:
294+
((int32_t*)column_buffers_[column_idx])[row_idx] =
295+
inline_null_array_value<int32_t>();
296+
break;
297+
case 8:
298+
((int64_t*)column_buffers_[column_idx])[row_idx] =
299+
inline_null_array_value<int64_t>();
300+
break;
301+
default:
302+
CHECK(false);
303+
}
304+
break;
305+
case hdk::ir::Type::kFloatingPoint:
306+
switch (type->size()) {
307+
case 4:
308+
((float*)column_buffers_[column_idx])[row_idx] =
309+
inline_null_array_value<float>();
310+
break;
311+
case 8:
312+
((double*)column_buffers_[column_idx])[row_idx] =
313+
inline_null_array_value<double>();
314+
break;
315+
default:
316+
CHECK(false);
317+
}
318+
break;
319+
default:
320+
throw ColumnarConversionNotSupported();
321+
}
322+
};
323+
273324
auto type = target_types_[column_idx];
274325
const auto scalar_col_val = boost::get<ScalarTargetValue>(&col_val);
275326
if (scalar_col_val) {
276327
write_scalar(scalar_col_val, row_idx, type);
277328
} else {
278329
const auto arr_col_val = boost::get<ArrayTargetValue>(&col_val);
279330
CHECK(arr_col_val);
280-
CHECK(type->isVarLenArray());
281331
if (*arr_col_val) {
282332
auto elem_type = type->as<hdk::ir::ArrayBaseType>()->elemType();
283-
auto offsets = reinterpret_cast<int32_t*>(offset_buffers_[column_idx]);
284-
size_t offset = static_cast<size_t>(std::abs(offsets[row_idx])) / elem_type->size();
333+
size_t offset;
334+
if (type->isVarLenArray()) {
335+
auto offsets = reinterpret_cast<int32_t*>(offset_buffers_[column_idx]);
336+
offset = static_cast<size_t>(std::abs(offsets[row_idx])) / elem_type->size();
337+
} else {
338+
CHECK(type->isFixedLenArray());
339+
offset = row_idx * type->as<hdk::ir::FixedLenArrayType>()->numElems();
340+
}
285341
for (auto& elem_val : **arr_col_val) {
286342
write_scalar(&elem_val, offset++, elem_type);
287343
}
344+
// Put NULL sentinel value for fixed length array
345+
if (type->isFixedLenArray() && (*arr_col_val)->size() == 0) {
346+
write_arr_null_value(offset, elem_type);
347+
}
288348
}
289349
}
290350
}
@@ -388,7 +448,8 @@ void ColumnarResults::materializeAllColumnsProjection(const ResultSet& rows,
388448
/*
389449
* For all non-lazy columns, we can directly copy back the results of each column's
390450
* contents from different storages and put them into the corresponding output buffer.
391-
* This is not supported for varlen data, so it's handled as if it's lazily fetched.
451+
* This is not supported for varlen and array data, so it's handled as if it's lazily
452+
* fetched.
392453
*
393454
* This function is parallelized through assigning each column to a CPU thread.
394455
*/
@@ -399,7 +460,7 @@ void ColumnarResults::copyAllNonLazyColumns(
399460
CHECK(isDirectColumnarConversionPossible());
400461
const auto is_column_non_lazily_fetched = [this,
401462
&lazy_fetch_info](const size_t col_idx) {
402-
if (target_types_[col_idx]->isVarLen()) {
463+
if (target_types_[col_idx]->isVarLen() || target_types_[col_idx]->isArray()) {
403464
return false;
404465
}
405466
// Saman: make sure when this lazy_fetch_info is empty
@@ -461,7 +522,10 @@ void ColumnarResults::materializeAllLazyColumns(
461522

462523
// parallelized by assigning a chunk of rows to each thread)
463524
const bool skip_non_lazy_columns = rows.isPermutationBufferEmpty();
464-
if (rows.areAnyColumnsLazyFetched() || !offset_buffers_.empty()) {
525+
bool has_array = std::any_of(target_types_.begin(),
526+
target_types_.end(),
527+
[](const hdk::ir::Type* type) { return type->isArray(); });
528+
if (rows.areAnyColumnsLazyFetched() || !offset_buffers_.empty() || has_array) {
465529
const size_t worker_count =
466530
result_set::use_parallel_algorithms(rows) ? cpu_threads() : 1;
467531
std::vector<std::future<void>> conversion_threads;
@@ -473,7 +537,7 @@ void ColumnarResults::materializeAllLazyColumns(
473537
// we process lazy and varlen columns (i.e., skip non-lazy and non-varlen columns)
474538
targets_to_skip.push_back(
475539
(lazy_fetch_info.empty() || !lazy_fetch_info[i].is_lazily_fetched) &&
476-
!target_types_[i]->isVarLen());
540+
!target_types_[i]->isVarLen() && !target_types_[i]->isArray());
477541
}
478542
}
479543
size_t first = rows.getOffset();

omniscidb/ResultSetRegistry/ResultSetMetadata.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ ChunkMetadataMap synthesizeMetadata(const ResultSet* rows) {
9595
dummy_encoders.emplace_back();
9696
for (size_t i = 0; i < rows->colCount(); ++i) {
9797
const auto& col_type = rows->colType(i);
98-
if (col_type->isVarLenArray()) {
99-
// For varlen arrays we create encoders for elem type to avoid creating ArrayDatum
100-
// to update stats. This requires special handling for nulls when we update stats.
98+
if (col_type->isArray()) {
99+
// For arrays we create encoders for elem type to avoid creating ArrayDatum to
100+
// update stats. This requires special handling for nulls when we update stats.
101101
dummy_encoders.back().emplace_back(
102102
Encoder::Create(nullptr, col_type->as<hdk::ir::ArrayBaseType>()->elemType()));
103103
} else {
@@ -134,7 +134,15 @@ ChunkMetadataMap synthesizeMetadata(const ResultSet* rows) {
134134
for (auto& elem_val : **arr_col_val) {
135135
updateStats(&elem_val, dummy_encoders[i].get(), elem_type, true);
136136
}
137-
varlen_lengths[i] += (*arr_col_val)->size() * elem_type->size();
137+
if (col_type->isVarLenArray()) {
138+
varlen_lengths[i] += (*arr_col_val)->size() * elem_type->size();
139+
} else {
140+
// Emtpy value provided for fixed length array type is considered as NULL.
141+
CHECK(col_type->isFixedLenArray());
142+
if ((*arr_col_val)->empty()) {
143+
dummy_encoders[i]->updateStats((int64_t)0, true);
144+
}
145+
}
138146
} else {
139147
dummy_encoders[i]->updateStats((int64_t)0, true);
140148
}

omniscidb/ResultSetRegistry/ResultSetRegistry.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ ResultSetTableTokenPtr ResultSetRegistry::put(ResultSetTable table) {
107107
table.size());
108108
auto& first_rs = table.result(0);
109109
bool has_varlen = false;
110+
bool has_array = false;
110111
for (size_t col_idx = 0; col_idx < first_rs->colCount(); ++col_idx) {
111112
addColumnInfo(db_id_,
112113
table_id,
@@ -115,6 +116,7 @@ ResultSetTableTokenPtr ResultSetRegistry::put(ResultSetTable table) {
115116
first_rs->colType(col_idx),
116117
false);
117118
has_varlen = has_varlen || first_rs->colType(col_idx)->isVarLen();
119+
has_array = has_array || first_rs->colType(col_idx)->isArray();
118120
}
119121
addRowidColumn(db_id_, table_id);
120122

@@ -142,7 +144,7 @@ ResultSetTableTokenPtr ResultSetRegistry::put(ResultSetTable table) {
142144
table_data->use_columnar_res =
143145
!first_rs->isDirectColumnarConversionPossible() ||
144146
first_rs->getQueryDescriptionType() != QueryDescriptionType::Projection ||
145-
first_rs->areAnyColumnsLazyFetched() || has_varlen;
147+
first_rs->areAnyColumnsLazyFetched() || has_varlen || has_array;
146148

147149
tables_[table_id] = std::move(table_data);
148150

omniscidb/Tests/QueryBuilderTest.cpp

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,16 @@ class QueryBuilderTest : public TestSuite {
336336
{"id": 3, "arr1":[], "arr2" : null}
337337
{"id": 4, "arr1":[null, 2, null, 4], "arr2" : [null, 5.0, 6.0]})___");
338338

339+
createTable("test_arr",
340+
{{"id", ctx().int32()},
341+
{"arr1", ctx().arrayFixed(2, ctx().int32())},
342+
{"arr2", ctx().arrayFixed(3, ctx().fp64())}});
343+
insertJsonValues("test_arr",
344+
R"___({"id": 1, "arr1": null, "arr2": [4.0, null, 6.0]}
345+
{"id": 2, "arr1":[null, 2], "arr2" : null}
346+
{"id": 3, "arr1":[1, null], "arr2" : [null, 5.0, null]}
347+
{"id": 4, "arr1":[1, 2], "arr2" : [4.0, 5.0, 6.0]})___");
348+
339349
createTable("sort",
340350
{{"x", ctx().int32()}, {"y", ctx().int32()}, {"z", ctx().int32()}});
341351
insertCsvValues("sort",
@@ -4917,6 +4927,66 @@ TEST_F(QueryBuilderTest, VarlenArrayInRes) {
49174927
}
49184928
}
49194929

4930+
TEST_F(QueryBuilderTest, FixedArrayInRes) {
4931+
for (bool enable_columnar : {true, false}) {
4932+
auto orig_enable_columnar = config().rs.enable_columnar_output;
4933+
ScopeGuard guard([orig_enable_columnar]() {
4934+
config().rs.enable_columnar_output = orig_enable_columnar;
4935+
});
4936+
config().rs.enable_columnar_output = enable_columnar;
4937+
4938+
QueryBuilder builder(ctx(), schema_mgr_, configPtr());
4939+
4940+
auto dag1 = builder.scan("test_arr").proj({0, 1, 2}).finalize();
4941+
auto res1 = runQuery(std::move(dag1));
4942+
compare_res_data(
4943+
res1,
4944+
std::vector<int32_t>({1, 2, 3, 4}),
4945+
std::vector<std::vector<int32_t>>(
4946+
{std::vector<int32_t>({inline_null_array_value<int32_t>()}),
4947+
std::vector<int32_t>({inline_null_value<int32_t>(), 2}),
4948+
std::vector<int32_t>({1, inline_null_value<int32_t>()}),
4949+
std::vector<int32_t>({1, 2})}),
4950+
std::vector<std::vector<double>>(
4951+
{std::vector<double>({4.0, inline_null_value<double>(), 6.0}),
4952+
std::vector<double>({inline_null_array_value<double>()}),
4953+
std::vector<double>(
4954+
{inline_null_value<double>(), 5.0, inline_null_value<double>()}),
4955+
std::vector<double>({4.0, 5.0, 6.0})}));
4956+
4957+
auto dag2 = builder.scan(res1.tableName()).proj({2, 1, 0}).finalize();
4958+
auto res2 = runQuery(std::move(dag2));
4959+
compare_res_data(
4960+
res2,
4961+
std::vector<std::vector<double>>(
4962+
{std::vector<double>({4.0, inline_null_value<double>(), 6.0}),
4963+
std::vector<double>({inline_null_array_value<double>()}),
4964+
std::vector<double>(
4965+
{inline_null_value<double>(), 5.0, inline_null_value<double>()}),
4966+
std::vector<double>({4.0, 5.0, 6.0})}),
4967+
std::vector<std::vector<int32_t>>(
4968+
{std::vector<int32_t>({inline_null_array_value<int32_t>()}),
4969+
std::vector<int32_t>({inline_null_value<int32_t>(), 2}),
4970+
std::vector<int32_t>({1, inline_null_value<int32_t>()}),
4971+
std::vector<int32_t>({1, 2})}),
4972+
std::vector<int32_t>({1, 2, 3, 4}));
4973+
4974+
auto scan = builder.scan(res2.tableName());
4975+
auto dag3 = scan.filter(scan.ref(2) > 2).finalize();
4976+
auto res3 = runQuery(std::move(dag3));
4977+
compare_res_data(
4978+
res3,
4979+
std::vector<std::vector<double>>(
4980+
{std::vector<double>(
4981+
{inline_null_value<double>(), 5.0, inline_null_value<double>()}),
4982+
std::vector<double>({4.0, 5.0, 6.0})}),
4983+
std::vector<std::vector<int32_t>>(
4984+
{std::vector<int32_t>({1, inline_null_value<int32_t>()}),
4985+
std::vector<int32_t>({1, 2})}),
4986+
std::vector<int32_t>({3, 4}));
4987+
}
4988+
}
4989+
49204990
class Taxi : public TestSuite {
49214991
protected:
49224992
static void SetUpTestSuite() {

0 commit comments

Comments
 (0)