document_indexing_task.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import datetime
  2. import logging
  3. import time
  4. import click
  5. from celery import shared_task
  6. from werkzeug.exceptions import NotFound
  7. from core.indexing_runner import IndexingRunner, DocumentIsPausedException
  8. from core.llm.error import ProviderTokenNotInitError
  9. from extensions.ext_database import db
  10. from models.dataset import Document
  11. @shared_task
  12. def document_indexing_task(dataset_id: str, document_ids: list):
  13. """
  14. Async process document
  15. :param dataset_id:
  16. :param document_ids:
  17. Usage: document_indexing_task.delay(dataset_id, document_id)
  18. """
  19. documents = []
  20. for document_id in document_ids:
  21. logging.info(click.style('Start process document: {}'.format(document_id), fg='green'))
  22. start_at = time.perf_counter()
  23. document = db.session.query(Document).filter(
  24. Document.id == document_id,
  25. Document.dataset_id == dataset_id
  26. ).first()
  27. if not document:
  28. raise NotFound('Document not found')
  29. document.indexing_status = 'parsing'
  30. document.processing_started_at = datetime.datetime.utcnow()
  31. documents.append(document)
  32. db.session.add(document)
  33. db.session.commit()
  34. try:
  35. indexing_runner = IndexingRunner()
  36. indexing_runner.run(documents)
  37. end_at = time.perf_counter()
  38. logging.info(click.style('Processed document: {} latency: {}'.format(document.id, end_at - start_at), fg='green'))
  39. except DocumentIsPausedException:
  40. logging.info(click.style('Document paused, document id: {}'.format(document.id), fg='yellow'))
  41. except ProviderTokenNotInitError as e:
  42. document.indexing_status = 'error'
  43. document.error = str(e.description)
  44. document.stopped_at = datetime.datetime.utcnow()
  45. db.session.commit()
  46. except Exception as e:
  47. logging.exception("consume document failed")
  48. document.indexing_status = 'error'
  49. document.error = str(e)
  50. document.stopped_at = datetime.datetime.utcnow()
  51. db.session.commit()