Преглед на файлове

fix: workflow log run time error (#7130)

Joe преди 8 месеца
родител
ревизия
7201b56a6d
променени са 2 файла, в които са добавени 25 реда и са изтрити 9 реда
  1. 3 9
      api/core/app/task_pipeline/workflow_cycle_manage.py
  2. 22 0
      api/services/workflow_service.py

+ 3 - 9
api/core/app/task_pipeline/workflow_cycle_manage.py

@@ -40,6 +40,7 @@ from models.workflow import (
     WorkflowRunStatus,
     WorkflowRunTriggeredFrom,
 )
+from services.workflow_service import WorkflowService
 
 
 class WorkflowCycleManage(WorkflowIterationCycleManage):
@@ -97,7 +98,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage):
 
     def _workflow_run_success(
         self, workflow_run: WorkflowRun,
-        start_at: float,
         total_tokens: int,
         total_steps: int,
         outputs: Optional[str] = None,
@@ -107,7 +107,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage):
         """
         Workflow run success
         :param workflow_run: workflow run
-        :param start_at: start time
         :param total_tokens: total tokens
         :param total_steps: total steps
         :param outputs: outputs
@@ -116,7 +115,7 @@ class WorkflowCycleManage(WorkflowIterationCycleManage):
         """
         workflow_run.status = WorkflowRunStatus.SUCCEEDED.value
         workflow_run.outputs = outputs
-        workflow_run.elapsed_time = time.perf_counter() - start_at
+        workflow_run.elapsed_time = WorkflowService.get_elapsed_time(workflow_run_id=workflow_run.id)
         workflow_run.total_tokens = total_tokens
         workflow_run.total_steps = total_steps
         workflow_run.finished_at = datetime.now(timezone.utc).replace(tzinfo=None)
@@ -139,7 +138,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage):
 
     def _workflow_run_failed(
         self, workflow_run: WorkflowRun,
-        start_at: float,
         total_tokens: int,
         total_steps: int,
         status: WorkflowRunStatus,
@@ -150,7 +148,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage):
         """
         Workflow run failed
         :param workflow_run: workflow run
-        :param start_at: start time
         :param total_tokens: total tokens
         :param total_steps: total steps
         :param status: status
@@ -159,7 +156,7 @@ class WorkflowCycleManage(WorkflowIterationCycleManage):
         """
         workflow_run.status = status.value
         workflow_run.error = error
-        workflow_run.elapsed_time = time.perf_counter() - start_at
+        workflow_run.elapsed_time = WorkflowService.get_elapsed_time(workflow_run_id=workflow_run.id)
         workflow_run.total_tokens = total_tokens
         workflow_run.total_steps = total_steps
         workflow_run.finished_at = datetime.now(timezone.utc).replace(tzinfo=None)
@@ -542,7 +539,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage):
         if isinstance(event, QueueStopEvent):
             workflow_run = self._workflow_run_failed(
                 workflow_run=workflow_run,
-                start_at=self._task_state.start_at,
                 total_tokens=self._task_state.total_tokens,
                 total_steps=self._task_state.total_steps,
                 status=WorkflowRunStatus.STOPPED,
@@ -565,7 +561,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage):
         elif isinstance(event, QueueWorkflowFailedEvent):
             workflow_run = self._workflow_run_failed(
                 workflow_run=workflow_run,
-                start_at=self._task_state.start_at,
                 total_tokens=self._task_state.total_tokens,
                 total_steps=self._task_state.total_steps,
                 status=WorkflowRunStatus.FAILED,
@@ -583,7 +578,6 @@ class WorkflowCycleManage(WorkflowIterationCycleManage):
 
             workflow_run = self._workflow_run_success(
                 workflow_run=workflow_run,
-                start_at=self._task_state.start_at,
                 total_tokens=self._task_state.total_tokens,
                 total_steps=self._task_state.total_steps,
                 outputs=outputs,

+ 22 - 0
api/services/workflow_service.py

@@ -319,3 +319,25 @@ class WorkflowService:
             )
         else:
             raise ValueError(f"Invalid app mode: {app_model.mode}")
+
+    @classmethod
+    def get_elapsed_time(cls, workflow_run_id: str) -> float:
+        """
+        Get elapsed time
+        """
+        elapsed_time = 0.0
+
+        # fetch workflow node execution by workflow_run_id
+        workflow_nodes = (
+            db.session.query(WorkflowNodeExecution)
+            .filter(WorkflowNodeExecution.workflow_run_id == workflow_run_id)
+            .order_by(WorkflowNodeExecution.created_at.asc())
+            .all()
+        )
+        if not workflow_nodes:
+            return elapsed_time
+        
+        for node in workflow_nodes:
+            elapsed_time += node.elapsed_time
+
+        return elapsed_time