Skip to content

Commit 128efc3

Browse files
authored
Feat/clean message records (langgenius#10588)
1 parent c49efc0 commit 128efc3

File tree

6 files changed

+127
-8
lines changed

6 files changed

+127
-8
lines changed

api/configs/feature/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,11 @@ class DataSetConfig(BaseSettings):
616616
default=False,
617617
)
618618

619+
PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING: PositiveInt = Field(
620+
description="Interval in days for message cleanup operations - plan: sandbox",
621+
default=30,
622+
)
623+
619624

620625
class WorkspaceConfig(BaseSettings):
621626
"""

api/extensions/ext_celery.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def __call__(self, *args: object, **kwargs: object) -> object:
6868
"schedule.clean_unused_datasets_task",
6969
"schedule.create_tidb_serverless_task",
7070
"schedule.update_tidb_serverless_status_task",
71+
"schedule.clean_messages",
7172
]
7273
day = dify_config.CELERY_BEAT_SCHEDULER_TIME
7374
beat_schedule = {
@@ -87,6 +88,10 @@ def __call__(self, *args: object, **kwargs: object) -> object:
8788
"task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task",
8889
"schedule": crontab(minute="30", hour="*"),
8990
},
91+
"clean_messages": {
92+
"task": "schedule.clean_messages.clean_messages",
93+
"schedule": timedelta(days=day),
94+
},
9095
}
9196
celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)
9297

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""add_created_at_index_for_messages
2+
3+
Revision ID: 01d6889832f7
4+
Revises: 09a8d1878d9b
5+
Create Date: 2024-11-12 09:25:05.527827
6+
7+
"""
8+
from alembic import op
9+
import models as models
10+
import sqlalchemy as sa
11+
12+
13+
# revision identifiers, used by Alembic.
14+
revision = '01d6889832f7'
15+
down_revision = '09a8d1878d9b'
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade():
21+
# ### commands auto generated by Alembic - please adjust! ###
22+
with op.batch_alter_table('messages', schema=None) as batch_op:
23+
batch_op.create_index('message_created_at_idx', ['created_at'], unique=False)
24+
# ### end Alembic commands ###
25+
26+
27+
def downgrade():
28+
# ### commands auto generated by Alembic - please adjust! ###
29+
with op.batch_alter_table('messages', schema=None) as batch_op:
30+
batch_op.drop_index('message_created_at_idx')
31+
# ### end Alembic commands ###

api/models/model.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,7 @@ class Message(db.Model):
719719
db.Index("message_end_user_idx", "app_id", "from_source", "from_end_user_id"),
720720
db.Index("message_account_idx", "app_id", "from_source", "from_account_id"),
721721
db.Index("message_workflow_run_id_idx", "conversation_id", "workflow_run_id"),
722+
db.Index("message_created_at_idx", "created_at"),
722723
)
723724

724725
id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()"))

api/schedule/clean_messages.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import datetime
2+
import time
3+
4+
import click
5+
from werkzeug.exceptions import NotFound
6+
7+
import app
8+
from configs import dify_config
9+
from extensions.ext_database import db
10+
from extensions.ext_redis import redis_client
11+
from models.model import (
12+
App,
13+
Message,
14+
MessageAgentThought,
15+
MessageAnnotation,
16+
MessageChain,
17+
MessageFeedback,
18+
MessageFile,
19+
)
20+
from models.web import SavedMessage
21+
from services.feature_service import FeatureService
22+
23+
24+
@app.celery.task(queue="dataset")
25+
def clean_messages():
26+
click.echo(click.style("Start clean messages.", fg="green"))
27+
start_at = time.perf_counter()
28+
plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
29+
days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
30+
)
31+
page = 1
32+
while True:
33+
try:
34+
# Main query with join and filter
35+
messages = (
36+
db.session.query(Message)
37+
.filter(Message.created_at < plan_sandbox_clean_message_day)
38+
.order_by(Message.created_at.desc())
39+
.paginate(page=page, per_page=100)
40+
)
41+
42+
except NotFound:
43+
break
44+
if messages.items is None or len(messages.items) == 0:
45+
break
46+
for message in messages.items:
47+
app = App.query.filter_by(id=message.app_id).first()
48+
features_cache_key = f"features:{app.tenant_id}"
49+
plan_cache = redis_client.get(features_cache_key)
50+
if plan_cache is None:
51+
features = FeatureService.get_features(app.tenant_id)
52+
redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
53+
plan = features.billing.subscription.plan
54+
else:
55+
plan = plan_cache.decode()
56+
if plan == "sandbox":
57+
# clean related message
58+
db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
59+
synchronize_session=False
60+
)
61+
db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
62+
synchronize_session=False
63+
)
64+
db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
65+
synchronize_session=False
66+
)
67+
db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
68+
synchronize_session=False
69+
)
70+
db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
71+
synchronize_session=False
72+
)
73+
db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
74+
synchronize_session=False
75+
)
76+
db.session.query(Message).filter(Message.id == message.id).delete()
77+
db.session.commit()
78+
end_at = time.perf_counter()
79+
click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))

api/schedule/clean_unused_datasets_task.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ def clean_unused_datasets_task():
2222
start_at = time.perf_counter()
2323
plan_sandbox_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_sandbox_clean_day_setting)
2424
plan_pro_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_pro_clean_day_setting)
25-
page = 1
2625
while True:
2726
try:
2827
# Subquery for counting new documents
@@ -62,14 +61,13 @@ def clean_unused_datasets_task():
6261
func.coalesce(document_subquery_old.c.document_count, 0) > 0,
6362
)
6463
.order_by(Dataset.created_at.desc())
65-
.paginate(page=page, per_page=50)
64+
.paginate(page=1, per_page=50)
6665
)
6766

6867
except NotFound:
6968
break
7069
if datasets.items is None or len(datasets.items) == 0:
7170
break
72-
page += 1
7371
for dataset in datasets:
7472
dataset_query = (
7573
db.session.query(DatasetQuery)
@@ -92,7 +90,6 @@ def clean_unused_datasets_task():
9290
click.echo(
9391
click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red")
9492
)
95-
page = 1
9693
while True:
9794
try:
9895
# Subquery for counting new documents
@@ -132,14 +129,13 @@ def clean_unused_datasets_task():
132129
func.coalesce(document_subquery_old.c.document_count, 0) > 0,
133130
)
134131
.order_by(Dataset.created_at.desc())
135-
.paginate(page=page, per_page=50)
132+
.paginate(page=1, per_page=50)
136133
)
137134

138135
except NotFound:
139136
break
140137
if datasets.items is None or len(datasets.items) == 0:
141138
break
142-
page += 1
143139
for dataset in datasets:
144140
dataset_query = (
145141
db.session.query(DatasetQuery)
@@ -149,11 +145,13 @@ def clean_unused_datasets_task():
149145
if not dataset_query or len(dataset_query) == 0:
150146
try:
151147
features_cache_key = f"features:{dataset.tenant_id}"
152-
plan = redis_client.get(features_cache_key)
153-
if plan is None:
148+
plan_cache = redis_client.get(features_cache_key)
149+
if plan_cache is None:
154150
features = FeatureService.get_features(dataset.tenant_id)
155151
redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
156152
plan = features.billing.subscription.plan
153+
else:
154+
plan = plan_cache.decode()
157155
if plan == "sandbox":
158156
# remove index
159157
index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor()

0 commit comments

Comments
 (0)