|
4 | 4 |
|
5 | 5 | import click |
6 | 6 | from celery import shared_task # type: ignore |
7 | | -from sqlalchemy import delete, select |
| 7 | +from sqlalchemy import delete |
8 | 8 | from sqlalchemy.exc import SQLAlchemyError |
9 | | -from sqlalchemy.orm import Session |
10 | 9 |
|
11 | | -from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository |
12 | 10 | from extensions.ext_database import db |
13 | 11 | from models import ( |
14 | | - Account, |
15 | 12 | ApiToken, |
16 | | - App, |
17 | 13 | AppAnnotationHitHistory, |
18 | 14 | AppAnnotationSetting, |
19 | 15 | AppDatasetJoin, |
|
34 | 30 | ) |
35 | 31 | from models.tools import WorkflowToolProvider |
36 | 32 | from models.web import PinnedConversation, SavedMessage |
37 | | -from models.workflow import ConversationVariable, Workflow, WorkflowAppLog, WorkflowRun |
| 33 | +from models.workflow import ConversationVariable, Workflow, WorkflowAppLog, WorkflowNodeExecution, WorkflowRun |
38 | 34 |
|
39 | 35 |
|
40 | 36 | @shared_task(queue="app_deletion", bind=True, max_retries=3) |
@@ -191,30 +187,17 @@ def del_workflow_run(workflow_run_id: str): |
191 | 187 |
|
192 | 188 |
|
193 | 189 | def _delete_app_workflow_node_executions(tenant_id: str, app_id: str): |
194 | | - # Get app's owner |
195 | | - with Session(db.engine, expire_on_commit=False) as session: |
196 | | - stmt = select(Account).where(Account.id == App.created_by).where(App.id == app_id) |
197 | | - user = session.scalar(stmt) |
198 | | - |
199 | | - if user is None: |
200 | | - errmsg = ( |
201 | | - f"Failed to delete workflow node executions for tenant {tenant_id} and app {app_id}, app's owner not found" |
| 190 | + def del_workflow_node_execution(workflow_node_execution_id: str): |
| 191 | + db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.id == workflow_node_execution_id).delete( |
| 192 | + synchronize_session=False |
202 | 193 | ) |
203 | | - logging.error(errmsg) |
204 | | - raise ValueError(errmsg) |
205 | | - |
206 | | - # Create a repository instance for WorkflowNodeExecution |
207 | | - repository = SQLAlchemyWorkflowNodeExecutionRepository( |
208 | | - session_factory=db.engine, |
209 | | - user=user, |
210 | | - app_id=app_id, |
211 | | - triggered_from=None, |
212 | | - ) |
213 | | - |
214 | | - # Use the clear method to delete all records for this tenant_id and app_id |
215 | | - repository.clear() |
216 | 194 |
|
217 | | - logging.info(click.style(f"Deleted workflow node executions for tenant {tenant_id} and app {app_id}", fg="green")) |
| 195 | + _delete_records( |
| 196 | + """select id from workflow_node_executions where tenant_id=:tenant_id and app_id=:app_id limit 1000""", |
| 197 | + {"tenant_id": tenant_id, "app_id": app_id}, |
| 198 | + del_workflow_node_execution, |
| 199 | + "workflow node execution", |
| 200 | + ) |
218 | 201 |
|
219 | 202 |
|
220 | 203 | def _delete_app_workflow_app_logs(tenant_id: str, app_id: str): |
|
0 commit comments