clean_embedding_cache_task.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. import datetime
  2. import time
  3. import click
  4. from sqlalchemy import text
  5. from werkzeug.exceptions import NotFound
  6. import app
  7. from configs import dify_config
  8. from extensions.ext_database import db
  9. from models.dataset import Embedding
  10. @app.celery.task(queue="dataset")
  11. def clean_embedding_cache_task():
  12. click.echo(click.style("Start clean embedding cache.", fg="green"))
  13. clean_days = int(dify_config.PLAN_SANDBOX_CLEAN_DAY_SETTING)
  14. start_at = time.perf_counter()
  15. thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
  16. while True:
  17. try:
  18. embedding_ids = (
  19. db.session.query(Embedding.id)
  20. .filter(Embedding.created_at < thirty_days_ago)
  21. .order_by(Embedding.created_at.desc())
  22. .limit(100)
  23. .all()
  24. )
  25. embedding_ids = [embedding_id[0] for embedding_id in embedding_ids]
  26. except NotFound:
  27. break
  28. if embedding_ids:
  29. for embedding_id in embedding_ids:
  30. db.session.execute(
  31. text("DELETE FROM embeddings WHERE id = :embedding_id"), {"embedding_id": embedding_id}
  32. )
  33. db.session.commit()
  34. else:
  35. break
  36. end_at = time.perf_counter()
  37. click.echo(click.style("Cleaned embedding cache from db success latency: {}".format(end_at - start_at), fg="green"))