Skip to content

Commit f3a6ae0

Browse files
authored
Merge branch 'master' into fix_coalesce_statistics
2 parents 001f7dd + 59fabff commit f3a6ae0

File tree

15 files changed

+707
-168
lines changed

15 files changed

+707
-168
lines changed

be/src/vec/core/field.cpp

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,32 @@ std::string Field::get_type_name() const {
647647
return type_to_string(type);
648648
}
649649

650+
template <PrimitiveType T>
651+
typename PrimitiveTypeTraits<T>::CppType& Field::get() {
652+
DCHECK(T == type || (is_string_type(type) && is_string_type(T)) || type == TYPE_NULL)
653+
<< "Type mismatch: requested " << type_to_string(T) << ", actual " << get_type_name();
654+
auto* MAY_ALIAS ptr = reinterpret_cast<typename PrimitiveTypeTraits<T>::CppType*>(&storage);
655+
return *ptr;
656+
}
657+
658+
template <PrimitiveType T>
659+
const typename PrimitiveTypeTraits<T>::CppType& Field::get() const {
660+
DCHECK(T == type || (is_string_type(type) && is_string_type(T)) || type == TYPE_NULL)
661+
<< "Type mismatch: requested " << type_to_string(T) << ", actual " << get_type_name();
662+
const auto* MAY_ALIAS ptr =
663+
reinterpret_cast<const typename PrimitiveTypeTraits<T>::CppType*>(&storage);
664+
return *ptr;
665+
}
666+
667+
template <PrimitiveType T>
668+
void Field::destroy() {
669+
using TargetType = typename PrimitiveTypeTraits<T>::CppType;
670+
DCHECK(T == type || ((is_string_type(type) && is_string_type(T))))
671+
<< "Type mismatch: requested " << type_to_string(T) << ", actual " << get_type_name();
672+
auto* MAY_ALIAS ptr = reinterpret_cast<TargetType*>(&storage);
673+
ptr->~TargetType();
674+
}
675+
650676
std::strong_ordering Field::operator<=>(const Field& rhs) const {
651677
if (type == PrimitiveType::TYPE_NULL || rhs == PrimitiveType::TYPE_NULL) {
652678
return type <=> rhs.type;
@@ -947,6 +973,49 @@ std::string_view Field::as_string_view() const {
947973
typename PrimitiveTypeTraits<TYPE_UINT64>::CppType && rhs);
948974
DECLARE_FUNCTION(create_concrete)
949975
DECLARE_FUNCTION(assign_concrete)
976+
#undef DECLARE_FUNCTION
950977

978+
#define DECLARE_FUNCTION(TYPE_NAME) \
979+
template typename PrimitiveTypeTraits<TYPE_NAME>::CppType& Field::get<TYPE_NAME>(); \
980+
template const typename PrimitiveTypeTraits<TYPE_NAME>::CppType& Field::get<TYPE_NAME>() \
981+
const; \
982+
template void Field::destroy<TYPE_NAME>();
983+
DECLARE_FUNCTION(TYPE_NULL)
984+
DECLARE_FUNCTION(TYPE_TINYINT)
985+
DECLARE_FUNCTION(TYPE_SMALLINT)
986+
DECLARE_FUNCTION(TYPE_INT)
987+
DECLARE_FUNCTION(TYPE_BIGINT)
988+
DECLARE_FUNCTION(TYPE_LARGEINT)
989+
DECLARE_FUNCTION(TYPE_DATE)
990+
DECLARE_FUNCTION(TYPE_DATETIME)
991+
DECLARE_FUNCTION(TYPE_DATEV2)
992+
DECLARE_FUNCTION(TYPE_DATETIMEV2)
993+
DECLARE_FUNCTION(TYPE_TIMESTAMPTZ)
994+
DECLARE_FUNCTION(TYPE_DECIMAL32)
995+
DECLARE_FUNCTION(TYPE_DECIMAL64)
996+
DECLARE_FUNCTION(TYPE_DECIMALV2)
997+
DECLARE_FUNCTION(TYPE_DECIMAL128I)
998+
DECLARE_FUNCTION(TYPE_DECIMAL256)
999+
DECLARE_FUNCTION(TYPE_CHAR)
1000+
DECLARE_FUNCTION(TYPE_VARCHAR)
1001+
DECLARE_FUNCTION(TYPE_STRING)
1002+
DECLARE_FUNCTION(TYPE_VARBINARY)
1003+
DECLARE_FUNCTION(TYPE_HLL)
1004+
DECLARE_FUNCTION(TYPE_VARIANT)
1005+
DECLARE_FUNCTION(TYPE_QUANTILE_STATE)
1006+
DECLARE_FUNCTION(TYPE_ARRAY)
1007+
DECLARE_FUNCTION(TYPE_TIME)
1008+
DECLARE_FUNCTION(TYPE_IPV4)
1009+
DECLARE_FUNCTION(TYPE_IPV6)
1010+
DECLARE_FUNCTION(TYPE_BOOLEAN)
1011+
DECLARE_FUNCTION(TYPE_FLOAT)
1012+
DECLARE_FUNCTION(TYPE_DOUBLE)
1013+
DECLARE_FUNCTION(TYPE_JSONB)
1014+
DECLARE_FUNCTION(TYPE_STRUCT)
1015+
DECLARE_FUNCTION(TYPE_MAP)
1016+
DECLARE_FUNCTION(TYPE_BITMAP)
1017+
DECLARE_FUNCTION(TYPE_TIMEV2)
1018+
DECLARE_FUNCTION(TYPE_UINT32)
1019+
DECLARE_FUNCTION(TYPE_UINT64)
9511020
#undef DECLARE_FUNCTION
9521021
} // namespace doris::vectorized

be/src/vec/core/field.h

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -244,28 +244,10 @@ class Field {
244244
// If not, use NearestFieldType<> externally.
245245
// Maybe modify this in the future, reference: https://github.com/ClickHouse/ClickHouse/pull/22003
246246
template <PrimitiveType T>
247-
typename PrimitiveTypeTraits<T>::CppType& get() {
248-
DCHECK(T == type ||
249-
((type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_STRING) &&
250-
(T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING)) ||
251-
type == TYPE_NULL)
252-
<< "Type mismatch: requested " << int(T) << ", actual " << get_type_name();
253-
auto* MAY_ALIAS ptr = reinterpret_cast<typename PrimitiveTypeTraits<T>::CppType*>(&storage);
254-
return *ptr;
255-
}
247+
typename PrimitiveTypeTraits<T>::CppType& get();
256248

257249
template <PrimitiveType T>
258-
const typename PrimitiveTypeTraits<T>::CppType& get() const {
259-
// TODO(gabriel): Is it safe for null type?
260-
DCHECK(T == type ||
261-
((type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_STRING) &&
262-
(T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING)) ||
263-
type == TYPE_NULL)
264-
<< "Type mismatch: requested " << int(T) << ", actual " << get_type_name();
265-
const auto* MAY_ALIAS ptr =
266-
reinterpret_cast<const typename PrimitiveTypeTraits<T>::CppType*>(&storage);
267-
return *ptr;
268-
}
250+
const typename PrimitiveTypeTraits<T>::CppType& get() const;
269251

270252
bool operator==(const Field& rhs) const {
271253
return operator<=>(rhs) == std::strong_ordering::equal;
@@ -304,14 +286,7 @@ class Field {
304286
void destroy();
305287

306288
template <PrimitiveType T>
307-
void destroy() {
308-
using TargetType = typename PrimitiveTypeTraits<T>::CppType;
309-
DCHECK(T == type || ((type == TYPE_CHAR || type == TYPE_VARCHAR || type == TYPE_STRING) &&
310-
(T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING)))
311-
<< "Type mismatch: requested " << int(T) << ", actual " << get_type_name();
312-
auto* MAY_ALIAS ptr = reinterpret_cast<TargetType*>(&storage);
313-
ptr->~TargetType();
314-
}
289+
void destroy();
315290
};
316291

317292
struct FieldWithDataType {

be/src/vec/exprs/vectorized_fn_call.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,12 @@ void VectorizedFnCall::prepare_ann_range_search(
418418
suitable_for_ann_index = false;
419419
return;
420420
}
421-
421+
#ifndef NDEBUG
422+
if (right_literal->is_nullable()) {
423+
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
424+
"ANN range search with nullable literal is not supported");
425+
}
426+
#endif
422427
auto right_col = right_literal->get_column_ptr()->convert_to_full_column_if_const();
423428
auto right_type = right_literal->get_data_type();
424429

be/src/vec/sink/writer/vtablet_writer.cpp

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -748,15 +748,28 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload)
748748
// But there is still some unfinished things, we do mem limit here temporarily.
749749
// _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below.
750750
// It's fine to do a fake add_block() and return OK, because we will check _cancelled in next add_block() or mark_close().
751-
bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
752-
auto current_load_mem_value = MemoryProfile::load_current_usage();
753-
bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
754-
current_load_mem_value > _load_mem_limit ||
755-
_pending_batches_bytes > _max_pending_batches_bytes;
756-
while (!_cancelled && !_state->is_cancelled() && _pending_batches_num > 0 &&
757-
mem_limit_exceeded) {
751+
constexpr int64_t kBackPressureSleepMs = 10;
752+
auto* memtable_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
753+
while (true) {
754+
bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
755+
int64_t memtable_mem =
756+
(memtable_limiter != nullptr && memtable_limiter->mem_tracker() != nullptr)
757+
? memtable_limiter->mem_tracker()->consumption()
758+
: 0;
759+
// Note: Memtable memory is not included in load memory statistics (MemoryProfile::load_current_usage())
760+
// for performance and memory control complexity reasons. Therefore, we explicitly add memtable memory
761+
// consumption here to ensure accurate back pressure decisions and prevent OOM during heavy loads.
762+
auto current_load_mem_value = MemoryProfile::load_current_usage() + memtable_mem;
763+
bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
764+
current_load_mem_value > _load_mem_limit ||
765+
_pending_batches_bytes > _max_pending_batches_bytes;
766+
bool need_back_pressure = !_cancelled && !_state->is_cancelled() &&
767+
_pending_batches_num > 0 && mem_limit_exceeded;
768+
if (!need_back_pressure) {
769+
break;
770+
}
758771
SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns);
759-
std::this_thread::sleep_for(std::chrono::milliseconds(10));
772+
std::this_thread::sleep_for(std::chrono::milliseconds(kBackPressureSleepMs));
760773
}
761774

762775
if (UNLIKELY(!_cur_mutable_block)) {

fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long, Set<Lon
221221
List<Replica> deadPathReplica = Lists.newArrayListWithCapacity(replicaNum);
222222
List<Replica> mayMissingVersionReplica = Lists.newArrayListWithCapacity(replicaNum);
223223
List<Replica> notCatchupReplica = Lists.newArrayListWithCapacity(replicaNum);
224+
List<Replica> userDropReplica = Lists.newArrayListWithCapacity(replicaNum);
224225

225226
for (Replica replica : replicas) {
226227
if (replica.isBad()) {
@@ -230,6 +231,10 @@ public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long, Set<Lon
230231
notCatchupReplica.add(replica);
231232
continue;
232233
}
234+
if (replica.isUserDrop()) {
235+
userDropReplica.add(replica);
236+
continue;
237+
}
233238
if (replica.getLastFailedVersion() > 0) {
234239
mayMissingVersionReplica.add(replica);
235240
continue;
@@ -253,6 +258,7 @@ public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long, Set<Lon
253258
if (allQueryableReplica.isEmpty()) {
254259
allQueryableReplica = auxiliaryReplica;
255260
}
261+
256262
if (allQueryableReplica.isEmpty()) {
257263
allQueryableReplica = deadPathReplica;
258264
}
@@ -266,6 +272,10 @@ public List<Replica> getQueryableReplicas(long visibleVersion, Map<Long, Set<Lon
266272
allQueryableReplica = notCatchupReplica;
267273
}
268274

275+
if (allQueryableReplica.isEmpty()) {
276+
allQueryableReplica = userDropReplica;
277+
}
278+
269279
if (Config.skip_compaction_slower_replica && allQueryableReplica.size() > 1) {
270280
long minVersionCount = Long.MAX_VALUE;
271281
for (Replica replica : allQueryableReplica) {

0 commit comments

Comments
 (0)