Skip to content

Commit cb6ba74

Browse files
committed
refactor(task-queue): add scope isolation and abstract base task controller
Introduce scope dimension to TaskQueueService so AI and cron tasks operate independently via Redis indexes. Extract ScopedTaskService as scope-aware wrapper with strict access verification, and BaseTaskController for shared CRUD routes.
1 parent d9fc9e2 commit cb6ba74

16 files changed

+342
-353
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { Delete, Get, Param, Post, Query } from '@nestjs/common'
2+
import { Auth } from '~/common/decorators/auth.decorator'
3+
import { BizException } from '~/common/exceptions/biz.exception'
4+
import { ErrorCodeEnum } from '~/constants/error-code.constant'
5+
import type { ScopedTaskService } from '~/processors/task-queue/scoped-task.service'
6+
import { StringIdDto } from '~/shared/dto/id.dto'
7+
import { BaseDeleteTasksQueryDto, BaseGetTasksQueryDto } from './base-task.dto'
8+
9+
export abstract class BaseTaskController {
10+
protected abstract get taskCrudService(): ScopedTaskService
11+
12+
@Get('/:id')
13+
@Auth()
14+
async getTask(@Param() params: StringIdDto) {
15+
const task = await this.taskCrudService.getTask(params.id)
16+
if (!task) {
17+
throw new BizException(ErrorCodeEnum.AITaskNotFound)
18+
}
19+
return task
20+
}
21+
22+
@Get('/')
23+
@Auth()
24+
async getTasks(@Query() query: BaseGetTasksQueryDto) {
25+
return this.taskCrudService.getTasks(query)
26+
}
27+
28+
@Post('/:id/cancel')
29+
@Auth()
30+
async cancelTask(@Param() params: StringIdDto) {
31+
await this.taskCrudService.cancelTask(params.id)
32+
return { success: true }
33+
}
34+
35+
@Post('/:id/retry')
36+
@Auth()
37+
async retryTask(@Param() params: StringIdDto) {
38+
return this.taskCrudService.retryTask(params.id)
39+
}
40+
41+
@Delete('/:id')
42+
@Auth()
43+
async deleteTask(@Param() params: StringIdDto) {
44+
await this.taskCrudService.deleteTask(params.id)
45+
return { success: true }
46+
}
47+
48+
@Delete('/')
49+
@Auth()
50+
async deleteTasks(@Query() query: BaseDeleteTasksQueryDto) {
51+
const deleted = await this.taskCrudService.deleteTasks(query)
52+
return { deleted }
53+
}
54+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { TaskStatus } from '~/processors/task-queue'
2+
import { createZodDto } from 'nestjs-zod'
3+
import { z } from 'zod'
4+
5+
export const BaseGetTasksQuerySchema = z.object({
6+
status: z
7+
.enum([
8+
TaskStatus.Pending,
9+
TaskStatus.Running,
10+
TaskStatus.Completed,
11+
TaskStatus.Failed,
12+
TaskStatus.Cancelled,
13+
])
14+
.optional(),
15+
type: z.string().optional(),
16+
page: z.coerce.number().int().min(1).default(1),
17+
size: z.coerce.number().int().min(1).max(50).default(20),
18+
})
19+
20+
export class BaseGetTasksQueryDto extends createZodDto(
21+
BaseGetTasksQuerySchema,
22+
) {}
23+
24+
export const BaseDeleteTasksQuerySchema = z.object({
25+
status: z
26+
.enum([TaskStatus.Completed, TaskStatus.Failed, TaskStatus.Cancelled])
27+
.optional(),
28+
type: z.string().optional(),
29+
before: z.coerce.number().int().positive(),
30+
})
31+
32+
export class BaseDeleteTasksQueryDto extends createZodDto(
33+
BaseDeleteTasksQuerySchema,
34+
) {}
Lines changed: 13 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,31 @@
1-
import { Delete, Get, Param, Post, Query } from '@nestjs/common'
1+
import { Delete, Get, Param } from '@nestjs/common'
2+
import { BaseTaskController } from '~/common/controllers/base-task.controller'
23
import { ApiController } from '~/common/decorators/api-controller.decorator'
34
import { Auth } from '~/common/decorators/auth.decorator'
5+
import type { ScopedTaskService } from '~/processors/task-queue'
46
import { StringIdDto } from '~/shared/dto/id.dto'
5-
import { DeleteTasksQueryDto, GetTasksQueryDto } from './ai-task.dto'
67
import { AiTaskService } from './ai-task.service'
78

89
@ApiController('ai/tasks')
9-
export class AiTaskController {
10-
constructor(private readonly service: AiTaskService) {}
10+
export class AiTaskController extends BaseTaskController {
11+
constructor(private readonly service: AiTaskService) {
12+
super()
13+
}
14+
15+
protected get taskCrudService(): ScopedTaskService {
16+
return this.service.crud
17+
}
1118

1219
@Get('/group/:id')
1320
@Auth()
1421
async getTasksByGroupId(@Param() params: StringIdDto) {
15-
return this.service.getTasksByGroupId(params.id)
22+
return this.service.crud.getTasksByGroupId(params.id)
1623
}
1724

1825
@Delete('/group/:id')
1926
@Auth()
2027
async cancelTasksByGroupId(@Param() params: StringIdDto) {
21-
const cancelled = await this.service.cancelTasksByGroupId(params.id)
28+
const cancelled = await this.service.crud.cancelTasksByGroupId(params.id)
2229
return { cancelled }
2330
}
24-
25-
@Get('/:id')
26-
@Auth()
27-
async getTask(@Param() params: StringIdDto) {
28-
return this.service.getTask(params.id)
29-
}
30-
31-
@Get('/')
32-
@Auth()
33-
async getTasks(@Query() query: GetTasksQueryDto) {
34-
return this.service.getTasks(query)
35-
}
36-
37-
@Post('/:id/retry')
38-
@Auth()
39-
async retryTask(@Param() params: StringIdDto) {
40-
return this.service.retryTask(params.id)
41-
}
42-
43-
@Post('/:id/cancel')
44-
@Auth()
45-
async cancelTask(@Param() params: StringIdDto) {
46-
await this.service.cancelTask(params.id)
47-
return { success: true }
48-
}
49-
50-
@Delete('/:id')
51-
@Auth()
52-
async deleteTask(@Param() params: StringIdDto) {
53-
await this.service.deleteTask(params.id)
54-
return { success: true }
55-
}
56-
57-
@Delete('/')
58-
@Auth()
59-
async deleteTasks(@Query() query: DeleteTasksQueryDto) {
60-
const deleted = await this.service.deleteTasks(query)
61-
return { deleted }
62-
}
6331
}
Lines changed: 23 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,7 @@
11
import { Injectable } from '@nestjs/common'
2-
import { BizException } from '~/common/exceptions/biz.exception'
32
import { CollectionRefTypes } from '~/constants/db.constant'
4-
import { ErrorCodeEnum } from '~/constants/error-code.constant'
53
import { DatabaseService } from '~/processors/database/database.service'
6-
import {
7-
TaskQueueService,
8-
TaskStatus,
9-
type Task,
10-
} from '~/processors/task-queue'
4+
import { ScopedTaskService, TaskQueueService } from '~/processors/task-queue'
115
import {
126
AITaskType,
137
computeAITaskDedupKey,
@@ -20,15 +14,18 @@ import {
2014

2115
@Injectable()
2216
export class AiTaskService {
17+
readonly crud: ScopedTaskService
18+
2319
constructor(
24-
private readonly taskQueueService: TaskQueueService,
20+
taskQueueService: TaskQueueService,
2521
private readonly databaseService: DatabaseService,
26-
) {}
22+
) {
23+
this.crud = new ScopedTaskService(taskQueueService, 'ai')
24+
}
2725

2826
async createSummaryTask(
2927
payload: SummaryTaskPayload,
3028
): Promise<{ taskId: string; created: boolean }> {
31-
// Auto-fill title if not provided
3229
if (!payload.title && payload.refId) {
3330
const articleInfo = await this.getArticleInfo(payload.refId)
3431
if (articleInfo) {
@@ -42,7 +39,6 @@ export class AiTaskService {
4239
async createTranslationTask(
4340
payload: TranslationTaskPayload,
4441
): Promise<{ taskId: string; created: boolean }> {
45-
// Auto-fill title if not provided
4642
if (!payload.title && payload.refId) {
4743
const articleInfo = await this.getArticleInfo(payload.refId)
4844
if (articleInfo) {
@@ -53,27 +49,6 @@ export class AiTaskService {
5349
return this.createTask(AITaskType.Translation, payload)
5450
}
5551

56-
private async getArticleInfo(
57-
refId: string,
58-
): Promise<{ title: string; type: string } | null> {
59-
const article = await this.databaseService.findGlobalById(refId)
60-
if (!article || !article.document) {
61-
return null
62-
}
63-
64-
const typeMap: Record<CollectionRefTypes, string> = {
65-
[CollectionRefTypes.Post]: 'Post',
66-
[CollectionRefTypes.Note]: 'Note',
67-
[CollectionRefTypes.Page]: 'Page',
68-
[CollectionRefTypes.Recently]: 'Recently',
69-
}
70-
71-
return {
72-
title: (article.document as { title?: string }).title || refId,
73-
type: typeMap[article.type] || 'Unknown',
74-
}
75-
}
76-
7752
async createTranslationBatchTask(
7853
payload: TranslationBatchTaskPayload,
7954
): Promise<{ taskId: string; created: boolean }> {
@@ -91,91 +66,31 @@ export class AiTaskService {
9166
payload: AITaskPayload,
9267
): Promise<{ taskId: string; created: boolean }> {
9368
const dedupKey = computeAITaskDedupKey(type, payload)
94-
return this.taskQueueService.createTask({
69+
return this.crud.createTask({
9570
type,
9671
payload: payload as Record<string, unknown>,
9772
dedupKey,
9873
})
9974
}
10075

101-
async getTask(taskId: string): Promise<Task | null> {
102-
return this.taskQueueService.getTask(taskId)
103-
}
104-
105-
async getTasks(options: {
106-
status?: TaskStatus
107-
type?: AITaskType
108-
page: number
109-
size: number
110-
}): Promise<{ data: Task[]; total: number }> {
111-
return this.taskQueueService.getTasks({
112-
status: options.status,
113-
type: options.type,
114-
page: options.page,
115-
size: options.size,
116-
})
117-
}
118-
119-
async cancelTask(taskId: string): Promise<boolean> {
120-
return this.taskQueueService.cancelTask(taskId)
121-
}
122-
123-
async deleteTask(taskId: string): Promise<void> {
124-
return this.taskQueueService.deleteTask(taskId)
125-
}
126-
127-
async deleteTasks(options: {
128-
status?: TaskStatus
129-
type?: AITaskType
130-
before: number
131-
}): Promise<number> {
132-
return this.taskQueueService.deleteTasks({
133-
status: options.status,
134-
type: options.type,
135-
before: options.before,
136-
})
137-
}
138-
139-
async getTasksByGroupId(groupId: string): Promise<Task[]> {
140-
return this.taskQueueService.getTasksByGroupId(groupId)
141-
}
142-
143-
async cancelTasksByGroupId(groupId: string): Promise<number> {
144-
return this.taskQueueService.cancelTasksByGroupId(groupId)
145-
}
146-
147-
async retryTask(
148-
taskId: string,
149-
): Promise<{ taskId: string; created: boolean }> {
150-
const task = await this.taskQueueService.getTask(taskId)
151-
if (!task) {
152-
throw new BizException(ErrorCodeEnum.AITaskNotFound)
76+
private async getArticleInfo(
77+
refId: string,
78+
): Promise<{ title: string; type: string } | null> {
79+
const article = await this.databaseService.findGlobalById(refId)
80+
if (!article || !article.document) {
81+
return null
15382
}
15483

155-
// Only allow retry for failed, partial_failed, or cancelled tasks
156-
if (
157-
task.status !== TaskStatus.Failed &&
158-
task.status !== TaskStatus.PartialFailed &&
159-
task.status !== TaskStatus.Cancelled
160-
) {
161-
throw new BizException(
162-
ErrorCodeEnum.AITaskCannotRetry,
163-
'Only failed, partial_failed, or cancelled tasks can be retried',
164-
)
84+
const typeMap: Record<CollectionRefTypes, string> = {
85+
[CollectionRefTypes.Post]: 'Post',
86+
[CollectionRefTypes.Note]: 'Note',
87+
[CollectionRefTypes.Page]: 'Page',
88+
[CollectionRefTypes.Recently]: 'Recently',
16589
}
16690

167-
const type = task.type as AITaskType
168-
const payload = task.payload as AITaskPayload
169-
170-
// Create a new task with the same type and payload
171-
// Use a unique dedupKey to avoid deduplication
172-
const dedupKey = `${computeAITaskDedupKey(type, payload)}:retry:${Date.now()}`
173-
174-
return this.taskQueueService.createTask({
175-
type,
176-
payload: payload as Record<string, unknown>,
177-
dedupKey,
178-
groupId: task.groupId,
179-
})
91+
return {
92+
title: (article.document as { title?: string }).title || refId,
93+
type: typeMap[article.type] || 'Unknown',
94+
}
18095
}
18196
}

apps/core/src/modules/ai/ai-translation/ai-translation.service.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import { EventManagerService } from '~/processors/helper/helper.event.service'
99
import { LexicalService } from '~/processors/helper/helper.lexical.service'
1010
import {
1111
TaskQueueProcessor,
12-
TaskQueueService,
1312
TaskStatus,
1413
type TaskExecuteContext,
1514
} from '~/processors/task-queue'
@@ -89,7 +88,6 @@ export class AiTranslationService implements OnModuleInit {
8988
private readonly aiInFlightService: AiInFlightService,
9089
private readonly eventManager: EventManagerService,
9190
private readonly taskProcessor: TaskQueueProcessor,
92-
private readonly taskQueueService: TaskQueueService,
9391
private readonly lexicalService: LexicalService,
9492
private readonly aiTaskService: AiTaskService,
9593
) {}
@@ -255,7 +253,7 @@ export class AiTranslationService implements OnModuleInit {
255253
AITaskType.Translation,
256254
taskPayload,
257255
)
258-
const result = await this.taskQueueService.createTask({
256+
const result = await this.aiTaskService.crud.createTask({
259257
type: AITaskType.Translation,
260258
payload: taskPayload as unknown as Record<string, unknown>,
261259
dedupKey,
@@ -384,7 +382,7 @@ export class AiTranslationService implements OnModuleInit {
384382
AITaskType.Translation,
385383
taskPayload,
386384
)
387-
const result = await this.taskQueueService.createTask({
385+
const result = await this.aiTaskService.crud.createTask({
388386
type: AITaskType.Translation,
389387
payload: taskPayload as unknown as Record<string, unknown>,
390388
dedupKey,

0 commit comments

Comments
 (0)