workflow_service.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. import json
  2. import time
  3. from collections.abc import Sequence
  4. from datetime import UTC, datetime
  5. from typing import Optional, cast
  6. from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
  7. from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
  8. from core.model_runtime.utils.encoders import jsonable_encoder
  9. from core.variables import Variable
  10. from core.workflow.entities.node_entities import NodeRunResult
  11. from core.workflow.errors import WorkflowNodeRunFailedError
  12. from core.workflow.nodes import NodeType
  13. from core.workflow.nodes.base.entities import BaseNodeData
  14. from core.workflow.nodes.base.node import BaseNode
  15. from core.workflow.nodes.enums import ErrorStrategy
  16. from core.workflow.nodes.event import RunCompletedEvent
  17. from core.workflow.nodes.event.event import SingleStepRetryEvent
  18. from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
  19. from core.workflow.workflow_entry import WorkflowEntry
  20. from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
  21. from extensions.ext_database import db
  22. from models.account import Account
  23. from models.enums import CreatedByRole
  24. from models.model import App, AppMode
  25. from models.workflow import (
  26. Workflow,
  27. WorkflowNodeExecution,
  28. WorkflowNodeExecutionStatus,
  29. WorkflowNodeExecutionTriggeredFrom,
  30. WorkflowType,
  31. )
  32. from services.errors.app import WorkflowHashNotEqualError
  33. from services.workflow.workflow_converter import WorkflowConverter
  34. class WorkflowService:
  35. """
  36. Workflow Service
  37. """
  38. def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
  39. """
  40. Get draft workflow
  41. """
  42. # fetch draft workflow by app_model
  43. workflow = (
  44. db.session.query(Workflow)
  45. .filter(
  46. Workflow.tenant_id == app_model.tenant_id, Workflow.app_id == app_model.id, Workflow.version == "draft"
  47. )
  48. .first()
  49. )
  50. # return draft workflow
  51. return workflow
  52. def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
  53. """
  54. Get published workflow
  55. """
  56. if not app_model.workflow_id:
  57. return None
  58. # fetch published workflow by workflow_id
  59. workflow = (
  60. db.session.query(Workflow)
  61. .filter(
  62. Workflow.tenant_id == app_model.tenant_id,
  63. Workflow.app_id == app_model.id,
  64. Workflow.id == app_model.workflow_id,
  65. )
  66. .first()
  67. )
  68. return workflow
  69. def sync_draft_workflow(
  70. self,
  71. *,
  72. app_model: App,
  73. graph: dict,
  74. features: dict,
  75. unique_hash: Optional[str],
  76. account: Account,
  77. environment_variables: Sequence[Variable],
  78. conversation_variables: Sequence[Variable],
  79. ) -> Workflow:
  80. """
  81. Sync draft workflow
  82. :raises WorkflowHashNotEqualError
  83. """
  84. # fetch draft workflow by app_model
  85. workflow = self.get_draft_workflow(app_model=app_model)
  86. if workflow and workflow.unique_hash != unique_hash:
  87. raise WorkflowHashNotEqualError()
  88. # validate features structure
  89. self.validate_features_structure(app_model=app_model, features=features)
  90. # create draft workflow if not found
  91. if not workflow:
  92. workflow = Workflow(
  93. tenant_id=app_model.tenant_id,
  94. app_id=app_model.id,
  95. type=WorkflowType.from_app_mode(app_model.mode).value,
  96. version="draft",
  97. graph=json.dumps(graph),
  98. features=json.dumps(features),
  99. created_by=account.id,
  100. environment_variables=environment_variables,
  101. conversation_variables=conversation_variables,
  102. )
  103. db.session.add(workflow)
  104. # update draft workflow if found
  105. else:
  106. workflow.graph = json.dumps(graph)
  107. workflow.features = json.dumps(features)
  108. workflow.updated_by = account.id
  109. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  110. workflow.environment_variables = environment_variables
  111. workflow.conversation_variables = conversation_variables
  112. # commit db session changes
  113. db.session.commit()
  114. # trigger app workflow events
  115. app_draft_workflow_was_synced.send(app_model, synced_draft_workflow=workflow)
  116. # return draft workflow
  117. return workflow
  118. def publish_workflow(self, app_model: App, account: Account, draft_workflow: Optional[Workflow] = None) -> Workflow:
  119. """
  120. Publish workflow from draft
  121. :param app_model: App instance
  122. :param account: Account instance
  123. :param draft_workflow: Workflow instance
  124. """
  125. if not draft_workflow:
  126. # fetch draft workflow by app_model
  127. draft_workflow = self.get_draft_workflow(app_model=app_model)
  128. if not draft_workflow:
  129. raise ValueError("No valid workflow found.")
  130. # create new workflow
  131. workflow = Workflow(
  132. tenant_id=app_model.tenant_id,
  133. app_id=app_model.id,
  134. type=draft_workflow.type,
  135. version=str(datetime.now(UTC).replace(tzinfo=None)),
  136. graph=draft_workflow.graph,
  137. features=draft_workflow.features,
  138. created_by=account.id,
  139. environment_variables=draft_workflow.environment_variables,
  140. conversation_variables=draft_workflow.conversation_variables,
  141. )
  142. # commit db session changes
  143. db.session.add(workflow)
  144. db.session.flush()
  145. db.session.commit()
  146. app_model.workflow_id = workflow.id
  147. db.session.commit()
  148. # trigger app workflow events
  149. app_published_workflow_was_updated.send(app_model, published_workflow=workflow)
  150. # return new workflow
  151. return workflow
  152. def get_default_block_configs(self) -> list[dict]:
  153. """
  154. Get default block configs
  155. """
  156. # return default block config
  157. default_block_configs = []
  158. for node_class_mapping in NODE_TYPE_CLASSES_MAPPING.values():
  159. node_class = node_class_mapping[LATEST_VERSION]
  160. default_config = node_class.get_default_config()
  161. if default_config:
  162. default_block_configs.append(default_config)
  163. return default_block_configs
  164. def get_default_block_config(self, node_type: str, filters: Optional[dict] = None) -> Optional[dict]:
  165. """
  166. Get default config of node.
  167. :param node_type: node type
  168. :param filters: filter by node config parameters.
  169. :return:
  170. """
  171. node_type_enum = NodeType(node_type)
  172. # return default block config
  173. if node_type_enum not in NODE_TYPE_CLASSES_MAPPING:
  174. return None
  175. node_class = NODE_TYPE_CLASSES_MAPPING[node_type_enum][LATEST_VERSION]
  176. default_config = node_class.get_default_config(filters=filters)
  177. if not default_config:
  178. return None
  179. return default_config
  180. def run_draft_workflow_node(
  181. self, app_model: App, node_id: str, user_inputs: dict, account: Account
  182. ) -> WorkflowNodeExecution:
  183. """
  184. Run draft workflow node
  185. """
  186. # fetch draft workflow by app_model
  187. draft_workflow = self.get_draft_workflow(app_model=app_model)
  188. if not draft_workflow:
  189. raise ValueError("Workflow not initialized")
  190. # run draft workflow node
  191. start_at = time.perf_counter()
  192. retries = 0
  193. max_retries = 0
  194. should_retry = True
  195. retry_events = []
  196. try:
  197. while retries <= max_retries and should_retry:
  198. retry_start_at = time.perf_counter()
  199. node_instance, generator = WorkflowEntry.single_step_run(
  200. workflow=draft_workflow,
  201. node_id=node_id,
  202. user_inputs=user_inputs,
  203. user_id=account.id,
  204. )
  205. node_instance = cast(BaseNode[BaseNodeData], node_instance)
  206. max_retries = (
  207. node_instance.node_data.retry_config.max_retries if node_instance.node_data.retry_config else 0
  208. )
  209. retry_interval = node_instance.node_data.retry_config.retry_interval_seconds
  210. node_run_result: NodeRunResult | None = None
  211. for event in generator:
  212. if isinstance(event, RunCompletedEvent):
  213. node_run_result = event.run_result
  214. # sign output files
  215. node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
  216. break
  217. if not node_run_result:
  218. raise ValueError("Node run failed with no run result")
  219. # single step debug mode error handling return
  220. if node_run_result.status == WorkflowNodeExecutionStatus.FAILED:
  221. if (
  222. retries == max_retries
  223. and node_instance.node_type == NodeType.HTTP_REQUEST
  224. and node_run_result.outputs
  225. and not node_instance.should_continue_on_error
  226. ):
  227. node_run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED
  228. should_retry = False
  229. else:
  230. if node_instance.should_retry:
  231. node_run_result.status = WorkflowNodeExecutionStatus.RETRY
  232. retries += 1
  233. node_run_result.retry_index = retries
  234. retry_events.append(
  235. SingleStepRetryEvent(
  236. inputs=WorkflowEntry.handle_special_values(node_run_result.inputs)
  237. if node_run_result.inputs
  238. else None,
  239. error=node_run_result.error,
  240. outputs=WorkflowEntry.handle_special_values(node_run_result.outputs)
  241. if node_run_result.outputs
  242. else None,
  243. retry_index=node_run_result.retry_index,
  244. elapsed_time=time.perf_counter() - retry_start_at,
  245. execution_metadata=WorkflowEntry.handle_special_values(node_run_result.metadata)
  246. if node_run_result.metadata
  247. else None,
  248. )
  249. )
  250. time.sleep(retry_interval)
  251. else:
  252. should_retry = False
  253. if node_instance.should_continue_on_error:
  254. node_error_args = {
  255. "status": WorkflowNodeExecutionStatus.EXCEPTION,
  256. "error": node_run_result.error,
  257. "inputs": node_run_result.inputs,
  258. "metadata": {"error_strategy": node_instance.node_data.error_strategy},
  259. }
  260. if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
  261. node_run_result = NodeRunResult(
  262. **node_error_args,
  263. outputs={
  264. **node_instance.node_data.default_value_dict,
  265. "error_message": node_run_result.error,
  266. "error_type": node_run_result.error_type,
  267. },
  268. )
  269. else:
  270. node_run_result = NodeRunResult(
  271. **node_error_args,
  272. outputs={
  273. "error_message": node_run_result.error,
  274. "error_type": node_run_result.error_type,
  275. },
  276. )
  277. run_succeeded = node_run_result.status in (
  278. WorkflowNodeExecutionStatus.SUCCEEDED,
  279. WorkflowNodeExecutionStatus.EXCEPTION,
  280. )
  281. error = node_run_result.error if not run_succeeded else None
  282. except WorkflowNodeRunFailedError as e:
  283. node_instance = e.node_instance
  284. run_succeeded = False
  285. node_run_result = None
  286. error = e.error
  287. workflow_node_execution = WorkflowNodeExecution()
  288. workflow_node_execution.tenant_id = app_model.tenant_id
  289. workflow_node_execution.app_id = app_model.id
  290. workflow_node_execution.workflow_id = draft_workflow.id
  291. workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
  292. workflow_node_execution.index = 1
  293. workflow_node_execution.node_id = node_id
  294. workflow_node_execution.node_type = node_instance.node_type
  295. workflow_node_execution.title = node_instance.node_data.title
  296. workflow_node_execution.elapsed_time = time.perf_counter() - start_at
  297. workflow_node_execution.created_by_role = CreatedByRole.ACCOUNT.value
  298. workflow_node_execution.created_by = account.id
  299. workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
  300. workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
  301. if run_succeeded and node_run_result:
  302. # create workflow node execution
  303. inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
  304. process_data = (
  305. WorkflowEntry.handle_special_values(node_run_result.process_data)
  306. if node_run_result.process_data
  307. else None
  308. )
  309. outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
  310. workflow_node_execution.inputs = json.dumps(inputs)
  311. workflow_node_execution.process_data = json.dumps(process_data)
  312. workflow_node_execution.outputs = json.dumps(outputs)
  313. workflow_node_execution.execution_metadata = (
  314. json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
  315. )
  316. if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
  317. workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
  318. elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION:
  319. workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION.value
  320. workflow_node_execution.error = node_run_result.error
  321. else:
  322. # create workflow node execution
  323. workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
  324. workflow_node_execution.error = error
  325. db.session.add(workflow_node_execution)
  326. db.session.commit()
  327. workflow_node_execution.retry_events = retry_events
  328. return workflow_node_execution
  329. def convert_to_workflow(self, app_model: App, account: Account, args: dict) -> App:
  330. """
  331. Basic mode of chatbot app(expert mode) to workflow
  332. Completion App to Workflow App
  333. :param app_model: App instance
  334. :param account: Account instance
  335. :param args: dict
  336. :return:
  337. """
  338. # chatbot convert to workflow mode
  339. workflow_converter = WorkflowConverter()
  340. if app_model.mode not in {AppMode.CHAT.value, AppMode.COMPLETION.value}:
  341. raise ValueError(f"Current App mode: {app_model.mode} is not supported convert to workflow.")
  342. # convert to workflow
  343. new_app = workflow_converter.convert_to_workflow(
  344. app_model=app_model,
  345. account=account,
  346. name=args.get("name", "Default Name"),
  347. icon_type=args.get("icon_type", "emoji"),
  348. icon=args.get("icon", "🤖"),
  349. icon_background=args.get("icon_background", "#FFEAD5"),
  350. )
  351. return new_app
  352. def validate_features_structure(self, app_model: App, features: dict) -> dict:
  353. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  354. return AdvancedChatAppConfigManager.config_validate(
  355. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  356. )
  357. elif app_model.mode == AppMode.WORKFLOW.value:
  358. return WorkflowAppConfigManager.config_validate(
  359. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  360. )
  361. else:
  362. raise ValueError(f"Invalid app mode: {app_model.mode}")