123456789101112131415161718192021222324252627282930313233343536373839404142 |
- import datetime
- import time
- import click
- from sqlalchemy import text
- from werkzeug.exceptions import NotFound
- import app
- from configs import dify_config
- from extensions.ext_database import db
- from models.dataset import Embedding
- @app.celery.task(queue="dataset")
- def clean_embedding_cache_task():
- click.echo(click.style("Start clean embedding cache.", fg="green"))
- clean_days = int(dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING)
- start_at = time.perf_counter()
- thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
- while True:
- try:
- embedding_ids = (
- db.session.query(Embedding.id)
- .filter(Embedding.created_at < thirty_days_ago)
- .order_by(Embedding.created_at.desc())
- .limit(100)
- .all()
- )
- embedding_ids = [embedding_id[0] for embedding_id in embedding_ids]
- except NotFound:
- break
- if embedding_ids:
- for embedding_id in embedding_ids:
- db.session.execute(
- text("DELETE FROM embeddings WHERE id = :embedding_id"), {"embedding_id": embedding_id}
- )
- db.session.commit()
- else:
- break
- end_at = time.perf_counter()
- click.echo(click.style("Cleaned embedding cache from db success latency: {}".format(end_at - start_at), fg="green"))
|