task.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from datetime import UTC, datetime
  2. from celery import states # type: ignore
  3. from models.base import Base
  4. from .engine import db
  5. class CeleryTask(Base):
  6. """Task result/status."""
  7. __tablename__ = "celery_taskmeta"
  8. id = db.Column(db.Integer, db.Sequence("task_id_sequence"), primary_key=True, autoincrement=True)
  9. task_id = db.Column(db.String(155), unique=True)
  10. status = db.Column(db.String(50), default=states.PENDING)
  11. result = db.Column(db.PickleType, nullable=True)
  12. date_done = db.Column(
  13. db.DateTime,
  14. default=lambda: datetime.now(UTC).replace(tzinfo=None),
  15. onupdate=lambda: datetime.now(UTC).replace(tzinfo=None),
  16. nullable=True,
  17. )
  18. traceback = db.Column(db.Text, nullable=True)
  19. name = db.Column(db.String(155), nullable=True)
  20. args = db.Column(db.LargeBinary, nullable=True)
  21. kwargs = db.Column(db.LargeBinary, nullable=True)
  22. worker = db.Column(db.String(155), nullable=True)
  23. retries = db.Column(db.Integer, nullable=True)
  24. queue = db.Column(db.String(155), nullable=True)
  25. class CeleryTaskSet(Base):
  26. """TaskSet result."""
  27. __tablename__ = "celery_tasksetmeta"
  28. id = db.Column(db.Integer, db.Sequence("taskset_id_sequence"), autoincrement=True, primary_key=True)
  29. taskset_id = db.Column(db.String(155), unique=True)
  30. result = db.Column(db.PickleType, nullable=True)
  31. date_done = db.Column(db.DateTime, default=lambda: datetime.now(UTC).replace(tzinfo=None), nullable=True)