Skip to content

Commit e758f01

Browse files
committed
feat: knowledge workflow
1 parent 8baa8e8 commit e758f01

File tree

12 files changed

+250
-15
lines changed

12 files changed

+250
-15
lines changed

apps/application/flow/i_step_node.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from application.models import ApplicationChatUserStats
2222
from application.models import ChatRecord, ChatUserType
2323
from common.field.common import InstanceField
24+
from knowledge.models.knowledge_action import KnowledgeAction, State
2425

2526
chat_cache = cache
2627

@@ -97,11 +98,13 @@ def handler(self, workflow):
9798

9899

99100
class KnowledgeWorkflowPostHandler(WorkFlowPostHandler):
100-
def __init__(self, chat_info):
101+
def __init__(self, chat_info, knowledge_action_id):
101102
super().__init__(chat_info)
103+
self.knowledge_action_id = knowledge_action_id
102104

103105
def handler(self, workflow):
104-
pass
106+
QuerySet(KnowledgeAction).filter(id=self.knowledge_action_id).update(
107+
state=State.SUCCESS)
105108

106109

107110
class NodeResult:
@@ -161,7 +164,8 @@ class FlowParamsSerializer(serializers.Serializer):
161164

162165

163166
class KnowledgeFlowParamsSerializer(serializers.Serializer):
164-
knowledge_id = serializers.CharField(required=True, label="知识库id")
167+
knowledge_id = serializers.UUIDField(required=True, label="知识库id")
168+
knowledge_action_id = serializers.UUIDField(required=True, label="知识库任务执行器id")
165169
data_source = serializers.DictField(required=True, label="数据源")
166170
knowledge_base = serializers.DictField(required=False, label="知识库设置")
167171

@@ -241,7 +245,8 @@ def get_write_error_context(self, e):
241245
self.status = 500
242246
self.answer_text = str(e)
243247
self.err_message = str(e)
244-
self.context['run_time'] = time.time() - self.context['start_time']
248+
current_time = time.time()
249+
self.context['run_time'] = current_time - (self.context.get('start_time') or current_time)
245250

246251
def write_error_context(answer, status=200):
247252
pass

