Explorar o código

Feat/clean message records (#10588)

Jyong hai 5 meses
pai
achega
128efc3193

+ 5 - 0
api/configs/feature/__init__.py

@@ -616,6 +616,11 @@ class DataSetConfig(BaseSettings):
         default=False,
     )
 
+    PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING: PositiveInt = Field(
+        description="Interval in days for message cleanup operations - plan: sandbox",
+        default=30,
+    )
+
 
 class WorkspaceConfig(BaseSettings):
     """

+ 5 - 0
api/extensions/ext_celery.py

@@ -68,6 +68,7 @@ def init_app(app: Flask) -> Celery:
         "schedule.clean_unused_datasets_task",
         "schedule.create_tidb_serverless_task",
         "schedule.update_tidb_serverless_status_task",
+        "schedule.clean_messages",
     ]
     day = dify_config.CELERY_BEAT_SCHEDULER_TIME
     beat_schedule = {
@@ -87,6 +88,10 @@ def init_app(app: Flask) -> Celery:
             "task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task",
             "schedule": crontab(minute="30", hour="*"),
         },
+        "clean_messages": {
+            "task": "schedule.clean_messages.clean_messages",
+            "schedule": timedelta(days=day),
+        },
     }
     celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)
 

+ 31 - 0
api/migrations/versions/2024_11_12_0925-01d6889832f7_add_created_at_index_for_messages.py

@@ -0,0 +1,31 @@
+"""add_created_at_index_for_messages
+
+Revision ID: 01d6889832f7
+Revises: 09a8d1878d9b
+Create Date: 2024-11-12 09:25:05.527827
+
+"""
+from alembic import op
+import models as models
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = '01d6889832f7'
+down_revision = '09a8d1878d9b'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    # ### commands auto generated by Alembic - please adjust! ###
+    with op.batch_alter_table('messages', schema=None) as batch_op:
+        batch_op.create_index('message_created_at_idx', ['created_at'], unique=False)
+    # ### end Alembic commands ###
+
+
+def downgrade():
+    # ### commands auto generated by Alembic - please adjust! ###
+    with op.batch_alter_table('messages', schema=None) as batch_op:
+        batch_op.drop_index('message_created_at_idx')
+    # ### end Alembic commands ###

+ 1 - 0
api/models/model.py

@@ -719,6 +719,7 @@ class Message(db.Model):
         db.Index("message_end_user_idx", "app_id", "from_source", "from_end_user_id"),
         db.Index("message_account_idx", "app_id", "from_source", "from_account_id"),
         db.Index("message_workflow_run_id_idx", "conversation_id", "workflow_run_id"),
+        db.Index("message_created_at_idx", "created_at"),
     )
 
     id = db.Column(StringUUID, server_default=db.text("uuid_generate_v4()"))

+ 79 - 0
api/schedule/clean_messages.py

@@ -0,0 +1,79 @@
+import datetime
+import time
+
+import click
+from werkzeug.exceptions import NotFound
+
+import app
+from configs import dify_config
+from extensions.ext_database import db
+from extensions.ext_redis import redis_client
+from models.model import (
+    App,
+    Message,
+    MessageAgentThought,
+    MessageAnnotation,
+    MessageChain,
+    MessageFeedback,
+    MessageFile,
+)
+from models.web import SavedMessage
+from services.feature_service import FeatureService
+
+
+@app.celery.task(queue="dataset")
+def clean_messages():
+    click.echo(click.style("Start clean messages.", fg="green"))
+    start_at = time.perf_counter()
+    plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
+        days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
+    )
+    page = 1
+    while True:
+        try:
+            # Main query with join and filter
+            messages = (
+                db.session.query(Message)
+                .filter(Message.created_at < plan_sandbox_clean_message_day)
+                .order_by(Message.created_at.desc())
+                .paginate(page=page, per_page=100)
+            )
+
+        except NotFound:
+            break
+        if messages.items is None or len(messages.items) == 0:
+            break
+        for message in messages.items:
+            app = App.query.filter_by(id=message.app_id).first()
+            features_cache_key = f"features:{app.tenant_id}"
+            plan_cache = redis_client.get(features_cache_key)
+            if plan_cache is None:
+                features = FeatureService.get_features(app.tenant_id)
+                redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
+                plan = features.billing.subscription.plan
+            else:
+                plan = plan_cache.decode()
+            if plan == "sandbox":
+                # clean related message
+                db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
+                    synchronize_session=False
+                )
+                db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
+                    synchronize_session=False
+                )
+                db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
+                    synchronize_session=False
+                )
+                db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
+                    synchronize_session=False
+                )
+                db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
+                    synchronize_session=False
+                )
+                db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
+                    synchronize_session=False
+                )
+                db.session.query(Message).filter(Message.id == message.id).delete()
+                db.session.commit()
+    end_at = time.perf_counter()
+    click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))

+ 6 - 8
api/schedule/clean_unused_datasets_task.py

@@ -22,7 +22,6 @@ def clean_unused_datasets_task():
     start_at = time.perf_counter()
     plan_sandbox_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_sandbox_clean_day_setting)
     plan_pro_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_pro_clean_day_setting)
-    page = 1
     while True:
         try:
             # Subquery for counting new documents
@@ -62,14 +61,13 @@ def clean_unused_datasets_task():
                     func.coalesce(document_subquery_old.c.document_count, 0) > 0,
                 )
                 .order_by(Dataset.created_at.desc())
-                .paginate(page=page, per_page=50)
+                .paginate(page=1, per_page=50)
             )
 
         except NotFound:
             break
         if datasets.items is None or len(datasets.items) == 0:
             break
-        page += 1
         for dataset in datasets:
             dataset_query = (
                 db.session.query(DatasetQuery)
@@ -92,7 +90,6 @@ def clean_unused_datasets_task():
                     click.echo(
                         click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red")
                     )
-    page = 1
     while True:
         try:
             # Subquery for counting new documents
@@ -132,14 +129,13 @@ def clean_unused_datasets_task():
                     func.coalesce(document_subquery_old.c.document_count, 0) > 0,
                 )
                 .order_by(Dataset.created_at.desc())
-                .paginate(page=page, per_page=50)
+                .paginate(page=1, per_page=50)
             )
 
         except NotFound:
             break
         if datasets.items is None or len(datasets.items) == 0:
             break
-        page += 1
         for dataset in datasets:
             dataset_query = (
                 db.session.query(DatasetQuery)
@@ -149,11 +145,13 @@ def clean_unused_datasets_task():
             if not dataset_query or len(dataset_query) == 0:
                 try:
                     features_cache_key = f"features:{dataset.tenant_id}"
-                    plan = redis_client.get(features_cache_key)
-                    if plan is None:
+                    plan_cache = redis_client.get(features_cache_key)
+                    if plan_cache is None:
                         features = FeatureService.get_features(dataset.tenant_id)
                         redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
                         plan = features.billing.subscription.plan
+                    else:
+                        plan = plan_cache.decode()
                     if plan == "sandbox":
                         # remove index
                         index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor()