workflow.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. import json
  2. import logging
  3. from flask import abort, request
  4. from flask_restful import Resource, marshal_with, reqparse
  5. from werkzeug.exceptions import InternalServerError, NotFound
  6. import services
  7. from controllers.console import api
  8. from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync
  9. from controllers.console.app.wraps import get_app_model
  10. from controllers.console.setup import setup_required
  11. from controllers.console.wraps import account_initialization_required
  12. from core.app.apps.base_app_queue_manager import AppQueueManager
  13. from core.app.entities.app_invoke_entities import InvokeFrom
  14. from fields.workflow_fields import workflow_fields
  15. from fields.workflow_run_fields import workflow_run_node_execution_fields
  16. from libs import helper
  17. from libs.helper import TimestampField, uuid_value
  18. from libs.login import current_user, login_required
  19. from models.model import App, AppMode
  20. from services.app_generate_service import AppGenerateService
  21. from services.errors.app import WorkflowHashNotEqualError
  22. from services.workflow_service import WorkflowService
  23. logger = logging.getLogger(__name__)
  24. class DraftWorkflowApi(Resource):
  25. @setup_required
  26. @login_required
  27. @account_initialization_required
  28. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  29. @marshal_with(workflow_fields)
  30. def get(self, app_model: App):
  31. """
  32. Get draft workflow
  33. """
  34. # fetch draft workflow by app_model
  35. workflow_service = WorkflowService()
  36. workflow = workflow_service.get_draft_workflow(app_model=app_model)
  37. if not workflow:
  38. raise DraftWorkflowNotExist()
  39. # return workflow, if not found, return None (initiate graph by frontend)
  40. return workflow
  41. @setup_required
  42. @login_required
  43. @account_initialization_required
  44. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  45. def post(self, app_model: App):
  46. """
  47. Sync draft workflow
  48. """
  49. content_type = request.headers.get('Content-Type')
  50. if 'application/json' in content_type:
  51. parser = reqparse.RequestParser()
  52. parser.add_argument('graph', type=dict, required=True, nullable=False, location='json')
  53. parser.add_argument('features', type=dict, required=True, nullable=False, location='json')
  54. parser.add_argument('hash', type=str, required=False, location='json')
  55. args = parser.parse_args()
  56. elif 'text/plain' in content_type:
  57. try:
  58. data = json.loads(request.data.decode('utf-8'))
  59. if 'graph' not in data or 'features' not in data:
  60. raise ValueError('graph or features not found in data')
  61. if not isinstance(data.get('graph'), dict) or not isinstance(data.get('features'), dict):
  62. raise ValueError('graph or features is not a dict')
  63. args = {
  64. 'graph': data.get('graph'),
  65. 'features': data.get('features'),
  66. 'hash': data.get('hash')
  67. }
  68. except json.JSONDecodeError:
  69. return {'message': 'Invalid JSON data'}, 400
  70. else:
  71. abort(415)
  72. workflow_service = WorkflowService()
  73. try:
  74. workflow = workflow_service.sync_draft_workflow(
  75. app_model=app_model,
  76. graph=args.get('graph'),
  77. features=args.get('features'),
  78. unique_hash=args.get('hash'),
  79. account=current_user
  80. )
  81. except WorkflowHashNotEqualError:
  82. raise DraftWorkflowNotSync()
  83. return {
  84. "result": "success",
  85. "hash": workflow.unique_hash,
  86. "updated_at": TimestampField().format(workflow.updated_at or workflow.created_at)
  87. }
  88. class AdvancedChatDraftWorkflowRunApi(Resource):
  89. @setup_required
  90. @login_required
  91. @account_initialization_required
  92. @get_app_model(mode=[AppMode.ADVANCED_CHAT])
  93. def post(self, app_model: App):
  94. """
  95. Run draft workflow
  96. """
  97. parser = reqparse.RequestParser()
  98. parser.add_argument('inputs', type=dict, location='json')
  99. parser.add_argument('query', type=str, required=True, location='json', default='')
  100. parser.add_argument('files', type=list, location='json')
  101. parser.add_argument('conversation_id', type=uuid_value, location='json')
  102. args = parser.parse_args()
  103. try:
  104. response = AppGenerateService.generate(
  105. app_model=app_model,
  106. user=current_user,
  107. args=args,
  108. invoke_from=InvokeFrom.DEBUGGER,
  109. streaming=True
  110. )
  111. return helper.compact_generate_response(response)
  112. except services.errors.conversation.ConversationNotExistsError:
  113. raise NotFound("Conversation Not Exists.")
  114. except services.errors.conversation.ConversationCompletedError:
  115. raise ConversationCompletedError()
  116. except ValueError as e:
  117. raise e
  118. except Exception as e:
  119. logging.exception("internal server error.")
  120. raise InternalServerError()
  121. class AdvancedChatDraftRunIterationNodeApi(Resource):
  122. @setup_required
  123. @login_required
  124. @account_initialization_required
  125. @get_app_model(mode=[AppMode.ADVANCED_CHAT])
  126. def post(self, app_model: App, node_id: str):
  127. """
  128. Run draft workflow iteration node
  129. """
  130. parser = reqparse.RequestParser()
  131. parser.add_argument('inputs', type=dict, location='json')
  132. args = parser.parse_args()
  133. try:
  134. response = AppGenerateService.generate_single_iteration(
  135. app_model=app_model,
  136. user=current_user,
  137. node_id=node_id,
  138. args=args,
  139. streaming=True
  140. )
  141. return helper.compact_generate_response(response)
  142. except services.errors.conversation.ConversationNotExistsError:
  143. raise NotFound("Conversation Not Exists.")
  144. except services.errors.conversation.ConversationCompletedError:
  145. raise ConversationCompletedError()
  146. except ValueError as e:
  147. raise e
  148. except Exception as e:
  149. logging.exception("internal server error.")
  150. raise InternalServerError()
  151. class WorkflowDraftRunIterationNodeApi(Resource):
  152. @setup_required
  153. @login_required
  154. @account_initialization_required
  155. @get_app_model(mode=[AppMode.WORKFLOW])
  156. def post(self, app_model: App, node_id: str):
  157. """
  158. Run draft workflow iteration node
  159. """
  160. parser = reqparse.RequestParser()
  161. parser.add_argument('inputs', type=dict, location='json')
  162. args = parser.parse_args()
  163. try:
  164. response = AppGenerateService.generate_single_iteration(
  165. app_model=app_model,
  166. user=current_user,
  167. node_id=node_id,
  168. args=args,
  169. streaming=True
  170. )
  171. return helper.compact_generate_response(response)
  172. except services.errors.conversation.ConversationNotExistsError:
  173. raise NotFound("Conversation Not Exists.")
  174. except services.errors.conversation.ConversationCompletedError:
  175. raise ConversationCompletedError()
  176. except ValueError as e:
  177. raise e
  178. except Exception as e:
  179. logging.exception("internal server error.")
  180. raise InternalServerError()
  181. class DraftWorkflowRunApi(Resource):
  182. @setup_required
  183. @login_required
  184. @account_initialization_required
  185. @get_app_model(mode=[AppMode.WORKFLOW])
  186. def post(self, app_model: App):
  187. """
  188. Run draft workflow
  189. """
  190. parser = reqparse.RequestParser()
  191. parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
  192. parser.add_argument('files', type=list, required=False, location='json')
  193. args = parser.parse_args()
  194. try:
  195. response = AppGenerateService.generate(
  196. app_model=app_model,
  197. user=current_user,
  198. args=args,
  199. invoke_from=InvokeFrom.DEBUGGER,
  200. streaming=True
  201. )
  202. return helper.compact_generate_response(response)
  203. except ValueError as e:
  204. raise e
  205. except Exception as e:
  206. logging.exception("internal server error.")
  207. raise InternalServerError()
  208. class WorkflowTaskStopApi(Resource):
  209. @setup_required
  210. @login_required
  211. @account_initialization_required
  212. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  213. def post(self, app_model: App, task_id: str):
  214. """
  215. Stop workflow task
  216. """
  217. AppQueueManager.set_stop_flag(task_id, InvokeFrom.DEBUGGER, current_user.id)
  218. return {
  219. "result": "success"
  220. }
  221. class DraftWorkflowNodeRunApi(Resource):
  222. @setup_required
  223. @login_required
  224. @account_initialization_required
  225. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  226. @marshal_with(workflow_run_node_execution_fields)
  227. def post(self, app_model: App, node_id: str):
  228. """
  229. Run draft workflow node
  230. """
  231. parser = reqparse.RequestParser()
  232. parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
  233. args = parser.parse_args()
  234. workflow_service = WorkflowService()
  235. workflow_node_execution = workflow_service.run_draft_workflow_node(
  236. app_model=app_model,
  237. node_id=node_id,
  238. user_inputs=args.get('inputs'),
  239. account=current_user
  240. )
  241. return workflow_node_execution
  242. class PublishedWorkflowApi(Resource):
  243. @setup_required
  244. @login_required
  245. @account_initialization_required
  246. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  247. @marshal_with(workflow_fields)
  248. def get(self, app_model: App):
  249. """
  250. Get published workflow
  251. """
  252. # fetch published workflow by app_model
  253. workflow_service = WorkflowService()
  254. workflow = workflow_service.get_published_workflow(app_model=app_model)
  255. # return workflow, if not found, return None
  256. return workflow
  257. @setup_required
  258. @login_required
  259. @account_initialization_required
  260. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  261. def post(self, app_model: App):
  262. """
  263. Publish workflow
  264. """
  265. workflow_service = WorkflowService()
  266. workflow = workflow_service.publish_workflow(app_model=app_model, account=current_user)
  267. return {
  268. "result": "success",
  269. "created_at": TimestampField().format(workflow.created_at)
  270. }
  271. class DefaultBlockConfigsApi(Resource):
  272. @setup_required
  273. @login_required
  274. @account_initialization_required
  275. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  276. def get(self, app_model: App):
  277. """
  278. Get default block config
  279. """
  280. # Get default block configs
  281. workflow_service = WorkflowService()
  282. return workflow_service.get_default_block_configs()
  283. class DefaultBlockConfigApi(Resource):
  284. @setup_required
  285. @login_required
  286. @account_initialization_required
  287. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  288. def get(self, app_model: App, block_type: str):
  289. """
  290. Get default block config
  291. """
  292. parser = reqparse.RequestParser()
  293. parser.add_argument('q', type=str, location='args')
  294. args = parser.parse_args()
  295. filters = None
  296. if args.get('q'):
  297. try:
  298. filters = json.loads(args.get('q'))
  299. except json.JSONDecodeError:
  300. raise ValueError('Invalid filters')
  301. # Get default block configs
  302. workflow_service = WorkflowService()
  303. return workflow_service.get_default_block_config(
  304. node_type=block_type,
  305. filters=filters
  306. )
  307. class ConvertToWorkflowApi(Resource):
  308. @setup_required
  309. @login_required
  310. @account_initialization_required
  311. @get_app_model(mode=[AppMode.CHAT, AppMode.COMPLETION])
  312. def post(self, app_model: App):
  313. """
  314. Convert basic mode of chatbot app to workflow mode
  315. Convert expert mode of chatbot app to workflow mode
  316. Convert Completion App to Workflow App
  317. """
  318. if request.data:
  319. parser = reqparse.RequestParser()
  320. parser.add_argument('name', type=str, required=False, nullable=True, location='json')
  321. parser.add_argument('icon', type=str, required=False, nullable=True, location='json')
  322. parser.add_argument('icon_background', type=str, required=False, nullable=True, location='json')
  323. args = parser.parse_args()
  324. else:
  325. args = {}
  326. # convert to workflow mode
  327. workflow_service = WorkflowService()
  328. new_app_model = workflow_service.convert_to_workflow(
  329. app_model=app_model,
  330. account=current_user,
  331. args=args
  332. )
  333. # return app id
  334. return {
  335. 'new_app_id': new_app_model.id,
  336. }
  337. api.add_resource(DraftWorkflowApi, '/apps/<uuid:app_id>/workflows/draft')
  338. api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/run')
  339. api.add_resource(DraftWorkflowRunApi, '/apps/<uuid:app_id>/workflows/draft/run')
  340. api.add_resource(WorkflowTaskStopApi, '/apps/<uuid:app_id>/workflow-runs/tasks/<string:task_id>/stop')
  341. api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/run')
  342. api.add_resource(AdvancedChatDraftRunIterationNodeApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/iteration/nodes/<string:node_id>/run')
  343. api.add_resource(WorkflowDraftRunIterationNodeApi, '/apps/<uuid:app_id>/workflows/draft/iteration/nodes/<string:node_id>/run')
  344. api.add_resource(PublishedWorkflowApi, '/apps/<uuid:app_id>/workflows/publish')
  345. api.add_resource(DefaultBlockConfigsApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs')
  346. api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs'
  347. '/<string:block_type>')
  348. api.add_resource(ConvertToWorkflowApi, '/apps/<uuid:app_id>/convert-to-workflow')