apps/application/flow/knowledge_workflow_manage.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,20 @@
66
@date:2025/11/13 19:02
77
@desc:
88
"""
9+
import traceback
10+
from concurrent.futures import ThreadPoolExecutor
11+
12+
from django.db.models import QuerySet
13+
from django.utils.translation import get_language
914

1015
from application.flow.common import Workflow
1116
from application.flow.i_step_node import WorkFlowPostHandler, KnowledgeFlowParamsSerializer
1217
from application.flow.workflow_manage import WorkflowManage
1318
from common.handle.base_to_response import BaseToResponse
1419
from common.handle.impl.response.system_to_response import SystemToResponse
20+
from knowledge.models.knowledge_action import KnowledgeAction, State
21+
22+
executor = ThreadPoolExecutor(max_workers=200)
1523

1624

1725
class KnowledgeWorkflowManage(WorkflowManage):
@@ -33,3 +41,46 @@ def get_start_node(self):
3341
start_node_list = [node for node in self.flow.nodes if
3442
self.params.get('data_source', {}).get('node_id') == node.id]
3543
return start_node_list[0]
44+
45+
def run(self):
46+
executor.submit(self._run)
47+
48+
def _run(self):
49+
QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update(
50+
state=State.STARTED)
51+
language = get_language()
52+
self.run_chain_async(self.start_node, None, language)
53+
while self.is_run():
54+
pass
55+
self.work_flow_post_handler.handler(self)
56+
57+
def run_chain(self, current_node, node_result_future=None):
58+
if node_result_future is None:
59+
node_result_future = self.run_node_future(current_node)
60+
try:
61+
result = self.hand_node_result(current_node, node_result_future)
62+
return result
63+
except Exception as e:
64+
traceback.print_exc()
65+
return None
66+
67+
def hand_node_result(self, current_node, node_result_future):
68+
try:
69+
current_result = node_result_future.result()
70+
result = current_result.write_context(current_node, self)
71+
if result is not None:
72+
# 阻塞获取结果
73+
list(result)
74+
return current_result
75+
except Exception as e:
76+
traceback.print_exc()
77+
self.status = 500
78+
current_node.get_write_error_context(e)
79+
self.answer += str(e)
80+
QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update(
81+
details=self.get_runtime_details(),
82+
state=State.FAILURE)
83+
finally:
84+
current_node.node_chunk.end()
85+
QuerySet(KnowledgeAction).filter(id=self.params.get('knowledge_action_id')).update(
86+
details=self.get_runtime_details())

apps/application/flow/step_node/data_source_local_node/impl/base_data_source_local_node.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,15 @@ def get_form_list(node):
3737
def execute(self, file_type_list, file_size_limit, file_count_limit, **kwargs) -> NodeResult:
3838
return NodeResult({'file_list': self.workflow_manage.params.get('data_source', {}).get('file_list')},
3939
self.workflow_manage.params.get('knowledge_base') or {})
40+
41+
def get_details(self, index: int, **kwargs):
42+
return {
43+
'name': self.node.properties.get('stepName'),
44+
"index": index,
45+
'run_time': self.context.get('run_time'),
46+
'type': self.node.type,
47+
'file_list': self.context.get('file_list'),
48+
'knowledge_base': self.workflow_params.get('knowledge_base'),
49+
'status': self.status,
50+
'err_message': self.err_message
51+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Generated by Django 5.2.8 on 2025-11-19 06:06
2+
3+
import common.encoder.encoder
4+
import django.db.models.deletion
5+
import uuid_utils.compat
6+
from django.db import migrations, models
7+
8+
9+
class Migration(migrations.Migration):
10+
11+
dependencies = [
12+
('knowledge', '0004_alter_document_type_alter_knowledge_type_and_more'),
13+
]
14+
15+
operations = [
16+
migrations.CreateModel(
17+
name='KnowledgeAction',
18+
fields=[
19+
('create_time', models.DateTimeField(auto_now_add=True, db_index=True, verbose_name='创建时间')),
20+
('update_time', models.DateTimeField(auto_now=True, db_index=True, verbose_name='修改时间')),
21+
('id', models.UUIDField(default=uuid_utils.compat.uuid7, editable=False, primary_key=True, serialize=False, verbose_name='主键id')),
22+
('state', models.CharField(choices=[('PENDING', 'Pending'), ('STARTED', 'Started'), ('SUCCESS', 'Success'), ('FAILURE', 'Failure'), ('REVOKE', 'Revoke'), ('REVOKED', 'Revoked')], default='STARTED', max_length=20, verbose_name='状态')),
23+
('details', models.JSONField(default=dict, encoder=common.encoder.encoder.SystemEncoder, verbose_name='执行详情')),
24+
('run_time', models.FloatField(default=0, verbose_name='运行时长')),
25+
('meta', models.JSONField(default=dict, verbose_name='元数据')),
26+
('knowledge', models.ForeignKey(db_constraint=False, on_delete=django.db.models.deletion.DO_NOTHING, to='knowledge.knowledge', verbose_name='知识库')),
27+
],
28+
options={
29+
'db_table': 'knowledge_action',
30+
},
31+
),
32+
]
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# coding=utf-8
2+
"""
3+
@project: MaxKB
4+
@Author:虎虎
5+
@file: knowledge_action.py
6+
@date:2025/11/18 17:59
7+
@desc:
8+
"""
9+
import uuid_utils.compat as uuid
10+
11+
from django.db import models
12+
13+
from common.encoder.encoder import SystemEncoder
14+
from common.mixins.app_model_mixin import AppModelMixin
15+
from knowledge.models import Knowledge
16+
17+
18+
class State(models.TextChoices):
19+
# 等待
20+
PENDING = 'PENDING'
21+
# 执行中
22+
STARTED = 'STARTED'
23+
# 成功
24+
SUCCESS = 'SUCCESS'
25+
# 失败
26+
FAILURE = 'FAILURE'
27+
# 取消任务
28+
REVOKE = 'REVOKE'
29+
# 取消成功
30+
REVOKED = 'REVOKED'
31+
32+
33+
class KnowledgeAction(AppModelMixin):
34+
id = models.UUIDField(primary_key=True, max_length=128, default=uuid.uuid7, editable=False, verbose_name="主键id")
35+
36+
knowledge = models.ForeignKey(Knowledge, on_delete=models.DO_NOTHING, verbose_name="知识库", db_constraint=False)
37+
38+
state = models.CharField(verbose_name='状态', max_length=20,
39+
choices=State.choices,
40+
default=State.STARTED)
41+
42+
details = models.JSONField(verbose_name="执行详情", default=dict, encoder=SystemEncoder)
43+
44+
run_time = models.FloatField(verbose_name="运行时长", default=0)
45+
46+
meta = models.JSONField(verbose_name="元数据", default=dict)
47+
48+
class Meta:
49+
db_table = "knowledge_action"

apps/knowledge/serializers/knowledge_workflow.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from application.flow.step_node import get_node
1515
from common.exception.app_exception import AppApiException
1616
from knowledge.models import KnowledgeScope, Knowledge, KnowledgeType, KnowledgeWorkflow
17+
from knowledge.models.knowledge_action import KnowledgeAction, State
1718
from knowledge.serializers.knowledge import KnowledgeModelSerializer
1819
from system_manage.models import AuthTargetType
1920
from system_manage.serializers.user_resource_permission import UserResourcePermissionSerializer
@@ -34,13 +35,30 @@ def action(self, instance: Dict, with_valid=True):
3435
if with_valid:
3536
self.is_valid(raise_exception=True)
3637
knowledge_workflow = QuerySet(KnowledgeWorkflow).filter(knowledge_id=self.data.get("knowledge_id")).first()
38+
knowledge_action_id = uuid.uuid7()
39+
KnowledgeAction(id=knowledge_action_id, knowledge_id=self.data.get("knowledge_id"), state=State.STARTED).save()
3740
work_flow_manage = KnowledgeWorkflowManage(
3841
Workflow.new_instance(knowledge_workflow.work_flow, WorkflowMode.KNOWLEDGE),
39-
{'knowledge_id': self.data.get("knowledge_id"), 'stream': True,
42+
{'knowledge_id': self.data.get("knowledge_id"), 'knowledge_action_id': knowledge_action_id, 'stream': True,
4043
**instance},
41-
KnowledgeWorkflowPostHandler(None))
42-
r = work_flow_manage.run()
43-
return r
44+
KnowledgeWorkflowPostHandler(None, knowledge_action_id))
45+
work_flow_manage.run()
46+
return {'id': knowledge_action_id, 'knowledge_id': self.data.get("knowledge_id"), 'state': State.STARTED,
47+
'details': {}}
48+
49+
class Operate(serializers.Serializer):
50+
workspace_id = serializers.CharField(required=True, label=_('workspace id'))
51+
knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id'))
52+
id = serializers.UUIDField(required=True, label=_('knowledge action id'))
53+
54+
def one(self, is_valid=True):
55+
if is_valid:
56+
self.is_valid(raise_exception=True)
57+
knowledge_action_id = self.data.get("id")
58+
knowledge_action = QuerySet(KnowledgeAction).filter(id=knowledge_action_id).first()
59+
return {'id': knowledge_action_id, 'knowledge_id': knowledge_action.knowledge_id,
60+
'state': knowledge_action.state,
61+
'details': knowledge_action.details}
4462

4563

4664
class KnowledgeWorkflowSerializer(serializers.Serializer):

apps/knowledge/urls.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/document/<int:current_page>/<int:page_size>', views.DocumentView.Page.as_view()),
7171
path('workspace/<str:workspace_id>/knowledge/<int:current_page>/<int:page_size>', views.KnowledgeView.Page.as_view()),
7272
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/form_list/<str:type>/<str:id>', views.KnowledgeWorkflowFormView.as_view()),
73-
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/action', views.KnowledgeWorkflowActionView.as_view())
73+
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/action', views.KnowledgeWorkflowActionView.as_view()),
74+
path('workspace/<str:workspace_id>/knowledge/<str:knowledge_id>/action/<str:knowledge_action_id>', views.KnowledgeWorkflowActionView.Operate.as_view())
7475

7576
]

apps/knowledge/views/knowledge_workflow.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,16 @@ class KnowledgeWorkflowActionView(APIView):
2727
authentication_classes = [TokenAuth]
2828

2929
def post(self, request: Request, workspace_id: str, knowledge_id: str):
30-
return KnowledgeWorkflowActionSerializer(
31-
data={'workspace_id': workspace_id, 'knowledge_id': knowledge_id}).action(request.data, True)
30+
return result.success(KnowledgeWorkflowActionSerializer(
31+
data={'workspace_id': workspace_id, 'knowledge_id': knowledge_id}).action(request.data, True))
32+
33+
class Operate(APIView):
34+
authentication_classes = [TokenAuth]
35+
36+
def get(self, request, workspace_id: str, knowledge_id: str, knowledge_action_id: str):
37+
return result.success(KnowledgeWorkflowActionSerializer.Operate(
38+
data={'workspace_id': workspace_id, 'knowledge_id': knowledge_id, 'id': knowledge_action_id})
39+
.one())
3240

3341

3442
class KnowledgeWorkflowView(APIView):

ui/src/api/knowledge/knowledge.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,13 @@ const workflowAction: (
337337
) => Promise<Result<any>> = (knowledge_id: string, instance, loading) => {
338338
return post(`${prefix.value}/${knowledge_id}/action`, instance, {}, loading)
339339
}
340-
340+
const getWorkflowAction: (
341+
knowledge_id: string,
342+
knowledge_action_id: string,
343+
loading?: Ref<boolean>,
344+
) => Promise<Result<any>> = (knowledge_id: string, knowledge_action_id, loading) => {
345+
return get(`${prefix.value}/${knowledge_id}/action/${knowledge_action_id}`, {}, loading)
346+
}
341347
export default {
342348
getKnowledgeList,
343349
getKnowledgeListPage,
@@ -364,4 +370,5 @@ export default {
364370
createWorkflowKnowledge,
365371
getKnowledgeWorkflowFormList,
366372
workflowAction,
373+
getWorkflowAction,
367374
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<template>
2+
<div>
3+
<ExecutionDetailContent :detail="detail" app-type="WORK_FLOW"></ExecutionDetailContent>
4+
</div>
5+
</template>
6+
<script setup lang="ts">
7+
import { onUnmounted, ref, computed } from 'vue'
8+
import knowledgeApi from '@/api/knowledge/knowledge'
9+
const props = defineProps<{ id: string; knowledge_id: string }>()
10+
import ExecutionDetailContent from '@/components/ai-chat/component/knowledge-source-component/ExecutionDetailContent.vue'
11+
const detail = computed(() => {
12+
if (knowledge_action.value) {
13+
return Object.values(knowledge_action.value.details)
14+
}
15+
return []
16+
})
17+
const state = computed(() => {
18+
if (knowledge_action.value) {
19+
return knowledge_action.value.state
20+
}
21+
return 'PADDING'
22+
})
23+
const knowledge_action = ref<any>()
24+
const getKnowledgeWorkflowAction = () => {
25+
knowledgeApi.getWorkflowAction(props.knowledge_id, props.id).then((ok) => {
26+
knowledge_action.value = ok.data
27+
if (['SUCCESS', 'FAILURE', 'REVOKED'].includes(state.value)) {
28+
clearInterval(polling)
29+
}
30+
})
31+
}
32+
const polling = setInterval(getKnowledgeWorkflowAction, 2000)
33+
34+
onUnmounted(() => {
35+
clearInterval(polling)
36+
})
37+
</script>
38+
<style lang="scss"></style>

0 commit comments

Comments
 (0)