workflow_service.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. import json
  2. import time
  3. from collections.abc import Sequence
  4. from datetime import UTC, datetime
  5. from typing import Any, Optional, cast
  6. from uuid import uuid4
  7. from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
  8. from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
  9. from core.model_runtime.utils.encoders import jsonable_encoder
  10. from core.variables import Variable
  11. from core.workflow.entities.node_entities import NodeRunResult
  12. from core.workflow.errors import WorkflowNodeRunFailedError
  13. from core.workflow.nodes import NodeType
  14. from core.workflow.nodes.base.entities import BaseNodeData
  15. from core.workflow.nodes.base.node import BaseNode
  16. from core.workflow.nodes.enums import ErrorStrategy
  17. from core.workflow.nodes.event import RunCompletedEvent
  18. from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
  19. from core.workflow.workflow_entry import WorkflowEntry
  20. from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
  21. from extensions.ext_database import db
  22. from models.account import Account
  23. from models.enums import CreatedByRole
  24. from models.model import App, AppMode
  25. from models.workflow import (
  26. Workflow,
  27. WorkflowNodeExecution,
  28. WorkflowNodeExecutionStatus,
  29. WorkflowNodeExecutionTriggeredFrom,
  30. WorkflowType,
  31. )
  32. from services.errors.app import WorkflowHashNotEqualError
  33. from services.workflow.workflow_converter import WorkflowConverter
  34. class WorkflowService:
  35. """
  36. Workflow Service
  37. """
  38. def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
  39. """
  40. Get draft workflow
  41. """
  42. # fetch draft workflow by app_model
  43. workflow = (
  44. db.session.query(Workflow)
  45. .filter(
  46. Workflow.tenant_id == app_model.tenant_id, Workflow.app_id == app_model.id, Workflow.version == "draft"
  47. )
  48. .first()
  49. )
  50. # return draft workflow
  51. return workflow
  52. def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
  53. """
  54. Get published workflow
  55. """
  56. if not app_model.workflow_id:
  57. return None
  58. # fetch published workflow by workflow_id
  59. workflow = (
  60. db.session.query(Workflow)
  61. .filter(
  62. Workflow.tenant_id == app_model.tenant_id,
  63. Workflow.app_id == app_model.id,
  64. Workflow.id == app_model.workflow_id,
  65. )
  66. .first()
  67. )
  68. return workflow
  69. def sync_draft_workflow(
  70. self,
  71. *,
  72. app_model: App,
  73. graph: dict,
  74. features: dict,
  75. unique_hash: Optional[str],
  76. account: Account,
  77. environment_variables: Sequence[Variable],
  78. conversation_variables: Sequence[Variable],
  79. ) -> Workflow:
  80. """
  81. Sync draft workflow
  82. :raises WorkflowHashNotEqualError
  83. """
  84. # fetch draft workflow by app_model
  85. workflow = self.get_draft_workflow(app_model=app_model)
  86. if workflow and workflow.unique_hash != unique_hash:
  87. raise WorkflowHashNotEqualError()
  88. # validate features structure
  89. self.validate_features_structure(app_model=app_model, features=features)
  90. # create draft workflow if not found
  91. if not workflow:
  92. workflow = Workflow(
  93. tenant_id=app_model.tenant_id,
  94. app_id=app_model.id,
  95. type=WorkflowType.from_app_mode(app_model.mode).value,
  96. version="draft",
  97. graph=json.dumps(graph),
  98. features=json.dumps(features),
  99. created_by=account.id,
  100. environment_variables=environment_variables,
  101. conversation_variables=conversation_variables,
  102. )
  103. db.session.add(workflow)
  104. # update draft workflow if found
  105. else:
  106. workflow.graph = json.dumps(graph)
  107. workflow.features = json.dumps(features)
  108. workflow.updated_by = account.id
  109. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  110. workflow.environment_variables = environment_variables
  111. workflow.conversation_variables = conversation_variables
  112. # commit db session changes
  113. db.session.commit()
  114. # trigger app workflow events
  115. app_draft_workflow_was_synced.send(app_model, synced_draft_workflow=workflow)
  116. # return draft workflow
  117. return workflow
  118. def publish_workflow(self, app_model: App, account: Account, draft_workflow: Optional[Workflow] = None) -> Workflow:
  119. """
  120. Publish workflow from draft
  121. :param app_model: App instance
  122. :param account: Account instance
  123. :param draft_workflow: Workflow instance
  124. """
  125. if not draft_workflow:
  126. # fetch draft workflow by app_model
  127. draft_workflow = self.get_draft_workflow(app_model=app_model)
  128. if not draft_workflow:
  129. raise ValueError("No valid workflow found.")
  130. # create new workflow
  131. workflow = Workflow(
  132. tenant_id=app_model.tenant_id,
  133. app_id=app_model.id,
  134. type=draft_workflow.type,
  135. version=str(datetime.now(UTC).replace(tzinfo=None)),
  136. graph=draft_workflow.graph,
  137. features=draft_workflow.features,
  138. created_by=account.id,
  139. environment_variables=draft_workflow.environment_variables,
  140. conversation_variables=draft_workflow.conversation_variables,
  141. )
  142. # commit db session changes
  143. db.session.add(workflow)
  144. db.session.flush()
  145. db.session.commit()
  146. app_model.workflow_id = workflow.id
  147. db.session.commit()
  148. # trigger app workflow events
  149. app_published_workflow_was_updated.send(app_model, published_workflow=workflow)
  150. # return new workflow
  151. return workflow
  152. def get_default_block_configs(self) -> list[dict]:
  153. """
  154. Get default block configs
  155. """
  156. # return default block config
  157. default_block_configs = []
  158. for node_class_mapping in NODE_TYPE_CLASSES_MAPPING.values():
  159. node_class = node_class_mapping[LATEST_VERSION]
  160. default_config = node_class.get_default_config()
  161. if default_config:
  162. default_block_configs.append(default_config)
  163. return default_block_configs
  164. def get_default_block_config(self, node_type: str, filters: Optional[dict] = None) -> Optional[dict]:
  165. """
  166. Get default config of node.
  167. :param node_type: node type
  168. :param filters: filter by node config parameters.
  169. :return:
  170. """
  171. node_type_enum = NodeType(node_type)
  172. # return default block config
  173. if node_type_enum not in NODE_TYPE_CLASSES_MAPPING:
  174. return None
  175. node_class = NODE_TYPE_CLASSES_MAPPING[node_type_enum][LATEST_VERSION]
  176. default_config = node_class.get_default_config(filters=filters)
  177. if not default_config:
  178. return None
  179. return default_config
  180. def run_draft_workflow_node(
  181. self, app_model: App, node_id: str, user_inputs: dict, account: Account
  182. ) -> WorkflowNodeExecution:
  183. """
  184. Run draft workflow node
  185. """
  186. # fetch draft workflow by app_model
  187. draft_workflow = self.get_draft_workflow(app_model=app_model)
  188. if not draft_workflow:
  189. raise ValueError("Workflow not initialized")
  190. # run draft workflow node
  191. start_at = time.perf_counter()
  192. try:
  193. node_instance, generator = WorkflowEntry.single_step_run(
  194. workflow=draft_workflow,
  195. node_id=node_id,
  196. user_inputs=user_inputs,
  197. user_id=account.id,
  198. )
  199. node_instance = cast(BaseNode[BaseNodeData], node_instance)
  200. node_run_result: NodeRunResult | None = None
  201. for event in generator:
  202. if isinstance(event, RunCompletedEvent):
  203. node_run_result = event.run_result
  204. # sign output files
  205. node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
  206. break
  207. if not node_run_result:
  208. raise ValueError("Node run failed with no run result")
  209. # single step debug mode error handling return
  210. if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
  211. node_error_args: dict[str, Any] = {
  212. "status": WorkflowNodeExecutionStatus.EXCEPTION,
  213. "error": node_run_result.error,
  214. "inputs": node_run_result.inputs,
  215. "metadata": {"error_strategy": node_instance.node_data.error_strategy},
  216. }
  217. if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
  218. node_run_result = NodeRunResult(
  219. **node_error_args,
  220. outputs={
  221. **node_instance.node_data.default_value_dict,
  222. "error_message": node_run_result.error,
  223. "error_type": node_run_result.error_type,
  224. },
  225. )
  226. else:
  227. node_run_result = NodeRunResult(
  228. **node_error_args,
  229. outputs={
  230. "error_message": node_run_result.error,
  231. "error_type": node_run_result.error_type,
  232. },
  233. )
  234. run_succeeded = node_run_result.status in (
  235. WorkflowNodeExecutionStatus.SUCCEEDED,
  236. WorkflowNodeExecutionStatus.EXCEPTION,
  237. )
  238. error = node_run_result.error if not run_succeeded else None
  239. except WorkflowNodeRunFailedError as e:
  240. node_instance = e.node_instance
  241. run_succeeded = False
  242. node_run_result = None
  243. error = e.error
  244. workflow_node_execution = WorkflowNodeExecution()
  245. workflow_node_execution.id = str(uuid4())
  246. workflow_node_execution.tenant_id = app_model.tenant_id
  247. workflow_node_execution.app_id = app_model.id
  248. workflow_node_execution.workflow_id = draft_workflow.id
  249. workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
  250. workflow_node_execution.index = 1
  251. workflow_node_execution.node_id = node_id
  252. workflow_node_execution.node_type = node_instance.node_type
  253. workflow_node_execution.title = node_instance.node_data.title
  254. workflow_node_execution.elapsed_time = time.perf_counter() - start_at
  255. workflow_node_execution.created_by_role = CreatedByRole.ACCOUNT.value
  256. workflow_node_execution.created_by = account.id
  257. workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
  258. workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
  259. if run_succeeded and node_run_result:
  260. # create workflow node execution
  261. inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
  262. process_data = (
  263. WorkflowEntry.handle_special_values(node_run_result.process_data)
  264. if node_run_result.process_data
  265. else None
  266. )
  267. outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
  268. workflow_node_execution.inputs = json.dumps(inputs)
  269. workflow_node_execution.process_data = json.dumps(process_data)
  270. workflow_node_execution.outputs = json.dumps(outputs)
  271. workflow_node_execution.execution_metadata = (
  272. json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
  273. )
  274. if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
  275. workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
  276. elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION:
  277. workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION.value
  278. workflow_node_execution.error = node_run_result.error
  279. else:
  280. # create workflow node execution
  281. workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
  282. workflow_node_execution.error = error
  283. db.session.add(workflow_node_execution)
  284. db.session.commit()
  285. return workflow_node_execution
  286. def convert_to_workflow(self, app_model: App, account: Account, args: dict) -> App:
  287. """
  288. Basic mode of chatbot app(expert mode) to workflow
  289. Completion App to Workflow App
  290. :param app_model: App instance
  291. :param account: Account instance
  292. :param args: dict
  293. :return:
  294. """
  295. # chatbot convert to workflow mode
  296. workflow_converter = WorkflowConverter()
  297. if app_model.mode not in {AppMode.CHAT.value, AppMode.COMPLETION.value}:
  298. raise ValueError(f"Current App mode: {app_model.mode} is not supported convert to workflow.")
  299. # convert to workflow
  300. new_app: App = workflow_converter.convert_to_workflow(
  301. app_model=app_model,
  302. account=account,
  303. name=args.get("name", "Default Name"),
  304. icon_type=args.get("icon_type", "emoji"),
  305. icon=args.get("icon", "🤖"),
  306. icon_background=args.get("icon_background", "#FFEAD5"),
  307. )
  308. return new_app
  309. def validate_features_structure(self, app_model: App, features: dict) -> dict:
  310. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  311. return AdvancedChatAppConfigManager.config_validate(
  312. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  313. )
  314. elif app_model.mode == AppMode.WORKFLOW.value:
  315. return WorkflowAppConfigManager.config_validate(
  316. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  317. )
  318. else:
  319. raise ValueError(f"Invalid app mode: {app_model.mode}")