Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions api/core/app/apps/advanced_chat/generate_task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.system_variable import SystemVariable
from core.workflow.workflow_cycle_manager import CycleManagerWorkflowInfo, WorkflowCycleManager
from events.message_event import message_was_created
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models import Conversation, EndUser, Message, MessageFile
Expand Down Expand Up @@ -938,10 +937,6 @@ def _save_message(self, *, session: Session, graph_runtime_state: Optional[Graph
self._task_state.metadata.usage = usage
else:
self._task_state.metadata.usage = LLMUsage.empty_usage()
message_was_created.send(
message,
application_generate_entity=self._application_generate_entity,
)

def _message_end_to_stream_response(self) -> MessageEndStreamResponse:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,16 @@ def clean(self, dataset: Dataset, node_ids: Optional[list[str]], with_keywords:
if delete_child_chunks:
db.session.query(ChildChunk).where(
ChildChunk.dataset_id == dataset.id, ChildChunk.index_node_id.in_(child_node_ids)
).delete()
).delete(synchronize_session=False)
db.session.commit()
else:
vector.delete()

if delete_child_chunks:
db.session.query(ChildChunk).where(ChildChunk.dataset_id == dataset.id).delete()
# Use existing compound index: (tenant_id, dataset_id, ...)
db.session.query(ChildChunk).where(
ChildChunk.tenant_id == dataset.tenant_id, ChildChunk.dataset_id == dataset.id
).delete(synchronize_session=False)
db.session.commit()

def retrieve(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,45 @@
from core.plugin.entities.plugin import ModelProviderID
from events.message_event import message_was_created
from extensions.ext_database import db
from extensions.ext_redis import redis_client, redis_fallback
from libs import datetime_utils
from models.model import Message
from models.provider import Provider, ProviderType

logger = logging.getLogger(__name__)

# Redis cache key prefix for provider last used timestamps
_PROVIDER_LAST_USED_CACHE_PREFIX = "provider:last_used"
# Default TTL for cache entries (10 minutes)
_CACHE_TTL_SECONDS = 600


def _get_provider_cache_key(tenant_id: str, provider_name: str) -> str:
"""Generate Redis cache key for provider last used timestamp."""
return f"{_PROVIDER_LAST_USED_CACHE_PREFIX}:{tenant_id}:{provider_name}"


@redis_fallback(default_return=None)
def _get_last_update_timestamp(cache_key: str) -> Optional[datetime]:
"""Get last update timestamp from Redis cache."""
try:
timestamp_str = redis_client.get(cache_key)
if timestamp_str:
return datetime.fromisoformat(timestamp_str.decode("utf-8"))
return None
except Exception as e:
logger.warning("Failed to get last update timestamp from Redis: %s", e)
return None


@redis_fallback()
def _set_last_update_timestamp(cache_key: str, timestamp: datetime) -> None:
"""Set last update timestamp in Redis cache with TTL."""
try:
redis_client.setex(cache_key, _CACHE_TTL_SECONDS, timestamp.isoformat())
except Exception as e:
logger.warning("Failed to set last update timestamp in Redis: %s", e)


class _ProviderUpdateFilters(BaseModel):
"""Filters for identifying Provider records to update."""
Expand Down Expand Up @@ -139,7 +172,7 @@ def handle(sender: Message, **kwargs):
provider_name,
)

except Exception as e:
except Exception:
# Log failure with timing and context
duration = time_module.perf_counter() - start_time

Expand Down Expand Up @@ -215,8 +248,18 @@ def _execute_provider_updates(updates_to_perform: list[_ProviderUpdateOperation]

# Prepare values dict for SQLAlchemy update
update_values = {}
# updateing to `last_used` is removed due to performance reason.
# ref: https://github.com/langgenius/dify/issues/24526

# Time-window based update for last_used to avoid hot row contention
if values.last_used is not None:
cache_key = _get_provider_cache_key(filters.tenant_id, filters.provider_name)
now = datetime_utils.naive_utc_now()
last_update = _get_last_update_timestamp(cache_key)

update_window_seconds = 60 * 5
if last_update is None or (now - last_update).total_seconds() > update_window_seconds:
update_values["last_used"] = values.last_used
_set_last_update_timestamp(cache_key, now)

if values.quota_used is not None:
update_values["quota_used"] = values.quota_used
# Skip the current update operation if no updates are required.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""chore: add workflow app log run id index

Revision ID: b95962a3885c
Revises: 0e154742a5fa
Create Date: 2025-08-29 15:34:09.838623

"""
from alembic import op
import models as models
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = 'b95962a3885c'
down_revision = '8d289573e1da'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_app_logs', schema=None) as batch_op:
batch_op.create_index('workflow_app_log_workflow_run_id_idx', ['workflow_run_id'], unique=False)

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_app_logs', schema=None) as batch_op:
batch_op.drop_index('workflow_app_log_workflow_run_id_idx')
# ### end Alembic commands ###
1 change: 1 addition & 0 deletions api/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ class WorkflowAppLog(Base):
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_app_log_pkey"),
sa.Index("workflow_app_log_app_idx", "tenant_id", "app_id"),
sa.Index("workflow_app_log_workflow_run_id_idx", "workflow_run_id"),
)

id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
Expand Down