Browse Source

feat: add workflow parallel depth limit configuration (#11460)

Signed-off-by: -LAN- <laipz8200@outlook.com>
Co-authored-by: zxhlyh <jasonapring2015@outlook.com>
-LAN- 4 tháng trước cách đây
mục cha
commit
dacd457478

+ 1 - 0
api/.env.example

@@ -399,6 +399,7 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000
 WORKFLOW_MAX_EXECUTION_STEPS=500
 WORKFLOW_MAX_EXECUTION_TIME=1200
 WORKFLOW_CALL_MAX_DEPTH=5
+WORKFLOW_PARALLEL_DEPTH_LIMIT=3
 MAX_VARIABLE_SIZE=204800
 
 # App configuration

+ 5 - 0
api/configs/feature/__init__.py

@@ -433,6 +433,11 @@ class WorkflowConfig(BaseSettings):
         default=5,
     )
 
+    WORKFLOW_PARALLEL_DEPTH_LIMIT: PositiveInt = Field(
+        description="Maximum allowed depth for nested parallel executions",
+        default=3,
+    )
+
     MAX_VARIABLE_SIZE: PositiveInt = Field(
         description="Maximum size in bytes for a single variable in workflows. Default to 200 KB.",
         default=200 * 1024,

+ 15 - 0
api/controllers/console/app/workflow.py

@@ -6,6 +6,7 @@ from flask_restful import Resource, marshal_with, reqparse
 from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
 
 import services
+from configs import dify_config
 from controllers.console import api
 from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync
 from controllers.console.app.wraps import get_app_model
@@ -426,7 +427,21 @@ class ConvertToWorkflowApi(Resource):
         }
 
 
+class WorkflowConfigApi(Resource):
+    """Resource for workflow configuration."""
+
+    @setup_required
+    @login_required
+    @account_initialization_required
+    @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
+    def get(self, app_model: App):
+        return {
+            "parallel_depth_limit": dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT,
+        }
+
+
 api.add_resource(DraftWorkflowApi, "/apps/<uuid:app_id>/workflows/draft")
+api.add_resource(WorkflowConfigApi, "/apps/<uuid:app_id>/workflows/draft/config")
 api.add_resource(AdvancedChatDraftWorkflowRunApi, "/apps/<uuid:app_id>/advanced-chat/workflows/draft/run")
 api.add_resource(DraftWorkflowRunApi, "/apps/<uuid:app_id>/workflows/draft/run")
 api.add_resource(WorkflowTaskStopApi, "/apps/<uuid:app_id>/workflow-runs/tasks/<string:task_id>/stop")

+ 4 - 1
api/core/workflow/graph_engine/entities/graph.py

@@ -4,6 +4,7 @@ from typing import Any, Optional, cast
 
 from pydantic import BaseModel, Field
 
+from configs import dify_config
 from core.workflow.graph_engine.entities.run_condition import RunCondition
 from core.workflow.nodes import NodeType
 from core.workflow.nodes.answer.answer_stream_generate_router import AnswerStreamGeneratorRouter
@@ -170,7 +171,9 @@ class Graph(BaseModel):
         for parallel in parallel_mapping.values():
             if parallel.parent_parallel_id:
                 cls._check_exceed_parallel_limit(
-                    parallel_mapping=parallel_mapping, level_limit=3, parent_parallel_id=parallel.parent_parallel_id
+                    parallel_mapping=parallel_mapping,
+                    level_limit=dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT,
+                    parent_parallel_id=parallel.parent_parallel_id,
                 )
 
         # init answer stream generate routes

+ 2 - 0
api/tests/unit_tests/configs/test_dify_config.py

@@ -59,6 +59,8 @@ def test_dify_config(example_env_file):
     # annotated field with configured value
     assert config.HTTP_REQUEST_MAX_WRITE_TIMEOUT == 30
 
+    assert config.WORKFLOW_PARALLEL_DEPTH_LIMIT == 3
+
 
 # NOTE: If there is a `.env` file in your Workspace, this test might not succeed as expected.
 # This is due to `pymilvus` loading all the variables from the `.env` file into `os.environ`.

+ 1 - 0
docker/.env.example

@@ -699,6 +699,7 @@ WORKFLOW_MAX_EXECUTION_STEPS=500
 WORKFLOW_MAX_EXECUTION_TIME=1200
 WORKFLOW_CALL_MAX_DEPTH=5
 MAX_VARIABLE_SIZE=204800
+WORKFLOW_PARALLEL_DEPTH_LIMIT=3
 WORKFLOW_FILE_UPLOAD_LIMIT=10
 
 # HTTP request node in workflow configuration

+ 2 - 1
docker/docker-compose.yaml

@@ -18,7 +18,6 @@ x-shared-env: &shared-api-worker-env
   LOG_DATEFORMAT: ${LOG_DATEFORMAT:-"%Y-%m-%d %H:%M:%S"}
   LOG_TZ: ${LOG_TZ:-UTC}
   DEBUG: ${DEBUG:-false}
