clean_embedding_cache_task.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536
  1. import datetime
  2. import time
  3. import click
  4. from sqlalchemy import text
  5. from werkzeug.exceptions import NotFound
  6. import app
  7. from configs import dify_config
  8. from extensions.ext_database import db
  9. from models.dataset import Embedding
  10. @app.celery.task(queue='dataset')
  11. def clean_embedding_cache_task():
  12. click.echo(click.style('Start clean embedding cache.', fg='green'))
  13. clean_days = int(dify_config.CLEAN_DAY_SETTING)
  14. start_at = time.perf_counter()
  15. thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
  16. while True:
  17. try:
  18. embedding_ids = db.session.query(Embedding.id).filter(Embedding.created_at < thirty_days_ago) \
  19. .order_by(Embedding.created_at.desc()).limit(100).all()
  20. embedding_ids = [embedding_id[0] for embedding_id in embedding_ids]
  21. except NotFound:
  22. break
  23. if embedding_ids:
  24. db.session.execute(text(
  25. "DELETE FROM embeddings WHERE id in :embedding_ids"
  26. ), {'embedding_ids': tuple(embedding_ids)})
  27. db.session.commit()
  28. else:
  29. break
  30. end_at = time.perf_counter()
  31. click.echo(click.style('Cleaned embedding cache from db success latency: {}'.format(end_at - start_at), fg='green'))