use-workflow-run.ts 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. import { useCallback } from 'react'
  2. import {
  3. useReactFlow,
  4. useStoreApi,
  5. } from 'reactflow'
  6. import produce from 'immer'
  7. import { useWorkflowStore } from '../store'
  8. import { useNodesSyncDraft } from '../hooks'
  9. import {
  10. NodeRunningStatus,
  11. WorkflowRunningStatus,
  12. } from '../types'
  13. import { useWorkflowInteractions } from './use-workflow-interactions'
  14. import { useStore as useAppStore } from '@/app/components/app/store'
  15. import type { IOtherOptions } from '@/service/base'
  16. import { ssePost } from '@/service/base'
  17. import {
  18. fetchPublishedWorkflow,
  19. stopWorkflowRun,
  20. } from '@/service/workflow'
  21. import { useFeaturesStore } from '@/app/components/base/features/hooks'
  22. export const useWorkflowRun = () => {
  23. const store = useStoreApi()
  24. const workflowStore = useWorkflowStore()
  25. const reactflow = useReactFlow()
  26. const featuresStore = useFeaturesStore()
  27. const { doSyncWorkflowDraft } = useNodesSyncDraft()
  28. const { handleUpdateWorkflowCanvas } = useWorkflowInteractions()
  29. const handleBackupDraft = useCallback(() => {
  30. const {
  31. getNodes,
  32. edges,
  33. } = store.getState()
  34. const { getViewport } = reactflow
  35. const {
  36. backupDraft,
  37. setBackupDraft,
  38. } = workflowStore.getState()
  39. const { features } = featuresStore!.getState()
  40. if (!backupDraft) {
  41. setBackupDraft({
  42. nodes: getNodes(),
  43. edges,
  44. viewport: getViewport(),
  45. features,
  46. })
  47. doSyncWorkflowDraft()
  48. }
  49. }, [reactflow, workflowStore, store, featuresStore, doSyncWorkflowDraft])
  50. const handleLoadBackupDraft = useCallback(() => {
  51. const {
  52. backupDraft,
  53. setBackupDraft,
  54. } = workflowStore.getState()
  55. if (backupDraft) {
  56. const {
  57. nodes,
  58. edges,
  59. viewport,
  60. features,
  61. } = backupDraft
  62. handleUpdateWorkflowCanvas({
  63. nodes,
  64. edges,
  65. viewport,
  66. })
  67. featuresStore!.setState({ features })
  68. setBackupDraft(undefined)
  69. }
  70. }, [handleUpdateWorkflowCanvas, workflowStore, featuresStore])
  71. const handleRun = useCallback(async (
  72. params: any,
  73. callback?: IOtherOptions,
  74. ) => {
  75. const {
  76. getNodes,
  77. setNodes,
  78. } = store.getState()
  79. const newNodes = produce(getNodes(), (draft) => {
  80. draft.forEach((node) => {
  81. node.data.selected = false
  82. })
  83. })
  84. setNodes(newNodes)
  85. await doSyncWorkflowDraft()
  86. const {
  87. onWorkflowStarted,
  88. onWorkflowFinished,
  89. onNodeStarted,
  90. onNodeFinished,
  91. onError,
  92. ...restCallback
  93. } = callback || {}
  94. workflowStore.setState({ historyWorkflowData: undefined })
  95. const appDetail = useAppStore.getState().appDetail
  96. const workflowContainer = document.getElementById('workflow-container')
  97. const {
  98. clientWidth,
  99. clientHeight,
  100. } = workflowContainer!
  101. let url = ''
  102. if (appDetail?.mode === 'advanced-chat')
  103. url = `/apps/${appDetail.id}/advanced-chat/workflows/draft/run`
  104. if (appDetail?.mode === 'workflow')
  105. url = `/apps/${appDetail.id}/workflows/draft/run`
  106. let prevNodeId = ''
  107. const {
  108. setWorkflowRunningData,
  109. } = workflowStore.getState()
  110. setWorkflowRunningData({
  111. result: {
  112. status: WorkflowRunningStatus.Running,
  113. },
  114. tracing: [],
  115. })
  116. ssePost(
  117. url,
  118. {
  119. body: params,
  120. },
  121. {
  122. onWorkflowStarted: (params) => {
  123. const { task_id, data } = params
  124. const {
  125. workflowRunningData,
  126. setWorkflowRunningData,
  127. } = workflowStore.getState()
  128. const {
  129. edges,
  130. setEdges,
  131. } = store.getState()
  132. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  133. draft.task_id = task_id
  134. draft.result = {
  135. ...draft?.result,
  136. ...data,
  137. status: WorkflowRunningStatus.Running,
  138. }
  139. }))
  140. const newEdges = produce(edges, (draft) => {
  141. draft.forEach((edge) => {
  142. edge.data = {
  143. ...edge.data,
  144. _runned: false,
  145. }
  146. })
  147. })
  148. setEdges(newEdges)
  149. if (onWorkflowStarted)
  150. onWorkflowStarted(params)
  151. },
  152. onWorkflowFinished: (params) => {
  153. const { data } = params
  154. const {
  155. workflowRunningData,
  156. setWorkflowRunningData,
  157. } = workflowStore.getState()
  158. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  159. draft.result = {
  160. ...draft.result,
  161. ...data,
  162. }
  163. }))
  164. prevNodeId = ''
  165. if (onWorkflowFinished)
  166. onWorkflowFinished(params)
  167. },
  168. onError: (params) => {
  169. const {
  170. workflowRunningData,
  171. setWorkflowRunningData,
  172. } = workflowStore.getState()
  173. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  174. draft.result = {
  175. ...draft.result,
  176. status: WorkflowRunningStatus.Failed,
  177. }
  178. }))
  179. if (onError)
  180. onError(params)
  181. },
  182. onNodeStarted: (params) => {
  183. const { data } = params
  184. const {
  185. workflowRunningData,
  186. setWorkflowRunningData,
  187. } = workflowStore.getState()
  188. const {
  189. getNodes,
  190. setNodes,
  191. edges,
  192. setEdges,
  193. transform,
  194. } = store.getState()
  195. const nodes = getNodes()
  196. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  197. draft.tracing!.push({
  198. ...data,
  199. status: NodeRunningStatus.Running,
  200. } as any)
  201. }))
  202. const {
  203. setViewport,
  204. } = reactflow
  205. const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id)
  206. const currentNode = nodes[currentNodeIndex]
  207. const position = currentNode.position
  208. const zoom = transform[2]
  209. setViewport({
  210. x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom,
  211. y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom,
  212. zoom: transform[2],
  213. })
  214. const newNodes = produce(nodes, (draft) => {
  215. draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
  216. })
  217. setNodes(newNodes)
  218. const newEdges = produce(edges, (draft) => {
  219. const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId)
  220. if (edge)
  221. edge.data = { ...edge.data, _runned: true } as any
  222. })
  223. setEdges(newEdges)
  224. if (onNodeStarted)
  225. onNodeStarted(params)
  226. },
  227. onNodeFinished: (params) => {
  228. const { data } = params
  229. const {
  230. workflowRunningData,
  231. setWorkflowRunningData,
  232. } = workflowStore.getState()
  233. const {
  234. getNodes,
  235. setNodes,
  236. } = store.getState()
  237. const nodes = getNodes()
  238. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  239. const currentIndex = draft.tracing!.findIndex(trace => trace.node_id === data.node_id)
  240. if (currentIndex > -1 && draft.tracing) {
  241. draft.tracing[currentIndex] = {
  242. ...(draft.tracing[currentIndex].extras
  243. ? { extras: draft.tracing[currentIndex].extras }
  244. : {}),
  245. ...data,
  246. } as any
  247. }
  248. }))
  249. const newNodes = produce(nodes, (draft) => {
  250. const currentNode = draft.find(node => node.id === data.node_id)!
  251. currentNode.data._runningStatus = data.status as any
  252. })
  253. setNodes(newNodes)
  254. prevNodeId = data.node_id
  255. if (onNodeFinished)
  256. onNodeFinished(params)
  257. },
  258. ...restCallback,
  259. },
  260. )
  261. }, [store, reactflow, workflowStore, doSyncWorkflowDraft])
  262. const handleStopRun = useCallback((taskId: string) => {
  263. const appId = useAppStore.getState().appDetail?.id
  264. stopWorkflowRun(`/apps/${appId}/workflow-runs/tasks/${taskId}/stop`)
  265. }, [])
  266. const handleRestoreFromPublishedWorkflow = useCallback(async () => {
  267. const appDetail = useAppStore.getState().appDetail
  268. const publishedWorkflow = await fetchPublishedWorkflow(`/apps/${appDetail?.id}/workflows/publish`)
  269. if (publishedWorkflow) {
  270. const nodes = publishedWorkflow.graph.nodes
  271. const edges = publishedWorkflow.graph.edges
  272. const viewport = publishedWorkflow.graph.viewport!
  273. handleUpdateWorkflowCanvas({
  274. nodes,
  275. edges,
  276. viewport,
  277. })
  278. featuresStore?.setState({ features: publishedWorkflow.features })
  279. workflowStore.getState().setPublishedAt(publishedWorkflow.created_at)
  280. }
  281. }, [featuresStore, handleUpdateWorkflowCanvas, workflowStore])
  282. return {
  283. handleBackupDraft,
  284. handleLoadBackupDraft,
  285. handleRun,
  286. handleStopRun,
  287. handleRestoreFromPublishedWorkflow,
  288. }
  289. }