Skip to content

Commit a6508dd

Browse files
committed
Merge remote-tracking branch 'refs/remotes/upstream/master' into master-paimon-list.version
# Conflicts: # fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
2 parents 239be30 + 9177047 commit a6508dd

File tree

192 files changed

+4050
-518
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

192 files changed

+4050
-518
lines changed

be/src/common/config.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,6 +1177,9 @@ DEFINE_Int32(inverted_index_query_cache_shards, "256");
11771177
// inverted index match bitmap cache size
11781178
DEFINE_String(inverted_index_query_cache_limit, "10%");
11791179

1180+
// condition cache limit
1181+
DEFINE_Int16(condition_cache_limit, "512");
1182+
11801183
// inverted index
11811184
DEFINE_mDouble(inverted_index_ram_buffer_size, "512");
11821185
// -1 indicates not working.

be/src/common/config.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,6 +1202,9 @@ DECLARE_Int32(inverted_index_query_cache_shards);
12021202
// inverted index match bitmap cache size
12031203
DECLARE_String(inverted_index_query_cache_limit);
12041204

1205+
// condition cache limit
1206+
DECLARE_Int16(condition_cache_limit);
1207+
12051208
// inverted index
12061209
DECLARE_mDouble(inverted_index_ram_buffer_size);
12071210
DECLARE_mInt32(inverted_index_max_buffered_docs);

