Explorar o código

fix: thread_pool submit count in parallel workflow not releasing (#8549)

takatost hai 7 meses
pai
achega
ffd2f61dd9
Modificáronse 1 ficheiros con 17 adicións e 12 borrados
  1. 17 12
      api/core/workflow/graph_engine/graph_engine.py

+ 17 - 12
api/core/workflow/graph_engine/graph_engine.py

@@ -61,6 +61,9 @@ class GraphEngineThreadPool(ThreadPoolExecutor):
 
         return super().submit(fn, *args, **kwargs)
 
+    def task_done_callback(self, future):
+        self.submit_count -= 1
+
     def check_is_full(self) -> None:
         print(f"submit_count: {self.submit_count}, max_submit_count: {self.max_submit_count}")
         if self.submit_count > self.max_submit_count:
@@ -426,20 +429,22 @@ class GraphEngine:
             ):
                 continue
 
-            futures.append(
-                self.thread_pool.submit(
-                    self._run_parallel_node,
-                    **{
-                        "flask_app": current_app._get_current_object(),  # type: ignore[attr-defined]
-                        "q": q,
-                        "parallel_id": parallel_id,
-                        "parallel_start_node_id": edge.target_node_id,
-                        "parent_parallel_id": in_parallel_id,
-                        "parent_parallel_start_node_id": parallel_start_node_id,
-                    },
-                )
+            future = self.thread_pool.submit(
+                self._run_parallel_node,
+                **{
+                    "flask_app": current_app._get_current_object(),  # type: ignore[attr-defined]
+                    "q": q,
+                    "parallel_id": parallel_id,
+                    "parallel_start_node_id": edge.target_node_id,
+                    "parent_parallel_id": in_parallel_id,
+                    "parent_parallel_start_node_id": parallel_start_node_id,
+                },
             )
 
+            future.add_done_callback(self.thread_pool.task_done_callback)
+
+            futures.append(future)
+
         succeeded_count = 0
         while True:
             try: