clean_unused_datasets_task.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import datetime
  2. import time
  3. import click
  4. from flask import current_app
  5. from werkzeug.exceptions import NotFound
  6. import app
  7. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  8. from extensions.ext_database import db
  9. from models.dataset import Dataset, DatasetQuery, Document
  10. @app.celery.task(queue='dataset')
  11. def clean_unused_datasets_task():
  12. click.echo(click.style('Start clean unused datasets indexes.', fg='green'))
  13. clean_days = int(current_app.config.get('CLEAN_DAY_SETTING'))
  14. start_at = time.perf_counter()
  15. thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
  16. page = 1
  17. while True:
  18. try:
  19. datasets = db.session.query(Dataset).filter(Dataset.created_at < thirty_days_ago) \
  20. .order_by(Dataset.created_at.desc()).paginate(page=page, per_page=50)
  21. except NotFound:
  22. break
  23. page += 1
  24. for dataset in datasets:
  25. dataset_query = db.session.query(DatasetQuery).filter(
  26. DatasetQuery.created_at > thirty_days_ago,
  27. DatasetQuery.dataset_id == dataset.id
  28. ).all()
  29. if not dataset_query or len(dataset_query) == 0:
  30. documents = db.session.query(Document).filter(
  31. Document.dataset_id == dataset.id,
  32. Document.indexing_status == 'completed',
  33. Document.enabled == True,
  34. Document.archived == False,
  35. Document.updated_at > thirty_days_ago
  36. ).all()
  37. if not documents or len(documents) == 0:
  38. try:
  39. # remove index
  40. index_processor = IndexProcessorFactory(dataset.doc_form).init_index_processor()
  41. index_processor.clean(dataset, None)
  42. # update document
  43. update_params = {
  44. Document.enabled: False
  45. }
  46. Document.query.filter_by(dataset_id=dataset.id).update(update_params)
  47. db.session.commit()
  48. click.echo(click.style('Cleaned unused dataset {} from db success!'.format(dataset.id),
  49. fg='green'))
  50. except Exception as e:
  51. click.echo(
  52. click.style('clean dataset index error: {} {}'.format(e.__class__.__name__, str(e)),
  53. fg='red'))
  54. end_at = time.perf_counter()
  55. click.echo(click.style('Cleaned unused dataset from db success latency: {}'.format(end_at - start_at), fg='green'))