be/src/io/file_factory.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ class FileFactory {
113113
return TFileType::FILE_LOCAL;
114114
case TStorageBackendType::S3:
115115
return TFileType::FILE_S3;
116+
case TStorageBackendType::AZURE:
117+
return TFileType::FILE_S3;
116118
case TStorageBackendType::BROKER:
117119
return TFileType::FILE_BROKER;
118120
case TStorageBackendType::HDFS:

be/src/io/fs/azure_obj_storage_client.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
#include "common/status.h"
4444
#include "io/fs/obj_storage_client.h"
4545
#include "util/bvar_helper.h"
46+
#include "util/coding.h"
4647
#include "util/s3_util.h"
4748

4849
using namespace Azure::Storage::Blobs;
@@ -54,8 +55,9 @@ std::string wrap_object_storage_path_msg(const doris::io::ObjectStoragePathOptio
5455
}
5556

5657
auto base64_encode_part_num(int part_num) {
57-
return Aws::Utils::HashingUtils::Base64Encode(
58-
{reinterpret_cast<unsigned char*>(&part_num), sizeof(part_num)});
58+
uint8_t buf[4];
59+
encode_fixed32_le(buf, static_cast<uint32_t>(part_num));
60+
return Aws::Utils::HashingUtils::Base64Encode({buf, sizeof(buf)});
5961
}
6062

6163
template <typename Func>

be/src/olap/iterators.h

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "olap/block_column_predicate.h"
2626
#include "olap/column_predicate.h"
2727
#include "olap/olap_common.h"
28+
#include "olap/row_cursor.h"
2829
#include "olap/rowset/segment_v2/ann_index/ann_topn_runtime.h"
2930
#include "olap/rowset/segment_v2/row_ranges.h"
3031
#include "olap/tablet_schema.h"
@@ -35,7 +36,6 @@
3536

3637
namespace doris {
3738

38-
class RowCursor;
3939
class Schema;
4040
class ColumnPredicate;
4141

@@ -71,6 +71,22 @@ class StorageReadOptions {
7171
const RowCursor* upper_key = nullptr;
7272
// whether `upper_key` is included in the range
7373
bool include_upper;
74+
75+
uint64_t get_digest(uint64_t seed) const {
76+
if (lower_key != nullptr) {
77+
auto key_str = lower_key->to_string();
78+
seed = HashUtil::hash64(key_str.c_str(), key_str.size(), seed);
79+
seed = HashUtil::hash64(&include_lower, sizeof(include_lower), seed);
80+
}
81+
82+
if (upper_key != nullptr) {
83+
auto key_str = upper_key->to_string();
84+
seed = HashUtil::hash64(key_str.c_str(), key_str.size(), seed);
85+
seed = HashUtil::hash64(&include_upper, sizeof(include_upper), seed);
86+
}
87+
88+
return seed;
89+
}
7490
};
7591

7692
// reader's key ranges, empty if not existed.
@@ -129,6 +145,12 @@ class StorageReadOptions {
129145

130146
std::shared_ptr<vectorized::ScoreRuntime> score_runtime;
131147
CollectionStatisticsPtr collection_statistics;
148+
149+
// Cache for sparse column data to avoid redundant reads
150+
// col_unique_id -> cached column_ptr
151+
std::unordered_map<int32_t, vectorized::ColumnPtr> sparse_column_cache;
152+
153+
uint64_t condition_cache_digest = 0;
132154
};
133155

134156
struct CompactionSampleInfo {

be/src/olap/olap_common.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,10 @@ struct OlapReaderStatistics {
407407
int64_t output_index_result_column_timer = 0;
408408
// number of segment filtered by column stat when creating seg iterator
409409
int64_t filtered_segment_number = 0;
410+
// number of segment with condition cache hit
411+
int64_t condition_cache_hit_seg_nums = 0;
412+
// number of rows filtered by condition cache hit
413+
int64_t condition_cache_filtered_rows = 0;
410414
// total number of segment
411415
int64_t total_segment_number = 0;
412416

be/src/olap/parallel_scanner_builder.cpp

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ using namespace vectorized;
3434

3535
Status ParallelScannerBuilder::build_scanners(std::list<ScannerSPtr>& scanners) {
3636
RETURN_IF_ERROR(_load());
37-
if (_optimize_index_scan_parallelism) {
37+
if (_scan_parallelism_by_segment) {
3838
return _build_scanners_by_segment(scanners);
3939
} else if (_is_dup_mow_key) {
4040
// Default strategy for DUP/MOW tables: split by rowids within segments
@@ -87,7 +87,7 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>&
8787
auto rows_need = _rows_per_scanner - rows_collected;
8888

8989
// 0.9: try to avoid splitting the segments into excessively small parts.
90-
if (rows_need >= remaining_rows * 0.9) {
90+
if (rows_need >= remaining_rows * 9 / 10) {
9191
rows_need = remaining_rows;
9292
}
9393
DCHECK_LE(rows_need, remaining_rows);
@@ -167,6 +167,8 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>&
167167
// for the involved tablets. It preserves delete predicates and key ranges, and clones
168168
// RowsetReader per scanner to avoid sharing between scanners.
169169
Status ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>& scanners) {
170+
DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
171+
170172
for (auto&& [tablet, version] : _tablets) {
171173
DCHECK(_all_read_sources.contains(tablet->tablet_id()));
172174
auto& entire_read_source = _all_read_sources[tablet->tablet_id()];
@@ -176,8 +178,10 @@ Status ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>
176178
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
177179
}
178180

179-
// For each RowSet split in the read source, split by segment id and build
180-
// one scanner per segment. Keep delete predicates shared.
181+
// Collect segments into scanners based on rows count instead of one scanner per segment
182+
TabletReader::ReadSource partitial_read_source;
183+
int64_t rows_collected = 0;
184+
181185
for (auto& rs_split : entire_read_source.rs_splits) {
182186
auto reader = rs_split.rs_reader;
183187
auto rowset = reader->rowset();
@@ -188,21 +192,62 @@ Status ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>
188192
continue;
189193
}
190194

191-
// Build scanners for [i, i+1) segment range, without row-range slicing.
192-
for (int64_t i = 0; i < rowset->num_segments(); ++i) {
193-
RowSetSplits split(reader->clone());
194-
split.segment_offsets.first = i;
195-
split.segment_offsets.second = i + 1;
196-
// No row-ranges slicing; scan whole segment i.
197-
DCHECK_GE(split.segment_offsets.second, split.segment_offsets.first + 1);
195+
int64_t segment_start = 0;
196+
auto split = RowSetSplits(reader->clone());
198197

199-
TabletReader::ReadSource partitial_read_source;
198+
for (size_t i = 0; i < segments_rows.size(); ++i) {
199+
const size_t rows_of_segment = segments_rows[i];
200+
201+
// Check if adding this segment would exceed rows_per_scanner
202+
// 0.9: try to avoid splitting the segments into excessively small parts.
203+
if (rows_collected > 0 && (rows_collected + rows_of_segment > _rows_per_scanner &&
204+
rows_collected < _rows_per_scanner * 9 / 10)) {
205+
// Create a new scanner with collected segments
206+
split.segment_offsets.first = segment_start;
207+
split.segment_offsets.second =
208+
i; // Range is [segment_start, i), including all segments from segment_start to i-1
209+
210+
DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first);
211+
212+
partitial_read_source.rs_splits.emplace_back(std::move(split));
213+
214+
scanners.emplace_back(
215+
_build_scanner(tablet, version, _key_ranges,
216+
{std::move(partitial_read_source.rs_splits),
217+
entire_read_source.delete_predicates}));
218+
219+
// Reset for next scanner
220+
partitial_read_source = TabletReader::ReadSource();
221+
split = RowSetSplits(reader->clone());
222+
segment_start = i;
223+
rows_collected = 0;
224+
}
225+
226+
// Add current segment to the current scanner
227+
rows_collected += rows_of_segment;
228+
}
229+
230+
// Add remaining segments in this rowset to a scanner
231+
if (rows_collected > 0) {
232+
split.segment_offsets.first = segment_start;
233+
split.segment_offsets.second = segments_rows.size();
234+
DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first);
200235
partitial_read_source.rs_splits.emplace_back(std::move(split));
236+
}
237+
}
201238

202-
scanners.emplace_back(_build_scanner(tablet, version, _key_ranges,
203-
{std::move(partitial_read_source.rs_splits),
204-
entire_read_source.delete_predicates}));
239+
// Add remaining segments across all rowsets to a scanner
240+
if (rows_collected > 0) {
241+
DCHECK_GT(partitial_read_source.rs_splits.size(), 0);
242+
#ifndef NDEBUG
243+
for (auto& split : partitial_read_source.rs_splits) {
244+
DCHECK(split.rs_reader != nullptr);
245+
DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second);
205246
}
247+
#endif
248+
scanners.emplace_back(_build_scanner(tablet, version, _key_ranges,
249+
{std::move(partitial_read_source.rs_splits),
250+
entire_read_source.delete_predicates}));
206251
}
207252
}
208253

be/src/olap/parallel_scanner_builder.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class ParallelScannerBuilder {
6464

6565
void set_min_rows_per_scanner(int64_t size) { _min_rows_per_scanner = size; }
6666

67-
void set_optimize_index_scan_parallelism(bool v) { _optimize_index_scan_parallelism = v; }
67+
void set_scan_parallelism_by_segment(bool v) { _scan_parallelism_by_segment = v; }
6868

6969
const OlapReaderStatistics* builder_stats() const { return &_builder_stats; }
7070

@@ -95,7 +95,7 @@ class ParallelScannerBuilder {
9595
std::map<RowsetId, std::vector<size_t>> _all_segments_rows;
9696

9797
// Force building one scanner per segment when true.
98-
bool _optimize_index_scan_parallelism {false};
98+
bool _scan_parallelism_by_segment {false};
9999

100100
std::shared_ptr<RuntimeProfile> _scanner_profile;
101101
OlapReaderStatistics _builder_stats;

be/src/olap/predicate_creator.h

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,15 @@
1717

1818
#pragma once
1919

20+
#include <fast_float/fast_float.h>
21+
2022
#include <charconv>
23+
#include <stdexcept>
24+
#include <string>
2125
#include <type_traits>
2226

27+
#include "common/exception.h"
28+
#include "common/status.h"
2329
#include "exec/olap_utils.h"
2430
#include "exprs/create_predicate_function.h"
2531
#include "exprs/hybrid_set.h"
@@ -33,7 +39,10 @@
3339
#include "runtime/primitive_type.h"
3440
#include "util/date_func.h"
3541
#include "util/string_util.h"
42+
#include "vec/common/string_ref.h"
3643
#include "vec/data_types/data_type.h"
44+
#include "vec/functions/cast/cast_parameters.h"
45+
#include "vec/functions/cast/cast_to_basic_number_common.h"
3746

3847
namespace doris {
3948
#include "common/compile_check_begin.h"
@@ -65,15 +74,29 @@ class IntegerPredicateCreator : public PredicateCreator<ConditionType> {
6574
private:
6675
static CppType convert(const std::string& condition) {
6776
CppType value = 0;
68-
// because std::from_chars can't compile on macOS
69-
if constexpr (std::is_same_v<CppType, double>) {
70-
value = std::stod(condition, nullptr);
71-
} else if constexpr (std::is_same_v<CppType, float>) {
72-
value = std::stof(condition, nullptr);
77+
if constexpr (std::is_floating_point_v<CppType>) {
78+
vectorized::CastParameters params;
79+
if (vectorized::CastToFloat::from_string(StringRef {condition.data(), condition.size()},
80+
value, params)) {
81+
return value;
82+
} else {
83+
throw Exception(
84+
ErrorCode::INVALID_ARGUMENT,
85+
fmt::format("convert string to number failed, str: {} to float/double",
86+
condition));
87+
}
7388
} else {
74-
std::from_chars(condition.data(), condition.data() + condition.size(), value);
89+
auto ret =
90+
std::from_chars(condition.data(), condition.data() + condition.size(), value);
91+
if (ret.ptr == condition.data() + condition.size()) {
92+
return value;
93+
} else {
94+
throw Exception(
95+
ErrorCode::INVALID_ARGUMENT,
96+
fmt::format("convert string to number failed, str: {}, error: [{}] {}",
97+
condition, ret.ec, std::make_error_code(ret.ec).message()));
98+
}
7599
}
76-
return value;
77100
}
78101
};
79102

be/src/olap/rowset/beta_rowset_reader.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
142142
read_columns.push_back(cid);
143143
}
144144
}
145+
// disable condition cache if you have delete condition
146+
_read_context->condition_cache_digest =
147+
delete_columns_set.empty() ? _read_context->condition_cache_digest : 0;
148+
// create segment iterators
145149
VLOG_NOTICE << "read columns size: " << read_columns.size();
146150
_input_schema = std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
147151
if (_read_context->predicates != nullptr) {
@@ -216,6 +220,14 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
216220
_read_context->runtime_state->query_options().disable_file_cache;
217221
}
218222

223+
if (_read_context->condition_cache_digest) {
224+
for (const auto& key_range : _read_options.key_ranges) {
225+
_read_context->condition_cache_digest =
226+
key_range.get_digest(_read_context->condition_cache_digest);
227+
}
228+
_read_options.condition_cache_digest = _read_context->condition_cache_digest;
229+
}
230+
219231
_read_options.io_ctx.expiration_time =
220232
read_context->ttl_seconds > 0 && _rowset->rowset_meta()->newest_write_timestamp() > 0
221233
? _rowset->rowset_meta()->newest_write_timestamp() + read_context->ttl_seconds
@@ -267,6 +279,10 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
267279
DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
268280
auto local_options = _read_options;
269281
local_options.row_ranges = _segment_row_ranges[i - seg_start];
282+
if (local_options.condition_cache_digest) {
283+
local_options.condition_cache_digest =
284+
local_options.row_ranges.get_digest(local_options.condition_cache_digest);
285+
}
270286
iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i, should_use_cache,
271287
_input_schema, local_options);
272288
}

0 commit comments

Comments
 (0)