浏览代码

update dataset clean rule (#9426)

Jyong 6 月之前
父节点
当前提交
5aabb83f5a
共有 3 个文件被更改,包括 19 次插入105 次删除
  1. 7 2
      api/configs/feature/__init__.py
  2. 12 11
      api/schedule/clean_unused_datasets_task.py
  3. 0 92
      api/schedule/clean_unused_messages_task.py

+ 7 - 2
api/configs/feature/__init__.py

@@ -506,11 +506,16 @@ class DataSetConfig(BaseSettings):
     Configuration for dataset management
     """
 
-    CLEAN_DAY_SETTING: PositiveInt = Field(
-        description="Interval in days for dataset cleanup operations",
+    PLAN_SANDBOX_CLEAN_DAY_SETTING: PositiveInt = Field(
+        description="Interval in days for dataset cleanup operations - plan: sandbox",
         default=30,
     )
 
+    PLAN_PRO_CLEAN_DAY_SETTING: PositiveInt = Field(
+        description="Interval in days for dataset cleanup operations - plan: pro and team",
+        default=7,
+    )
+
     DATASET_OPERATOR_ENABLED: bool = Field(
         description="Enable or disable dataset operator functionality",
         default=False,

+ 12 - 11
api/schedule/clean_unused_datasets_task.py

@@ -17,10 +17,11 @@ from services.feature_service import FeatureService
 @app.celery.task(queue="dataset")
 def clean_unused_datasets_task():
     click.echo(click.style("Start clean unused datasets indexes.", fg="green"))
-    clean_days = dify_config.CLEAN_DAY_SETTING
+    plan_sandbox_clean_day_setting = dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING
+    plan_pro_clean_day_setting = dify_config.PLAN_PRO_CLEAN_DAY_SETTING
     start_at = time.perf_counter()
-    thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
-    seven_days_ago = datetime.datetime.now() - datetime.timedelta(days=7)
+    plan_sandbox_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_sandbox_clean_day_setting)
+    plan_pro_clean_day = datetime.datetime.now() - datetime.timedelta(days=plan_pro_clean_day_setting)
     page = 1
     while True:
         try:
@@ -31,7 +32,7 @@ def clean_unused_datasets_task():
                     Document.indexing_status == "completed",
                     Document.enabled == True,
                     Document.archived == False,
-                    Document.updated_at > thirty_days_ago,
+                    Document.updated_at > plan_sandbox_clean_day,
                 )
                 .group_by(Document.dataset_id)
                 .subquery()
@@ -44,7 +45,7 @@ def clean_unused_datasets_task():
                     Document.indexing_status == "completed",
                     Document.enabled == True,
                     Document.archived == False,
-                    Document.updated_at < thirty_days_ago,
+                    Document.updated_at < plan_sandbox_clean_day,
                 )
                 .group_by(Document.dataset_id)
                 .subquery()
@@ -56,7 +57,7 @@ def clean_unused_datasets_task():
                 .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id)
                 .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id)
                 .filter(
-                    Dataset.created_at < thirty_days_ago,
+                    Dataset.created_at < plan_sandbox_clean_day,
                     func.coalesce(document_subquery_new.c.document_count, 0) == 0,
                     func.coalesce(document_subquery_old.c.document_count, 0) > 0,
                 )
@@ -72,7 +73,7 @@ def clean_unused_datasets_task():
         for dataset in datasets:
             dataset_query = (
                 db.session.query(DatasetQuery)
-                .filter(DatasetQuery.created_at > thirty_days_ago, DatasetQuery.dataset_id == dataset.id)
+                .filter(DatasetQuery.created_at > plan_sandbox_clean_day, DatasetQuery.dataset_id == dataset.id)
                 .all()
             )
             if not dataset_query or len(dataset_query) == 0:
@@ -101,7 +102,7 @@ def clean_unused_datasets_task():
                     Document.indexing_status == "completed",
                     Document.enabled == True,
                     Document.archived == False,
-                    Document.updated_at > seven_days_ago,
+                    Document.updated_at > plan_pro_clean_day,
                 )
                 .group_by(Document.dataset_id)
                 .subquery()
@@ -114,7 +115,7 @@ def clean_unused_datasets_task():
                     Document.indexing_status == "completed",
                     Document.enabled == True,
                     Document.archived == False,
-                    Document.updated_at < seven_days_ago,
+                    Document.updated_at < plan_pro_clean_day,
                 )
                 .group_by(Document.dataset_id)
                 .subquery()
@@ -126,7 +127,7 @@ def clean_unused_datasets_task():
                 .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id)
                 .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id)
                 .filter(
-                    Dataset.created_at < seven_days_ago,
+                    Dataset.created_at < plan_pro_clean_day,
                     func.coalesce(document_subquery_new.c.document_count, 0) == 0,
                     func.coalesce(document_subquery_old.c.document_count, 0) > 0,
                 )
@@ -142,7 +143,7 @@ def clean_unused_datasets_task():
         for dataset in datasets:
             dataset_query = (
                 db.session.query(DatasetQuery)
-                .filter(DatasetQuery.created_at > seven_days_ago, DatasetQuery.dataset_id == dataset.id)
+                .filter(DatasetQuery.created_at > plan_pro_clean_day, DatasetQuery.dataset_id == dataset.id)
                 .all()
             )
             if not dataset_query or len(dataset_query) == 0:

+ 0 - 92
api/schedule/clean_unused_messages_task.py

@@ -1,92 +0,0 @@
-import datetime
-import time
-
-import click
-from sqlalchemy import func
-from werkzeug.exceptions import NotFound
-
-import app
-from configs import dify_config
-from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
-from extensions.ext_database import db
-from models.dataset import Dataset, DatasetQuery, Document
-
-
-@app.celery.task(queue="dataset")
-def clean_unused_message_task():
-    click.echo(click.style("Start clean unused messages .", fg="green"))
-    clean_days = int(dify_config.CLEAN_DAY_SETTING)
-    start_at = time.perf_counter()
-    thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
-    page = 1
-    while True:
-        try:
-            # Subquery for counting new documents
-            document_subquery_new = (
-                db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
-                .filter(
-                    Document.indexing_status == "completed",
-                    Document.enabled == True,
-                    Document.archived == False,
-                    Document.updated_at > thirty_days_ago,
-                )
-                .group_by(Document.dataset_id)
-                .subquery()
-            )
-
-            # Subquery for counting old documents
-            document_subquery_old = (
-                db.session.query(Document.dataset_id, func.count(Document.id).label("document_count"))
-                .filter(
-                    Document.indexing_status == "completed",
-                    Document.enabled == True,
-                    Document.archived == False,
-                    Document.updated_at < thirty_days_ago,
-                )
-                .group_by(Document.dataset_id)
-                .subquery()
-            )
-
-            # Main query with join and filter
-            datasets = (
-                db.session.query(Dataset)
-                .outerjoin(document_subquery_new, Dataset.id == document_subquery_new.c.dataset_id)
-                .outerjoin(document_subquery_old, Dataset.id == document_subquery_old.c.dataset_id)
-                .filter(
-                    Dataset.created_at < thirty_days_ago,
-                    func.coalesce(document_subquery_new.c.document_count, 0) == 0,
-                    func.coalesce(document_subquery_old.c.document_count, 0) > 0,
-                )
-                .order_by(Dataset.created_at.desc())
-                .paginate(page=page, per_page=50)
-            )
-
-        except NotFound:
-            break
-        if datasets.items is None or len(datasets.items) == 0:
-            break
-        page += 1
-        for dataset in datasets:
-            dataset_query = (
-                db.session.query(DatasetQuery)
-                .filter(DatasetQuery.created_at > thirty_days_ago, DatasetQuery.dataset_id == dataset.id)
-                .all()
-            )
-            if not dataset_query or len(dataset_query) == 0:
-                try:
-                    # remove index
-                    index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor()
-                    index_processor.clean(dataset, None)
-
-                    # update document
-                    update_params = {Document.enabled: False}
-
-                    Document.query.filter_by(dataset_id=dataset.id).update(update_params)
-                    db.session.commit()
-                    click.echo(click.style("Cleaned unused dataset {} from db success!".format(dataset.id), fg="green"))
-                except Exception as e:
-                    click.echo(
-                        click.style("clean dataset index error: {} {}".format(e.__class__.__name__, str(e)), fg="red")
-                    )
-    end_at = time.perf_counter()
-    click.echo(click.style("Cleaned unused dataset from db success latency: {}".format(end_at - start_at), fg="green"))