-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbackup_network_devices.py
More file actions
210 lines (179 loc) · 6.68 KB
/
backup_network_devices.py
File metadata and controls
210 lines (179 loc) · 6.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import paramiko
import os
import json
from datetime import datetime, timedelta
import time
import re
import logging
import urllib.request
import urllib.parse
from concurrent.futures import ThreadPoolExecutor, as_completed
# 设置日志
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 备份存储路径(支持环境变量)
BACKUP_DIR = os.environ.get("BACKUP_DIR", "/app/backups")
RETENTION_DAYS = 90 # 保存 3 个月
MAX_WORKERS = 10 # 最大并发线程数
# 确保备份目录存在
if not os.path.exists(BACKUP_DIR):
os.makedirs(BACKUP_DIR, exist_ok=True)
# 支持 devices.json 中的 // 注释
_def_comment_pattern = re.compile(r"(^|\s)//.*$")
def _load_devices_with_comments(path: str):
with open(path, "r", encoding="utf-8") as f:
lines = []
for raw in f.readlines():
line = raw.rstrip("\n")
# 简单移除行尾 // 注释(不处理字符串内的 // 场景,建议生产改用 JSON5)
line = _def_comment_pattern.sub("", line)
if line.strip():
lines.append(line)
text = "\n".join(lines)
return json.loads(text)
# 钉钉Webhook URL
dingtalk_webhook = "https://oapi.dingtalk.com/robot/send?access_token=33b46719c52b41ca4e50f9f20db89af7d6342d6e2e570e7d65b77ff5a018c5f9"
# 用户手机号
user_mobile = "15320488098"
# 读取设备列表
DEVICE_FILE = os.environ.get("DEVICE_FILE", "devices.json")
try:
devices = _load_devices_with_comments(DEVICE_FILE)["devices"]
logger.info(f"Loaded {len(devices)} devices from {DEVICE_FILE}")
except Exception as e:
logger.error(f"Load {DEVICE_FILE} failed: {e}")
devices = []
def backup_device(device):
try:
logger.info(f"Starting backup for {device['hostname']} ({device['ip']})")
# 建立 SSH 连接
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 增加连接超时时间到60秒
ssh.connect(device["ip"], username=device["username"], password=device["password"], timeout=60)
# 执行华为设备的备份命令
if device["device_type"] == "huawei":
logger.debug(f"Executing 'display saved-configuration' on {device['hostname']} with pagination handling")
# 使用交互式shell处理华为设备的分页问题
shell = ssh.invoke_shell()
time.sleep(2)
# 清空初始输出
output = ""
while shell.recv_ready():
output += shell.recv(65535).decode('utf-8')
# 禁用分页显示
logger.debug("Disabling pagination...")
shell.send('screen-length 0 temporary\n')
time.sleep(1)
# 清空输出
output = ""
while shell.recv_ready():
output += shell.recv(65535).decode('utf-8')
# 执行显示配置命令
logger.debug("Sending command: display saved-configuration")
shell.send('display saved-configuration\n')
# 读取完整输出
output = ""
start_time = time.time()
while time.time() - start_time < 180: # 最多等待180秒
if shell.recv_ready():
chunk = shell.recv(65535).decode('utf-8')
output += chunk
# 重置计时器
start_time = time.time()
time.sleep(0.5)
# 检查是否完成
if not shell.recv_ready() and ('>' in output[-10:] or '#' in output[-10:]):
break
config = output
else:
logger.warning(f"Unsupported device type for {device['hostname']}")
return
# 读取配置
if not config.strip():
logger.warning(f"Empty configuration received from {device['hostname']}")
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = os.path.join(BACKUP_DIR, f"{device['hostname']}_{timestamp}.cfg")
# 保存配置到文件
with open(backup_file, "w", encoding="utf-8") as f:
f.write(config)
logger.info(f"Backup successful for {device['hostname']} at {backup_file}")
ssh.close()
except Exception as e:
logger.error(f"Backup failed for {device['hostname']} ({device['ip']}): {str(e)}")
raise # 重新抛出异常以便主线程捕获
def send_dingtalk_message(success_count, failure_count, failed_devices):
"""发送钉钉告警消息"""
try:
# 构建消息内容,包含关键词"日志"
message = f"德阳基地设备备份日志\n成功: {success_count}台\n失败: {failure_count}台"
# 如果有失败的设备,添加详细信息
if failed_devices:
message += "\n\n失败设备列表:"
for device in failed_devices:
message += f"\n- {device['hostname']} ({device['ip']})"
# 构建钉钉消息格式
data = {
"msgtype": "text",
"text": {
"content": message
},
"at": {
"atMobiles": [user_mobile],
"isAtAll": False
}
}
# 发送钉钉消息
json_data = json.dumps(data).encode('utf-8')
req = urllib.request.Request(dingtalk_webhook, data=json_data, headers={'Content-Type': 'application/json'})
response = urllib.request.urlopen(req)
result = response.read().decode('utf-8')
result_data = json.loads(result)
if result_data.get('errcode') == 0:
logger.info("钉钉告警消息发送成功")
else:
logger.error(f"钉钉告警消息发送失败: {result_data.get('errmsg')}")
except Exception as e:
logger.error(f"发送钉钉告警消息时出错: {str(e)}")
def delete_old_backups():
# 删除 90 天前的备份文件
cutoff_date = datetime.now() - timedelta(days=RETENTION_DAYS)
deleted_count = 0
for filename in os.listdir(BACKUP_DIR):
file_path = os.path.join(BACKUP_DIR, filename)
if os.path.isfile(file_path):
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mtime < cutoff_date:
os.remove(file_path)
logger.info(f"Deleted old backup: {file_path}")
deleted_count += 1
logger.info(f"Deleted {deleted_count} old backup files")
def main():
logger.info(f"Starting backup process for {len(devices)} devices with up to {MAX_WORKERS} concurrent threads")
# 备份所有设备
success_count = 0
failure_count = 0
failed_devices = []
# 使用线程池并发执行备份任务
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 提交所有备份任务
future_to_device = {executor.submit(backup_device, device): device for device in devices}
# 处理完成的任务
for future in as_completed(future_to_device):
device = future_to_device[future]
try:
future.result() # 获取任务结果
success_count += 1
logger.info(f"Successfully backed up {device['hostname']}")
except Exception as e:
logger.error(f"Failed to backup {device['hostname']} ({device['ip']}): {str(e)}")
failure_count += 1
failed_devices.append(device)
logger.info(f"Backup process completed. Success: {success_count}, Failures: {failure_count}")
# 发送钉钉告警
send_dingtalk_message(success_count, failure_count, failed_devices)
# 删除过期备份
delete_old_backups()
if __name__ == "__main__":
main()