diff --git a/apps/application/flow/i_step_node.py b/apps/application/flow/i_step_node.py index f60f208c226..e8caf162d73 100644 --- a/apps/application/flow/i_step_node.py +++ b/apps/application/flow/i_step_node.py @@ -123,6 +123,8 @@ def get_loop_workflow_node(node_list): def get_workflow_state(workflow): + if workflow.is_the_task_interrupted(): + return State.REVOKED details = workflow.get_runtime_details() node_list = details.values() all_node = [*node_list, *get_loop_workflow_node(node_list)] diff --git a/apps/application/flow/knowledge_workflow_manage.py b/apps/application/flow/knowledge_workflow_manage.py index ab739932b74..1a696675cbc 100644 --- a/apps/application/flow/knowledge_workflow_manage.py +++ b/apps/application/flow/knowledge_workflow_manage.py @@ -30,10 +30,10 @@ def __init__(self, flow: Workflow, work_flow_post_handler: WorkFlowPostHandler, base_to_response: BaseToResponse = SystemToResponse(), start_node_id=None, - start_node_data=None, chat_record=None, child_node=None): + start_node_data=None, chat_record=None, child_node=None, is_the_task_interrupted=lambda: False): super().__init__(flow, params, work_flow_post_handler, base_to_response, None, None, None, None, - None, None, start_node_id, start_node_data, chat_record, child_node) + None, None, start_node_id, start_node_data, chat_record, child_node, is_the_task_interrupted) def get_params_serializer_class(self): return KnowledgeFlowParamsSerializer @@ -91,6 +91,9 @@ def hand_node_result(self, current_node, node_result_future): list(result) if current_node.status == 500: return None + if self.is_the_task_interrupted(): + current_node.status = 201 + return None return current_result except Exception as e: traceback.print_exc() diff --git a/apps/application/flow/loop_workflow_manage.py b/apps/application/flow/loop_workflow_manage.py index bf01e7606db..27c84f4dc66 100644 --- a/apps/application/flow/loop_workflow_manage.py +++ b/apps/application/flow/loop_workflow_manage.py @@ -92,14 +92,14 @@ def __init__(self, flow: Workflow, get_loop_context, base_to_response: BaseToResponse = SystemToResponse(), start_node_id=None, - start_node_data=None, chat_record=None, child_node=None): + start_node_data=None, chat_record=None, child_node=None, is_the_task_interrupted=lambda: False): self.parentWorkflowManage = parentWorkflowManage self.loop_params = loop_params self.get_loop_context = get_loop_context self.loop_field_list = [] super().__init__(flow, params, work_flow_post_handler, base_to_response, None, None, None, None, - None, None, start_node_id, start_node_data, chat_record, child_node) + None, None, start_node_id, start_node_data, chat_record, child_node, is_the_task_interrupted) def get_node_cls_by_id(self, node_id, up_node_id_list=None, get_node_params=lambda node: node.properties.get('node_data')): diff --git a/apps/application/flow/step_node/ai_chat_step_node/impl/base_chat_node.py b/apps/application/flow/step_node/ai_chat_step_node/impl/base_chat_node.py index 85d046b2e59..6e0b32df631 100644 --- a/apps/application/flow/step_node/ai_chat_step_node/impl/base_chat_node.py +++ b/apps/application/flow/step_node/ai_chat_step_node/impl/base_chat_node.py @@ -63,6 +63,8 @@ def write_context_stream(node_variable: Dict, workflow_variable: Dict, node: INo response_reasoning_content = False for chunk in response: + if workflow.is_the_task_interrupted(): + break reasoning_chunk = reasoning.get_reasoning_content(chunk) content_chunk = reasoning_chunk.get('content') if 'reasoning_content' in chunk.additional_kwargs: @@ -110,7 +112,8 @@ def write_context(node_variable: Dict, workflow_variable: Dict, node: INode, wor if 'reasoning_content' in meta: reasoning_content = (meta.get('reasoning_content', '') or '') else: - reasoning_content = (reasoning_result.get('reasoning_content') or '') + (reasoning_result_end.get('reasoning_content') or '') + reasoning_content = (reasoning_result.get('reasoning_content') or '') + ( + reasoning_result_end.get('reasoning_content') or '') _write_context(node_variable, workflow_variable, node, workflow, content, reasoning_content) diff --git a/apps/application/flow/step_node/loop_node/impl/base_loop_node.py b/apps/application/flow/step_node/loop_node/impl/base_loop_node.py index 2b17851a84d..cd75272e264 100644 --- a/apps/application/flow/step_node/loop_node/impl/base_loop_node.py +++ b/apps/application/flow/step_node/loop_node/impl/base_loop_node.py @@ -268,7 +268,8 @@ def workflow_manage_new_instance(loop_data, global_data, start_node_id=None, start_node_id=start_node_id, start_node_data=start_node_data, chat_record=chat_record, - child_node=child_node + child_node=child_node, + is_the_task_interrupted=self.workflow_manage.is_the_task_interrupted ) return workflow_manage diff --git a/apps/application/flow/workflow_manage.py b/apps/application/flow/workflow_manage.py index 7f23391ff5e..839794d54c4 100644 --- a/apps/application/flow/workflow_manage.py +++ b/apps/application/flow/workflow_manage.py @@ -97,7 +97,7 @@ def __init__(self, flow: Workflow, params, work_flow_post_handler: WorkFlowPostH video_list=None, other_list=None, start_node_id=None, - start_node_data=None, chat_record=None, child_node=None): + start_node_data=None, chat_record=None, child_node=None, is_the_task_interrupted=lambda: False): if form_data is None: form_data = {} if image_list is None: @@ -138,6 +138,7 @@ def __init__(self, flow: Workflow, params, work_flow_post_handler: WorkFlowPostH self.global_field_list = [] self.chat_field_list = [] self.init_fields() + self.is_the_task_interrupted = is_the_task_interrupted if start_node_id is not None: self.load_node(chat_record, start_node_id, start_node_data) else: diff --git a/apps/common/constants/cache_version.py b/apps/common/constants/cache_version.py index 6664acb5623..1b202dc8e07 100644 --- a/apps/common/constants/cache_version.py +++ b/apps/common/constants/cache_version.py @@ -26,7 +26,7 @@ class Cache_Version(Enum): SYSTEM = "SYSTEM", lambda key: key # 应用对接三方应用的缓存 APPLICATION_THIRD_PARTY = "APPLICATION:THIRD_PARTY", lambda key: key - + KNOWLEDGE_WORKFLOW_INTERRUPTED = "KNOWLEDGE_WORKFLOW_INTERRUPTED", lambda action_id: action_id # 对话 CHAT = "CHAT", lambda key: key diff --git a/apps/common/utils/tool_code.py b/apps/common/utils/tool_code.py index a5b29af1ca5..233bafbe55c 100644 --- a/apps/common/utils/tool_code.py +++ b/apps/common/utils/tool_code.py @@ -7,36 +7,41 @@ import socket import subprocess import sys -import tempfile -import pwd -import resource -import getpass -import random +import signal import time import uuid_utils.compat as uuid -from contextlib import contextmanager from common.utils.logger import maxkb_logger from django.utils.translation import gettext_lazy as _ from maxkb.const import BASE_DIR, CONFIG from maxkb.const import PROJECT_DIR from textwrap import dedent -_enable_sandbox = bool(CONFIG.get('SANDBOX', 0)) -_run_user = 'sandbox' if _enable_sandbox else getpass.getuser() -_sandbox_path = CONFIG.get("SANDBOX_HOME", '/opt/maxkb-app/sandbox') if _enable_sandbox else os.path.join(PROJECT_DIR, 'data', 'sandbox') -_process_limit_timeout_seconds = int(CONFIG.get("SANDBOX_PYTHON_PROCESS_LIMIT_TIMEOUT_SECONDS", '3600')) -_process_limit_cpu_cores = min(max(int(CONFIG.get("SANDBOX_PYTHON_PROCESS_LIMIT_CPU_CORES", '1')), 1), len(os.sched_getaffinity(0))) if sys.platform.startswith("linux") else os.cpu_count() # 只支持linux,window和mac不支持 -_process_limit_mem_mb = int(CONFIG.get("SANDBOX_PYTHON_PROCESS_LIMIT_MEM_MB", '256')) +python_directory = sys.executable + class ToolExecutor: - def __init__(self): - pass + def __init__(self, sandbox=False): + self.sandbox = sandbox + if sandbox: + self.sandbox_path = CONFIG.get("SANDBOX_HOME", '/opt/maxkb-app/sandbox') + self.user = 'sandbox' + else: + self.sandbox_path = os.path.join(PROJECT_DIR, 'data', 'sandbox') + self.user = None + self.sandbox_so_path = f'{self.sandbox_path}/lib/sandbox.so' + self.process_timeout_seconds = int(CONFIG.get("SANDBOX_PYTHON_PROCESS_TIMEOUT_SECONDS", '3600')) + try: + self._init_sandbox_dir() + except Exception as e: + # 本机忽略异常,容器内不忽略 + maxkb_logger.error(f'Exception: {e}', exc_info=True) + if self.sandbox: + raise e - @staticmethod - def init_sandbox_dir(): - if not _enable_sandbox: - # 不启用sandbox就不初始化目录 + def _init_sandbox_dir(self): + if not self.sandbox: + # 不是sandbox就不初始化目录 return try: # 只初始化一次 @@ -46,7 +51,7 @@ def init_sandbox_dir(): except FileExistsError: # 文件已存在 → 已初始化过 return - maxkb_logger.info("Init sandbox dir.") + maxkb_logger.debug("init dir") try: os.system("chmod -R g-rwx /dev/shm /dev/mqueue") os.system("chmod o-rwx /run/postgresql") @@ -56,7 +61,7 @@ def init_sandbox_dir(): if CONFIG.get("SANDBOX_TMP_DIR_ENABLED", '0') == "1": os.system("chmod g+rwx /tmp") # 初始化sandbox配置文件 - sandbox_lib_path = os.path.dirname(f'{_sandbox_path}/lib/sandbox.so') + sandbox_lib_path = os.path.dirname(self.sandbox_so_path) sandbox_conf_file_path = f'{sandbox_lib_path}/.sandbox.conf' if os.path.exists(sandbox_conf_file_path): os.remove(sandbox_conf_file_path) @@ -69,49 +74,37 @@ def init_sandbox_dir(): with open(sandbox_conf_file_path, "w") as f: f.write(f"SANDBOX_PYTHON_BANNED_HOSTS={banned_hosts}\n") f.write(f"SANDBOX_PYTHON_ALLOW_SUBPROCESS={allow_subprocess}\n") - os.system(f"chmod -R 550 {_sandbox_path}") - - try: - init_sandbox_dir() - except Exception as e: - maxkb_logger.error(f'Exception: {e}', exc_info=True) + os.system(f"chmod -R 550 {self.sandbox_path}") def exec_code(self, code_str, keywords, function_name=None): _id = str(uuid.uuid7()) + success = '{"code":200,"msg":"成功","data":exec_result}' + err = '{"code":500,"msg":str(e),"data":None}' action_function = f'({function_name !a}, locals_v.get({function_name !a}))' if function_name else 'locals_v.popitem()' python_paths = CONFIG.get_sandbox_python_package_paths().split(',') - set_run_user = f'os.setgid({pwd.getpwnam(_run_user).pw_gid});os.setuid({pwd.getpwnam(_run_user).pw_uid});' if _enable_sandbox else '' _exec_code = f""" try: - import os, sys, json - from contextlib import redirect_stdout + import os, sys, json, base64, builtins path_to_exclude = ['/opt/py3/lib/python3.11/site-packages', '/opt/maxkb-app/apps'] sys.path = [p for p in sys.path if p not in path_to_exclude] sys.path += {python_paths} - locals_v={{}} + locals_v={'{}'} keywords={keywords} - globals_v={{}} - {set_run_user} + globals_v={'{}'} os.environ.clear() - with redirect_stdout(open(os.devnull, 'w')): - exec({dedent(code_str)!a}, globals_v, locals_v) - f_name, f = {action_function} - globals_v.update(locals_v) - exec_result=f(**keywords) - sys.stdout.write("\\n{_id}:") - json.dump({{'code':200,'msg':'success','data':exec_result}}, sys.stdout, default=str) + exec({dedent(code_str)!a}, globals_v, locals_v) + f_name, f = {action_function} + for local in locals_v: + globals_v[local] = locals_v[local] + exec_result=f(**keywords) + builtins.print("\\n{_id}:"+base64.b64encode(json.dumps({success}, default=str).encode()).decode()) except Exception as e: - if isinstance(e, MemoryError): e = Exception("Cannot allocate more memory: exceeded the limit of {_process_limit_mem_mb} MB.") - sys.stdout.write("\\n{_id}:") - json.dump({{'code':500,'msg':str(e),'data':None}}, sys.stdout, default=str) -sys.stdout.flush() + builtins.print("\\n{_id}:"+base64.b64encode(json.dumps({err}, default=str).encode()).decode()) """ - maxkb_logger.debug(f"Sandbox execute code: {_exec_code}") - with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=True) as f: - f.write(_exec_code) - f.flush() - with execution_timer(_id): - subprocess_result = self._exec(f.name) + if self.sandbox: + subprocess_result = self._exec_sandbox(_exec_code) + else: + subprocess_result = self._exec(_exec_code) if subprocess_result.returncode != 0: raise Exception(subprocess_result.stderr or subprocess_result.stdout or "Unknown exception occurred") lines = subprocess_result.stdout.splitlines() @@ -119,10 +112,10 @@ def exec_code(self, code_str, keywords, function_name=None): if not result_line: maxkb_logger.error("\n".join(lines)) raise Exception("No result found.") - result = json.loads(result_line[-1].split(":", 1)[1]) + result = json.loads(base64.b64decode(result_line[-1].split(":", 1)[1]).decode()) if result.get('code') == 200: return result.get('data') - raise Exception(result.get('msg') + (f'\n{subprocess_result.stderr}' if subprocess_result.stderr else '')) + raise Exception(result.get('msg')) def _generate_mcp_server_code(self, _code, params): # 解析代码,提取导入语句和函数定义 @@ -190,7 +183,6 @@ def _generate_mcp_server_code(self, _code, params): def generate_mcp_server_code(self, code_str, params): python_paths = CONFIG.get_sandbox_python_package_paths().split(',') code = self._generate_mcp_server_code(code_str, params) - set_run_user = f'os.setgid({pwd.getpwnam(_run_user).pw_gid});os.setuid({pwd.getpwnam(_run_user).pw_uid});' if _enable_sandbox else '' return f""" import os, sys, logging logging.basicConfig(level=logging.WARNING) @@ -199,7 +191,6 @@ def generate_mcp_server_code(self, code_str, params): path_to_exclude = ['/opt/py3/lib/python3.11/site-packages', '/opt/maxkb-app/apps'] sys.path = [p for p in sys.path if p not in path_to_exclude] sys.path += {python_paths} -{set_run_user} os.environ.clear() exec({dedent(code)!a}) """ @@ -208,39 +199,67 @@ def get_tool_mcp_config(self, code, params): _code = self.generate_mcp_server_code(code, params) maxkb_logger.debug(f"Python code of mcp tool: {_code}") compressed_and_base64_encoded_code_str = base64.b64encode(gzip.compress(_code.encode())).decode() - tool_config = { - 'command': sys.executable, - 'args': [ - '-c', - f'import base64,gzip; exec(gzip.decompress(base64.b64decode(\'{compressed_and_base64_encoded_code_str}\')).decode())', - ], - 'cwd': _sandbox_path, - 'env': { - 'LD_PRELOAD': f'{_sandbox_path}/lib/sandbox.so', - }, - 'transport': 'stdio', - } + if self.sandbox: + tool_config = { + 'command': 'su', + 'args': [ + '-s', sys.executable, + '-c', + f'import base64,gzip; exec(gzip.decompress(base64.b64decode(\'{compressed_and_base64_encoded_code_str}\')).decode())', + self.user, + ], + 'cwd': self.sandbox_path, + 'env': { + 'LD_PRELOAD': self.sandbox_so_path, + }, + 'transport': 'stdio', + } + else: + tool_config = { + 'command': sys.executable, + 'args': [ + '-c', + f'import base64,gzip; exec(gzip.decompress(base64.b64decode(\'{compressed_and_base64_encoded_code_str}\')).decode())', + ], + 'transport': 'stdio', + } return tool_config - def _exec(self, execute_file): + def _exec_sandbox(self, _code): kwargs = {'cwd': BASE_DIR, 'env': { - 'LD_PRELOAD': f'{_sandbox_path}/lib/sandbox.so', + 'LD_PRELOAD': self.sandbox_so_path, }} + maxkb_logger.debug(f"Sandbox execute code: {_code}") + compressed_and_base64_encoded_code_str = base64.b64encode(gzip.compress(_code.encode())).decode() + cmd = [ + 'su', '-s', python_directory, '-c', + f'import base64,gzip; exec(gzip.decompress(base64.b64decode(\'{compressed_and_base64_encoded_code_str}\')).decode())', + self.user + ] try: - subprocess_result = subprocess.run( - [sys.executable, execute_file], - timeout=_process_limit_timeout_seconds, + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, text=True, - capture_output=True, **kwargs, - preexec_fn=(lambda: None if (not _enable_sandbox or not sys.platform.startswith("linux")) else ( - resource.setrlimit(resource.RLIMIT_AS, (_process_limit_mem_mb * 1024 * 1024,) * 2), - os.sched_setaffinity(0, set(random.sample(list(os.sched_getaffinity(0)), _process_limit_cpu_cores))) - )) + start_new_session=True + ) + proc.wait(timeout=self.process_timeout_seconds) + return subprocess.CompletedProcess( + proc.args, + proc.returncode, + proc.stdout.read(), + proc.stderr.read() ) - return subprocess_result except subprocess.TimeoutExpired: - raise Exception(_(f"Process execution timed out after {_process_limit_timeout_seconds} seconds.")) + pgid = os.getpgid(proc.pid) + os.killpg(pgid, signal.SIGTERM) #温和终止 + time.sleep(1) #留出短暂时间让进程清理 + if proc.poll() is None: #如果仍未终止,强制终止 + os.killpg(pgid, signal.SIGKILL) + proc.wait() + raise Exception(_(f"Process execution timed out after {self.process_timeout_seconds} seconds.")) def validate_mcp_transport(self, code_str): servers = json.loads(code_str) @@ -248,11 +267,6 @@ def validate_mcp_transport(self, code_str): if config.get('transport') not in ['sse', 'streamable_http']: raise Exception(_('Only support transport=sse or transport=streamable_http')) - -@contextmanager -def execution_timer(id=""): - start = time.perf_counter() - try: - yield - finally: - maxkb_logger.debug(f"Tool execution({id}) takes {time.perf_counter() - start:.6f} seconds.") \ No newline at end of file + @staticmethod + def _exec(_code): + return subprocess.run([python_directory, '-c', _code], text=True, capture_output=True) diff --git a/apps/knowledge/serializers/knowledge_workflow.py b/apps/knowledge/serializers/knowledge_workflow.py index 1138bfcf402..245869b085f 100644 --- a/apps/knowledge/serializers/knowledge_workflow.py +++ b/apps/knowledge/serializers/knowledge_workflow.py @@ -15,6 +15,7 @@ from application.flow.knowledge_workflow_manage import KnowledgeWorkflowManage from application.flow.step_node import get_node from application.serializers.application import get_mcp_tools +from common.constants.cache_version import Cache_Version from common.db.search import page_search from common.exception.app_exception import AppApiException from common.utils.rsa_util import rsa_long_decrypt @@ -22,7 +23,7 @@ from knowledge.models import KnowledgeScope, Knowledge, KnowledgeType, KnowledgeWorkflow, KnowledgeWorkflowVersion from knowledge.models.knowledge_action import KnowledgeAction, State from knowledge.serializers.knowledge import KnowledgeModelSerializer -from maxkb.const import CONFIG +from django.core.cache import cache from system_manage.models import AuthTargetType from system_manage.serializers.user_resource_permission import UserResourcePermissionSerializer from tools.models import Tool @@ -52,7 +53,11 @@ class KnowledgeWorkflowActionSerializer(serializers.Serializer): knowledge_id = serializers.UUIDField(required=True, label=_('knowledge id')) def get_query_set(self, instance: Dict): - query_set = QuerySet(KnowledgeAction).filter(knowledge_id=self.data.get('knowledge_id')).values('id','knowledge_id',"state",'meta','run_time',"create_time") + query_set = QuerySet(KnowledgeAction).filter(knowledge_id=self.data.get('knowledge_id')).values('id', + 'knowledge_id', + "state", 'meta', + 'run_time', + "create_time") if instance.get("user_name"): query_set = query_set.filter(meta__user_name__icontains=instance.get('user_name')) if instance.get('state'): @@ -73,7 +78,8 @@ def page(self, current_page, page_size, instance: Dict, is_valid=True): KnowledgeWorkflowActionListQuerySerializer(data=instance).is_valid(raise_exception=True) return page_search(current_page, page_size, self.get_query_set(instance), lambda a: {'id': a.get("id"), 'knowledge_id': a.get("knowledge_id"), 'state': a.get("state"), - 'meta': a.get("meta"), 'run_time': a.get("run_time"), 'create_time': a.get("create_time")}) + 'meta': a.get("meta"), 'run_time': a.get("run_time"), + 'create_time': a.get("create_time")}) def action(self, instance: Dict, user, with_valid=True): if with_valid: @@ -91,7 +97,10 @@ def action(self, instance: Dict, user, with_valid=True): {'knowledge_id': self.data.get("knowledge_id"), 'knowledge_action_id': knowledge_action_id, 'stream': True, 'workspace_id': self.data.get("workspace_id"), **instance}, - KnowledgeWorkflowPostHandler(None, knowledge_action_id)) + KnowledgeWorkflowPostHandler(None, knowledge_action_id), + is_the_task_interrupted=lambda: cache.get( + Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_key(action_id=knowledge_action_id), + version=Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_version()) or False) work_flow_manage.run() return {'id': knowledge_action_id, 'knowledge_id': self.data.get("knowledge_id"), 'state': State.STARTED, 'details': {}, 'meta': meta} @@ -135,6 +144,15 @@ def one(self, is_valid=True): 'details': knowledge_action.details, 'meta': knowledge_action.meta} + def cancel(self, is_valid=True): + if is_valid: + self.is_valid(raise_exception=True) + knowledge_action_id = self.data.get("id") + cache.set(Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_key(action_id=knowledge_action_id), True, + version=Cache_Version.KNOWLEDGE_WORKFLOW_INTERRUPTED.get_version()) + QuerySet(KnowledgeAction).filter(id=knowledge_action_id).update(state=State.REVOKE) + return True + class KnowledgeWorkflowSerializer(serializers.Serializer): class Datasource(serializers.Serializer): diff --git a/apps/knowledge/urls.py b/apps/knowledge/urls.py index 3e27fbda6b5..9f3007306bb 100644 --- a/apps/knowledge/urls.py +++ b/apps/knowledge/urls.py @@ -76,6 +76,7 @@ path('workspace//knowledge//action//', views.KnowledgeWorkflowActionView.Page.as_view()), path('workspace//knowledge//upload_document', views.KnowledgeWorkflowUploadDocumentView.as_view()), path('workspace//knowledge//action/', views.KnowledgeWorkflowActionView.Operate.as_view()), + path('workspace//knowledge//action//cancel', views.KnowledgeWorkflowActionView.Cancel.as_view()), path('workspace//knowledge//mcp_tools', views.McpServers.as_view()), path('workspace//knowledge//knowledge_version', views.KnowledgeWorkflowVersionView.as_view()), path('workspace//knowledge//knowledge_version//', views.KnowledgeWorkflowVersionView.Page.as_view()), diff --git a/apps/knowledge/views/knowledge_workflow.py b/apps/knowledge/views/knowledge_workflow.py index 074218e9d4d..2eb0175fe28 100644 --- a/apps/knowledge/views/knowledge_workflow.py +++ b/apps/knowledge/views/knowledge_workflow.py @@ -168,6 +168,33 @@ def get(self, request, workspace_id: str, knowledge_id: str, knowledge_action_id data={'workspace_id': workspace_id, 'knowledge_id': knowledge_id, 'id': knowledge_action_id}) .one()) + class Cancel(APIView): + authentication_classes = [TokenAuth] + + @extend_schema( + methods=['POST'], + description=_('Cancel knowledge workflow action'), + summary=_('Cancel knowledge workflow action'), + operation_id=_('Cancel knowledge workflow action'), # type: ignore + parameters=KnowledgeWorkflowActionApi.get_parameters(), + responses=DefaultResultSerializer(), + tags=[_('Knowledge Base')] # type: ignore + ) + @has_permissions( + PermissionConstants.KNOWLEDGE_WORKFLOW_EDIT.get_workspace_knowledge_permission(), + PermissionConstants.KNOWLEDGE_WORKFLOW_EDIT.get_workspace_permission_workspace_manage_role(), + RoleConstants.WORKSPACE_MANAGE.get_workspace_role(), + ViewPermission( + [RoleConstants.USER.get_workspace_role()], + [PermissionConstants.KNOWLEDGE.get_workspace_knowledge_permission()], + CompareConstants.AND + ), + ) + def post(self, request, workspace_id: str, knowledge_id: str, knowledge_action_id: str): + return result.success(KnowledgeWorkflowActionSerializer.Operate( + data={'workspace_id': workspace_id, 'knowledge_id': knowledge_id, 'id': knowledge_action_id}) + .cancel()) + class KnowledgeWorkflowView(APIView): authentication_classes = [TokenAuth] diff --git a/ui/src/api/knowledge/knowledge.ts b/ui/src/api/knowledge/knowledge.ts index e3ed013e530..06f151526cc 100644 --- a/ui/src/api/knowledge/knowledge.ts +++ b/ui/src/api/knowledge/knowledge.ts @@ -433,7 +433,13 @@ const getWorkflowAction: ( ) => Promise> = (knowledge_id: string, knowledge_action_id, loading) => { return get(`${prefix.value}/${knowledge_id}/action/${knowledge_action_id}`, {}, loading) } - +const cancelWorkflowAction: ( + knowledge_id: string, + knowledge_action_id: string, + loading?: Ref, +) => Promise> = (knowledge_id: string, knowledge_action_id, loading) => { + return post(`${prefix.value}/${knowledge_id}/action/${knowledge_action_id}/cancel`, {}, loading) +} /** * mcp 节点 */ @@ -480,4 +486,5 @@ export default { putKnowledgeWorkflow, workflowUpload, getWorkflowActionPage, + cancelWorkflowAction, } diff --git a/ui/src/views/knowledge-workflow/component/execution-record/ExecutionDetailDrawer.vue b/ui/src/views/knowledge-workflow/component/execution-record/ExecutionDetailDrawer.vue index 9b2e9e36091..ee8b7995781 100644 --- a/ui/src/views/knowledge-workflow/component/execution-record/ExecutionDetailDrawer.vue +++ b/ui/src/views/knowledge-workflow/component/execution-record/ExecutionDetailDrawer.vue @@ -47,6 +47,20 @@ {{ $t('common.status.fail') }} + + + {{ $t('common.status.REVOKED', '已取消') }} + + + + {{ $t('views.document.fileStatus.REVOKE', '取消中') }} + {{ $t('common.status.padding') }} diff --git a/ui/src/views/knowledge-workflow/component/execution-record/ExecutionRecordDrawer.vue b/ui/src/views/knowledge-workflow/component/execution-record/ExecutionRecordDrawer.vue index d4685240ef8..e236fcdbf86 100644 --- a/ui/src/views/knowledge-workflow/component/execution-record/ExecutionRecordDrawer.vue +++ b/ui/src/views/knowledge-workflow/component/execution-record/ExecutionRecordDrawer.vue @@ -64,6 +64,14 @@ {{ $t('common.status.fail') }} + + + {{ $t('common.status.REVOKED', '已取消') }} + + + + {{ $t('views.document.fileStatus.REVOKE', '取消中') }} + {{ $t('common.status.padding') }} @@ -87,11 +95,22 @@ @@ -157,6 +176,11 @@ const toDetails = (row: any) => { ExecutionDetailDrawerRef.value?.open() } +const cancel = (row: any) => { + loadSharedApi({ type: 'knowledge', systemType: apiType.value }) + .cancelWorkflowAction(active_knowledge_id.value, row.id, loading) + .then((ok: any) => {}) +} const changeFilterHandle = () => { query.value = { user_name: '', status: '' } }