workflow_service.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. import json
  2. import time
  3. from collections.abc import Sequence
  4. from datetime import UTC, datetime
  5. from typing import Any, Optional, cast
  6. from uuid import uuid4
  7. from sqlalchemy import desc
  8. from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
  9. from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
  10. from core.model_runtime.utils.encoders import jsonable_encoder
  11. from core.variables import Variable
  12. from core.workflow.entities.node_entities import NodeRunResult
  13. from core.workflow.errors import WorkflowNodeRunFailedError
  14. from core.workflow.nodes import NodeType
  15. from core.workflow.nodes.base.entities import BaseNodeData
  16. from core.workflow.nodes.base.node import BaseNode
  17. from core.workflow.nodes.enums import ErrorStrategy
  18. from core.workflow.nodes.event import RunCompletedEvent
  19. from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
  20. from core.workflow.workflow_entry import WorkflowEntry
  21. from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
  22. from extensions.ext_database import db
  23. from models.account import Account
  24. from models.enums import CreatedByRole
  25. from models.model import App, AppMode
  26. from models.workflow import (
  27. Workflow,
  28. WorkflowNodeExecution,
  29. WorkflowNodeExecutionStatus,
  30. WorkflowNodeExecutionTriggeredFrom,
  31. WorkflowType,
  32. )
  33. from services.errors.app import WorkflowHashNotEqualError
  34. from services.workflow.workflow_converter import WorkflowConverter
  35. class WorkflowService:
  36. """
  37. Workflow Service
  38. """
  39. def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
  40. """
  41. Get draft workflow
  42. """
  43. # fetch draft workflow by app_model
  44. workflow = (
  45. db.session.query(Workflow)
  46. .filter(
  47. Workflow.tenant_id == app_model.tenant_id, Workflow.app_id == app_model.id, Workflow.version == "draft"
  48. )
  49. .first()
  50. )
  51. # return draft workflow
  52. return workflow
  53. def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
  54. """
  55. Get published workflow
  56. """
  57. if not app_model.workflow_id:
  58. return None
  59. # fetch published workflow by workflow_id
  60. workflow = (
  61. db.session.query(Workflow)
  62. .filter(
  63. Workflow.tenant_id == app_model.tenant_id,
  64. Workflow.app_id == app_model.id,
  65. Workflow.id == app_model.workflow_id,
  66. )
  67. .first()
  68. )
  69. return workflow
  70. def get_all_published_workflow(self, app_model: App, page: int, limit: int) -> tuple[list[Workflow], bool]:
  71. """
  72. Get published workflow with pagination
  73. """
  74. if not app_model.workflow_id:
  75. return [], False
  76. workflows = (
  77. db.session.query(Workflow)
  78. .filter(Workflow.app_id == app_model.id)
  79. .order_by(desc(Workflow.version))
  80. .offset((page - 1) * limit)
  81. .limit(limit + 1)
  82. .all()
  83. )
  84. has_more = len(workflows) > limit
  85. if has_more:
  86. workflows = workflows[:-1]
  87. return workflows, has_more
  88. def sync_draft_workflow(
  89. self,
  90. *,
  91. app_model: App,
  92. graph: dict,
  93. features: dict,
  94. unique_hash: Optional[str],
  95. account: Account,
  96. environment_variables: Sequence[Variable],
  97. conversation_variables: Sequence[Variable],
  98. ) -> Workflow:
  99. """
  100. Sync draft workflow
  101. :raises WorkflowHashNotEqualError
  102. """
  103. # fetch draft workflow by app_model
  104. workflow = self.get_draft_workflow(app_model=app_model)
  105. if workflow and workflow.unique_hash != unique_hash:
  106. raise WorkflowHashNotEqualError()
  107. # validate features structure
  108. self.validate_features_structure(app_model=app_model, features=features)
  109. # create draft workflow if not found
  110. if not workflow:
  111. workflow = Workflow(
  112. tenant_id=app_model.tenant_id,
  113. app_id=app_model.id,
  114. type=WorkflowType.from_app_mode(app_model.mode).value,
  115. version="draft",
  116. graph=json.dumps(graph),
  117. features=json.dumps(features),
  118. created_by=account.id,
  119. environment_variables=environment_variables,
  120. conversation_variables=conversation_variables,
  121. )
  122. db.session.add(workflow)
  123. # update draft workflow if found
  124. else:
  125. workflow.graph = json.dumps(graph)
  126. workflow.features = json.dumps(features)
  127. workflow.updated_by = account.id
  128. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  129. workflow.environment_variables = environment_variables
  130. workflow.conversation_variables = conversation_variables
  131. # commit db session changes
  132. db.session.commit()
  133. # trigger app workflow events
  134. app_draft_workflow_was_synced.send(app_model, synced_draft_workflow=workflow)
  135. # return draft workflow
  136. return workflow
  137. def publish_workflow(self, app_model: App, account: Account, draft_workflow: Optional[Workflow] = None) -> Workflow:
  138. """
  139. Publish workflow from draft
  140. :param app_model: App instance
  141. :param account: Account instance
  142. :param draft_workflow: Workflow instance
  143. """
  144. if not draft_workflow:
  145. # fetch draft workflow by app_model
  146. draft_workflow = self.get_draft_workflow(app_model=app_model)
  147. if not draft_workflow:
  148. raise ValueError("No valid workflow found.")
  149. # create new workflow
  150. workflow = Workflow(
  151. tenant_id=app_model.tenant_id,
  152. app_id=app_model.id,
  153. type=draft_workflow.type,
  154. version=str(datetime.now(UTC).replace(tzinfo=None)),
  155. graph=draft_workflow.graph,
  156. features=draft_workflow.features,
  157. created_by=account.id,
  158. environment_variables=draft_workflow.environment_variables,
  159. conversation_variables=draft_workflow.conversation_variables,
  160. )
  161. # commit db session changes
  162. db.session.add(workflow)
  163. db.session.flush()
  164. db.session.commit()
  165. app_model.workflow_id = workflow.id
  166. db.session.commit()
  167. # trigger app workflow events
  168. app_published_workflow_was_updated.send(app_model, published_workflow=workflow)
  169. # return new workflow
  170. return workflow
  171. def get_default_block_configs(self) -> list[dict]:
  172. """
  173. Get default block configs
  174. """
  175. # return default block config
  176. default_block_configs = []
  177. for node_class_mapping in NODE_TYPE_CLASSES_MAPPING.values():
  178. node_class = node_class_mapping[LATEST_VERSION]
  179. default_config = node_class.get_default_config()
  180. if default_config:
  181. default_block_configs.append(default_config)
  182. return default_block_configs
  183. def get_default_block_config(self, node_type: str, filters: Optional[dict] = None) -> Optional[dict]:
  184. """
  185. Get default config of node.
  186. :param node_type: node type
  187. :param filters: filter by node config parameters.
  188. :return:
  189. """
  190. node_type_enum = NodeType(node_type)
  191. # return default block config
  192. if node_type_enum not in NODE_TYPE_CLASSES_MAPPING:
  193. return None
  194. node_class = NODE_TYPE_CLASSES_MAPPING[node_type_enum][LATEST_VERSION]
  195. default_config = node_class.get_default_config(filters=filters)
  196. if not default_config:
  197. return None
  198. return default_config
  199. def run_draft_workflow_node(
  200. self, app_model: App, node_id: str, user_inputs: dict, account: Account
  201. ) -> WorkflowNodeExecution:
  202. """
  203. Run draft workflow node
  204. """
  205. # fetch draft workflow by app_model
  206. draft_workflow = self.get_draft_workflow(app_model=app_model)
  207. if not draft_workflow:
  208. raise ValueError("Workflow not initialized")
  209. # run draft workflow node
  210. start_at = time.perf_counter()
  211. try:
  212. node_instance, generator = WorkflowEntry.single_step_run(
  213. workflow=draft_workflow,
  214. node_id=node_id,
  215. user_inputs=user_inputs,
  216. user_id=account.id,
  217. )
  218. node_instance = cast(BaseNode[BaseNodeData], node_instance)
  219. node_run_result: NodeRunResult | None = None
  220. for event in generator:
  221. if isinstance(event, RunCompletedEvent):
  222. node_run_result = event.run_result
  223. # sign output files
  224. node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
  225. break
  226. if not node_run_result:
  227. raise ValueError("Node run failed with no run result")
  228. # single step debug mode error handling return
  229. if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
  230. node_error_args: dict[str, Any] = {
  231. "status": WorkflowNodeExecutionStatus.EXCEPTION,
  232. "error": node_run_result.error,
  233. "inputs": node_run_result.inputs,
  234. "metadata": {"error_strategy": node_instance.node_data.error_strategy},
  235. }
  236. if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
  237. node_run_result = NodeRunResult(
  238. **node_error_args,
  239. outputs={
  240. **node_instance.node_data.default_value_dict,
  241. "error_message": node_run_result.error,
  242. "error_type": node_run_result.error_type,
  243. },
  244. )
  245. else:
  246. node_run_result = NodeRunResult(
  247. **node_error_args,
  248. outputs={
  249. "error_message": node_run_result.error,
  250. "error_type": node_run_result.error_type,
  251. },
  252. )
  253. run_succeeded = node_run_result.status in (
  254. WorkflowNodeExecutionStatus.SUCCEEDED,
  255. WorkflowNodeExecutionStatus.EXCEPTION,
  256. )
  257. error = node_run_result.error if not run_succeeded else None
  258. except WorkflowNodeRunFailedError as e:
  259. node_instance = e.node_instance
  260. run_succeeded = False
  261. node_run_result = None
  262. error = e.error
  263. workflow_node_execution = WorkflowNodeExecution()
  264. workflow_node_execution.id = str(uuid4())
  265. workflow_node_execution.tenant_id = app_model.tenant_id
  266. workflow_node_execution.app_id = app_model.id
  267. workflow_node_execution.workflow_id = draft_workflow.id
  268. workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
  269. workflow_node_execution.index = 1
  270. workflow_node_execution.node_id = node_id
  271. workflow_node_execution.node_type = node_instance.node_type
  272. workflow_node_execution.title = node_instance.node_data.title
  273. workflow_node_execution.elapsed_time = time.perf_counter() - start_at
  274. workflow_node_execution.created_by_role = CreatedByRole.ACCOUNT.value
  275. workflow_node_execution.created_by = account.id
  276. workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
  277. workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
  278. if run_succeeded and node_run_result:
  279. # create workflow node execution
  280. inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
  281. process_data = (
  282. WorkflowEntry.handle_special_values(node_run_result.process_data)
  283. if node_run_result.process_data
  284. else None
  285. )
  286. outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
  287. workflow_node_execution.inputs = json.dumps(inputs)
  288. workflow_node_execution.process_data = json.dumps(process_data)
  289. workflow_node_execution.outputs = json.dumps(outputs)
  290. workflow_node_execution.execution_metadata = (
  291. json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
  292. )
  293. if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
  294. workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
  295. elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION:
  296. workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION.value
  297. workflow_node_execution.error = node_run_result.error
  298. else:
  299. # create workflow node execution
  300. workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
  301. workflow_node_execution.error = error
  302. db.session.add(workflow_node_execution)
  303. db.session.commit()
  304. return workflow_node_execution
  305. def convert_to_workflow(self, app_model: App, account: Account, args: dict) -> App:
  306. """
  307. Basic mode of chatbot app(expert mode) to workflow
  308. Completion App to Workflow App
  309. :param app_model: App instance
  310. :param account: Account instance
  311. :param args: dict
  312. :return:
  313. """
  314. # chatbot convert to workflow mode
  315. workflow_converter = WorkflowConverter()
  316. if app_model.mode not in {AppMode.CHAT.value, AppMode.COMPLETION.value}:
  317. raise ValueError(f"Current App mode: {app_model.mode} is not supported convert to workflow.")
  318. # convert to workflow
  319. new_app: App = workflow_converter.convert_to_workflow(
  320. app_model=app_model,
  321. account=account,
  322. name=args.get("name", "Default Name"),
  323. icon_type=args.get("icon_type", "emoji"),
  324. icon=args.get("icon", "🤖"),
  325. icon_background=args.get("icon_background", "#FFEAD5"),
  326. )
  327. return new_app
  328. def validate_features_structure(self, app_model: App, features: dict) -> dict:
  329. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  330. return AdvancedChatAppConfigManager.config_validate(
  331. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  332. )
  333. elif app_model.mode == AppMode.WORKFLOW.value:
  334. return WorkflowAppConfigManager.config_validate(
  335. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  336. )
  337. else:
  338. raise ValueError(f"Invalid app mode: {app_model.mode}")