workflow_app_service.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import uuid
  2. from datetime import datetime
  3. from sqlalchemy import and_, func, or_, select
  4. from sqlalchemy.orm import Session
  5. from models import App, EndUser, WorkflowAppLog, WorkflowRun
  6. from models.enums import CreatedByRole
  7. from models.workflow import WorkflowRunStatus
  8. class WorkflowAppService:
  9. def get_paginate_workflow_app_logs(
  10. self,
  11. *,
  12. session: Session,
  13. app_model: App,
  14. keyword: str | None = None,
  15. status: WorkflowRunStatus | None = None,
  16. created_at_before: datetime | None = None,
  17. created_at_after: datetime | None = None,
  18. page: int = 1,
  19. limit: int = 20,
  20. ) -> dict:
  21. """
  22. Get paginate workflow app logs using SQLAlchemy 2.0 style
  23. :param session: SQLAlchemy session
  24. :param app_model: app model
  25. :param keyword: search keyword
  26. :param status: filter by status
  27. :param created_at_before: filter logs created before this timestamp
  28. :param created_at_after: filter logs created after this timestamp
  29. :param page: page number
  30. :param limit: items per page
  31. :return: Pagination object
  32. """
  33. # Build base statement using SQLAlchemy 2.0 style
  34. stmt = select(WorkflowAppLog).where(
  35. WorkflowAppLog.tenant_id == app_model.tenant_id, WorkflowAppLog.app_id == app_model.id
  36. )
  37. if keyword or status:
  38. stmt = stmt.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id)
  39. if keyword:
  40. keyword_like_val = f"%{keyword[:30].encode('unicode_escape').decode('utf-8')}%".replace(r"\u", r"\\u")
  41. keyword_conditions = [
  42. WorkflowRun.inputs.ilike(keyword_like_val),
  43. WorkflowRun.outputs.ilike(keyword_like_val),
  44. # filter keyword by end user session id if created by end user role
  45. and_(WorkflowRun.created_by_role == "end_user", EndUser.session_id.ilike(keyword_like_val)),
  46. ]
  47. # filter keyword by workflow run id
  48. keyword_uuid = self._safe_parse_uuid(keyword)
  49. if keyword_uuid:
  50. keyword_conditions.append(WorkflowRun.id == keyword_uuid)
  51. stmt = stmt.outerjoin(
  52. EndUser,
  53. and_(WorkflowRun.created_by == EndUser.id, WorkflowRun.created_by_role == CreatedByRole.END_USER),
  54. ).where(or_(*keyword_conditions))
  55. if status:
  56. stmt = stmt.where(WorkflowRun.status == status)
  57. # Add time-based filtering
  58. if created_at_before:
  59. stmt = stmt.where(WorkflowAppLog.created_at <= created_at_before)
  60. if created_at_after:
  61. stmt = stmt.where(WorkflowAppLog.created_at >= created_at_after)
  62. stmt = stmt.order_by(WorkflowAppLog.created_at.desc())
  63. # Get total count using the same filters
  64. count_stmt = select(func.count()).select_from(stmt.subquery())
  65. total = session.scalar(count_stmt) or 0
  66. # Apply pagination limits
  67. offset_stmt = stmt.offset((page - 1) * limit).limit(limit)
  68. # Execute query and get items
  69. items = list(session.scalars(offset_stmt).all())
  70. return {
  71. "page": page,
  72. "limit": limit,
  73. "total": total,
  74. "has_more": total > page * limit,
  75. "data": items,
  76. }
  77. @staticmethod
  78. def _safe_parse_uuid(value: str):
  79. # fast check
  80. if len(value) < 32:
  81. return None
  82. try:
  83. return uuid.UUID(value)
  84. except ValueError:
  85. return None