-  SENTRY_DSN: ${SENTRY_DSN:-}
   FLASK_DEBUG: ${FLASK_DEBUG:-false}
   SECRET_KEY: ${SECRET_KEY:-sk-9f73s3ljTXVcMT3Blb3ljTqtsKiGHXVcMT3BlbkFJLK7U}
   INIT_PASSWORD: ${INIT_PASSWORD:-}
@@ -260,6 +259,7 @@ x-shared-env: &shared-api-worker-env
   UPLOAD_IMAGE_FILE_SIZE_LIMIT: ${UPLOAD_IMAGE_FILE_SIZE_LIMIT:-10}
   UPLOAD_VIDEO_FILE_SIZE_LIMIT: ${UPLOAD_VIDEO_FILE_SIZE_LIMIT:-100}
   UPLOAD_AUDIO_FILE_SIZE_LIMIT: ${UPLOAD_AUDIO_FILE_SIZE_LIMIT:-50}
+  SENTRY_DSN: ${SENTRY_DSN:-}
   API_SENTRY_DSN: ${API_SENTRY_DSN:-}
   API_SENTRY_TRACES_SAMPLE_RATE: ${API_SENTRY_TRACES_SAMPLE_RATE:-1.0}
   API_SENTRY_PROFILES_SAMPLE_RATE: ${API_SENTRY_PROFILES_SAMPLE_RATE:-1.0}
@@ -299,6 +299,7 @@ x-shared-env: &shared-api-worker-env
   WORKFLOW_MAX_EXECUTION_TIME: ${WORKFLOW_MAX_EXECUTION_TIME:-1200}
   WORKFLOW_CALL_MAX_DEPTH: ${WORKFLOW_CALL_MAX_DEPTH:-5}
   MAX_VARIABLE_SIZE: ${MAX_VARIABLE_SIZE:-204800}
+  WORKFLOW_PARALLEL_DEPTH_LIMIT: ${WORKFLOW_PARALLEL_DEPTH_LIMIT:-3}
   WORKFLOW_FILE_UPLOAD_LIMIT: ${WORKFLOW_FILE_UPLOAD_LIMIT:-10}
   HTTP_REQUEST_NODE_MAX_BINARY_SIZE: ${HTTP_REQUEST_NODE_MAX_BINARY_SIZE:-10485760}
   HTTP_REQUEST_NODE_MAX_TEXT_SIZE: ${HTTP_REQUEST_NODE_MAX_TEXT_SIZE:-1048576}

+ 6 - 3
web/app/components/workflow/hooks/use-workflow.ts

@@ -57,6 +57,7 @@ import {
 import I18n from '@/context/i18n'
 import { CollectionType } from '@/app/components/tools/types'
 import { CUSTOM_ITERATION_START_NODE } from '@/app/components/workflow/nodes/iteration-start/constants'
+import { useWorkflowConfig } from '@/service/use-workflow'
 
 export const useIsChatMode = () => {
   const appDetail = useAppStore(s => s.appDetail)
@@ -69,7 +70,9 @@ export const useWorkflow = () => {
   const { locale } = useContext(I18n)
   const store = useStoreApi()
   const workflowStore = useWorkflowStore()
+  const appId = useStore(s => s.appId)
   const nodesExtraData = useNodesExtraData()
+  const { data: workflowConfig } = useWorkflowConfig(appId)
   const setPanelWidth = useCallback((width: number) => {
     localStorage.setItem('workflow-node-panel-width', `${width}`)
     workflowStore.setState({ panelWidth: width })
@@ -336,15 +339,15 @@ export const useWorkflow = () => {
     for (let i = 0; i < parallelList.length; i++) {
       const parallel = parallelList[i]
 
-      if (parallel.depth > PARALLEL_DEPTH_LIMIT) {
+      if (parallel.depth > (workflowConfig?.parallel_depth_limit || PARALLEL_DEPTH_LIMIT)) {
         const { setShowTips } = workflowStore.getState()
-        setShowTips(t('workflow.common.parallelTip.depthLimit', { num: PARALLEL_DEPTH_LIMIT }))
+        setShowTips(t('workflow.common.parallelTip.depthLimit', { num: (workflowConfig?.parallel_depth_limit || PARALLEL_DEPTH_LIMIT) }))
         return false
       }
     }
 
     return true
-  }, [t, workflowStore])
+  }, [t, workflowStore, workflowConfig?.parallel_depth_limit])
 
   const isValidConnection = useCallback(({ source, sourceHandle, target }: Connection) => {
     const {

+ 12 - 0
web/service/use-workflow.ts

@@ -0,0 +1,12 @@
+import { useQuery } from '@tanstack/react-query'
+import { get } from './base'
+import type { WorkflowConfigResponse } from '@/types/workflow'
+
+const NAME_SPACE = 'workflow'
+
+export const useWorkflowConfig = (appId: string) => {
+  return useQuery({
+    queryKey: [NAME_SPACE, 'config', appId],
+    queryFn: () => get<WorkflowConfigResponse>(`/apps/${appId}/workflows/draft/config`),
+  })
+}

+ 4 - 0
web/types/workflow.ts

@@ -333,3 +333,7 @@ export type ConversationVariableResponse = {
 }
 
 export type IterationDurationMap = Record<string, number>
+
+export type WorkflowConfigResponse = {
+  parallel_depth_limit: number
+}