Skip to content

Commit 3cbb145

Browse files
committed
feat: 文档状态90%
1 parent 1acbe8d commit 3cbb145

File tree

9 files changed

+258
-29
lines changed

9 files changed

+258
-29
lines changed

apps/common/event/listener_manage.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def embedding_paragraph_apply(paragraph_list):
159159
if is_the_task_interrupted():
160160
break
161161
ListenerManagement.embedding_by_paragraph(str(paragraph.get('id')), embedding_model)
162-
post_apply()
162+
post_apply()
163163

164164
return embedding_paragraph_apply
165165

@@ -186,8 +186,8 @@ def post_update_document_status(document_id, task_type: TaskType):
186186
agg_count = item.get('count')
187187
if Status(agg_status)[task_type] == State.FAILURE and agg_count > 0:
188188
status[task_type] = State.FAILURE
189-
_document.status = status.__str__()
190-
_document.save()
189+
ListenerManagement.update_status(QuerySet(Document).filter(id=document_id), task_type, status[task_type])
190+
191191
ListenerManagement.update_status(QuerySet(Paragraph).annotate(
192192
reversed_status=Reverse('status'),
193193
task_type_status=Substr('reversed_status', task_type.value,
@@ -203,10 +203,11 @@ def update_status(query_set: QuerySet, taskType: TaskType, state: State):
203203
bit_number = len(TaskType)
204204
up_index = taskType.value - 1
205205
next_index = taskType.value + 1
206+
current_index = taskType.value
206207
status_number = state.value
207208
params_dict = {'${bit_number}': bit_number, '${up_index}': up_index,
208209
'${status_number}': status_number, '${next_index}': next_index,
209-
'${table_name}': query_set.model._meta.db_table}
210+
'${table_name}': query_set.model._meta.db_table, '${current_index}': current_index}
210211
for key in params_dict:
211212
_value_ = params_dict[key]
212213
exec_sql = exec_sql.replace(key, str(_value_))
@@ -236,7 +237,7 @@ def is_the_task_interrupted():
236237
return False
237238

238239
# 根据段落进行向量化处理
239-
page(QuerySet(Paragraph).filter(document_id=document_id).values('id'), 10,
240+
page(QuerySet(Paragraph).filter(document_id=document_id).values('id'), 5,
240241
ListenerManagement.get_embedding_paragraph_apply(embedding_model, is_the_task_interrupted,
241242
ListenerManagement.get_aggregation_document_status(
242243
document_id)),
@@ -245,6 +246,7 @@ def is_the_task_interrupted():
245246
max_kb_error.error(f'向量化文档:{document_id}出现错误{str(e)}{traceback.format_exc()}')
246247
finally:
247248
ListenerManagement.post_update_document_status(document_id, TaskType.EMBEDDING)
249+
ListenerManagement.get_aggregation_document_status(document_id)()
248250
max_kb.info(f"结束--->向量化文档:{document_id}")
249251
un_lock('embedding' + str(document_id))
250252

apps/dataset/models/data_set.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class State(Enum):
4141
REVOKE = '4'
4242
# 取消成功
4343
REVOKED = '5'
44+
# 忽略
45+
IGNORED = 'n'
4446

4547

4648
class Status:
@@ -52,7 +54,7 @@ def __init__(self, status: str = None):
5254
status_list = list(status[::-1] if status is not None else '')
5355
for _type in self.type_cls:
5456
index = _type.value - 1
55-
_state = self.state_cls(status_list[index] if len(status_list) > index else '2')
57+
_state = self.state_cls(status_list[index] if len(status_list) > index else 'n')
5658
self.task_status[_type] = _state
5759

5860
@staticmethod

apps/dataset/serializers/document_serializers.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,7 @@ def refresh(self, with_valid=True):
612612
State.PENDING)
613613
ListenerManagement.update_status(QuerySet(Paragraph).filter(document_id=document_id), TaskType.EMBEDDING,
614614
State.PENDING)
615+
ListenerManagement.get_aggregation_document_status(document_id)()
615616
embedding_model_id = get_embedding_model_id_by_dataset_id(dataset_id=self.data.get('dataset_id'))
616617
try:
617618
embedding_by_document.delay(document_id, embedding_model_id)
@@ -1023,7 +1024,10 @@ def generate_related(self, model_id, prompt, with_valid=True):
10231024
if with_valid:
10241025
self.is_valid(raise_exception=True)
10251026
document_id = self.data.get('document_id')
1026-
ListenerManagement.update_status(QuerySet(Document).filter(dataset_id=self.data.get('id')),
1027+
ListenerManagement.update_status(QuerySet(Document).filter(id=self.data.get('id')),
1028+
TaskType.GENERATE_PROBLEM,
1029+
State.PENDING)
1030+
ListenerManagement.update_status(QuerySet(Paragraph).filter(document_id=document_id),
10271031
TaskType.GENERATE_PROBLEM,
10281032
State.PENDING)
10291033
try:

apps/dataset/sql/list_document.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
SELECT
22
"document".* ,
33
to_json("document"."meta") as meta,
4+
to_json("document"."status_meta") as status_meta,
45
(SELECT "count"("id") FROM "paragraph" WHERE document_id="document"."id") as "paragraph_count"
56
FROM
67
"document" "document"

apps/dataset/sql/update_paragraph_status.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
UPDATE "${table_name}"
22
SET status = reverse (
3-
SUBSTRING ( reverse ( LPAD( status, ${bit_number}, '0' ) ) :: TEXT FROM 1 FOR ${up_index} ) || ${status_number} || SUBSTRING ( reverse ( LPAD( status, ${bit_number}, '0' ) ) :: TEXT FROM ${next_index} )
3+
SUBSTRING ( reverse ( LPAD( status, ${bit_number}, 'n' ) ) :: TEXT FROM 1 FOR ${up_index} ) || ${status_number} || SUBSTRING ( reverse ( LPAD( status, ${bit_number}, 'n' ) ) :: TEXT FROM ${next_index} )
44
),
55
status_meta = jsonb_set (
66
"${table_name}".status_meta,
7-
'{state_time,${status_number}}',
7+
'{state_time,${current_index}}',
88
jsonb_set (
9-
COALESCE ( "${table_name}".status_meta #> '{state_time,${status_number}}', jsonb_build_object ( '${status_number}', now( ) ) ),
9+
COALESCE ( "${table_name}".status_meta #> '{state_time,${current_index}}', jsonb_build_object ( '${status_number}', now( ) ) ),
1010
'{${status_number}}',
1111
CONCAT ( '"', now( ), '"' ) :: JSONB
1212
)

apps/dataset/task/generate.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,18 @@ def get_llm_model(model_id):
2525

2626
def generate_problem_by_paragraph(paragraph, llm_model, prompt):
2727
try:
28-
ListenerManagement.update_status(QuerySet(Paragraph).filter(id=paragraph.id), TaskType.EMBEDDING,
28+
ListenerManagement.update_status(QuerySet(Paragraph).filter(id=paragraph.id), TaskType.GENERATE_PROBLEM,
2929
State.STARTED)
3030
res = llm_model.invoke([HumanMessage(content=prompt.replace('{data}', paragraph.content))])
3131
if (res.content is None) or (len(res.content) == 0):
3232
return
3333
problems = res.content.split('\n')
3434
for problem in problems:
3535
save_problem(paragraph.dataset_id, paragraph.document_id, paragraph.id, problem)
36-
ListenerManagement.update_status(QuerySet(Paragraph).filter(id=paragraph.id), TaskType.EMBEDDING,
36+
ListenerManagement.update_status(QuerySet(Paragraph).filter(id=paragraph.id), TaskType.GENERATE_PROBLEM,
3737
State.SUCCESS)
3838
except Exception as e:
39-
ListenerManagement.update_status(QuerySet(Paragraph).filter(id=paragraph.id), TaskType.EMBEDDING,
39+
ListenerManagement.update_status(QuerySet(Paragraph).filter(id=paragraph.id), TaskType.GENERATE_PROBLEM,
4040
State.FAILURE)
4141

4242

ui/src/utils/status.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import { type Dict } from '@/api/type/common'
2+
interface TaskTypeInterface {
3+
// 向量化
4+
EMBEDDING: number
5+
// 生成问题
6+
GENERATE_PROBLEM: number
7+
// 同步
8+
SYNC: number
9+
}
10+
interface StateInterface {
11+
// 等待
12+
PENDING: '0'
13+
// 执行中
14+
STARTED: '1'
15+
// 成功
16+
SUCCESS: '2'
17+
// 失败
18+
FAILURE: '3'
19+
// 取消任务
20+
REVOKE: '4'
21+
// 取消成功
22+
REVOKED: '5'
23+
IGNORED: 'n'
24+
}
25+
const TaskType: TaskTypeInterface = {
26+
EMBEDDING: 1,
27+
GENERATE_PROBLEM: 2,
28+
SYNC: 3
29+
}
30+
const State: StateInterface = {
31+
// 等待
32+
PENDING: '0',
33+
// 执行中
34+
STARTED: '1',
35+
// 成功
36+
SUCCESS: '2',
37+
// 失败
38+
FAILURE: '3',
39+
// 取消任务
40+
REVOKE: '4',
41+
// 取消成功
42+
REVOKED: '5',
43+
IGNORED: 'n'
44+
}
45+
class Status {
46+
task_status: Dict<any>
47+
constructor(status?: string) {
48+
if (!status) {
49+
status = ''
50+
}
51+
status = status.split('').reverse().join('')
52+
this.task_status = {}
53+
for (let key in TaskType) {
54+
const value = TaskType[key as keyof TaskTypeInterface]
55+
const index = value - 1
56+
this.task_status[value] = status[index] ? status[index] : 'n'
57+
}
58+
}
59+
toString() {
60+
const r = []
61+
for (let key in TaskType) {
62+
const value = TaskType[key as keyof TaskTypeInterface]
63+
r.push(this.task_status[value])
64+
}
65+
return r.reverse().join('')
66+
}
67+
}
68+
export { Status, State, TaskType, TaskTypeInterface, StateInterface }
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
<template>
2+
<el-popover placement="top" :width="450" trigger="hover">
3+
<template #default>
4+
<el-row :gutter="3" v-for="status in statusTable" :key="status.type">
5+
<el-col :span="4">{{ taskTypeMap[status.type] }} </el-col>
6+
<el-col :span="4">
7+
<el-text v-if="status.state === State.SUCCESS || status.state === State.REVOKED">
8+
<el-icon class="success"><SuccessFilled /></el-icon>
9+
{{ stateMap[status.state](status.type) }}
10+
</el-text>
11+
<el-text v-else-if="status.state === State.FAILURE">
12+
<el-icon class="danger"><CircleCloseFilled /></el-icon>
13+
{{ stateMap[status.state](status.type) }}
14+
</el-text>
15+
<el-text v-else-if="status.state === State.STARTED">
16+
<el-icon class="is-loading primary"><Loading /></el-icon>
17+
{{ stateMap[status.state](status.type) }}
18+
</el-text>
19+
<el-text v-else-if="status.state === State.PENDING">
20+
<el-icon class="is-loading primary"><Loading /></el-icon>
21+
{{ stateMap[status.state](status.type) }}
22+
</el-text>
23+
</el-col>
24+
<el-col :span="5">
25+
完成
26+
{{
27+
Object.keys(status.aggs ? status.aggs : {})
28+
.filter((k) => k == State.SUCCESS)
29+
.map((k) => status.aggs[k])
30+
.reduce((x: any, y: any) => x + y, 0)
31+
}}/{{
32+
Object.values(status.aggs ? status.aggs : {}).reduce((x: any, y: any) => x + y, 0)
33+
}}
34+
</el-col>
35+
<el-col :span="9">
36+
{{
37+
status.time
38+
? status.time[
39+
status.state == State.REVOKED ? State.REVOKED : State.PENDING
40+
]?.substring(0, 19)
41+
: undefined
42+
}}
43+
</el-col>
44+
</el-row>
45+
</template>
46+
<template #reference>
47+
<el-text v-if="aggStatus?.value === State.SUCCESS || aggStatus?.value === State.REVOKED">
48+
<el-icon class="success"><SuccessFilled /></el-icon>
49+
{{ stateMap[aggStatus.value](aggStatus.key) }}
50+
</el-text>
51+
<el-text v-else-if="aggStatus?.value === State.FAILURE">
52+
<el-icon class="danger"><CircleCloseFilled /></el-icon>
53+
{{ stateMap[aggStatus.value](aggStatus.key) }}
54+
</el-text>
55+
<el-text v-else-if="aggStatus?.value === State.STARTED">
56+
<el-icon class="is-loading primary"><Loading /></el-icon>
57+
{{ stateMap[aggStatus.value](aggStatus.key) }}
58+
</el-text>
59+
<el-text v-else-if="aggStatus?.value === State.PENDING">
60+
<el-icon class="is-loading primary"><Loading /></el-icon>
61+
{{ stateMap[aggStatus.value](aggStatus.key) }}
62+
</el-text>
63+
</template>
64+
</el-popover>
65+
</template>
66+
<script setup lang="ts">
67+
import { computed } from 'vue'
68+
import {
69+
Status,
70+
TaskType,
71+
State,
72+
type TaskTypeInterface,
73+
type StateInterface
74+
} from '@/utils/status'
75+
import { merge, mergeWith } from 'lodash'
76+
const props = defineProps<{ status: string; statusMeta: any }>()
77+
78+
const checkList: Array<string> = [
79+
State.REVOKE,
80+
State.STARTED,
81+
State.PENDING,
82+
State.REVOKED,
83+
State.FAILURE,
84+
State.SUCCESS
85+
]
86+
const aggStatus = computed(() => {
87+
for (const i in checkList) {
88+
const state = checkList[i]
89+
const index = props.status.indexOf(state)
90+
if (index > -1) {
91+
return { key: index + 1, value: state }
92+
}
93+
}
94+
})
95+
const startedMap = {
96+
[TaskType.EMBEDDING]: '索引中',
97+
[TaskType.GENERATE_PROBLEM]: '生成中',
98+
[TaskType.SYNC]: '同步中'
99+
}
100+
const taskTypeMap = {
101+
[TaskType.EMBEDDING]: '向量化',
102+
[TaskType.GENERATE_PROBLEM]: '生成问题',
103+
[TaskType.SYNC]: '同步'
104+
}
105+
const stateMap: any = {
106+
[State.PENDING]: (type: number) => '排队中',
107+
[State.STARTED]: (type: number) => startedMap[type],
108+
[State.REVOKE]: (type: number) => '取消中',
109+
[State.REVOKED]: (type: number) => '成功',
110+
[State.FAILURE]: (type: number) => '失败',
111+
[State.SUCCESS]: (type: number) => '成功'
112+
}
113+
114+
const parseAgg = (agg: { count: number; status: string }) => {
115+
const status = new Status(agg.status)
116+
return Object.keys(TaskType)
117+
.map((key) => {
118+
const value = TaskType[key as keyof TaskTypeInterface]
119+
return { [value]: { [status.task_status[value]]: agg.count } }
120+
})
121+
.reduce((x, y) => ({ ...x, ...y }), {})
122+
}
123+
124+
const customizer: (x: any, y: any) => any = (objValue: any, srcValue: any) => {
125+
if (objValue == undefined && srcValue) {
126+
return srcValue
127+
}
128+
if (srcValue == undefined && objValue) {
129+
return objValue
130+
}
131+
// 如果是数组,我们将元素进行聚合
132+
if (typeof objValue === 'object' && typeof srcValue === 'object') {
133+
// 若是object类型的对象,我们进行递归
134+
return mergeWith(objValue, srcValue, customizer)
135+
} else {
136+
// 否则,单纯的将值进行累加
137+
return objValue + srcValue
138+
}
139+
}
140+
const aggs = computed(() => {
141+
return (props.statusMeta.aggs ? props.statusMeta.aggs : [])
142+
.map((agg: any) => {
143+
return parseAgg(agg)
144+
})
145+
.reduce((x: any, y: any) => {
146+
return mergeWith(x, y, customizer)
147+
}, {})
148+
})
149+
150+
const statusTable = computed(() => {
151+
return Object.keys(TaskType)
152+
.map((key) => {
153+
const value = TaskType[key as keyof TaskTypeInterface]
154+
console.log(props.statusMeta.state_time[value], value)
155+
const parseStatus = new Status(props.status)
156+
return {
157+
type: value,
158+
state: parseStatus.task_status[value],
159+
aggs: aggs.value[value],
160+
time: props.statusMeta.state_time[value]
161+
}
162+
})
163+
.filter((item) => item.state !== State.IGNORED)
164+
})
165+
</script>
166+
<style lang="scss" scoped></style>

0 commit comments

Comments
 (0)