Browse Source

Make max_submit_count configurable via Config (#11673)

Kazuhisa Wada 4 months ago
parent
commit
63f1dd7877

+ 2 - 0
api/.env.example

@@ -433,3 +433,5 @@ RESET_PASSWORD_TOKEN_EXPIRY_MINUTES=5
 
 CREATE_TIDB_SERVICE_JOB_ENABLED=false
 
+# Maximum number of submitted thread count in a ThreadPool for parallel node execution
+MAX_SUBMIT_COUNT=100

+ 12 - 0
api/configs/feature/__init__.py

@@ -439,6 +439,17 @@ class WorkflowConfig(BaseSettings):
     )
 
 
+class WorkflowNodeExecutionConfig(BaseSettings):
+    """
+    Configuration for workflow node execution
+    """
+
+    MAX_SUBMIT_COUNT: PositiveInt = Field(
+        description="Maximum number of submitted thread count in a ThreadPool for parallel node execution",
+        default=100,
+    )
+
+
 class AuthConfig(BaseSettings):
     """
     Configuration for authentication and OAuth
@@ -775,6 +786,7 @@ class FeatureConfig(
     ToolConfig,
     UpdateConfig,
     WorkflowConfig,
+    WorkflowNodeExecutionConfig,
     WorkspaceConfig,
     LoginConfig,
     # hosted services config

+ 8 - 2
api/core/workflow/graph_engine/graph_engine.py

@@ -9,6 +9,7 @@ from typing import Any, Optional, cast
 
 from flask import Flask, current_app
 
+from configs import dify_config
 from core.app.apps.base_app_queue_manager import GenerateTaskStoppedError
 from core.app.entities.app_invoke_entities import InvokeFrom
 from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult
@@ -52,7 +53,12 @@ logger = logging.getLogger(__name__)
 
 class GraphEngineThreadPool(ThreadPoolExecutor):
     def __init__(
-        self, max_workers=None, thread_name_prefix="", initializer=None, initargs=(), max_submit_count=100
+        self,
+        max_workers=None,
+        thread_name_prefix="",
+        initializer=None,
+        initargs=(),
+        max_submit_count=dify_config.MAX_SUBMIT_COUNT,
     ) -> None:
         super().__init__(max_workers, thread_name_prefix, initializer, initargs)
         self.max_submit_count = max_submit_count
@@ -92,7 +98,7 @@ class GraphEngine:
         max_execution_time: int,
         thread_pool_id: Optional[str] = None,
     ) -> None:
-        thread_pool_max_submit_count = 100
+        thread_pool_max_submit_count = dify_config.MAX_SUBMIT_COUNT
         thread_pool_max_workers = 10
 
         # init thread pool

+ 3 - 1
api/core/workflow/nodes/iteration/iteration_node.py

@@ -163,7 +163,9 @@ class IterationNode(BaseNode[IterationNodeData]):
             if self.node_data.is_parallel:
                 futures: list[Future] = []
                 q: Queue = Queue()
-                thread_pool = GraphEngineThreadPool(max_workers=self.node_data.parallel_nums, max_submit_count=100)
+                thread_pool = GraphEngineThreadPool(
+                    max_workers=self.node_data.parallel_nums, max_submit_count=dify_config.MAX_SUBMIT_COUNT
+                )
                 for index, item in enumerate(iterator_list_value):
                     future: Future = thread_pool.submit(
                         self._run_single_iter_parallel,