Skip to content

Commit 635c5b4

Browse files
committed
fix
1 parent 7a50033 commit 635c5b4

File tree

97 files changed

+1201
-946
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

97 files changed

+1201
-946
lines changed

be/src/olap/rowset/segment_v2/column_writer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts,
295295
const TabletColumn* column, io::FileWriter* file_writer,
296296
std::unique_ptr<ColumnWriter>* writer) {
297297
if (column->is_extracted_column()) {
298-
if (column->name().find(DOC_SNAPSHOT_COLUMN_PATH) != std::string::npos) {
298+
if (column->name().find(DOC_VALUE_COLUMN_PATH) != std::string::npos) {
299299
*writer = std::make_unique<VariantCompactionDocSnapshotWriter>(
300300
opts, column, std::unique_ptr<Field>(FieldFactory::create(*column)));
301301
return Status::OK();
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
_init_container

be/src/olap/rowset/segment_v2/variant/sparse_column_extract_iterator.h renamed to be/src/olap/rowset/segment_v2/variant/binary_column_extract_iterator.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ namespace doris::segment_v2 {
5555
#include "common/compile_check_begin.h"
5656

5757
// Base class for sparse column processors with common functionality
58-
class BaseSparseColumnProcessor : public ColumnIterator {
58+
class BaseBinaryColumnProcessor : public ColumnIterator {
5959
protected:
6060
const StorageReadOptions* _read_opts;
6161
BinaryColumnCacheSPtr _sparse_column_cache;
@@ -68,7 +68,7 @@ class BaseSparseColumnProcessor : public ColumnIterator {
6868
size_t num_rows) = 0;
6969

7070
public:
71-
BaseSparseColumnProcessor(BinaryColumnCacheSPtr sparse_column_cache,
71+
BaseBinaryColumnProcessor(BinaryColumnCacheSPtr sparse_column_cache,
7272
const StorageReadOptions* opts)
7373
: _read_opts(opts), _sparse_column_cache(std::move(sparse_column_cache)) {}
7474

@@ -113,11 +113,11 @@ class BaseSparseColumnProcessor : public ColumnIterator {
113113
};
114114

115115
// Implementation for path extraction processor
116-
class SparseColumnExtractIterator : public BaseSparseColumnProcessor {
116+
class SparseColumnExtractIterator : public BaseBinaryColumnProcessor {
117117
public:
118118
SparseColumnExtractIterator(std::string_view path, BinaryColumnCacheSPtr sparse_column_cache,
119119
const StorageReadOptions* opts)
120-
: BaseSparseColumnProcessor(std::move(sparse_column_cache), opts), _path(path) {}
120+
: BaseBinaryColumnProcessor(std::move(sparse_column_cache), opts), _path(path) {}
121121

122122
// Batch processing using template method
123123
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override {
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "olap/rowset/segment_v2/variant/binary_column_reader.h"
19+
20+
#include <algorithm>
21+
#include <tuple>
22+
23+
#include "vec/columns/column_array.h"
24+
#include "vec/columns/column_map.h"
25+
#include "vec/columns/column_string.h"
26+
#include "vec/columns/column_variant.h"
27+
#include "vec/common/assert_cast.h"
28+
#include "vec/common/schema_util.h"
29+
#include "vec/common/string_ref.h"
30+
31+
namespace doris::segment_v2 {
32+
33+
#include "common/compile_check_begin.h"
34+
35+
Status SingleSparseColumnReader::add_binary_column_reader(std::shared_ptr<ColumnReader> reader,
36+
uint32_t /*index*/) {
37+
if (_single_reader) {
38+
return Status::AlreadyExist("Single sparse column reader already exists");
39+
}
40+
_single_reader = std::move(reader);
41+
return Status::OK();
42+
}
43+
44+
std::pair<std::shared_ptr<ColumnReader>, std::string>
45+
SingleSparseColumnReader::select_reader_and_cache_key(const std::string& /*relative_path*/) const {
46+
return {_single_reader, std::string(SPARSE_COLUMN_PATH)};
47+
}
48+
49+
Status SingleSparseColumnReader::new_binary_column_iterator(ColumnIteratorUPtr* iter) const {
50+
return _single_reader->new_iterator(iter, nullptr);
51+
}
52+
53+
std::shared_ptr<ColumnReader> SingleSparseColumnReader::select_reader(uint32_t /*index*/) const {
54+
return _single_reader;
55+
}
56+
57+
uint32_t SingleSparseColumnReader::num_buckets() const {
58+
return 1;
59+
}
60+
61+
BinaryColumnType SingleSparseColumnReader::get_type() const {
62+
return BinaryColumnType::SINGLE_SPARSE;
63+
}
64+
65+
Status MultipleBinaryColumnReader::new_binary_column_iterator(ColumnIteratorUPtr* iter) const {
66+
std::vector<std::unique_ptr<ColumnIterator>> iters;
67+
iters.reserve(_multiple_column_readers.size());
68+
for (const auto& [index, reader] : _multiple_column_readers) {
69+
if (!reader) {
70+
return Status::NotFound("No column reader available, binary column index is: ", index);
71+
}
72+
ColumnIteratorUPtr it;
73+
RETURN_IF_ERROR(reader->new_iterator(&it, nullptr));
74+
iters.emplace_back(std::move(it));
75+
}
76+
*iter = std::make_unique<CombineMultipleBinaryColumnIterator>(std::move(iters));
77+
return Status::OK();
78+
}
79+
80+
Status MultipleBinaryColumnReader::add_binary_column_reader(std::shared_ptr<ColumnReader> reader,
81+
uint32_t index) {
82+
if (_multiple_column_readers.find(index) != _multiple_column_readers.end()) {
83+
return Status::AlreadyExist(
84+
"Multiple sparse column reader already exists, binary column index is: ", index);
85+
}
86+
_multiple_column_readers.emplace(index, std::move(reader));
87+
return Status::OK();
88+
}
89+
90+
uint32_t MultipleBinaryColumnReader::num_buckets() const {
91+
return static_cast<uint32_t>(_multiple_column_readers.size());
92+
}
93+
94+
std::shared_ptr<ColumnReader> MultipleBinaryColumnReader::select_reader(uint32_t index) const {
95+
auto it = _multiple_column_readers.find(index);
96+
if (it == _multiple_column_readers.end()) {
97+
return nullptr;
98+
}
99+
std::shared_ptr<ColumnReader> reader = it->second;
100+
return reader;
101+
}
102+
103+
uint32_t MultipleBinaryColumnReader::pick_index(const std::string& relative_path) const {
104+
uint32_t N = static_cast<uint32_t>(_multiple_column_readers.size());
105+
uint32_t bucket_index = vectorized::schema_util::variant_binary_shard_of(
106+
StringRef {relative_path.data(), relative_path.size()}, N);
107+
DCHECK(bucket_index < N);
108+
return bucket_index;
109+
}
110+
111+
std::pair<std::shared_ptr<ColumnReader>, std::string>
112+
MultipleSparseColumnReader::select_reader_and_cache_key(const std::string& relative_path) const {
113+
uint32_t bucket_index = pick_index(relative_path);
114+
std::string key = std::string(SPARSE_COLUMN_PATH) + ".b" + std::to_string(bucket_index);
115+
std::shared_ptr<ColumnReader> reader = select_reader(bucket_index);
116+
return {std::move(reader), key};
117+
}
118+
119+
BinaryColumnType MultipleSparseColumnReader::get_type() const {
120+
return BinaryColumnType::MULTIPLE_SPARSE;
121+
}
122+
123+
std::pair<std::shared_ptr<ColumnReader>, std::string>
124+
MultipleDocColumnReader::select_reader_and_cache_key(const std::string& relative_path) const {
125+
uint32_t bucket_index = pick_index(relative_path);
126+
std::string key = std::string(DOC_VALUE_COLUMN_PATH) + ".b" + std::to_string(bucket_index);
127+
std::shared_ptr<ColumnReader> reader = select_reader(bucket_index);
128+
return {std::move(reader), key};
129+
}
130+
131+
BinaryColumnType MultipleDocColumnReader::get_type() const {
132+
return BinaryColumnType::MULTIPLE_DOC_VALUE;
133+
}
134+
135+
Status CombineMultipleBinaryColumnIterator::init(const ColumnIteratorOptions& opts) {
136+
for (auto& it : _iters) {
137+
RETURN_IF_ERROR(it->init(opts));
138+
}
139+
return Status::OK();
140+
}
141+
142+
Status CombineMultipleBinaryColumnIterator::seek_to_ordinal(ordinal_t ord_idx) {
143+
for (auto& it : _iters) {
144+
RETURN_IF_ERROR(it->seek_to_ordinal(ord_idx));
145+
}
146+
return Status::OK();
147+
}
148+
149+
Status CombineMultipleBinaryColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst,
150+
bool* has_null) {
151+
// Read each bucket into temp maps.
152+
_binary_column_data.clear();
153+
_binary_column_data.reserve(_iters.size());
154+
for (auto& it : _iters) {
155+
vectorized::MutableColumnPtr m = vectorized::ColumnVariant::create_binary_column_fn();
156+
RETURN_IF_ERROR(it->next_batch(n, m, has_null));
157+
_binary_column_data.emplace_back(std::move(m));
158+
}
159+
_collect_sparse_data_from_buckets(*dst);
160+
return Status::OK();
161+
}
162+
163+
Status CombineMultipleBinaryColumnIterator::read_by_rowids(const rowid_t* rowids,
164+
const size_t count,
165+
vectorized::MutableColumnPtr& dst) {
166+
_binary_column_data.clear();
167+
_binary_column_data.reserve(_iters.size());
168+
for (auto& it : _iters) {
169+
vectorized::MutableColumnPtr m = vectorized::ColumnVariant::create_binary_column_fn();
170+
RETURN_IF_ERROR(it->read_by_rowids(rowids, count, m));
171+
_binary_column_data.emplace_back(std::move(m));
172+
}
173+
_collect_sparse_data_from_buckets(*dst);
174+
return Status::OK();
175+
}
176+
177+
ordinal_t CombineMultipleBinaryColumnIterator::get_current_ordinal() const {
178+
return _iters.empty() ? 0 : _iters.front()->get_current_ordinal();
179+
}
180+
181+
void CombineMultipleBinaryColumnIterator::_collect_sparse_data_from_buckets(
182+
vectorized::IColumn& binary_data_column) {
183+
using namespace vectorized;
184+
185+
// Get path, value, offset from all buckets.
186+
auto& column_map = assert_cast<ColumnMap&>(binary_data_column);
187+
auto& dst_paths = assert_cast<ColumnString&>(column_map.get_keys());
188+
auto& dst_values = assert_cast<ColumnString&>(column_map.get_values());
189+
auto& dst_offsets = assert_cast<ColumnArray::Offsets64&>(column_map.get_offsets());
190+
191+
std::vector<const ColumnString*> src_paths(_binary_column_data.size());
192+
std::vector<const ColumnString*> src_values(_binary_column_data.size());
193+
std::vector<const ColumnArray::Offsets64*> src_offsets(_binary_column_data.size());
194+
for (size_t i = 0; i != _binary_column_data.size(); ++i) {
195+
const auto& src_map = assert_cast<const ColumnMap&>(*_binary_column_data[i]);
196+
src_paths[i] = assert_cast<const ColumnString*>(&src_map.get_keys());
197+
src_values[i] = assert_cast<const ColumnString*>(&src_map.get_values());
198+
src_offsets[i] = assert_cast<const ColumnArray::Offsets64*>(&src_map.get_offsets());
199+
}
200+
201+
size_t num_rows = _binary_column_data[0]->size();
202+
for (size_t i = 0; i != num_rows; ++i) {
203+
// Sparse data contains paths in sorted order in each row.
204+
// Collect all paths from all buckets in this row and sort them.
205+
// Save each path bucket and index to be able find corresponding value later.
206+
std::vector<std::tuple<std::string_view, size_t, size_t>> all_paths;
207+
for (size_t bucket = 0; bucket != _binary_column_data.size(); ++bucket) {
208+
size_t offset_start = (*src_offsets[bucket])[ssize_t(i) - 1];
209+
size_t offset_end = (*src_offsets[bucket])[ssize_t(i)];
210+
211+
// collect all paths.
212+
for (size_t j = offset_start; j != offset_end; ++j) {
213+
auto path = src_paths[bucket]->get_data_at(j).to_string_view();
214+
all_paths.emplace_back(path, bucket, j);
215+
}
216+
}
217+
218+
std::sort(all_paths.begin(), all_paths.end());
219+
for (const auto& [path, bucket, offset] : all_paths) {
220+
dst_paths.insert_data(path.data(), path.size());
221+
dst_values.insert_from(*src_values[bucket], offset);
222+
}
223+
224+
dst_offsets.push_back(dst_paths.size());
225+
}
226+
}
227+
228+
#include "common/compile_check_end.h"
229+
} // namespace doris::segment_v2

0 commit comments

Comments
 (0)