Skip to content

Commit 303eec0

Browse files
committed
feat: implement ConnectionManager for handling P2P connections in group chat services
1 parent 208eebb commit 303eec0

File tree

7 files changed

+381
-165
lines changed

7 files changed

+381
-165
lines changed

src/services/connectionManager.ts

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
import { DataConnection } from 'peerjs';
2+
import { EventEmitter } from '@/utils/eventEmitter';
3+
4+
// 连接状态枚举
5+
export enum ConnectionState {
6+
DISCONNECTED = 'disconnected',
7+
CONNECTING = 'connecting',
8+
CONNECTED = 'connected',
9+
DISCONNECTING = 'disconnecting',
10+
ERROR = 'error'
11+
}
12+
13+
// 连接信息接口
14+
export interface ConnectionInfo {
15+
id: string;
16+
connection: DataConnection;
17+
state: ConnectionState;
18+
createdAt: number;
19+
lastActivity: number;
20+
retryCount: number;
21+
metadata?: any;
22+
}
23+
24+
/**
25+
* 统一的连接管理器
26+
* 职责:管理所有 P2P 连接的生命周期
27+
*/
28+
export class ConnectionManager extends EventEmitter {
29+
private connections = new Map<string, ConnectionInfo>();
30+
31+
// 获取所有连接
32+
getAllConnections(): Map<string, ConnectionInfo> {
33+
return new Map(this.connections);
34+
}
35+
36+
// 获取特定连接
37+
getConnection(peerId: string): ConnectionInfo | undefined {
38+
return this.connections.get(peerId);
39+
}
40+
41+
// 获取连接状态
42+
getConnectionState(peerId: string): ConnectionState {
43+
const conn = this.connections.get(peerId);
44+
return conn?.state || ConnectionState.DISCONNECTED;
45+
}
46+
47+
// 注册新连接
48+
addConnection(peerId: string, connection: DataConnection, metadata?: any): void {
49+
const connectionInfo: ConnectionInfo = {
50+
id: peerId,
51+
connection,
52+
state: ConnectionState.CONNECTING,
53+
createdAt: Date.now(),
54+
lastActivity: Date.now(),
55+
retryCount: 0,
56+
metadata
57+
};
58+
59+
this.connections.set(peerId, connectionInfo);
60+
this.setupConnectionListeners(connectionInfo);
61+
62+
this.emit('connection:added', { peerId, connectionInfo });
63+
}
64+
65+
// 设置连接监听器
66+
private setupConnectionListeners(connInfo: ConnectionInfo): void {
67+
const { id: peerId, connection } = connInfo;
68+
69+
connection.on('open', () => {
70+
this.updateConnectionState(peerId, ConnectionState.CONNECTED);
71+
this.emit('connection:opened', { peerId });
72+
});
73+
74+
connection.on('data', (data) => {
75+
this.updateLastActivity(peerId);
76+
this.emit('connection:data', { peerId, data });
77+
});
78+
79+
connection.on('close', () => {
80+
this.updateConnectionState(peerId, ConnectionState.DISCONNECTED);
81+
this.emit('connection:closed', { peerId });
82+
});
83+
84+
connection.on('error', (error) => {
85+
this.updateConnectionState(peerId, ConnectionState.ERROR);
86+
this.emit('connection:error', { peerId, error });
87+
});
88+
}
89+
90+
// 更新连接状态
91+
private updateConnectionState(peerId: string, state: ConnectionState): void {
92+
const conn = this.connections.get(peerId);
93+
if (conn) {
94+
conn.state = state;
95+
this.emit('connection:state_changed', { peerId, state });
96+
}
97+
}
98+
99+
// 更新最后活动时间
100+
private updateLastActivity(peerId: string): void {
101+
const conn = this.connections.get(peerId);
102+
if (conn) {
103+
conn.lastActivity = Date.now();
104+
}
105+
}
106+
107+
// 发送数据到指定连接
108+
sendData(peerId: string, data: any): boolean {
109+
const conn = this.connections.get(peerId);
110+
if (!conn || conn.state !== ConnectionState.CONNECTED) {
111+
return false;
112+
}
113+
114+
try {
115+
conn.connection.send(data);
116+
this.updateLastActivity(peerId);
117+
return true;
118+
} catch (error) {
119+
console.error(`Failed to send data to ${peerId}:`, error);
120+
this.updateConnectionState(peerId, ConnectionState.ERROR);
121+
return false;
122+
}
123+
}
124+
125+
// 广播数据到所有连接
126+
broadcast(data: any, excludePeerIds?: string[]): number {
127+
let sentCount = 0;
128+
const exclude = new Set(excludePeerIds || []);
129+
130+
for (const [peerId, conn] of this.connections) {
131+
if (exclude.has(peerId) || conn.state !== ConnectionState.CONNECTED) {
132+
continue;
133+
}
134+
135+
if (this.sendData(peerId, data)) {
136+
sentCount++;
137+
}
138+
}
139+
140+
return sentCount;
141+
}
142+
143+
// 关闭特定连接
144+
closeConnection(peerId: string): void {
145+
const conn = this.connections.get(peerId);
146+
if (!conn) return;
147+
148+
this.updateConnectionState(peerId, ConnectionState.DISCONNECTING);
149+
150+
try {
151+
conn.connection.close();
152+
} catch (error) {
153+
console.error(`Error closing connection to ${peerId}:`, error);
154+
}
155+
156+
this.connections.delete(peerId);
157+
this.emit('connection:removed', { peerId });
158+
}
159+
160+
// 关闭所有连接
161+
closeAllConnections(): void {
162+
const peerIds = Array.from(this.connections.keys());
163+
for (const peerId of peerIds) {
164+
this.closeConnection(peerId);
165+
}
166+
}
167+
168+
// 清理不活跃的连接
169+
cleanupInactiveConnections(timeoutMs: number = 300000): number {
170+
// 5分钟
171+
const now = Date.now();
172+
const toRemove: string[] = [];
173+
174+
for (const [peerId, conn] of this.connections) {
175+
if (now - conn.lastActivity > timeoutMs) {
176+
toRemove.push(peerId);
177+
}
178+
}
179+
180+
for (const peerId of toRemove) {
181+
console.log(`Cleaning up inactive connection: ${peerId}`);
182+
this.closeConnection(peerId);
183+
}
184+
185+
return toRemove.length;
186+
}
187+
188+
// 获取连接统计
189+
getStats() {
190+
const stats = {
191+
total: this.connections.size,
192+
connected: 0,
193+
connecting: 0,
194+
disconnected: 0,
195+
error: 0
196+
};
197+
198+
for (const conn of this.connections.values()) {
199+
switch (conn.state) {
200+
case ConnectionState.CONNECTED:
201+
stats.connected++;
202+
break;
203+
case ConnectionState.CONNECTING:
204+
stats.connecting++;
205+
break;
206+
case ConnectionState.DISCONNECTED:
207+
stats.disconnected++;
208+
break;
209+
case ConnectionState.ERROR:
210+
stats.error++;
211+
break;
212+
}
213+
}
214+
215+
return stats;
216+
}
217+
218+
// 销毁管理器
219+
destroy(): void {
220+
this.closeAllConnections();
221+
// 清空所有事件监听器 - 需要逐个移除
222+
const eventNames = [
223+
'connection:added',
224+
'connection:opened',
225+
'connection:data',
226+
'connection:closed',
227+
'connection:error',
228+
'connection:state_changed',
229+
'connection:removed'
230+
];
231+
eventNames.forEach((event) => {
232+
// 移除所有此事件的监听器
233+
(this as any).events[event] = [];
234+
});
235+
}
236+
}

0 commit comments

Comments
 (0)