Browse Source

fix: _handle_workflow_run_partial_success args is wrong (#11562)

Signed-off-by: yihong0618 <zouzou0208@gmail.com>
yihong 4 months ago
parent
commit
efd5575683
1 changed files with 30 additions and 36 deletions
  1. 30 36
      api/core/app/apps/workflow/generate_task_pipeline.py

+ 30 - 36
api/core/app/apps/workflow/generate_task_pipeline.py

@@ -259,36 +259,36 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
 
                 workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event)
 
-                response = self._workflow_node_start_to_stream_response(
+                node_start_response = self._workflow_node_start_to_stream_response(
                     event=event,
                     task_id=self._application_generate_entity.task_id,
                     workflow_node_execution=workflow_node_execution,
                 )
 
-                if response:
-                    yield response
+                if node_start_response:
+                    yield node_start_response
             elif isinstance(event, QueueNodeSucceededEvent):
                 workflow_node_execution = self._handle_workflow_node_execution_success(event)
 
-                response = self._workflow_node_finish_to_stream_response(
+                node_success_response = self._workflow_node_finish_to_stream_response(
                     event=event,
                     task_id=self._application_generate_entity.task_id,
                     workflow_node_execution=workflow_node_execution,
                 )
 
-                if response:
-                    yield response
+                if node_success_response:
+                    yield node_success_response
             elif isinstance(event, QueueNodeFailedEvent | QueueNodeInIterationFailedEvent | QueueNodeExceptionEvent):
                 workflow_node_execution = self._handle_workflow_node_execution_failed(event)
 
-                response = self._workflow_node_finish_to_stream_response(
+                node_failed_response = self._workflow_node_finish_to_stream_response(
                     event=event,
                     task_id=self._application_generate_entity.task_id,
                     workflow_node_execution=workflow_node_execution,
                 )
 
-                if response:
-                    yield response
+                if node_failed_response:
+                    yield node_failed_response
             elif isinstance(event, QueueParallelBranchRunStartedEvent):
                 if not workflow_run:
                     raise Exception("Workflow run not initialized.")
@@ -377,20 +377,19 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
 
                 if not graph_runtime_state:
                     raise Exception("Graph runtime state not initialized.")
-                handle_args = {
-                    "workflow_run": workflow_run,
-                    "start_at": graph_runtime_state.start_at,
-                    "total_tokens": graph_runtime_state.total_tokens,
-                    "total_steps": graph_runtime_state.node_run_steps,
-                    "status": WorkflowRunStatus.FAILED
+                workflow_run = self._handle_workflow_run_failed(
+                    workflow_run=workflow_run,
+                    start_at=graph_runtime_state.start_at,
+                    total_tokens=graph_runtime_state.total_tokens,
+                    total_steps=graph_runtime_state.node_run_steps,
+                    status=WorkflowRunStatus.FAILED
                     if isinstance(event, QueueWorkflowFailedEvent)
                     else WorkflowRunStatus.STOPPED,
-                    "error": event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(),
-                    "conversation_id": None,
-                    "trace_manager": trace_manager,
-                    "exceptions_count": event.exceptions_count if isinstance(event, QueueWorkflowFailedEvent) else 0,
-                }
-                workflow_run = self._handle_workflow_run_failed(**handle_args)
+                    error=event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(),
+                    conversation_id=None,
+                    trace_manager=trace_manager,
+                    exceptions_count=event.exceptions_count if isinstance(event, QueueWorkflowFailedEvent) else 0,
+                )
 
                 # save workflow app log
                 self._save_workflow_app_log(workflow_run)
@@ -404,21 +403,16 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
 
                 if not graph_runtime_state:
                     raise Exception("Graph runtime state not initialized.")
-                handle_args = {
-                    "workflow_run": workflow_run,
-                    "start_at": graph_runtime_state.start_at,
-                    "total_tokens": graph_runtime_state.total_tokens,
-                    "total_steps": graph_runtime_state.node_run_steps,
-                    "status": WorkflowRunStatus.FAILED
-                    if isinstance(event, QueueWorkflowFailedEvent)
-                    else WorkflowRunStatus.STOPPED,
-                    "error": event.error if isinstance(event, QueueWorkflowFailedEvent) else event.get_stop_reason(),
-                    "conversation_id": None,
-                    "trace_manager": trace_manager,
-                    "exceptions_count": event.exceptions_count,
-                }
-                workflow_run = self._handle_workflow_run_partial_success(**handle_args)
-
+                workflow_run = self._handle_workflow_run_partial_success(
+                    workflow_run=workflow_run,
+                    start_at=graph_runtime_state.start_at,
+                    total_tokens=graph_runtime_state.total_tokens,
+                    total_steps=graph_runtime_state.node_run_steps,
+                    outputs=event.outputs,
+                    exceptions_count=event.exceptions_count,
+                    conversation_id=None,
+                    trace_manager=trace_manager,
+                )
                 # save workflow app log
                 self._save_workflow_app_log(workflow_run)