Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 658eb83

Browse files
committed
Support varlen data fetch in ResultSetRegistry.
Signed-off-by: ienkovich <[email protected]>
1 parent cf63bb3 commit 658eb83

14 files changed

+543
-169
lines changed

omniscidb/ArrowStorage/ArrowStorage.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ std::unique_ptr<AbstractDataToken> ArrowStorage::getZeroCopyBufferMemory(
156156
const int8_t* ptr =
157157
chunk->data()->GetValues<int8_t>(1, chunk->data()->offset * arrow_elem_size);
158158
size_t chunk_size = chunk->length() * arrow_elem_size;
159-
return std::make_unique<ArrowChunkDataToken>(std::move(chunk), ptr, chunk_size);
159+
return std::make_unique<ArrowChunkDataToken>(
160+
std::move(chunk), col_type, ptr, chunk_size);
160161
}
161162
}
162163

omniscidb/ArrowStorage/ArrowStorage.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,15 +149,18 @@ class ArrowStorage : public SimpleSchemaProvider, public AbstractDataProvider {
149149
class ArrowChunkDataToken : public Data_Namespace::AbstractDataToken {
150150
public:
151151
ArrowChunkDataToken(std::shared_ptr<arrow::Array> chunk,
152+
const hdk::ir::Type* type,
152153
const int8_t* ptr,
153154
size_t size)
154-
: chunk_(std::move(chunk)), ptr_(ptr), size_(size) {}
155+
: chunk_(std::move(chunk)), type_(type), ptr_(ptr), size_(size) {}
155156

156157
const int8_t* getMemoryPtr() const override { return ptr_; }
157158
size_t getSize() const override { return size_; }
159+
const hdk::ir::Type* getType() const override { return type_; }
158160

159161
private:
160162
std::shared_ptr<arrow::Array> chunk_;
163+
const hdk::ir::Type* type_;
161164
const int8_t* ptr_;
162165
size_t size_;
163166
};

omniscidb/DataMgr/AbstractBuffer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class AbstractDataToken {
5151

5252
virtual const int8_t* getMemoryPtr() const = 0;
5353
virtual size_t getSize() const = 0;
54+
virtual const hdk::ir::Type* getType() const = 0;
5455
};
5556

5657
class AbstractBuffer {

omniscidb/DataMgr/BufferMgr/Buffer.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ Buffer::Buffer(BufferMgr* bm,
6060
, token_(std::move(token)) {
6161
pin();
6262
setSize(token_->getSize());
63+
initEncoder(token_->getType());
6364
}
6465

6566
Buffer::~Buffer() {}

omniscidb/ResultSet/ResultSet.cpp

Lines changed: 135 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -890,20 +890,21 @@ bool ResultSet::isDirectColumnarConversionPossible() const {
890890
bool ResultSet::isZeroCopyColumnarConversionPossible(size_t column_idx) const {
891891
return query_mem_desc_.didOutputColumnar() &&
892892
query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection &&
893-
appended_storage_.empty() && storage_ &&
893+
!colType(column_idx)->isVarLen() && appended_storage_.empty() && storage_ &&
894894
(lazy_fetch_info_.empty() || !lazy_fetch_info_[column_idx].is_lazily_fetched);
895895
}
896896

897897
bool ResultSet::isChunkedZeroCopyColumnarConversionPossible(size_t column_idx) const {
898898
return query_mem_desc_.didOutputColumnar() &&
899899
query_mem_desc_.getQueryDescriptionType() == QueryDescriptionType::Projection &&
900-
storage_ &&
900+
!colType(column_idx)->isVarLen() && storage_ &&
901901
(lazy_fetch_info_.empty() || !lazy_fetch_info_[column_idx].is_lazily_fetched);
902902
}
903903

904904
const int8_t* ResultSet::getColumnarBuffer(size_t column_idx) const {
905905
CHECK(isZeroCopyColumnarConversionPossible(column_idx));
906-
return storage_->getUnderlyingBuffer() + query_mem_desc_.getColOffInBytes(column_idx);
906+
size_t slot_idx = query_mem_desc_.getSlotIndexForSingleSlotCol(column_idx);
907+
return storage_->getUnderlyingBuffer() + query_mem_desc_.getColOffInBytes(slot_idx);
907908
}
908909

909910
std::vector<std::pair<const int8_t*, size_t>> ResultSet::getChunkedColumnarBuffer(
@@ -917,14 +918,15 @@ std::vector<std::pair<const int8_t*, size_t>> ResultSet::getChunkedColumnarBuffe
917918
size_t rows_to_skip = drop_first_;
918919
// RowCount value should be cached and take into account size, limit and offset
919920
size_t rows_to_fetch = rowCount();
921+
size_t slot_idx = query_mem_desc_.getSlotIndexForSingleSlotCol(column_idx);
920922

921923
if (current_storage_rows <= rows_to_skip) {
922924
rows_to_skip -= current_storage_rows;
923925
} else {
924926
size_t fetch_from_current_storage =
925927
std::min(current_storage_rows - rows_to_skip, rows_to_fetch);
926928
retval.emplace_back(storage_->getUnderlyingBuffer() +
927-
storage_->getColOffInBytes(column_idx) +
929+
storage_->getColOffInBytes(slot_idx) +
928930
colType(column_idx)->size() * rows_to_skip,
929931
fetch_from_current_storage);
930932
rows_to_fetch -= fetch_from_current_storage;
@@ -936,7 +938,7 @@ std::vector<std::pair<const int8_t*, size_t>> ResultSet::getChunkedColumnarBuffe
936938
break;
937939
}
938940
const int8_t* ptr =
939-
storage_uptr->getUnderlyingBuffer() + storage_uptr->getColOffInBytes(column_idx);
941+
storage_uptr->getUnderlyingBuffer() + storage_uptr->getColOffInBytes(slot_idx);
940942
current_storage_rows = storage_uptr->binSearchRowCount();
941943
if (current_storage_rows <= rows_to_skip) {
942944
rows_to_skip -= current_storage_rows;
@@ -952,6 +954,132 @@ std::vector<std::pair<const int8_t*, size_t>> ResultSet::getChunkedColumnarBuffe
952954
return retval;
953955
}
954956

957+
size_t ResultSet::computeVarLenOffsets(size_t col_idx, int32_t* offsets) const {
958+
auto type = colType(col_idx);
959+
CHECK(type->isVarLen());
960+
size_t arr_elem_size =
961+
type->isVarLenArray() ? type->as<hdk::ir::ArrayBaseType>()->elemType()->size() : 1;
962+
bool lazy_fetch =
963+
!lazy_fetch_info_.empty() && lazy_fetch_info_[col_idx].is_lazily_fetched;
964+
965+
size_t data_slot_idx = 0;
966+
size_t data_slot_offs = 0;
967+
size_t size_slot_idx = 0;
968+
size_t size_slot_offs = 0;
969+
// Compute required slot index.
970+
for (size_t i = 0; i < col_idx; ++i) {
971+
// slot offset in a row is computed for rowwise access.
972+
if (!query_mem_desc_.didOutputColumnar()) {
973+
data_slot_offs = advance_target_ptr_row_wise(data_slot_offs,
974+
targets_[i],
975+
data_slot_idx,
976+
query_mem_desc_,
977+
separate_varlen_storage_valid_);
978+
}
979+
data_slot_idx =
980+
advance_slot(data_slot_idx, targets_[i], separate_varlen_storage_valid_);
981+
}
982+
if (!separate_varlen_storage_valid_ && !lazy_fetch) {
983+
size_slot_offs =
984+
data_slot_offs + query_mem_desc_.getPaddedSlotWidthBytes(data_slot_idx);
985+
size_slot_idx = data_slot_idx + 1;
986+
} else {
987+
size_slot_idx = data_slot_idx;
988+
size_slot_offs = data_slot_offs;
989+
}
990+
991+
// Translate varlen value to its length. Return -1 for NULLs.
992+
auto slot_val_to_length = [this, lazy_fetch, col_idx, type](
993+
size_t storage_idx,
994+
int64_t val,
995+
const int8_t* size_slot_ptr,
996+
size_t size_slot_sz) -> int32_t {
997+
if (separate_varlen_storage_valid_ && !targets_[col_idx].is_agg) {
998+
if (val >= 0) {
999+
const auto& varlen_buffer_for_storage = serialized_varlen_buffer_[storage_idx];
1000+
return varlen_buffer_for_storage[val].size();
1001+
}
1002+
return -1;
1003+
}
1004+
1005+
if (lazy_fetch) {
1006+
auto& frag_col_buffers = getColumnFrag(storage_idx, col_idx, val);
1007+
bool is_end{false};
1008+
if (type->isString()) {
1009+
VarlenDatum vd;
1010+
ChunkIter_get_nth(reinterpret_cast<ChunkIter*>(const_cast<int8_t*>(
1011+
frag_col_buffers[lazy_fetch_info_[col_idx].local_col_id])),
1012+
val,
1013+
false,
1014+
&vd,
1015+
&is_end);
1016+
CHECK(!is_end);
1017+
return vd.is_null ? -1 : vd.length;
1018+
} else {
1019+
ArrayDatum ad;
1020+
ChunkIter_get_nth(reinterpret_cast<ChunkIter*>(const_cast<int8_t*>(
1021+
frag_col_buffers[lazy_fetch_info_[col_idx].local_col_id])),
1022+
val,
1023+
&ad,
1024+
&is_end);
1025+
CHECK(!is_end);
1026+
return ad.is_null ? -1 : ad.length;
1027+
}
1028+
}
1029+
1030+
if (val)
1031+
return read_int_from_buff(size_slot_ptr, size_slot_sz);
1032+
return -1;
1033+
};
1034+
1035+
offsets[0] = 0;
1036+
size_t row_idx = 0;
1037+
ResultSetRowIterator iter(this);
1038+
++iter;
1039+
const auto data_elem_size = query_mem_desc_.getPaddedSlotWidthBytes(data_slot_idx);
1040+
const auto size_elem_size = query_mem_desc_.getPaddedSlotWidthBytes(size_slot_idx);
1041+
while (iter.global_entry_idx_valid_) {
1042+
const auto storage_lookup_result = findStorage(iter.global_entry_idx_);
1043+
auto storage = storage_lookup_result.storage_ptr;
1044+
auto local_entry_idx = storage_lookup_result.fixedup_entry_idx;
1045+
1046+
const int8_t* elem_ptr = nullptr;
1047+
const int8_t* size_ptr = nullptr;
1048+
if (query_mem_desc_.didOutputColumnar()) {
1049+
auto col_ptr =
1050+
storage->buff_ + storage->query_mem_desc_.getColOffInBytes(data_slot_idx);
1051+
elem_ptr = col_ptr + data_elem_size * local_entry_idx;
1052+
auto size_col_ptr =
1053+
storage->buff_ + storage->query_mem_desc_.getColOffInBytes(size_slot_idx);
1054+
size_ptr = size_col_ptr + size_elem_size * local_entry_idx;
1055+
} else {
1056+
auto keys_ptr = row_ptr_rowwise(storage->buff_, query_mem_desc_, local_entry_idx);
1057+
const auto key_bytes_with_padding =
1058+
align_to_int64(get_key_bytes_rowwise(query_mem_desc_));
1059+
elem_ptr = keys_ptr + key_bytes_with_padding + data_slot_offs;
1060+
size_ptr = keys_ptr + key_bytes_with_padding + size_slot_offs;
1061+
}
1062+
1063+
auto val = read_int_from_buff(elem_ptr, data_elem_size);
1064+
auto elem_length = slot_val_to_length(
1065+
storage_lookup_result.storage_idx, val, size_ptr, size_elem_size);
1066+
if (elem_length < 0) {
1067+
if (type->isString()) {
1068+
offsets[row_idx + 1] = offsets[row_idx];
1069+
} else {
1070+
offsets[row_idx + 1] = -std::abs(offsets[row_idx]);
1071+
}
1072+
} else {
1073+
offsets[row_idx + 1] = std::abs(offsets[row_idx]) + elem_length * arr_elem_size;
1074+
}
1075+
1076+
++iter;
1077+
++row_idx;
1078+
}
1079+
1080+
return row_idx + 1;
1081+
}
1082+
9551083
// Returns a bitmap (and total number) of all single slot targets
9561084
std::tuple<std::vector<bool>, size_t> ResultSet::getSingleSlotTargetBitmap() const {
9571085
std::vector<bool> target_bitmap(targets_.size(), true);
@@ -976,7 +1104,8 @@ std::tuple<std::vector<bool>, size_t> ResultSet::getSingleSlotTargetBitmap() con
9761104
*
9771105
* The final goal is to remove the need for such selection, but at the moment for any
9781106
* target that doesn't qualify for direct columnarization, we use the traditional
979-
* result set's iteration to handle it (e.g., count distinct, approximate count distinct)
1107+
* result set's iteration to handle it (e.g., count distinct, approximate count
1108+
* distinct)
9801109
*/
9811110
std::tuple<std::vector<bool>, size_t> ResultSet::getSupportedSingleSlotTargetBitmap()
9821111
const {

omniscidb/ResultSet/ResultSet.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,12 @@ class ResultSet {
462462
std::vector<std::pair<const int8_t*, size_t>> getChunkedColumnarBuffer(
463463
size_t column_idx) const;
464464

465+
// For columns with varlen data writes element offsets to the output buffer.
466+
// It is 0 for the first element and cumulative length of all previous elements
467+
// for others. The total length is written at the end.
468+
// Returns the number of values written.
469+
size_t computeVarLenOffsets(size_t col_idx, int32_t* offsets) const;
470+
465471
QueryDescriptionType getQueryDescriptionType() const {
466472
return query_mem_desc_.getQueryDescriptionType();
467473
}

omniscidb/ResultSet/ResultSetIteration.cpp

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -892,41 +892,15 @@ const VarlenOutputInfo* ResultSet::getVarlenOutputInfo(const size_t entry_idx) c
892892
void ResultSet::copyColumnIntoBuffer(const size_t column_idx,
893893
int8_t* output_buffer,
894894
const size_t output_buffer_size) const {
895-
CHECK(isDirectColumnarConversionPossible());
896-
CHECK_LT(column_idx, query_mem_desc_.getSlotCount());
897-
CHECK(output_buffer_size > 0);
898-
CHECK(output_buffer);
899-
const auto column_width_size = query_mem_desc_.getPaddedSlotWidthBytes(column_idx);
895+
const size_t slot_idx = query_mem_desc_.getSlotIndexForSingleSlotCol(column_idx);
896+
const auto column_width_size = query_mem_desc_.getPaddedSlotWidthBytes(slot_idx);
897+
auto chunks = getChunkedColumnarBuffer(column_idx);
900898
size_t out_buff_offset = 0;
901-
902-
// the main storage:
903-
const size_t crt_storage_row_count = storage_->query_mem_desc_.getEntryCount();
904-
const size_t crt_buffer_size = crt_storage_row_count * column_width_size;
905-
const size_t column_offset = storage_->query_mem_desc_.getColOffInBytes(column_idx);
906-
const int8_t* storage_buffer = storage_->getUnderlyingBuffer() + column_offset;
907-
CHECK(crt_buffer_size <= output_buffer_size);
908-
std::memcpy(output_buffer, storage_buffer, crt_buffer_size);
909-
910-
out_buff_offset += crt_buffer_size;
911-
912-
// the appended storages:
913-
for (size_t i = 0; i < appended_storage_.size(); i++) {
914-
const size_t crt_storage_row_count =
915-
appended_storage_[i]->query_mem_desc_.getEntryCount();
916-
if (crt_storage_row_count == 0) {
917-
// skip an empty appended storage
918-
continue;
919-
}
920-
CHECK_LT(out_buff_offset, output_buffer_size);
921-
const size_t crt_buffer_size = crt_storage_row_count * column_width_size;
922-
const size_t column_offset =
923-
appended_storage_[i]->query_mem_desc_.getColOffInBytes(column_idx);
924-
const int8_t* storage_buffer =
925-
appended_storage_[i]->getUnderlyingBuffer() + column_offset;
926-
CHECK(out_buff_offset + crt_buffer_size <= output_buffer_size);
927-
std::memcpy(output_buffer + out_buff_offset, storage_buffer, crt_buffer_size);
928-
929-
out_buff_offset += crt_buffer_size;
899+
for (auto& chunk : chunks) {
900+
size_t bytes_to_copy = chunk.second * column_width_size;
901+
CHECK_LE(out_buff_offset + bytes_to_copy, output_buffer_size);
902+
std::memcpy(output_buffer + out_buff_offset, chunk.first, bytes_to_copy);
903+
out_buff_offset += bytes_to_copy;
930904
}
931905
}
932906

0 commit comments

Comments
 (0)