|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +""" |
| 3 | +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. |
| 4 | +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. |
| 5 | +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. |
| 6 | +You may obtain a copy of the License at https://opensource.org/licenses/MIT |
| 7 | +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
| 8 | +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
| 9 | +specific language governing permissions and limitations under the License. |
| 10 | +""" |
| 11 | +from collections import defaultdict |
| 12 | +from typing import Any, Dict, List |
| 13 | + |
| 14 | +from backend.components import CCApi |
| 15 | +from backend.db_meta.models import AppCache, ProxyInstance, StorageInstance |
| 16 | +from backend.db_services.dbbase.constants import TCP_ESTABLISHED_CODE, TCP_LISTEN_CODE |
| 17 | +from backend.db_services.ipchooser.handlers.topo_handler import TopoHandler |
| 18 | +from backend.utils.batch_request import batch_request |
| 19 | + |
| 20 | + |
| 21 | +def parse_proc_net_tcp(proc_net_tcp_content: str): |
| 22 | + """ |
| 23 | + 解析proc/net/tcp文件 |
| 24 | + 参考:https://guanjunjian.github.io/2017/11/09/study-8-proc-net-tcp-analysis/ |
| 25 | + 返回聚合后的端口与tpc信息的映射 |
| 26 | + """ |
| 27 | + |
| 28 | + def parse_hex_addr(hex_ip_port): |
| 29 | + hex_ip, hex_port = hex_ip_port.split(":") |
| 30 | + if len(hex_ip) != 8: |
| 31 | + return "", 0 |
| 32 | + # 将小端序转为大端序,解析16进制 |
| 33 | + ip = f"{int(hex_ip[6:8], 16)}.{int(hex_ip[4:6], 16)}.{int(hex_ip[2:4], 16)}.{int(hex_ip[:2], 16)}" |
| 34 | + port = int(hex_port, 16) |
| 35 | + return ip, port |
| 36 | + |
| 37 | + proc_net_tcp_lines = proc_net_tcp_content.strip("\n").split("\n") |
| 38 | + # 如果第一行的头不为sl,说明解析错误 |
| 39 | + if not proc_net_tcp_lines or proc_net_tcp_lines[0].split()[0] != "sl": |
| 40 | + return {}, False |
| 41 | + |
| 42 | + # 解析每行的tcp字符 |
| 43 | + net_tcp_list: List = [] |
| 44 | + for line in proc_net_tcp_lines[1:]: |
| 45 | + line = line.split() |
| 46 | + local_host, local_port = parse_hex_addr(line[1]) |
| 47 | + remote_host, remote_port = parse_hex_addr(line[2]) |
| 48 | + sl, st = line[0], int(line[3], 16) |
| 49 | + tcp_info = { |
| 50 | + "sl": sl, |
| 51 | + "st": st, |
| 52 | + "established": st == TCP_ESTABLISHED_CODE, |
| 53 | + "local_host": local_host, |
| 54 | + "local_port": local_port, |
| 55 | + "remote_host": remote_host, |
| 56 | + "remote_port": remote_port, |
| 57 | + } |
| 58 | + net_tcp_list.append(tcp_info) |
| 59 | + |
| 60 | + # 遍历net_tcp_list,找到所有监听端口的tcp链接 |
| 61 | + # 1. 跳过 local_ip == "127.0.0.1"的情况 |
| 62 | + # 2. 如果是监听端口,记录本机的IP到 localIpList,并生成一个report[local_port] = [] |
| 63 | + local_ip_list: List[str] = [] |
| 64 | + tcp_report: Dict[int, List] = {} |
| 65 | + for tcp_info in net_tcp_list: |
| 66 | + if tcp_info["local_host"] == "127.0.0.1": |
| 67 | + continue |
| 68 | + if tcp_info["st"] == TCP_LISTEN_CODE: |
| 69 | + local_ip_list.append(tcp_info["local_host"]) |
| 70 | + tcp_report[tcp_info["local_port"]] = [] |
| 71 | + |
| 72 | + # 再次遍历net_tcp_list,将tcp信息聚合到report中 |
| 73 | + # 跳过LocalHost == "127.0.0.1"的情况, 跳过remote_ip in local_ip_list的情况 |
| 74 | + for tcp_info in net_tcp_list: |
| 75 | + if tcp_info["local_host"] == "127.0.0.1" or tcp_info["remote_host"] in local_ip_list: |
| 76 | + continue |
| 77 | + if tcp_info["local_port"] not in tcp_report: |
| 78 | + continue |
| 79 | + tcp_report[tcp_info["local_port"]].append(tcp_info) |
| 80 | + return tcp_report, True |
| 81 | + |
| 82 | + |
| 83 | +def format_cc_info(bk_cloud_id: int, remote_ip__report_map: dict): |
| 84 | + if not remote_ip__report_map: |
| 85 | + return {} |
| 86 | + |
| 87 | + # 查询remote cc信息 |
| 88 | + remote_ips = list(remote_ip__report_map.keys()) |
| 89 | + params = { |
| 90 | + "fields": ["bk_host_id", "bk_host_innerip", "operator", "bk_bak_operator"], |
| 91 | + "host_property_filter": { |
| 92 | + "condition": "AND", |
| 93 | + "rules": [ |
| 94 | + {"field": "bk_host_innerip", "operator": "in", "value": remote_ips}, |
| 95 | + {"field": "bk_cloud_id", "operator": "equal", "value": bk_cloud_id}, |
| 96 | + ], |
| 97 | + }, |
| 98 | + } |
| 99 | + hosts = batch_request(CCApi.list_hosts_without_biz, params, get_data=lambda x: x["info"], use_admin=True) |
| 100 | + |
| 101 | + if not hosts: |
| 102 | + return {} |
| 103 | + |
| 104 | + cc_map = defaultdict(dict) |
| 105 | + # 补充主备负责人信息 |
| 106 | + for host in hosts: |
| 107 | + cc_map[host["bk_host_innerip"]].update(operator=host["operator"], bak_operator=host["bk_bak_operator"]) |
| 108 | + |
| 109 | + # 查询主机与业务的映射关系,这里要分批请求 |
| 110 | + remote_host_ids = [host["bk_host_id"] for host in hosts] |
| 111 | + resp = CCApi.batch_find_host_biz_relations({"bk_host_id": remote_host_ids}) |
| 112 | + biz__host_ids = defaultdict(list) |
| 113 | + for host in resp: |
| 114 | + biz__host_ids[host["bk_biz_id"]].append(host["bk_host_id"]) |
| 115 | + |
| 116 | + # 查询主机的业务模块信息 |
| 117 | + app_dict = AppCache.get_appcache("appcache_dict") |
| 118 | + for bk_biz_id, bk_host_ids in biz__host_ids.items(): |
| 119 | + app = app_dict.get(str(bk_biz_id), "unknown") |
| 120 | + # 分批请求cc的拓扑信息 |
| 121 | + batch = 500 |
| 122 | + for i in range(0, len(bk_host_ids), batch): |
| 123 | + filter_conditions = {"bk_host_id": bk_host_ids[i : i + batch]} |
| 124 | + topos = TopoHandler.query_host_topo_infos(int(bk_biz_id), filter_conditions, 0, batch) |
| 125 | + for topo in topos["hosts_topo_info"]: |
| 126 | + topo["topo"] = [f"{app['bk_biz_name']}/{info}" for info in topo["topo"]] |
| 127 | + cc_map[topo["ip"]].update(topo=topo["topo"]) |
| 128 | + |
| 129 | + return cc_map |
| 130 | + |
| 131 | + |
| 132 | +def generate_cluster_query_report( |
| 133 | + job_log_resp, |
| 134 | + cluster_domain, |
| 135 | + cluster_all_ip: List, |
| 136 | +): |
| 137 | + bk_cloud_id = job_log_resp[0]["bk_cloud_id"] |
| 138 | + # 获取主机和tcp解析信息的映射,并收集错误主机 |
| 139 | + host_id__tcp_info_map: Dict[int, Dict] = {} |
| 140 | + host_id__ip_map: Dict[int, str] = {} |
| 141 | + success_hosts: List = [] |
| 142 | + err_hosts: List = [] |
| 143 | + |
| 144 | + for info in job_log_resp: |
| 145 | + try: |
| 146 | + parse_info, is_success = parse_proc_net_tcp(info["log_content"]) |
| 147 | + except (Exception, IndexError): |
| 148 | + parse_info, is_success = {}, False |
| 149 | + host_id__tcp_info_map[info["host_id"]] = parse_info |
| 150 | + host_id__ip_map[info["host_id"]] = info["ip"] |
| 151 | + |
| 152 | + if not is_success: |
| 153 | + err_hosts.append(info["host_id"]) |
| 154 | + else: |
| 155 | + success_hosts.append(info["host_id"]) |
| 156 | + |
| 157 | + # 获取执行主机关联的实例信息 |
| 158 | + fields = ["cluster__immute_domain", "machine__bk_host_id", "machine__ip", "port"] |
| 159 | + host_ids = host_id__tcp_info_map.keys() |
| 160 | + instances = ( |
| 161 | + StorageInstance.objects.filter(machine__bk_host_id__in=host_ids) |
| 162 | + .values(*fields) |
| 163 | + .union(ProxyInstance.objects.filter(machine__bk_host_id__in=host_ids).values(*fields)) |
| 164 | + ) |
| 165 | + |
| 166 | + # 生成汇总报告,按照remote ip + domain进行聚合 |
| 167 | + host_id__domain_map: Dict[int, str] = {} |
| 168 | + remote_ip__report_map: Dict[str, Dict[str, Any]] = defaultdict( |
| 169 | + lambda: defaultdict(lambda: {"all_connections": 0, "establish": 0}) |
| 170 | + ) |
| 171 | + for inst in instances: |
| 172 | + host_id, port, domain = inst["machine__bk_host_id"], inst["port"], inst["cluster__immute_domain"] |
| 173 | + host_id__domain_map[host_id] = domain |
| 174 | + tcp_infos = host_id__tcp_info_map.get(host_id, {}).get(port, []) |
| 175 | + # 统计总连接数、建立连接数、连接集群 |
| 176 | + for tcp_info in tcp_infos: |
| 177 | + ip = tcp_info["remote_host"] |
| 178 | + # 跳过集群内部IP |
| 179 | + if ip in cluster_all_ip: |
| 180 | + continue |
| 181 | + remote_ip__report_map[ip][domain]["all_connections"] += 1 |
| 182 | + remote_ip__report_map[ip][domain]["establish"] += tcp_info["st"] == TCP_ESTABLISHED_CODE # 生成tcp链接报告 |
| 183 | + cluster_domain__tcp_report: Dict[str, Any] = defaultdict(lambda: {"success": [], "error": [], "report": []}) |
| 184 | + cc_map = format_cc_info(bk_cloud_id, remote_ip__report_map) |
| 185 | + for remote_ip, domain_map in remote_ip__report_map.items(): |
| 186 | + for domain, tcp_info in domain_map.items(): |
| 187 | + # 主从多实例,可能会统计到其他实例的信息 |
| 188 | + if domain == cluster_domain: |
| 189 | + data = {"remote_ip": remote_ip, "cluster_domain": domain, **tcp_info, **cc_map.get(remote_ip, {})} |
| 190 | + cluster_domain__tcp_report[domain]["report"].append(data) |
| 191 | + |
| 192 | + # 统计集群正确和错误主机信息 |
| 193 | + for host in err_hosts: |
| 194 | + domain = host_id__domain_map[host] |
| 195 | + cluster_domain__tcp_report[domain]["error"].append(host_id__ip_map[host]) |
| 196 | + for host in success_hosts: |
| 197 | + domain = host_id__domain_map[host] |
| 198 | + cluster_domain__tcp_report[domain]["success"].append(host_id__ip_map[host]) |
| 199 | + |
| 200 | + # 补充连接数为0的集群报告 |
| 201 | + tcp_report = [{"cluster_domain": domain, **report} for domain, report in cluster_domain__tcp_report.items()] |
| 202 | + return tcp_report |
0 commit comments