Bläddra i källkod

Add data clean schedule (#1859)

Co-authored-by: jyong <jyong@dify.ai>
Jyong 1 år sedan
förälder
incheckning
595e9b25ba

+ 2 - 0
api/docker/entrypoint.sh

@@ -10,6 +10,8 @@ fi
 if [[ "${MODE}" == "worker" ]]; then
   celery -A app.celery worker -P ${CELERY_WORKER_CLASS:-gevent} -c ${CELERY_WORKER_AMOUNT:-1} --loglevel INFO \
     -Q ${CELERY_QUEUES:-dataset,generation,mail}
+elif [[ "${MODE}" == "beat" ]]; then
+  celery -A app.celery beat --loglevel INFO
 else
   if [[ "${DEBUG}" == "true" ]]; then
     flask run --host=${DIFY_BIND_ADDRESS:-0.0.0.0} --port=${DIFY_PORT:-5001} --debug

+ 23 - 0
api/extensions/ext_celery.py

@@ -1,3 +1,5 @@
+from datetime import timedelta
+
 from celery import Task, Celery
 from flask import Flask
 
@@ -35,4 +37,25 @@ def init_app(app: Flask) -> Celery:
         
     celery_app.set_default()
     app.extensions["celery"] = celery_app
+
+    imports = [
+        "schedule.clean_embedding_cache_task",
+        "schedule.clean_unused_datasets_task",
+    ]
+
+    beat_schedule = {
+        'clean_embedding_cache_task': {
+            'task': 'schedule.clean_embedding_cache_task.clean_embedding_cache_task',
+            'schedule': timedelta(minutes=1),
+        },
+        'clean_unused_datasets_task': {
+            'task': 'schedule.clean_unused_datasets_task.clean_unused_datasets_task',
+            'schedule': timedelta(minutes=10),
+        }
+    }
+    celery_app.conf.update(
+        beat_schedule=beat_schedule,
+        imports=imports
+    )
+
     return celery_app

+ 1 - 1
api/requirements.txt

@@ -57,4 +57,4 @@ cohere~=4.32
 unstructured~=0.10.27
 unstructured[docx,pptx,msg,md,ppt]~=0.10.27
 bs4~=0.0.1
-markdown~=3.5.1
+markdown~=3.5.1

+ 29 - 0
api/schedule/clean_embedding_cache_task.py

@@ -0,0 +1,29 @@
+import app
+import datetime
+import time
+import click
+from flask import current_app
+from werkzeug.exceptions import NotFound
+from extensions.ext_database import db
+from models.dataset import Embedding
+
+
+@app.celery.task(queue='dataset')
+def clean_embedding_cache_task():
+    click.echo(click.style('Start clean embedding cache.', fg='green'))
+    clean_days = int(current_app.config.get('CLEAN_DAY_SETTING'))
+    start_at = time.perf_counter()
+    thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
+    page = 1
+    while True:
+        try:
+            embeddings = db.session.query(Embedding).filter(Embedding.created_at < thirty_days_ago) \
+                .order_by(Embedding.created_at.desc()).paginate(page=page, per_page=100)
+        except NotFound:
+            break
+        for embedding in embeddings:
+            db.session.delete(embedding)
+        db.session.commit()
+        page += 1
+    end_at = time.perf_counter()
+    click.echo(click.style('Cleaned embedding cache from db success latency: {}'.format(end_at - start_at), fg='green'))

+ 69 - 0
api/schedule/clean_unused_datasets_task.py

@@ -0,0 +1,69 @@
+import logging
+import app
+import datetime
+import time
+import click
+from flask import current_app
+from werkzeug.exceptions import NotFound
+from core.index.index import IndexBuilder
+from extensions.ext_database import db
+from models.dataset import Dataset, DatasetQuery, Document, DatasetCollectionBinding
+
+
+@app.celery.task(queue='dataset')
+def clean_unused_datasets_task():
+    click.echo(click.style('Start clean unused datasets indexes.', fg='green'))
+    clean_days = int(current_app.config.get('CLEAN_DAY_SETTING'))
+    start_at = time.perf_counter()
+    thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
+    page = 1
+    while True:
+        try:
+            datasets = db.session.query(Dataset).filter(Dataset.created_at < thirty_days_ago) \
+                .order_by(Dataset.created_at.desc()).paginate(page=page, per_page=50)
+        except NotFound:
+            break
+        page += 1
+        for dataset in datasets:
+            dataset_query = db.session.query(DatasetQuery).filter(
+                DatasetQuery.created_at > thirty_days_ago,
+                DatasetQuery.dataset_id == dataset.id
+            ).all()
+            if not dataset_query or len(dataset_query) == 0:
+                documents = db.session.query(Document).filter(
+                    Document.dataset_id == dataset.id,
+                    Document.indexing_status == 'completed',
+                    Document.enabled == True,
+                    Document.archived == False,
+                    Document.updated_at > thirty_days_ago
+                ).all()
+                if not documents or len(documents) == 0:
+                    try:
+                        # remove index
+                        vector_index = IndexBuilder.get_index(dataset, 'high_quality')
+                        kw_index = IndexBuilder.get_index(dataset, 'economy')
+                        # delete from vector index
+                        if vector_index:
+                            if dataset.collection_binding_id:
+                                vector_index.delete_by_group_id(dataset.id)
+                            else:
+                                if dataset.collection_binding_id:
+                                    vector_index.delete_by_group_id(dataset.id)
+                                else:
+                                    vector_index.delete()
+                        kw_index.delete()
+                        # update document
+                        update_params = {
+                            Document.enabled: False
+                        }
+
+                        Document.query.filter_by(dataset_id=dataset.id).update(update_params)
+                        db.session.commit()
+                        click.echo(click.style('Cleaned unused dataset {} from db success!'.format(dataset.id),
+                                               fg='green'))
+                    except Exception as e:
+                        click.echo(
+                            click.style('clean dataset index error: {} {}'.format(e.__class__.__name__, str(e)),
+                                        fg='red'))
+    end_at = time.perf_counter()
+    click.echo(click.style('Cleaned unused dataset from db success latency: {}'.format(end_at - start_at), fg='green'))