diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 5f78c7d9294a30..d69c8e06b8b9d2 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -310,7 +310,7 @@ DEFINE_Int32(task_executor_initial_max_concurrency_per_task, "-1"); // Enable task executor in internal table scan. DEFINE_Bool(enable_task_executor_in_internal_table, "false"); // Enable task executor in external table scan. -DEFINE_Bool(enable_task_executor_in_external_table, "true"); +DEFINE_Bool(enable_task_executor_in_external_table, "false"); // number of scanner thread pool size for olap table // and the min thread num of remote scanner thread pool diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index a17380c245fdcf..9b9ff1758dd34f 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -328,6 +328,7 @@ void memory_gc() { } void Daemon::memory_maintenance_thread() { + doris::enable_profile_counter_check = 0; while (!_stop_background_threads_latch.wait_for( std::chrono::milliseconds(config::memory_maintenance_sleep_time_ms))) { // step 1. Refresh process memory metrics. diff --git a/be/src/runtime/cache/result_node.h b/be/src/runtime/cache/result_node.h index 10aff6c0d04fab..261bc943a00d09 100644 --- a/be/src/runtime/cache/result_node.h +++ b/be/src/runtime/cache/result_node.h @@ -93,7 +93,7 @@ class PartitionRowBatch { private: int64_t _partition_key; PCacheValue* _cache_value = nullptr; - size_t _data_size; + int64_t _data_size; CacheStat _cache_stat; }; @@ -175,7 +175,7 @@ class ResultNode { UniqueId _sql_key; ResultNode* _prev = nullptr; ResultNode* _next = nullptr; - size_t _data_size; + int64_t _data_size; PartitionRowBatchList _partition_list; PartitionRowBatchMap _partition_map; }; diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index a9ccb0910b66c6..0c4314f8ca2763 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -50,7 +50,7 @@ namespace doris { class TRuntimeProfileNode; class TRuntimeProfileTree; class RuntimeProfileCounterTreeNode; - +inline thread_local bool enable_profile_counter_check = true; // Some macro magic to generate unique ids using __COUNTER__ #define CONCAT_IMPL(x, y) x##y #define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y) @@ -190,15 +190,56 @@ class RuntimeProfile { virtual Counter* clone() const { return new Counter(type(), value(), _level); } - virtual void update(int64_t delta) { _value.fetch_add(delta, std::memory_order_relaxed); } + virtual void update(int64_t delta) { +#ifndef NDEBUG + int64_t prev_value = _value.load(std::memory_order_seq_cst); + // Using memory_order_seq_cst to make sure no concurrency issues, it may affect + // performance. So that only in debug mode, we check the counter value. + _value.fetch_add(delta, std::memory_order_seq_cst); +#else + _value.fetch_add(delta, std::memory_order_relaxed); +#endif +#ifndef NDEBUG + if (enable_profile_counter_check) { + if (delta < 0) { + DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L) + << " delta: " << delta << " prev_value: " << prev_value; + } + } +#endif + } void bit_or(int64_t delta) { _value.fetch_or(delta, std::memory_order_relaxed); } - virtual void set(int64_t value) { _value.store(value, std::memory_order_relaxed); } + virtual void set(int64_t value) { +#ifndef NDEBUG + int64_t prev_value = _value.load(std::memory_order_seq_cst); + _value.store(value, std::memory_order_seq_cst); +#else + _value.store(value, std::memory_order_relaxed); +#endif +#ifndef NDEBUG + if (enable_profile_counter_check) { + DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L) + << " new value: " << value << " prev_value: " << prev_value; + } +#endif + } virtual void set(double value) { DCHECK_EQ(sizeof(value), sizeof(int64_t)); +#ifndef NDEBUG + int64_t prev_value = _value.load(std::memory_order_seq_cst); + _value.store(binary_cast(value), std::memory_order_seq_cst); +#else _value.store(binary_cast(value), std::memory_order_relaxed); +#endif +#ifndef NDEBUG + if (enable_profile_counter_check) { + DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L) + << " new value: " << value << " prev_value: " << prev_value; + } +#endif } virtual int64_t value() const { return _value.load(std::memory_order_relaxed); } @@ -270,6 +311,7 @@ class RuntimeProfile { if (delta > 0) { UpdateMax(current_value_); } + DCHECK_GT(current_value_.load(std::memory_order_seq_cst), -1L); } virtual void update(int64_t delta) override { add(delta); } @@ -329,8 +371,23 @@ class RuntimeProfile { } void set(int64_t v) override { +#ifndef NDEBUG + int64_t prev_value = current_value_.load(std::memory_order_seq_cst); + int64_t prev_max_value = _value.load(std::memory_order_seq_cst); + current_value_.store(v, std::memory_order_seq_cst); +#else current_value_.store(v, std::memory_order_relaxed); +#endif UpdateMax(v); +#ifndef NDEBUG + + if (enable_profile_counter_check) { + DCHECK_GT(current_value_.load(std::memory_order_seq_cst), -1L) + << " prev_value: " << prev_value; + DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L) + << " prev_max_value: " << prev_max_value << " prev_value: " << prev_value; + } +#endif } int64_t current_value() const { return current_value_.load(std::memory_order_relaxed); } diff --git a/be/src/util/stopwatch.hpp b/be/src/util/stopwatch.hpp index f0f9442bcf58c1..1c2e79888b5136 100644 --- a/be/src/util/stopwatch.hpp +++ b/be/src/util/stopwatch.hpp @@ -22,6 +22,8 @@ #include +#include + namespace doris { // Stop watch for reporting elapsed time in nanosec based on CLOCK_MONOTONIC. @@ -47,6 +49,7 @@ class CustomStopWatch { if (!_running) { clock_gettime(Clock, &_start); _running = true; + _tid = std::this_thread::get_id(); } } @@ -63,6 +66,7 @@ class CustomStopWatch { if (_running) { clock_gettime(Clock, &_start); + _tid = std::this_thread::get_id(); } return ret; @@ -76,8 +80,13 @@ class CustomStopWatch { timespec end; clock_gettime(Clock, &end); - return (end.tv_sec - _start.tv_sec) * 1000L * 1000L * 1000L + - (end.tv_nsec - _start.tv_nsec); + int64_t nsec = end.tv_nsec - _start.tv_nsec; + int64_t sec = end.tv_sec - _start.tv_sec; + if (nsec < 0) { + nsec += 1000000000L; + sec -= 1; + } + return sec * 1000000000ULL + nsec; } // Return time in microseconds @@ -94,10 +103,11 @@ class CustomStopWatch { return end.tv_sec - _start.tv_sec; } -private: +public: timespec _start; uint64_t _total_time; // in nanosec bool _running; + std::thread::id _tid; }; // Stop watch for reporting elapsed time in nanosec based on CLOCK_MONOTONIC. diff --git a/be/src/vec/common/sort/heap_sorter.h b/be/src/vec/common/sort/heap_sorter.h index a163afa379aabf..f2e0ff89feb0b7 100644 --- a/be/src/vec/common/sort/heap_sorter.h +++ b/be/src/vec/common/sort/heap_sorter.h @@ -50,7 +50,7 @@ class HeapSorter final : public Sorter { private: void _do_filter(MergeSortCursorImpl& block, size_t num_rows); - size_t _data_size = 0; + int64_t _data_size = 0; size_t _heap_size = 0; size_t _queue_row_num = 0; MergeSorterQueue _queue; diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp index 5dced63feb6507..711c58d5442c17 100644 --- a/be/src/vec/exec/scan/scanner.cpp +++ b/be/src/vec/exec/scan/scanner.cpp @@ -241,11 +241,33 @@ void Scanner::_collect_profile_before_close() { _state->update_num_rows_load_unselected(_counter.num_rows_unselected); } -void Scanner::update_scan_cpu_timer() { - int64_t cpu_time = _cpu_watch.elapsed_time(); - _scan_cpu_timer += cpu_time; +void Scanner::stop_and_update_scan_cpu_timer() { + //uint64_t cpu_time = _cpu_watch.elapsed_time(); + int64_t cpu_time = 0L; + if (!_cpu_watch._running) { + LOG(FATAL) << "cpu_watch is not running"; + } + + timespec end; + std::thread::id tid = std::this_thread::get_id(); + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &end); + int64_t nsec = end.tv_nsec - _cpu_watch._start.tv_nsec; + int64_t sec = end.tv_sec - _cpu_watch._start.tv_sec; + if (nsec < 0) { + nsec += 1000000000L; + sec -= 1; + } + cpu_time = sec * 1000000000L + nsec; + if (cpu_time < 0) { + LOG(FATAL) << "cpu_time < 0, cpu_time: " << cpu_time << " sec: " << sec << " nsec: " << nsec + << " start: " << _cpu_watch._start.tv_nsec << " end: " << end.tv_nsec + << " start_sec: " << _cpu_watch._start.tv_sec << " end_sec: " << end.tv_sec + << " start_tid: " << _cpu_watch._tid << " cur_tid: " << tid << ";"; + } + _scan_cpu_timer += (int64_t)cpu_time; if (_state && _state->get_query_ctx()) { - _state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(cpu_time); + _state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms( + (int64_t)cpu_time); } } diff --git a/be/src/vec/exec/scan/scanner.h b/be/src/vec/exec/scan/scanner.h index 26023c58b3b076..8b8f5a54a3a5c7 100644 --- a/be/src/vec/exec/scan/scanner.h +++ b/be/src/vec/exec/scan/scanner.h @@ -142,7 +142,7 @@ class Scanner { int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; } - void update_scan_cpu_timer(); + void stop_and_update_scan_cpu_timer(); // Some counters need to be updated realtime, for example, workload group policy need // scan bytes to cancel the query exceed limit. diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index b2dc4981003a03..1f23f5a367fdd6 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -163,7 +163,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, if (scanner->has_prepared()) { // Counter update need prepare successfully, or it maybe core. For example, olap scanner // will open tablet reader during prepare, if not prepare successfully, tablet reader == nullptr. - scanner->update_scan_cpu_timer(); + scanner->stop_and_update_scan_cpu_timer(); scanner->update_realtime_counters(); scanner->start_wait_worker_timer(); }