Skip to content

Commit 65cd055

Browse files
authored
Merge pull request #17 from actiontech/feat/loop-getAsyncTaskInfo
feat: loop getAsyncTaskInfo
2 parents 2e0af75 + b6f9e12 commit 65cd055

File tree

2 files changed

+87
-0
lines changed

2 files changed

+87
-0
lines changed

webapp/packages/core-root/src/AsyncTask/AsyncTask.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ export class AsyncTask {
4444
private readonly init: () => Promise<AsyncTaskInfo>;
4545
private readonly cancel: (id: string) => Promise<void>;
4646
private initPromise: Promise<void> | null;
47+
private pollingTimer: ReturnType<typeof setInterval> | null;
48+
private pollingGetter: ((taskId: string) => Promise<AsyncTaskInfo>) | null;
4749

4850
constructor(init: () => Promise<AsyncTaskInfo>, cancel: (id: string) => Promise<void>) {
4951
this._id = uuid();
@@ -53,6 +55,8 @@ export class AsyncTask {
5355
this.updatingAsync = false;
5456
this.taskInfo = null;
5557
this.initPromise = null;
58+
this.pollingTimer = null;
59+
this.pollingGetter = null;
5660
this.onStatusChange = new SyncExecutor();
5761

5862
this.innerPromise = new Promise((resolve, reject) => {
@@ -118,6 +122,8 @@ export class AsyncTask {
118122
}
119123

120124
this._cancelled = true;
125+
// 停止轮询
126+
this.stopPolling();
121127
try {
122128
await this.cancelTask();
123129
} catch (exception: any) {
@@ -137,6 +143,8 @@ export class AsyncTask {
137143
this.taskInfo = info;
138144

139145
if (!info.running) {
146+
// 任务完成,停止轮询
147+
this.stopPolling();
140148
if (info.error) {
141149
this.reject(new ServerInternalError(info.error));
142150
} else {
@@ -151,4 +159,64 @@ export class AsyncTask {
151159
await this.cancel(this.info.id);
152160
}
153161
}
162+
163+
/**
164+
* 启动轮询机制,作为 WebSocket 通知的降级方案
165+
* @param getter 用于获取任务信息的函数
166+
* @param interval 轮询间隔(毫秒),默认 1000ms
167+
*/
168+
startPolling(getter: (taskId: string) => Promise<AsyncTaskInfo>, interval: number = 1000): void {
169+
// 如果任务已经完成或已取消,不启动轮询
170+
if (!this.pending || this._cancelled) {
171+
return;
172+
}
173+
174+
// 如果已经有轮询在运行,先停止
175+
this.stopPolling();
176+
177+
this.pollingGetter = getter;
178+
179+
// 立即执行一次检查
180+
this.pollOnce();
181+
182+
// 设置定时轮询
183+
this.pollingTimer = setInterval(() => {
184+
this.pollOnce();
185+
}, interval);
186+
}
187+
188+
/**
189+
* 停止轮询
190+
*/
191+
stopPolling(): void {
192+
if (this.pollingTimer) {
193+
clearInterval(this.pollingTimer);
194+
this.pollingTimer = null;
195+
}
196+
this.pollingGetter = null;
197+
}
198+
199+
/**
200+
* 执行一次轮询检查
201+
*/
202+
private async pollOnce(): Promise<void> {
203+
// 如果任务已完成、已取消或没有轮询 getter,不执行
204+
if (!this.pending || this._cancelled || !this.pollingGetter || !this.taskInfo) {
205+
return;
206+
}
207+
208+
// 如果正在更新,跳过本次轮询
209+
if (this.updatingAsync) {
210+
return;
211+
}
212+
213+
try {
214+
const info = await this.pollingGetter(this.taskInfo.id);
215+
await this.updateInfoAsync(() => Promise.resolve(info));
216+
} catch {
217+
// 轮询失败不影响主流程,静默处理
218+
// 如果 WebSocket 正常工作,会通过 WebSocket 通知完成任务
219+
// 如果 WebSocket 连接中断,下次轮询会继续尝试
220+
}
221+
}
154222
}

webapp/packages/core-root/src/AsyncTask/AsyncTaskInfoService.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,21 @@ export class AsyncTaskInfoService extends Disposable {
112112
await task.run();
113113
}
114114

115+
// 如果任务还在运行,启动轮询作为 WebSocket 的降级方案
116+
// 这样可以确保即使 WebSocket 连接中断,也能通过轮询获取结果
117+
if (task.pending && task.info) {
118+
task.startPolling(
119+
async (taskId: string) => {
120+
const { taskInfo } = await this.graphQLService.sdk.getAsyncTaskInfo({
121+
taskId,
122+
removeOnFinish: false,
123+
});
124+
return taskInfo;
125+
},
126+
1000, // 轮询间隔 1 秒
127+
);
128+
}
129+
115130
return task.promise;
116131
}
117132

@@ -125,6 +140,10 @@ export class AsyncTaskInfoService extends Disposable {
125140
if (task.pending) {
126141
throw new Error('Cant remove unfinished task');
127142
}
143+
144+
// 停止轮询
145+
task.stopPolling();
146+
128147
this.tasks.delete(task.id);
129148
if (task.info) {
130149
this.taskIdAliases.delete(task.info.id);

0 commit comments

Comments
 (0)