forked from awsl-project/maxx
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathuse-streaming.ts
More file actions
223 lines (195 loc) · 7.12 KB
/
use-streaming.ts
File metadata and controls
223 lines (195 loc) · 7.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
/**
* Streaming Requests Hook
* 追踪实时活动请求状态
*/
import { useState, useEffect, useCallback, useRef, useMemo } from 'react';
import { getTransport, type ProxyRequest, type ClientType } from '@/lib/transport';
export interface StreamingState {
/** 当前活动请求总数 */
total: number;
/** 活动请求列表 */
requests: ProxyRequest[];
/** 按 clientType 统计的活动请求数 */
countsByClient: Map<ClientType, number>;
/** 按 providerID 统计的活动请求数 */
countsByProvider: Map<number, number>;
/** 按 providerID + clientType 组合统计的活动请求数 (key: `${providerID}:${clientType}`) */
countsByProviderAndClient: Map<string, number>;
/** 按 routeID 统计的活动请求数 */
countsByRoute: Map<number, number>;
}
export interface StreamingOptions {
/** 事件更新节流间隔(毫秒),0 表示不节流 */
throttleMs?: number;
}
/**
* 判断请求是否为活跃状态
*/
function isActiveRequest(request: ProxyRequest): boolean {
return request.status === 'PENDING' || request.status === 'IN_PROGRESS';
}
/**
* 追踪实时活动的 streaming 请求
* 通过 WebSocket 事件更新状态
* 注意:React Query 缓存更新由 useProxyRequestUpdates 处理
*/
export function useStreamingRequests(options: StreamingOptions = {}): StreamingState {
const [activeRequests, setActiveRequests] = useState<Map<string, ProxyRequest>>(new Map());
const activeRequestsRef = useRef<Map<string, ProxyRequest>>(new Map());
const flushTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const isInitialized = useRef(false);
const throttleMs = options.throttleMs ?? 0;
const scheduleFlush = useCallback(() => {
if (throttleMs <= 0) {
return;
}
if (flushTimerRef.current) {
return;
}
flushTimerRef.current = setTimeout(() => {
flushTimerRef.current = null;
setActiveRequests(new Map(activeRequestsRef.current));
}, throttleMs);
}, [throttleMs]);
const applyState = useCallback(
(next: Map<string, ProxyRequest>) => {
activeRequestsRef.current = next;
if (throttleMs <= 0) {
setActiveRequests(next);
return;
}
scheduleFlush();
},
[scheduleFlush, throttleMs],
);
// 从 API 加载当前活跃请求
const loadActiveRequests = useCallback(async () => {
try {
const transport = getTransport();
const activeList = await transport.getActiveProxyRequests();
const activeMap = new Map<string, ProxyRequest>();
// Ensure activeList is an array before iterating
if (Array.isArray(activeList)) {
for (const request of activeList) {
activeMap.set(request.requestID, request);
}
} else {
console.warn('getActiveProxyRequests returned non-array:', activeList);
}
applyState(activeMap);
} catch (error) {
console.error('Failed to load active requests:', error);
}
}, [applyState]);
// 处理请求更新
const handleRequestUpdate = useCallback((request: ProxyRequest) => {
const next = new Map(activeRequestsRef.current);
if (isActiveRequest(request)) {
// PENDING 或 IN_PROGRESS 的请求添加到活动列表
next.set(request.requestID, request);
} else {
// 已完成、失败、取消或拒绝的请求从活动列表中移除
next.delete(request.requestID);
}
applyState(next);
// 注意:不要在这里调用 invalidateQueries,会导致重复请求
// React Query 缓存更新由 useProxyRequestUpdates 处理
}, [applyState]);
useEffect(() => {
const transport = getTransport();
// 初始化时加载当前活跃请求
if (!isInitialized.current) {
isInitialized.current = true;
loadActiveRequests();
}
// 订阅请求更新事件 (连接由 main.tsx 统一管理)
const unsubscribe = transport.subscribe<ProxyRequest>(
'proxy_request_update',
handleRequestUpdate,
);
// 订阅 WebSocket 重连事件,重新加载活跃请求
// 因为断开期间可能有请求完成或新增
const unsubscribeReconnect = transport.subscribe('_ws_reconnected', () => {
loadActiveRequests();
});
return () => {
unsubscribe();
unsubscribeReconnect();
if (flushTimerRef.current) {
clearTimeout(flushTimerRef.current);
flushTimerRef.current = null;
}
};
}, [handleRequestUpdate, loadActiveRequests]);
useEffect(() => {
if (throttleMs <= 0) {
if (flushTimerRef.current) {
clearTimeout(flushTimerRef.current);
flushTimerRef.current = null;
}
// 关闭节流时立即刷出缓冲状态,避免清理定时器后丢失一次更新。
setActiveRequests(new Map(activeRequestsRef.current));
}
// 让 effect 与最新的 handleRequestUpdate 逻辑保持一致(依赖变更触发重新刷出)。
void handleRequestUpdate;
}, [throttleMs, handleRequestUpdate]);
return useMemo((): StreamingState => {
// 计算按 clientType 和 providerID 的统计
const countsByClient = new Map<ClientType, number>();
const countsByProvider = new Map<number, number>();
const countsByProviderAndClient = new Map<string, number>();
const countsByRoute = new Map<number, number>();
const requests = Array.from(activeRequests.values());
for (const request of requests) {
// 按 clientType 统计
const clientCount = countsByClient.get(request.clientType) || 0;
countsByClient.set(request.clientType, clientCount + 1);
// 按 routeID 统计
if (request.routeID > 0) {
const routeCount = countsByRoute.get(request.routeID) || 0;
countsByRoute.set(request.routeID, routeCount + 1);
}
// 按 providerID 统计
if (request.providerID > 0) {
const providerCount = countsByProvider.get(request.providerID) || 0;
countsByProvider.set(request.providerID, providerCount + 1);
// 按 providerID + clientType 组合统计
const key = `${request.providerID}:${request.clientType}`;
const combinedCount = countsByProviderAndClient.get(key) || 0;
countsByProviderAndClient.set(key, combinedCount + 1);
}
}
return {
total: activeRequests.size,
requests,
countsByClient,
countsByProvider,
countsByProviderAndClient,
countsByRoute,
};
}, [activeRequests]);
}
/**
* 获取特定客户端的 streaming 请求数
*/
export function useClientStreamingCount(clientType: ClientType): number {
const { countsByClient } = useStreamingRequests();
return countsByClient.get(clientType) || 0;
}
/**
* 获取特定 Provider 的 streaming 请求数
*/
export function useProviderStreamingCount(providerId: number): number {
const { countsByProvider } = useStreamingRequests();
return countsByProvider.get(providerId) || 0;
}
/**
* 获取特定 Provider 在特定 ClientType 下的 streaming 请求数
*/
export function useProviderClientStreamingCount(
providerId: number,
clientType: ClientType,
): number {
const { countsByProviderAndClient } = useStreamingRequests();
return countsByProviderAndClient.get(`${providerId}:${clientType}`) || 0;
}