Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ void memory_gc() {
}

void Daemon::memory_maintenance_thread() {
doris::enable_profile_counter_check = 0;
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::memory_maintenance_sleep_time_ms))) {
// step 1. Refresh process memory metrics.
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/cache/result_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class PartitionRowBatch {
private:
int64_t _partition_key;
PCacheValue* _cache_value = nullptr;
size_t _data_size;
int64_t _data_size;
CacheStat _cache_stat;
};

Expand Down Expand Up @@ -175,7 +175,7 @@ class ResultNode {
UniqueId _sql_key;
ResultNode* _prev = nullptr;
ResultNode* _next = nullptr;
size_t _data_size;
int64_t _data_size;
PartitionRowBatchList _partition_list;
PartitionRowBatchMap _partition_map;
};
Expand Down
63 changes: 60 additions & 3 deletions be/src/util/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace doris {
class TRuntimeProfileNode;
class TRuntimeProfileTree;
class RuntimeProfileCounterTreeNode;

inline thread_local bool enable_profile_counter_check = true;
// Some macro magic to generate unique ids using __COUNTER__
#define CONCAT_IMPL(x, y) x##y
#define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y)
Expand Down Expand Up @@ -190,15 +190,56 @@ class RuntimeProfile {

virtual Counter* clone() const { return new Counter(type(), value(), _level); }

virtual void update(int64_t delta) { _value.fetch_add(delta, std::memory_order_relaxed); }
virtual void update(int64_t delta) {
#ifndef NDEBUG
int64_t prev_value = _value.load(std::memory_order_seq_cst);
// Using memory_order_seq_cst to make sure no concurrency issues, it may affect
// performance. So that only in debug mode, we check the counter value.
_value.fetch_add(delta, std::memory_order_seq_cst);
#else
_value.fetch_add(delta, std::memory_order_relaxed);
#endif
#ifndef NDEBUG
if (enable_profile_counter_check) {
if (delta < 0) {
DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L)
<< " delta: " << delta << " prev_value: " << prev_value;
}
}
#endif
}

void bit_or(int64_t delta) { _value.fetch_or(delta, std::memory_order_relaxed); }

virtual void set(int64_t value) { _value.store(value, std::memory_order_relaxed); }
virtual void set(int64_t value) {
#ifndef NDEBUG
int64_t prev_value = _value.load(std::memory_order_seq_cst);
_value.store(value, std::memory_order_seq_cst);
#else
_value.store(value, std::memory_order_relaxed);
#endif
#ifndef NDEBUG
if (enable_profile_counter_check) {
DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L)
<< " new value: " << value << " prev_value: " << prev_value;
}
#endif
}

virtual void set(double value) {
DCHECK_EQ(sizeof(value), sizeof(int64_t));
#ifndef NDEBUG
int64_t prev_value = _value.load(std::memory_order_seq_cst);
_value.store(binary_cast<double, int64_t>(value), std::memory_order_seq_cst);
#else
_value.store(binary_cast<double, int64_t>(value), std::memory_order_relaxed);
#endif
#ifndef NDEBUG
if (enable_profile_counter_check) {
DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L)
<< " new value: " << value << " prev_value: " << prev_value;
}
#endif
}

virtual int64_t value() const { return _value.load(std::memory_order_relaxed); }
Expand Down Expand Up @@ -270,6 +311,7 @@ class RuntimeProfile {
if (delta > 0) {
UpdateMax(current_value_);
}
DCHECK_GT(current_value_.load(std::memory_order_seq_cst), -1L);
}
virtual void update(int64_t delta) override { add(delta); }

Expand Down Expand Up @@ -329,8 +371,23 @@ class RuntimeProfile {
}

void set(int64_t v) override {
#ifndef NDEBUG
int64_t prev_value = current_value_.load(std::memory_order_seq_cst);
int64_t prev_max_value = _value.load(std::memory_order_seq_cst);
current_value_.store(v, std::memory_order_seq_cst);
#else
current_value_.store(v, std::memory_order_relaxed);
#endif
UpdateMax(v);
#ifndef NDEBUG

if (enable_profile_counter_check) {
DCHECK_GT(current_value_.load(std::memory_order_seq_cst), -1L)
<< " prev_value: " << prev_value;
DCHECK_GT(_value.load(std::memory_order_seq_cst), -1L)
<< " prev_max_value: " << prev_max_value << " prev_value: " << prev_value;
}
#endif
}

int64_t current_value() const { return current_value_.load(std::memory_order_relaxed); }
Expand Down
9 changes: 7 additions & 2 deletions be/src/util/stopwatch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,13 @@ class CustomStopWatch {

timespec end;
clock_gettime(Clock, &end);
return (end.tv_sec - _start.tv_sec) * 1000L * 1000L * 1000L +
(end.tv_nsec - _start.tv_nsec);
int64_t nsec = end.tv_nsec - _start.tv_nsec;
int64_t sec = end.tv_sec - _start.tv_sec;
if (nsec < 0) {
nsec += 1000000000L;
sec -= 1;
}
return sec * 1000000000ULL + nsec;
}

// Return time in microseconds
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/common/sort/heap_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class HeapSorter final : public Sorter {

private:
void _do_filter(MergeSortCursorImpl& block, size_t num_rows);
size_t _data_size = 0;
int64_t _data_size = 0;
size_t _heap_size = 0;
size_t _queue_row_num = 0;
MergeSorterQueue _queue;
Expand Down
13 changes: 9 additions & 4 deletions be/src/vec/exec/scan/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,16 @@ void Scanner::_collect_profile_before_close() {
_state->update_num_rows_load_unselected(_counter.num_rows_unselected);
}

void Scanner::update_scan_cpu_timer() {
int64_t cpu_time = _cpu_watch.elapsed_time();
_scan_cpu_timer += cpu_time;
void Scanner::stop_and_update_scan_cpu_timer() {
_cpu_watch.stop();
uint64_t cpu_time = _cpu_watch.elapsed_time();
if (cpu_time < 0) {
LOG(FATAL) << "cpu_time < 0, cpu_time: " << cpu_time;
}
_scan_cpu_timer += (int64_t)cpu_time;
if (_state && _state->get_query_ctx()) {
_state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(cpu_time);
_state->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
(int64_t)cpu_time);
}
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/scan/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ class Scanner {
}

void start_scan_cpu_timer() {
_cpu_watch.reset();
//_cpu_watch.reset();
_cpu_watch.start();
}

void update_wait_worker_timer() { _scanner_wait_worker_timer += _watch.elapsed_time(); }

int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; }

void update_scan_cpu_timer();
void stop_and_update_scan_cpu_timer();

// Some counters need to be updated realtime, for example, workload group policy need
// scan bytes to cancel the query exceed limit.
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
if (scanner->has_prepared()) {
// Counter update need prepare successfully, or it maybe core. For example, olap scanner
// will open tablet reader during prepare, if not prepare successfully, tablet reader == nullptr.
scanner->update_scan_cpu_timer();
scanner->stop_and_update_scan_cpu_timer();
scanner->update_realtime_counters();
scanner->start_wait_worker_timer();
}
Expand Down
Loading