From b9108e952774721b118e52e520a453860bb25f63 Mon Sep 17 00:00:00 2001 From: Novice Date: Fri, 29 Aug 2025 16:50:53 +0800 Subject: [PATCH 1/4] chore: optimize SQL queries that perform partial full table scans --- .../advanced_chat/generate_task_pipeline.py | 5 -- .../processor/parent_child_index_processor.py | 7 ++- .../update_provider_when_message_created.py | 49 +++++++++++++++++-- ...3885c_add_workflow_app_log_run_id_index.py | 32 ++++++++++++ api/models/workflow.py | 1 + 5 files changed, 84 insertions(+), 10 deletions(-) create mode 100644 api/migrations/versions/2025_08_29_1534-b95962a3885c_add_workflow_app_log_run_id_index.py diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index a61bba512f51b8..8e82522db0e14e 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -73,7 +73,6 @@ from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.system_variable import SystemVariable from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager -from events.message_event import message_was_created from extensions.ext_database import db from libs.datetime_utils import naive_utc_now from models import Conversation, EndUser, Message, MessageFile @@ -938,10 +937,6 @@ def _save_message(self, *, session: Session, graph_runtime_state: Optional[Graph self._task_state.metadata.usage = usage else: self._task_state.metadata.usage = LLMUsage.empty_usage() - message_was_created.send( - message, - application_generate_entity=self._application_generate_entity, - ) def _message_end_to_stream_response(self) -> MessageEndStreamResponse: """ diff --git a/api/core/rag/index_processor/processor/parent_child_index_processor.py b/api/core/rag/index_processor/processor/parent_child_index_processor.py index 52756fbacd8b52..01bd30f7b249ba 100644 --- a/api/core/rag/index_processor/processor/parent_child_index_processor.py +++ b/api/core/rag/index_processor/processor/parent_child_index_processor.py @@ -130,13 +130,16 @@ def clean(self, dataset: Dataset, node_ids: Optional[list[str]], with_keywords: if delete_child_chunks: db.session.query(ChildChunk).where( ChildChunk.dataset_id == dataset.id, ChildChunk.index_node_id.in_(child_node_ids) - ).delete() + ).delete(synchronize_session=False) db.session.commit() else: vector.delete() if delete_child_chunks: - db.session.query(ChildChunk).where(ChildChunk.dataset_id == dataset.id).delete() + # Use existing compound index: (tenant_id, dataset_id, ...) + db.session.query(ChildChunk).where( + ChildChunk.tenant_id == dataset.tenant_id, ChildChunk.dataset_id == dataset.id + ).delete(synchronize_session=False) db.session.commit() def retrieve( diff --git a/api/events/event_handlers/update_provider_when_message_created.py b/api/events/event_handlers/update_provider_when_message_created.py index 90eb524c9306a3..2e11c424cf1bc9 100644 --- a/api/events/event_handlers/update_provider_when_message_created.py +++ b/api/events/event_handlers/update_provider_when_message_created.py @@ -13,12 +13,45 @@ from core.plugin.entities.plugin import ModelProviderID from events.message_event import message_was_created from extensions.ext_database import db +from extensions.ext_redis import redis_client, redis_fallback from libs import datetime_utils from models.model import Message from models.provider import Provider, ProviderType logger = logging.getLogger(__name__) +# Redis cache key prefix for provider last used timestamps +_PROVIDER_LAST_USED_CACHE_PREFIX = "provider:last_used" +# Default TTL for cache entries (10 minutes) +_CACHE_TTL_SECONDS = 600 + + +def _get_provider_cache_key(tenant_id: str, provider_name: str) -> str: + """Generate Redis cache key for provider last used timestamp.""" + return f"{_PROVIDER_LAST_USED_CACHE_PREFIX}:{tenant_id}:{provider_name}" + + +@redis_fallback(default_return=None) +def _get_last_update_timestamp(cache_key: str) -> Optional[datetime]: + """Get last update timestamp from Redis cache.""" + try: + timestamp_str = redis_client.get(cache_key) + if timestamp_str: + return datetime.fromisoformat(timestamp_str.decode("utf-8")) + return None + except Exception as e: + logger.warning("Failed to get last update timestamp from Redis: %s", e) + return None + + +@redis_fallback() +def _set_last_update_timestamp(cache_key: str, timestamp: datetime) -> None: + """Set last update timestamp in Redis cache with TTL.""" + try: + redis_client.setex(cache_key, _CACHE_TTL_SECONDS, timestamp.isoformat()) + except Exception as e: + logger.warning("Failed to set last update timestamp in Redis: %s", e) + class _ProviderUpdateFilters(BaseModel): """Filters for identifying Provider records to update.""" @@ -139,7 +172,7 @@ def handle(sender: Message, **kwargs): provider_name, ) - except Exception as e: + except Exception: # Log failure with timing and context duration = time_module.perf_counter() - start_time @@ -215,8 +248,18 @@ def _execute_provider_updates(updates_to_perform: list[_ProviderUpdateOperation] # Prepare values dict for SQLAlchemy update update_values = {} - # updateing to `last_used` is removed due to performance reason. - # ref: https://github.com/langgenius/dify/issues/24526 + + # Time-window based update for last_used to avoid hot row contention + if values.last_used is not None: + cache_key = _get_provider_cache_key(filters.tenant_id, filters.provider_name) + now = datetime_utils.naive_utc_now() + last_update = _get_last_update_timestamp(cache_key) + + update_window_seconds = 60 * 5 + if last_update is None or (now - last_update).total_seconds() > update_window_seconds: + update_values["last_used"] = values.last_used + _set_last_update_timestamp(cache_key, now) + if values.quota_used is not None: update_values["quota_used"] = values.quota_used # Skip the current update operation if no updates are required. diff --git a/api/migrations/versions/2025_08_29_1534-b95962a3885c_add_workflow_app_log_run_id_index.py b/api/migrations/versions/2025_08_29_1534-b95962a3885c_add_workflow_app_log_run_id_index.py new file mode 100644 index 00000000000000..c0aec5f5d55fae --- /dev/null +++ b/api/migrations/versions/2025_08_29_1534-b95962a3885c_add_workflow_app_log_run_id_index.py @@ -0,0 +1,32 @@ +"""chore: add workflow app log run id index + +Revision ID: b95962a3885c +Revises: 0e154742a5fa +Create Date: 2025-08-29 15:34:09.838623 + +""" +from alembic import op +import models as models +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = 'b95962a3885c' +down_revision = '0e154742a5fa' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('workflow_app_logs', schema=None) as batch_op: + batch_op.create_index('workflow_app_log_workflow_run_id_idx', ['workflow_run_id'], unique=False) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('workflow_app_logs', schema=None) as batch_op: + batch_op.drop_index('workflow_app_log_workflow_run_id_idx') + # ### end Alembic commands ### diff --git a/api/models/workflow.py b/api/models/workflow.py index 4d0089fa4e0d26..28bf683fb861e0 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -835,6 +835,7 @@ class WorkflowAppLog(Base): __table_args__ = ( sa.PrimaryKeyConstraint("id", name="workflow_app_log_pkey"), sa.Index("workflow_app_log_app_idx", "tenant_id", "app_id"), + sa.Index("workflow_app_log_workflow_run_id_idx", "workflow_run_id"), ) id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()")) From ffd6fd137909ff5d0053a7d05ee7d377da120ea7 Mon Sep 17 00:00:00 2001 From: Novice Date: Fri, 29 Aug 2025 17:11:40 +0800 Subject: [PATCH 2/4] chore: handle migrations --- ...08_29_1534-b95962a3885c_add_workflow_app_log_run_id_index.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/migrations/versions/2025_08_29_1534-b95962a3885c_add_workflow_app_log_run_id_index.py b/api/migrations/versions/2025_08_29_1534-b95962a3885c_add_workflow_app_log_run_id_index.py index c0aec5f5d55fae..465f8664a55a92 100644 --- a/api/migrations/versions/2025_08_29_1534-b95962a3885c_add_workflow_app_log_run_id_index.py +++ b/api/migrations/versions/2025_08_29_1534-b95962a3885c_add_workflow_app_log_run_id_index.py @@ -12,7 +12,7 @@ # revision identifiers, used by Alembic. revision = 'b95962a3885c' -down_revision = '0e154742a5fa' +down_revision = '8d289573e1da' branch_labels = None depends_on = None From 8d0117a0d588010f2c0e1fc2afa3a82335844efd Mon Sep 17 00:00:00 2001 From: Novice Date: Fri, 29 Aug 2025 18:06:42 +0800 Subject: [PATCH 3/4] chore: remove the magic number --- .../event_handlers/update_provider_when_message_created.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/events/event_handlers/update_provider_when_message_created.py b/api/events/event_handlers/update_provider_when_message_created.py index 2e11c424cf1bc9..cb80a90d3ec76d 100644 --- a/api/events/event_handlers/update_provider_when_message_created.py +++ b/api/events/event_handlers/update_provider_when_message_created.py @@ -24,6 +24,7 @@ _PROVIDER_LAST_USED_CACHE_PREFIX = "provider:last_used" # Default TTL for cache entries (10 minutes) _CACHE_TTL_SECONDS = 600 +LAST_USED_UPDATE_WINDOW_SECONDS = 60 * 5 def _get_provider_cache_key(tenant_id: str, provider_name: str) -> str: @@ -255,8 +256,7 @@ def _execute_provider_updates(updates_to_perform: list[_ProviderUpdateOperation] now = datetime_utils.naive_utc_now() last_update = _get_last_update_timestamp(cache_key) - update_window_seconds = 60 * 5 - if last_update is None or (now - last_update).total_seconds() > update_window_seconds: + if last_update is None or (now - last_update).total_seconds() > LAST_USED_UPDATE_WINDOW_SECONDS: update_values["last_used"] = values.last_used _set_last_update_timestamp(cache_key, now) From bc6e0204485ed47e15b333dfb33f192dba095c1d Mon Sep 17 00:00:00 2001 From: Novice Date: Tue, 2 Sep 2025 09:26:30 +0800 Subject: [PATCH 4/4] chore: apply code review comments --- .../update_provider_when_message_created.py | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/api/events/event_handlers/update_provider_when_message_created.py b/api/events/event_handlers/update_provider_when_message_created.py index cb80a90d3ec76d..96ff6fcf5ef091 100644 --- a/api/events/event_handlers/update_provider_when_message_created.py +++ b/api/events/event_handlers/update_provider_when_message_created.py @@ -35,23 +35,16 @@ def _get_provider_cache_key(tenant_id: str, provider_name: str) -> str: @redis_fallback(default_return=None) def _get_last_update_timestamp(cache_key: str) -> Optional[datetime]: """Get last update timestamp from Redis cache.""" - try: - timestamp_str = redis_client.get(cache_key) - if timestamp_str: - return datetime.fromisoformat(timestamp_str.decode("utf-8")) - return None - except Exception as e: - logger.warning("Failed to get last update timestamp from Redis: %s", e) - return None + timestamp_str = redis_client.get(cache_key) + if timestamp_str: + return datetime.fromtimestamp(float(timestamp_str.decode("utf-8"))) + return None @redis_fallback() def _set_last_update_timestamp(cache_key: str, timestamp: datetime) -> None: """Set last update timestamp in Redis cache with TTL.""" - try: - redis_client.setex(cache_key, _CACHE_TTL_SECONDS, timestamp.isoformat()) - except Exception as e: - logger.warning("Failed to set last update timestamp in Redis: %s", e) + redis_client.setex(cache_key, _CACHE_TTL_SECONDS, str(timestamp.timestamp())) class _ProviderUpdateFilters(BaseModel): @@ -250,7 +243,13 @@ def _execute_provider_updates(updates_to_perform: list[_ProviderUpdateOperation] # Prepare values dict for SQLAlchemy update update_values = {} - # Time-window based update for last_used to avoid hot row contention + # NOTE: For frequently used providers under high load, this implementation may experience + # race conditions or update contention despite the time-window optimization: + # 1. Multiple concurrent requests might check the same cache key simultaneously + # 2. Redis cache operations are not atomic with database updates + # 3. Heavy providers could still face database lock contention during peak usage + # The current implementation is acceptable for most scenarios, but future optimization + # considerations could include: batched updates, or async processing. if values.last_used is not None: cache_key = _get_provider_cache_key(filters.tenant_id, filters.provider_name) now = datetime_utils.naive_utc_now()