workflow_run_service.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import threading
  2. from typing import Optional
  3. import contexts
  4. from extensions.ext_database import db
  5. from libs.infinite_scroll_pagination import InfiniteScrollPagination
  6. from models.enums import WorkflowRunTriggeredFrom
  7. from models.model import App
  8. from models.workflow import (
  9. WorkflowNodeExecution,
  10. WorkflowNodeExecutionTriggeredFrom,
  11. WorkflowRun,
  12. )
  13. class WorkflowRunService:
  14. def get_paginate_advanced_chat_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination:
  15. """
  16. Get advanced chat app workflow run list
  17. Only return triggered_from == advanced_chat
  18. :param app_model: app model
  19. :param args: request args
  20. """
  21. class WorkflowWithMessage:
  22. message_id: str
  23. conversation_id: str
  24. def __init__(self, workflow_run: WorkflowRun):
  25. self._workflow_run = workflow_run
  26. def __getattr__(self, item):
  27. return getattr(self._workflow_run, item)
  28. pagination = self.get_paginate_workflow_runs(app_model, args)
  29. with_message_workflow_runs = []
  30. for workflow_run in pagination.data:
  31. message = workflow_run.message
  32. with_message_workflow_run = WorkflowWithMessage(workflow_run=workflow_run)
  33. if message:
  34. with_message_workflow_run.message_id = message.id
  35. with_message_workflow_run.conversation_id = message.conversation_id
  36. with_message_workflow_runs.append(with_message_workflow_run)
  37. pagination.data = with_message_workflow_runs
  38. return pagination
  39. def get_paginate_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination:
  40. """
  41. Get debug workflow run list
  42. Only return triggered_from == debugging
  43. :param app_model: app model
  44. :param args: request args
  45. """
  46. limit = int(args.get("limit", 20))
  47. base_query = db.session.query(WorkflowRun).filter(
  48. WorkflowRun.tenant_id == app_model.tenant_id,
  49. WorkflowRun.app_id == app_model.id,
  50. WorkflowRun.triggered_from == WorkflowRunTriggeredFrom.DEBUGGING.value,
  51. )
  52. if args.get("last_id"):
  53. last_workflow_run = base_query.filter(
  54. WorkflowRun.id == args.get("last_id"),
  55. ).first()
  56. if not last_workflow_run:
  57. raise ValueError("Last workflow run not exists")
  58. workflow_runs = (
  59. base_query.filter(
  60. WorkflowRun.created_at < last_workflow_run.created_at, WorkflowRun.id != last_workflow_run.id
  61. )
  62. .order_by(WorkflowRun.created_at.desc())
  63. .limit(limit)
  64. .all()
  65. )
  66. else:
  67. workflow_runs = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all()
  68. has_more = False
  69. if len(workflow_runs) == limit:
  70. current_page_first_workflow_run = workflow_runs[-1]
  71. rest_count = base_query.filter(
  72. WorkflowRun.created_at < current_page_first_workflow_run.created_at,
  73. WorkflowRun.id != current_page_first_workflow_run.id,
  74. ).count()
  75. if rest_count > 0:
  76. has_more = True
  77. return InfiniteScrollPagination(data=workflow_runs, limit=limit, has_more=has_more)
  78. def get_workflow_run(self, app_model: App, run_id: str) -> Optional[WorkflowRun]:
  79. """
  80. Get workflow run detail
  81. :param app_model: app model
  82. :param run_id: workflow run id
  83. """
  84. workflow_run = (
  85. db.session.query(WorkflowRun)
  86. .filter(
  87. WorkflowRun.tenant_id == app_model.tenant_id,
  88. WorkflowRun.app_id == app_model.id,
  89. WorkflowRun.id == run_id,
  90. )
  91. .first()
  92. )
  93. return workflow_run
  94. def get_workflow_run_node_executions(self, app_model: App, run_id: str) -> list[WorkflowNodeExecution]:
  95. """
  96. Get workflow run node execution list
  97. """
  98. workflow_run = self.get_workflow_run(app_model, run_id)
  99. contexts.plugin_tool_providers.set({})
  100. contexts.plugin_tool_providers_lock.set(threading.Lock())
  101. if not workflow_run:
  102. return []
  103. node_executions = (
  104. db.session.query(WorkflowNodeExecution)
  105. .filter(
  106. WorkflowNodeExecution.tenant_id == app_model.tenant_id,
  107. WorkflowNodeExecution.app_id == app_model.id,
  108. WorkflowNodeExecution.workflow_id == workflow_run.workflow_id,
  109. WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
  110. WorkflowNodeExecution.workflow_run_id == run_id,
  111. )
  112. .order_by(WorkflowNodeExecution.index.desc())
  113. .all()
  114. )
  115. return node_executions