diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 0d93ed5ce75ea1..061b5420dc00f5 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -30,6 +30,7 @@ #include "exec/schema_scanner/schema_backend_active_tasks.h" #include "exec/schema_scanner/schema_backend_configuration_scanner.h" #include "exec/schema_scanner/schema_backend_kerberos_ticket_cache.h" +#include "exec/schema_scanner/schema_backend_metrics_scanner.h" #include "exec/schema_scanner/schema_catalog_meta_cache_stats_scanner.h" #include "exec/schema_scanner/schema_charsets_scanner.h" #include "exec/schema_scanner/schema_cluster_snapshot_properties_scanner.h" @@ -43,6 +44,7 @@ #include "exec/schema_scanner/schema_file_cache_info_scanner.h" #include "exec/schema_scanner/schema_file_cache_statistics.h" #include "exec/schema_scanner/schema_files_scanner.h" +#include "exec/schema_scanner/schema_frontend_metrics_scanner.h" #include "exec/schema_scanner/schema_load_job_scanner.h" #include "exec/schema_scanner/schema_metadata_name_ids_scanner.h" #include "exec/schema_scanner/schema_partitions_scanner.h" @@ -262,6 +264,10 @@ std::unique_ptr SchemaScanner::create(TSchemaTableType::type type return SchemaColumnDataSizesScanner::create_unique(); case TSchemaTableType::SCH_FILE_CACHE_INFO: return SchemaFileCacheInfoScanner::create_unique(); + case TSchemaTableType::SCH_BE_METRICS: + return SchemaBackendMetricsScanner::create_unique(); + case TSchemaTableType::SCH_FE_METRICS: + return SchemaFrontendMetricsScanner::create_unique(); default: return SchemaDummyScanner::create_unique(); break; diff --git a/be/src/exec/schema_scanner/schema_backend_metrics_scanner.cpp b/be/src/exec/schema_scanner/schema_backend_metrics_scanner.cpp new file mode 100644 index 00000000000000..ab378ee73e45df --- /dev/null +++ b/be/src/exec/schema_scanner/schema_backend_metrics_scanner.cpp @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_backend_metrics_scanner.h" + +#include "runtime/exec_env.h" +#include "runtime/runtime_query_statistics_mgr.h" +#include "runtime/runtime_state.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { +#include "common/compile_check_begin.h" + +std::vector SchemaBackendMetricsScanner::_s_tbls_columns = { + // name, type, size, is_null + {"BE_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"BE_IP", TYPE_VARCHAR, sizeof(StringRef), true}, + {"METRIC_NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"METRIC_TYPE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"METRIC_VALUE", TYPE_DOUBLE, sizeof(double), true}, + {"TAG", TYPE_VARCHAR, sizeof(StringRef), true}}; + +SchemaBackendMetricsScanner::SchemaBackendMetricsScanner() + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_BE_METRICS) {} + +SchemaBackendMetricsScanner::~SchemaBackendMetricsScanner() {} + +Status SchemaBackendMetricsScanner::start(RuntimeState* state) { + _block_rows_limit = state->batch_size(); + return Status::OK(); +} + +Status SchemaBackendMetricsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { + if (!_is_init) { + return Status::InternalError("Used before initialized."); + } + + if (nullptr == block || nullptr == eos) { + return Status::InternalError("input pointer is nullptr."); + } + + if (_backend_metrics_block == nullptr) { + _backend_metrics_block = vectorized::Block::create_unique(); + for (int i = 0; i < _s_tbls_columns.size(); ++i) { + auto data_type = vectorized::DataTypeFactory::instance().create_data_type( + _s_tbls_columns[i].type, true); + _backend_metrics_block->insert(vectorized::ColumnWithTypeAndName( + data_type->create_column(), data_type, _s_tbls_columns[i].name)); + } + _backend_metrics_block->reserve(_block_rows_limit); + DorisMetrics::instance()->metric_registry()->get_be_metrics_block( + _backend_metrics_block.get()); + _total_rows = (int)_backend_metrics_block->rows(); + } + + if (_row_idx == _total_rows) { + *eos = true; + return Status::OK(); + } + + int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); + vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); + RETURN_IF_ERROR(mblock.add_rows(_backend_metrics_block.get(), _row_idx, current_batch_rows)); + _row_idx += current_batch_rows; + + *eos = _row_idx == _total_rows; + return Status::OK(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/schema_scanner/schema_backend_metrics_scanner.h b/be/src/exec/schema_scanner/schema_backend_metrics_scanner.h new file mode 100644 index 00000000000000..6a322139b9c7ea --- /dev/null +++ b/be/src/exec/schema_scanner/schema_backend_metrics_scanner.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "exec/schema_scanner.h" + +namespace doris { +class RuntimeState; +namespace vectorized { +class Block; +} // namespace vectorized + +class SchemaBackendMetricsScanner : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaBackendMetricsScanner); + +public: + SchemaBackendMetricsScanner(); + ~SchemaBackendMetricsScanner() override; + + Status start(RuntimeState* state) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; + + static std::vector _s_tbls_columns; + +private: + int _block_rows_limit = 4096; + int _row_idx = 0; + int _total_rows = 0; + std::unique_ptr _backend_metrics_block = nullptr; +}; +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/schema_scanner/schema_frontend_metrics_scanner.cpp b/be/src/exec/schema_scanner/schema_frontend_metrics_scanner.cpp new file mode 100644 index 00000000000000..79951f813ed475 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_frontend_metrics_scanner.cpp @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_frontend_metrics_scanner.h" + +#include + +#include +#include + +#include "exec/schema_scanner/schema_helper.h" +#include "runtime/define_primitive_type.h" +#include "runtime/runtime_state.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { +#include "common/compile_check_begin.h" + +std::vector SchemaFrontendMetricsScanner::_s_frontend_metric_columns = { + // name, type, size, is_null + {"FE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"METRIC_NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"METRIC_TYPE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"METRIC_VALUE", TYPE_DOUBLE, sizeof(double), true}, + {"TAG", TYPE_VARCHAR, sizeof(StringRef), true}}; + +SchemaFrontendMetricsScanner::SchemaFrontendMetricsScanner() + : SchemaScanner(_s_frontend_metric_columns, TSchemaTableType::SCH_FE_METRICS) {} + +SchemaFrontendMetricsScanner::~SchemaFrontendMetricsScanner() = default; + +Status SchemaFrontendMetricsScanner::start(RuntimeState* state) { + TFetchFeMetricsRequest request; + + for (const auto& fe_addr : _param->common_param->fe_addr_list) { + TFetchFeMetricsResult tmp_ret; + RETURN_IF_ERROR(SchemaHelper::fetch_frontend_metrics(fe_addr.hostname, fe_addr.port, + request, &tmp_ret)); + + _metrics_list_result.metrics_list.insert(_metrics_list_result.metrics_list.end(), + tmp_ret.metrics_list.begin(), + tmp_ret.metrics_list.end()); + } + + return Status::OK(); +} + +Status SchemaFrontendMetricsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { + if (!_is_init) { + return Status::InternalError("call this before initial."); + } + if (block == nullptr || eos == nullptr) { + return Status::InternalError("invalid parameter."); + } + + *eos = true; + if (_metrics_list_result.metrics_list.empty()) { + return Status::OK(); + } + + return _fill_block_impl(block); +} + +Status SchemaFrontendMetricsScanner::_fill_block_impl(vectorized::Block* block) { + SCOPED_TIMER(_fill_block_timer); + + const auto& metrics_list = _metrics_list_result.metrics_list; + size_t row_num = metrics_list.size(); + if (row_num == 0) { + return Status::OK(); + } + + for (size_t col_idx = 0; col_idx < _s_frontend_metric_columns.size(); ++col_idx) { + std::vector str_refs(row_num); + std::vector double_vals(row_num); + std::vector datas(row_num); + std::vector column_values( + row_num); // Store the strings to ensure their lifetime + + for (size_t row_idx = 0; row_idx < row_num; ++row_idx) { + const auto& row = metrics_list[row_idx]; + if (row.size() != _s_frontend_metric_columns.size()) { + return Status::InternalError( + "process list meet invalid schema, schema_size={}, input_data_size={}", + _s_frontend_metric_columns.size(), row.size()); + } + + // Fetch and store the column value based on its index + std::string& column_value = + column_values[row_idx]; // Reference to the actual string in the vector + column_value = row[col_idx]; + + if (_s_frontend_metric_columns[col_idx].type == TYPE_DOUBLE) { + try { + double val = !column_value.empty() ? std::stod(column_value) : 0; + double_vals[row_idx] = val; + } catch (const std::exception& e) { + return Status::InternalError( + "process list meet invalid data, column={}, data={}, reason={}", + _s_frontend_metric_columns[col_idx].name, column_value, e.what()); + } + datas[row_idx] = &double_vals[row_idx]; + } else { + str_refs[row_idx] = + StringRef(column_values[row_idx].data(), column_values[row_idx].size()); + datas[row_idx] = &str_refs[row_idx]; + } + } + + RETURN_IF_ERROR(fill_dest_column_for_range(block, col_idx, datas)); + } + + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/exec/schema_scanner/schema_frontend_metrics_scanner.h b/be/src/exec/schema_scanner/schema_frontend_metrics_scanner.h new file mode 100644 index 00000000000000..de67bb1b0c4210 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_frontend_metrics_scanner.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/status.h" +#include "exec/schema_scanner.h" +#include "gen_cpp/FrontendService_types.h" + +namespace doris { + +class RuntimeState; + +namespace vectorized { +class Block; +} + +class SchemaFrontendMetricsScanner : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaFrontendMetricsScanner); + +public: + SchemaFrontendMetricsScanner(); + ~SchemaFrontendMetricsScanner() override; + + Status start(RuntimeState* state) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; + + static std::vector _s_tbls_columns; + +private: + Status _fill_block_impl(vectorized::Block* block); + + TFetchFeMetricsResult _metrics_list_result; +}; + +} // namespace doris diff --git a/be/src/exec/schema_scanner/schema_helper.cpp b/be/src/exec/schema_scanner/schema_helper.cpp index 33516e01726447..567621187ed001 100644 --- a/be/src/exec/schema_scanner/schema_helper.cpp +++ b/be/src/exec/schema_scanner/schema_helper.cpp @@ -178,4 +178,13 @@ Status SchemaHelper::get_master_keys(const std::string& ip, const int32_t port, }); } +Status SchemaHelper::fetch_frontend_metrics(const std::string& ip, const int32_t port, + const TFetchFeMetricsRequest& request, + TFetchFeMetricsResult* result) { + return ThriftRpcHelper::rpc( + ip, port, [&request, &result](FrontendServiceConnection& client) { + client->fetchFrontendMetrics(*result, request); + }); +} + } // namespace doris diff --git a/be/src/exec/schema_scanner/schema_helper.h b/be/src/exec/schema_scanner/schema_helper.h index 76bc7eaea46b8d..27c48916d9f29d 100644 --- a/be/src/exec/schema_scanner/schema_helper.h +++ b/be/src/exec/schema_scanner/schema_helper.h @@ -46,6 +46,8 @@ class TShowProcessListRequest; class TShowProcessListResult; class TShowUserRequest; class TShowUserResult; +class TFetchFeMetricsRequest; +class TFetchFeMetricsResult; // this class is a helper for getting schema info from FE class SchemaHelper { @@ -106,6 +108,10 @@ class SchemaHelper { static Status get_master_keys(const std::string& ip, const int32_t port, const TGetEncryptionKeysRequest& request, TGetEncryptionKeysResult* result); + + static Status fetch_frontend_metrics(const std::string& ip, const int32_t port, + const TFetchFeMetricsRequest& request, + TFetchFeMetricsResult* result); }; } // namespace doris diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp index 6b47cb9ddc20c5..f488903f6917cc 100644 --- a/be/src/util/metrics.cpp +++ b/be/src/util/metrics.cpp @@ -25,6 +25,10 @@ #include #include "common/config.h" +#include "exec/schema_scanner/schema_scanner_helper.h" +#include "runtime/exec_env.h" +#include "service/backend_options.h" +#include "vec/core/block.h" namespace doris { @@ -422,4 +426,65 @@ std::string MetricRegistry::to_core_string() const { return ss.str(); } +void MetricRegistry::get_be_metrics_block(vectorized::Block* block) const { + int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id; + std::string be_ip = BackendOptions::get_localhost(); + + std::lock_guard l(_lock); + for (const auto& entity : _entities) { + std::lock_guard entity_lock(entity.first->_lock); + entity.first->trigger_hook_unlocked(false); + + for (const auto& metric : entity.first->_metrics) { + rj::Document tag_doc(rj::kObjectType); + rj::Document::AllocatorType& allocator = tag_doc.GetAllocator(); + + for (auto& label : metric.first->labels) { + tag_doc.AddMember(rj::Value(label.first.c_str(), allocator), + rj::Value(label.second.c_str(), allocator), allocator); + } + + for (auto& label : entity.first->_labels) { + tag_doc.AddMember(rj::Value(label.first.c_str(), allocator), + rj::Value(label.second.c_str(), allocator), allocator); + } + + rj::StringBuffer tag_buf; + rj::Writer tag_writer(tag_buf); + tag_doc.Accept(tag_writer); + std::string tag_str = tag_buf.GetString(); + + std::string metric_type; + switch (metric.first->type) { + case MetricType::COUNTER: + metric_type = "counter"; + break; + case MetricType::GAUGE: + metric_type = "gauge"; + break; + case MetricType::HISTOGRAM: + metric_type = "histogram"; + break; + default: + metric_type = "unknown"; + } + + double metric_value = 0.0; + try { + std::string value_str = metric.second->to_string(); + metric_value = std::stod(value_str); // convert string to double + } catch (...) { + } + + // BE_ID(0), BE_IP(1), METRIC_NAME(2), METRIC_TYPE(3), METRIC_VALUE(4), TAG(5) + SchemaScannerHelper::insert_int64_value(0, be_id, block); + SchemaScannerHelper::insert_string_value(1, be_ip, block); + SchemaScannerHelper::insert_string_value(2, metric.first->simple_name(), block); + SchemaScannerHelper::insert_string_value(3, metric_type, block); + SchemaScannerHelper::insert_double_value(4, metric_value, block); + SchemaScannerHelper::insert_string_value(5, tag_str, block); + } + } +} + } // namespace doris diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h index cb49884fefb60b..662db63fc0ee7b 100644 --- a/be/src/util/metrics.h +++ b/be/src/util/metrics.h @@ -34,6 +34,10 @@ namespace doris { +namespace vectorized { +class Block; +} // namespace vectorized + namespace rj = RAPIDJSON_NAMESPACE; enum class MetricType { COUNTER, GAUGE, HISTOGRAM, SUMMARY, UNTYPED }; @@ -322,6 +326,7 @@ class MetricRegistry { std::string to_prometheus(bool with_tablet_metrics = false) const; std::string to_json(bool with_tablet_metrics = false) const; std::string to_core_string() const; + void get_be_metrics_block(vectorized::Block* block) const; private: const std::string _name; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java index 02ea8a610a4eec..0fb0d982ce7ac2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java @@ -114,7 +114,9 @@ public enum SchemaTableType { SCH_COLUMN_DATA_SIZES("COLUMN_DATA_SIZES", "COLUMN_DATA_SIZES", TSchemaTableType.SCH_COLUMN_DATA_SIZES), SCH_DATABASE_PROPERTIES("DATABASE_PROPERTIES", "DATABASE_PROPERTIES", - TSchemaTableType.SCH_DATABASE_PROPERTIES); + TSchemaTableType.SCH_DATABASE_PROPERTIES), + SCH_BACKEND_METRICS("BACKEND_METRICS", "BACKEND_METRICS", TSchemaTableType.SCH_BE_METRICS), + SCH_FRONTEND_METRICS("FRONTEND_METRICS", "FRONTEND_METRICS", TSchemaTableType.SCH_FE_METRICS); private static final String dbName = "INFORMATION_SCHEMA"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 92356740c50ea0..e32bf653584f52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -814,6 +814,23 @@ public class SchemaTable extends Table { .column("MAX_RESERVED_SNAPSHOTS", ScalarType.createType(PrimitiveType.BIGINT)) .column("SNAPSHOT_INTERVAL_SECONDS", ScalarType.createType(PrimitiveType.BIGINT)) .build())) + .put("backend_metrics", + new SchemaTable(SystemIdGenerator.getNextId(), "backend_metrics", TableType.SCHEMA, + builder().column("BE_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("BE_IP", ScalarType.createVarchar(NAME_CHAR_LEN)) + .column("METRIC_NAME", ScalarType.createVarchar(NAME_CHAR_LEN)) + .column("METRIC_TYPE", ScalarType.createVarchar(NAME_CHAR_LEN)) + .column("METRIC_VALUE", ScalarType.createType(PrimitiveType.DOUBLE)) + .column("TAG", ScalarType.createVarchar(NAME_CHAR_LEN)) + .build())) + .put("frontend_metrics", + new SchemaTable(SystemIdGenerator.getNextId(), "frontend_metrics", TableType.SCHEMA, + builder().column("FE", ScalarType.createVarchar(NAME_CHAR_LEN)) + .column("METRIC_NAME", ScalarType.createVarchar(NAME_CHAR_LEN)) + .column("METRIC_TYPE", ScalarType.createVarchar(NAME_CHAR_LEN)) + .column("METRIC_VALUE", ScalarType.createType(PrimitiveType.DOUBLE)) + .column("TAG", ScalarType.createVarchar(NAME_CHAR_LEN)) + .build(), true)) .build(); private boolean fetchAllFe = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/ListMetricVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/metric/ListMetricVisitor.java new file mode 100644 index 00000000000000..215f788b28494b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/ListMetricVisitor.java @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.metric; + +import org.apache.doris.catalog.Env; +import org.apache.doris.monitor.jvm.JvmStats; +import org.apache.doris.monitor.jvm.JvmStats.GarbageCollector; +import org.apache.doris.monitor.jvm.JvmStats.MemoryPool; +import org.apache.doris.monitor.jvm.JvmStats.Threads; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Snapshot; +import com.google.common.base.Joiner; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/* + * Like this: + * [ + * ['FE', 'METRIC_NAME', 'METRIC_TYPE', 'METRIC_VALUE', 'TAG'], + * [] + * ] + */ +public class ListMetricVisitor extends MetricVisitor { + private static final Logger logger = LogManager.getLogger(ListMetricVisitor.class); + + private List> metricsList; + private String localHostAddr; + private StringBuilder strConcat; + + // jvm + private static final String JVM_HEAP_SIZE_BYTES = "jvm_heap_size_bytes"; + private static final String JVM_NON_HEAP_SIZE_BYTES = "jvm_non_heap_size_bytes"; + private static final String JVM_YOUNG_SIZE_BYTES = "jvm_young_size_bytes"; + private static final String JVM_OLD_SIZE_BYTES = "jvm_old_size_bytes"; + private static final String JVM_THREAD = "jvm_thread"; + + private static final String JVM_GC = "jvm_gc"; + private static final String TYPE_GAUGE = "gauge"; + private static final String TYPE_COUNTER = "counter"; + private static final String TYPE_HISTOGRAM = "histogram"; + + public ListMetricVisitor(List> metricsList, String localHostAddr) { + super(); + // List[ List[FE, METRIC_NAME, METRIC_TYPE, METRIC_VALUE, TAG] ] + this.metricsList = metricsList; + this.localHostAddr = localHostAddr; + this.strConcat = new StringBuilder(); + } + + @Override + public void visitJvm(JvmStats jvmStats) { + // heap + this.metricsList.add(newGaugeSubList(JVM_HEAP_SIZE_BYTES, + jvmStats.getMem().getHeapMax().getBytes(), "{\"type\"=\"max\"}")); + this.metricsList.add(newGaugeSubList(JVM_HEAP_SIZE_BYTES, + jvmStats.getMem().getHeapCommitted().getBytes(), "{\"type\"=\"committed\"}")); + this.metricsList.add(newGaugeSubList(JVM_HEAP_SIZE_BYTES, + jvmStats.getMem().getHeapUsed().getBytes(), "{\"type\"=\"used\"}")); + + // non heap + this.metricsList.add(newGaugeSubList(JVM_NON_HEAP_SIZE_BYTES, + jvmStats.getMem().getNonHeapCommitted().getBytes(), "{\"type\"=\"committed\"}")); + this.metricsList.add(newGaugeSubList(JVM_NON_HEAP_SIZE_BYTES, + jvmStats.getMem().getNonHeapUsed().getBytes(), "{\"type\"=\"used\"}")); + + // mem pool + Iterator memIter = jvmStats.getMem().iterator(); + while (memIter.hasNext()) { + MemoryPool memPool = memIter.next(); + if (memPool.getName().equalsIgnoreCase("young")) { + this.metricsList.add(newGaugeSubList(JVM_YOUNG_SIZE_BYTES, + memPool.getUsed().getBytes(), "{\"type\"=\"used\"}")); + this.metricsList.add(newGaugeSubList(JVM_YOUNG_SIZE_BYTES, + memPool.getPeakUsed().getBytes(), "{\"type\"=\"peak_used\"}")); + this.metricsList.add(newGaugeSubList(JVM_YOUNG_SIZE_BYTES, + memPool.getMax().getBytes(), "{\"type\"=\"max\"}")); + } else if (memPool.getName().equalsIgnoreCase("old")) { + this.metricsList.add(newGaugeSubList(JVM_OLD_SIZE_BYTES, + memPool.getUsed().getBytes(), "{\"type\"=\"used\"}")); + this.metricsList.add(newGaugeSubList(JVM_OLD_SIZE_BYTES, + memPool.getPeakUsed().getBytes(), "{\"type\"=\"peak_used\"}")); + this.metricsList.add(newGaugeSubList(JVM_OLD_SIZE_BYTES, + memPool.getMax().getBytes(), "{\"type\"=\"max\"}")); + } + } + + // gc + for (GarbageCollector gc : jvmStats.getGc()) { + this.metricsList.add(newGaugeSubList(JVM_GC, + gc.getCollectionCount(), "{\"name\"=\"" + gc.getName() + "\",\"type\"=\"count\"}")); + this.metricsList.add(newGaugeSubList(JVM_GC, + gc.getCollectionTime().getMillis(), "{\"name\"=\"" + gc.getName() + "\",\"type\"=\"time\"}")); + } + + // threads + Threads threads = jvmStats.getThreads(); + this.metricsList.add(newGaugeSubList(JVM_THREAD, + threads.getCount(), "{\"type\"=\"count\"}")); + this.metricsList.add(newGaugeSubList(JVM_THREAD, + threads.getPeakCount(), "{\"type\"=\"peak_count\"}")); + this.metricsList.add(newGaugeSubList(JVM_THREAD, + threads.getThreadsNewCount(), "{\"type\"=\"new_count\"}")); + this.metricsList.add(newGaugeSubList(JVM_THREAD, + threads.getThreadsRunnableCount(), "{\"type\"=\"runnable_count\"}")); + this.metricsList.add(newGaugeSubList(JVM_THREAD, + threads.getThreadsBlockedCount(), "{\"type\"=\"blocked_count\"}")); + this.metricsList.add(newGaugeSubList(JVM_THREAD, + threads.getThreadsWaitingCount(), "{\"type\"=\"waiting_count\"}")); + this.metricsList.add(newGaugeSubList(JVM_THREAD, + threads.getThreadsTimedWaitingCount(), "{\"type\"=\"timed_waiting_count\"}")); + this.metricsList.add(newGaugeSubList(JVM_THREAD, + threads.getThreadsTerminatedCount(), "{\"type\"=\"terminated_count\"}")); + } + + @Override + public void visit(String prefix, @SuppressWarnings("rawtypes") Metric metric) { + // title + final String fullName = prefix + metric.getName(); + // name + strConcat.setLength(0); + List labels = metric.getLabels(); + if (!labels.isEmpty()) { + strConcat.append("{"); + List labelStrs = labels.stream().map(l -> "\"" + l.getKey() + "\"" + "=\"" + l.getValue() + + "\"").collect(Collectors.toList()); + strConcat.append(Joiner.on(", ").join(labelStrs)); + strConcat.append("}"); + } + // value + List metricStr = + newSubListByType(fullName, metric.getValue().toString(), strConcat.toString(), metric.getType()); + if (metricStr != null) { + this.metricsList.add(metricStr); + } + } + + @Override + public void visitHistogram(String prefix, String name, Histogram histogram) { + // part.part.part.k1=v1.k2=v2 + List names = new ArrayList<>(); + List tags = new ArrayList<>(); + for (String part : name.split("\\.")) { + String[] kv = part.split("="); + if (kv.length == 1) { + names.add(kv[0]); + } else if (kv.length == 2) { + tags.add(String.format("\"%s\"=\"%s\"", kv[0], kv[1])); + } + } + final String fullName = prefix + String.join("_", names); + final String fullTag = String.join(",", tags); + String delimiter = tags.isEmpty() ? "" : ","; + Snapshot snapshot = histogram.getSnapshot(); + + this.metricsList.add(newHistogramSubList(fullName, + snapshot.get75thPercentile(), "{\"quantile\"=\"0.75\"" + delimiter + fullTag + "}")); + this.metricsList.add(newHistogramSubList(fullName, + snapshot.get95thPercentile(), "{\"quantile\"=\"0.95\"" + delimiter + fullTag + "}")); + this.metricsList.add(newHistogramSubList(fullName, + snapshot.get98thPercentile(), "{\"quantile\"=\"0.98\"" + delimiter + fullTag + "}")); + this.metricsList.add(newHistogramSubList(fullName, + snapshot.get99thPercentile(), "{\"quantile\"=\"0.99\"" + delimiter + fullTag + "}")); + this.metricsList.add(newHistogramSubList(fullName, + snapshot.get999thPercentile(), "{\"quantile\"=\"0.999\"" + delimiter + fullTag + "}")); + this.metricsList.add(newHistogramSubList(fullName + "_sum", + histogram.getCount() * snapshot.getMean(), + "{\"quantile\"=\"0.75\"" + delimiter + fullTag + "}")); + this.metricsList.add(newHistogramSubList(fullName + "_count", + histogram.getCount(), "{\"quantile\"=\"0.75\"" + delimiter + fullTag + "}")); + } + + @Override + public void visitNodeInfo() { + final String NODE_INFO = "node_info"; + + this.metricsList.add(newGaugeSubList(NODE_INFO, Env.getCurrentEnv().getFrontends(null).size(), + "{\"type\"=\"fe_node_num\", \"state\"=\"total\"}")); + this.metricsList.add(newGaugeSubList(NODE_INFO, Env.getCurrentSystemInfo().getAllBackendIds(false).size(), + "{\"type\"=\"be_node_num\", \"state\"=\"total\"}")); + this.metricsList.add(newGaugeSubList(NODE_INFO, Env.getCurrentSystemInfo().getAllBackendIds(true).size(), + "{\"type\"=\"be_node_num\", \"state\"=\"alive\"}")); + this.metricsList.add(newGaugeSubList(NODE_INFO, Env.getCurrentSystemInfo().getDecommissionedBackendIds().size(), + "{\"type\"=\"be_node_num\", \"state\"=\"decommissioned\"}")); + this.metricsList.add(newGaugeSubList(NODE_INFO, + Env.getCurrentEnv().getBrokerMgr().getAllBrokers().stream().filter(b -> !b.isAlive).count(), + "{\"type\"=\"broker_node_num\", \"state\"=\"dead\"}")); + + // only master FE has this metrics + if (Env.getCurrentEnv().isMaster()) { + this.metricsList.add(newGaugeSubList(NODE_INFO, 1, + "{\"type\"=\"is_master\"}")); + } + } + + @Override + public void visitCloudTableStats() { + } + + @Override + public void visitWorkloadGroup() { + try { + String counterTitle = "doris_workload_group_query_detail"; + Map> workloadGroupMap = Env.getCurrentEnv().getWorkloadGroupMgr() + .getWorkloadGroupQueryDetail(); + for (Map.Entry> entry : workloadGroupMap.entrySet()) { + String name = entry.getKey(); + List valList = entry.getValue(); + this.metricsList.add(newCounterSubList(counterTitle, valList.get(0), + String.format("{\"name\"=\"%s\", \"type\"=\"%s\"}", name, "running_query_num"))); + this.metricsList.add(newCounterSubList(counterTitle, valList.get(1), + String.format("{\"name\"=\"%s\", \"type\"=\"%s\"}", name, "waiting_query_num"))); + } + } catch (Exception e) { + logger.warn("error happens when get workload group query detail ", e); + } + } + + private List newGaugeSubList(String name, Object value, String tag) { + return newSubList(name, TYPE_GAUGE, String.valueOf(value), tag); + } + + private List newCounterSubList(String name, Object value, String tag) { + return newSubList(name, TYPE_COUNTER, String.valueOf(value), tag); + } + + private List newHistogramSubList(String name, Object value, String tag) { + return newSubList(name, TYPE_HISTOGRAM, String.valueOf(value), tag); + } + + private List newSubListByType(String name, Object value, String tag, Metric.MetricType metricType) { + if (Metric.MetricType.GAUGE.equals(metricType)) { + return newGaugeSubList(name, value, tag); + } else if (Metric.MetricType.COUNTER.equals(metricType)) { + return newCounterSubList(name, value, tag); + } else { + return null; + } + } + + private List newSubList(String name, String type, String value, String tag) { + // List[FE, METRIC_NAME, METRIC_TYPE, METRIC_VALUE, TAG] + List subList = new ArrayList<>(Collections.nCopies(5, "")); + subList.set(0, this.localHostAddr); + subList.set(1, name); + subList.set(2, type); + subList.set(3, value); + subList.set(4, tag); + return subList; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java index f7610480efa587..9a9310c6df09de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java @@ -75,6 +75,7 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode { BACKEND_TABLE.add("backend_tablets"); BACKEND_TABLE.add("backend_configuration"); + BACKEND_TABLE.add("backend_metrics"); } public static boolean isBackendPartitionedSchemaTable(String tableName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 1720a27001a377..2e59ca93c45e63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -93,6 +93,8 @@ import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.master.MasterImpl; import org.apache.doris.meta.MetaContext; +import org.apache.doris.metric.ListMetricVisitor; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.trees.plans.PlanNodeAndHash; @@ -151,6 +153,9 @@ import org.apache.doris.thrift.TDescribeTablesResult; import org.apache.doris.thrift.TEncryptionAlgorithm; import org.apache.doris.thrift.TEncryptionKey; +import org.apache.doris.thrift.TFeResult; +import org.apache.doris.thrift.TFetchFeMetricsRequest; +import org.apache.doris.thrift.TFetchFeMetricsResult; import org.apache.doris.thrift.TFetchLoadJobRequest; import org.apache.doris.thrift.TFetchLoadJobResult; import org.apache.doris.thrift.TFetchResourceResult; @@ -307,6 +312,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -4589,6 +4595,15 @@ public TGetOlapTableMetaResult getOlapTableMeta(TGetOlapTableMetaRequest request } } + @Override + public TFetchFeMetricsResult fetchFrontendMetrics(TFetchFeMetricsRequest request) throws TException { + TFetchFeMetricsResult result = new TFetchFeMetricsResult(); + List> metricsList = new LinkedList<>(); + MetricRepo.getMetric(new ListMetricVisitor(metricsList, FrontendOptions.getLocalHostAddress())); + result.setMetricsList(metricsList); + return result; + } + private TStatus checkMaster() { TStatus status = new TStatus(TStatusCode.OK); if (!Env.getCurrentEnv().isMaster()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/metric/MetricsTest.java b/fe/fe-core/src/test/java/org/apache/doris/metric/MetricsTest.java index 48857bd6d4c34b..27ceab6004910e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/metric/MetricsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/metric/MetricsTest.java @@ -32,6 +32,7 @@ import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.SortedMap; @@ -69,8 +70,8 @@ public void testTcpMetrics() { @Test public void testUserQueryMetrics() { - MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd("test_user").increase(1L); - MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd("test_user").increase(1L); + MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd("test_user").update(1L); + MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd("test_user").update(1L); MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd("test_user").update(10L); MetricVisitor visitor = new PrometheusMetricVisitor(); MetricRepo.DORIS_METRIC_REGISTER.accept(visitor); @@ -270,4 +271,48 @@ public void testPlanMetrics() { Assert.assertTrue(metricResult.contains("# TYPE doris_fe_plan_cloud_meta_duration_ms summary")); Assert.assertTrue(metricResult.contains("# TYPE doris_fe_plan_materialized_view_rewrite_duration_ms summary")); } + + @Test + public void testListQueryMetrics() { + MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd("test_user").update(1L); + MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd("test_user").update(1L); + MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd("test_user").update(10L); + List> metricsList = new LinkedList<>(); + MetricVisitor visitor = new ListMetricVisitor(metricsList, "127.0.0.1"); + MetricRepo.DORIS_METRIC_REGISTER.accept(visitor); + SortedMap histograms = MetricRepo.METRIC_REGISTER.getHistograms(); + for (Map.Entry entry : histograms.entrySet()) { + visitor.visitHistogram(MetricVisitor.FE_PREFIX, entry.getKey(), entry.getValue()); + } + + boolean isTotal = false; + boolean isErr = false; + boolean isLatencyAll = false; + boolean isLatencyUser = false; + for (List metric : metricsList) { + String name = metric.get(1); + String type = metric.get(2); + String value = metric.get(3); + String tag = metric.get(4); + if (!isTotal && "doris_fe_query_total".equals(name)) { + isTotal = "counter".equals(type) && "1".equals(value) && tag.contains("test_user"); + } else if (!isErr && "doris_fe_query_err".equals(name)) { + isErr = "counter".equals(type) && "1".equals(value) && tag.contains("test_user"); + } else if ("doris_fe_query_latency_ms".equals(name)) { + if (tag.contains("quantile") && "histogram".equals(type)) { + if (tag.contains("test_user")) { + if (!isLatencyUser) { + isLatencyUser = "10.0".equals(value); + } + } else if (!isLatencyAll) { + isLatencyAll = "0.0".equals(value); + } + } + } + } + Assert.assertTrue(isTotal); + Assert.assertTrue(isErr); + Assert.assertTrue(isLatencyAll); + Assert.assertTrue(isLatencyUser); + } } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 4f5f10bbc04a3c..39c615224b73cf 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -210,6 +210,8 @@ enum TSchemaTableType { SCH_LOAD_JOBS = 64; SCH_FILE_CACHE_INFO = 65; SCH_DATABASE_PROPERTIES = 66; + SCH_BE_METRICS = 67; + SCH_FE_METRICS = 68; } enum THdfsCompression { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 863a38610f90a8..cc99ab7568d24a 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1653,6 +1653,13 @@ struct TGetOlapTableMetaResult { 4: optional list removed_partitions } +struct TFetchFeMetricsRequest { +} + +struct TFetchFeMetricsResult { + 1: optional list> metrics_list +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1756,4 +1763,6 @@ service FrontendService { TGetTableTDEInfoResult getTableTDEInfo(1: TGetTableTDEInfoRequest request) TGetOlapTableMetaResult getOlapTableMeta(1: TGetOlapTableMetaRequest request) + + TFetchFeMetricsResult fetchFrontendMetrics(1: TFetchFeMetricsRequest request) }