|
@@ -1,3 +1,4 @@
|
|
|
+import logging
|
|
|
from abc import ABC, abstractmethod
|
|
|
from collections.abc import Generator
|
|
|
|
|
@@ -5,6 +6,8 @@ from core.workflow.entities.variable_pool import VariablePool
|
|
|
from core.workflow.graph_engine.entities.event import GraphEngineEvent, NodeRunSucceededEvent
|
|
|
from core.workflow.graph_engine.entities.graph import Graph
|
|
|
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
|
|
|
class StreamProcessor(ABC):
|
|
|
def __init__(self, graph: Graph, variable_pool: VariablePool) -> None:
|
|
@@ -31,6 +34,9 @@ class StreamProcessor(ABC):
|
|
|
if run_result.edge_source_handle:
|
|
|
reachable_node_ids = []
|
|
|
unreachable_first_node_ids = []
|
|
|
+ if finished_node_id not in self.graph.edge_mapping:
|
|
|
+ logger.warning(f"node {finished_node_id} has no edge mapping")
|
|
|
+ return
|
|
|
for edge in self.graph.edge_mapping[finished_node_id]:
|
|
|
if (
|
|
|
edge.run_condition
|