use-workflow-run.ts 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. import { useCallback } from 'react'
  2. import {
  3. useReactFlow,
  4. useStoreApi,
  5. } from 'reactflow'
  6. import produce from 'immer'
  7. import { useWorkflowStore } from '../store'
  8. import { useNodesSyncDraft } from '../hooks'
  9. import {
  10. NodeRunningStatus,
  11. WorkflowRunningStatus,
  12. } from '../types'
  13. import { useWorkflowUpdate } from './use-workflow-interactions'
  14. import { useStore as useAppStore } from '@/app/components/app/store'
  15. import type { IOtherOptions } from '@/service/base'
  16. import { ssePost } from '@/service/base'
  17. import {
  18. fetchPublishedWorkflow,
  19. stopWorkflowRun,
  20. } from '@/service/workflow'
  21. import { useFeaturesStore } from '@/app/components/base/features/hooks'
  22. export const useWorkflowRun = () => {
  23. const store = useStoreApi()
  24. const workflowStore = useWorkflowStore()
  25. const reactflow = useReactFlow()
  26. const featuresStore = useFeaturesStore()
  27. const { doSyncWorkflowDraft } = useNodesSyncDraft()
  28. const { handleUpdateWorkflowCanvas } = useWorkflowUpdate()
  29. const handleBackupDraft = useCallback(() => {
  30. const {
  31. getNodes,
  32. edges,
  33. } = store.getState()
  34. const { getViewport } = reactflow
  35. const {
  36. backupDraft,
  37. setBackupDraft,
  38. } = workflowStore.getState()
  39. const { features } = featuresStore!.getState()
  40. if (!backupDraft) {
  41. setBackupDraft({
  42. nodes: getNodes(),
  43. edges,
  44. viewport: getViewport(),
  45. features,
  46. })
  47. doSyncWorkflowDraft()
  48. }
  49. }, [reactflow, workflowStore, store, featuresStore, doSyncWorkflowDraft])
  50. const handleLoadBackupDraft = useCallback(() => {
  51. const {
  52. backupDraft,
  53. setBackupDraft,
  54. } = workflowStore.getState()
  55. if (backupDraft) {
  56. const {
  57. nodes,
  58. edges,
  59. viewport,
  60. features,
  61. } = backupDraft
  62. handleUpdateWorkflowCanvas({
  63. nodes,
  64. edges,
  65. viewport,
  66. })
  67. featuresStore!.setState({ features })
  68. setBackupDraft(undefined)
  69. }
  70. }, [handleUpdateWorkflowCanvas, workflowStore, featuresStore])
  71. const handleRun = useCallback(async (
  72. params: any,
  73. callback?: IOtherOptions,
  74. ) => {
  75. const {
  76. getNodes,
  77. setNodes,
  78. } = store.getState()
  79. const newNodes = produce(getNodes(), (draft) => {
  80. draft.forEach((node) => {
  81. node.data.selected = false
  82. node.data._runningStatus = undefined
  83. })
  84. })
  85. setNodes(newNodes)
  86. await doSyncWorkflowDraft()
  87. const {
  88. onWorkflowStarted,
  89. onWorkflowFinished,
  90. onNodeStarted,
  91. onNodeFinished,
  92. onIterationStart,
  93. onIterationNext,
  94. onIterationFinish,
  95. onError,
  96. ...restCallback
  97. } = callback || {}
  98. workflowStore.setState({ historyWorkflowData: undefined })
  99. const appDetail = useAppStore.getState().appDetail
  100. const workflowContainer = document.getElementById('workflow-container')
  101. const {
  102. clientWidth,
  103. clientHeight,
  104. } = workflowContainer!
  105. let url = ''
  106. if (appDetail?.mode === 'advanced-chat')
  107. url = `/apps/${appDetail.id}/advanced-chat/workflows/draft/run`
  108. if (appDetail?.mode === 'workflow')
  109. url = `/apps/${appDetail.id}/workflows/draft/run`
  110. let prevNodeId = ''
  111. const {
  112. setWorkflowRunningData,
  113. } = workflowStore.getState()
  114. setWorkflowRunningData({
  115. result: {
  116. status: WorkflowRunningStatus.Running,
  117. },
  118. tracing: [],
  119. resultText: '',
  120. })
  121. let isInIteration = false
  122. let iterationLength = 0
  123. ssePost(
  124. url,
  125. {
  126. body: params,
  127. },
  128. {
  129. onWorkflowStarted: (params) => {
  130. const { task_id, data } = params
  131. const {
  132. workflowRunningData,
  133. setWorkflowRunningData,
  134. } = workflowStore.getState()
  135. const {
  136. edges,
  137. setEdges,
  138. } = store.getState()
  139. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  140. draft.task_id = task_id
  141. draft.result = {
  142. ...draft?.result,
  143. ...data,
  144. status: WorkflowRunningStatus.Running,
  145. }
  146. }))
  147. const newEdges = produce(edges, (draft) => {
  148. draft.forEach((edge) => {
  149. edge.data = {
  150. ...edge.data,
  151. _runned: false,
  152. }
  153. })
  154. })
  155. setEdges(newEdges)
  156. if (onWorkflowStarted)
  157. onWorkflowStarted(params)
  158. },
  159. onWorkflowFinished: (params) => {
  160. const { data } = params
  161. const {
  162. workflowRunningData,
  163. setWorkflowRunningData,
  164. } = workflowStore.getState()
  165. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  166. draft.result = {
  167. ...draft.result,
  168. ...data,
  169. } as any
  170. }))
  171. prevNodeId = ''
  172. if (onWorkflowFinished)
  173. onWorkflowFinished(params)
  174. },
  175. onError: (params) => {
  176. const {
  177. workflowRunningData,
  178. setWorkflowRunningData,
  179. } = workflowStore.getState()
  180. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  181. draft.result = {
  182. ...draft.result,
  183. status: WorkflowRunningStatus.Failed,
  184. }
  185. }))
  186. if (onError)
  187. onError(params)
  188. },
  189. onNodeStarted: (params) => {
  190. const { data } = params
  191. const {
  192. workflowRunningData,
  193. setWorkflowRunningData,
  194. } = workflowStore.getState()
  195. const {
  196. getNodes,
  197. setNodes,
  198. edges,
  199. setEdges,
  200. transform,
  201. } = store.getState()
  202. if (isInIteration) {
  203. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  204. const tracing = draft.tracing!
  205. const iterations = tracing[tracing.length - 1]
  206. const currIteration = iterations.details![iterations.details!.length - 1]
  207. currIteration.push({
  208. ...data,
  209. status: NodeRunningStatus.Running,
  210. } as any)
  211. }))
  212. }
  213. else {
  214. const nodes = getNodes()
  215. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  216. draft.tracing!.push({
  217. ...data,
  218. status: NodeRunningStatus.Running,
  219. } as any)
  220. }))
  221. const {
  222. setViewport,
  223. } = reactflow
  224. const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id)
  225. const currentNode = nodes[currentNodeIndex]
  226. const position = currentNode.position
  227. const zoom = transform[2]
  228. if (!currentNode.parentId) {
  229. setViewport({
  230. x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom,
  231. y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom,
  232. zoom: transform[2],
  233. })
  234. }
  235. const newNodes = produce(nodes, (draft) => {
  236. draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
  237. })
  238. setNodes(newNodes)
  239. const newEdges = produce(edges, (draft) => {
  240. const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId)
  241. if (edge)
  242. edge.data = { ...edge.data, _runned: true } as any
  243. })
  244. setEdges(newEdges)
  245. }
  246. if (onNodeStarted)
  247. onNodeStarted(params)
  248. },
  249. onNodeFinished: (params) => {
  250. const { data } = params
  251. const {
  252. workflowRunningData,
  253. setWorkflowRunningData,
  254. } = workflowStore.getState()
  255. const {
  256. getNodes,
  257. setNodes,
  258. } = store.getState()
  259. if (isInIteration) {
  260. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  261. const tracing = draft.tracing!
  262. const iterations = tracing[tracing.length - 1]
  263. const currIteration = iterations.details![iterations.details!.length - 1]
  264. const nodeInfo = currIteration[currIteration.length - 1]
  265. currIteration[currIteration.length - 1] = {
  266. ...nodeInfo,
  267. ...data,
  268. status: NodeRunningStatus.Succeeded,
  269. } as any
  270. }))
  271. }
  272. else {
  273. const nodes = getNodes()
  274. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  275. const currentIndex = draft.tracing!.findIndex(trace => trace.node_id === data.node_id)
  276. if (currentIndex > -1 && draft.tracing) {
  277. draft.tracing[currentIndex] = {
  278. ...(draft.tracing[currentIndex].extras
  279. ? { extras: draft.tracing[currentIndex].extras }
  280. : {}),
  281. ...data,
  282. } as any
  283. }
  284. }))
  285. const newNodes = produce(nodes, (draft) => {
  286. const currentNode = draft.find(node => node.id === data.node_id)!
  287. currentNode.data._runningStatus = data.status as any
  288. })
  289. setNodes(newNodes)
  290. prevNodeId = data.node_id
  291. }
  292. if (onNodeFinished)
  293. onNodeFinished(params)
  294. },
  295. onIterationStart: (params) => {
  296. const { data } = params
  297. const {
  298. workflowRunningData,
  299. setWorkflowRunningData,
  300. } = workflowStore.getState()
  301. const {
  302. getNodes,
  303. setNodes,
  304. edges,
  305. setEdges,
  306. transform,
  307. } = store.getState()
  308. const nodes = getNodes()
  309. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  310. draft.tracing!.push({
  311. ...data,
  312. status: NodeRunningStatus.Running,
  313. details: [],
  314. } as any)
  315. }))
  316. isInIteration = true
  317. iterationLength = data.metadata.iterator_length
  318. const {
  319. setViewport,
  320. } = reactflow
  321. const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id)
  322. const currentNode = nodes[currentNodeIndex]
  323. const position = currentNode.position
  324. const zoom = transform[2]
  325. if (!currentNode.parentId) {
  326. setViewport({
  327. x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom,
  328. y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom,
  329. zoom: transform[2],
  330. })
  331. }
  332. const newNodes = produce(nodes, (draft) => {
  333. draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
  334. draft[currentNodeIndex].data._iterationLength = data.metadata.iterator_length
  335. })
  336. setNodes(newNodes)
  337. const newEdges = produce(edges, (draft) => {
  338. const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId)
  339. if (edge)
  340. edge.data = { ...edge.data, _runned: true } as any
  341. })
  342. setEdges(newEdges)
  343. if (onIterationStart)
  344. onIterationStart(params)
  345. },
  346. onIterationNext: (params) => {
  347. const {
  348. workflowRunningData,
  349. setWorkflowRunningData,
  350. } = workflowStore.getState()
  351. const { data } = params
  352. const {
  353. getNodes,
  354. setNodes,
  355. } = store.getState()
  356. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  357. const iteration = draft.tracing![draft.tracing!.length - 1]
  358. if (iteration.details!.length >= iterationLength)
  359. return
  360. iteration.details!.push([])
  361. }))
  362. const nodes = getNodes()
  363. const newNodes = produce(nodes, (draft) => {
  364. const currentNode = draft.find(node => node.id === data.node_id)!
  365. currentNode.data._iterationIndex = data.index > 0 ? data.index : 1
  366. })
  367. setNodes(newNodes)
  368. if (onIterationNext)
  369. onIterationNext(params)
  370. },
  371. onIterationFinish: (params) => {
  372. const { data } = params
  373. const {
  374. workflowRunningData,
  375. setWorkflowRunningData,
  376. } = workflowStore.getState()
  377. const {
  378. getNodes,
  379. setNodes,
  380. } = store.getState()
  381. const nodes = getNodes()
  382. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  383. const tracing = draft.tracing!
  384. tracing[tracing.length - 1] = {
  385. ...tracing[tracing.length - 1],
  386. ...data,
  387. status: NodeRunningStatus.Succeeded,
  388. } as any
  389. }))
  390. isInIteration = false
  391. const newNodes = produce(nodes, (draft) => {
  392. const currentNode = draft.find(node => node.id === data.node_id)!
  393. currentNode.data._runningStatus = data.status
  394. })
  395. setNodes(newNodes)
  396. prevNodeId = data.node_id
  397. if (onIterationFinish)
  398. onIterationFinish(params)
  399. },
  400. onTextChunk: (params) => {
  401. const { data: { text } } = params
  402. const {
  403. workflowRunningData,
  404. setWorkflowRunningData,
  405. } = workflowStore.getState()
  406. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  407. draft.resultTabActive = true
  408. draft.resultText += text
  409. }))
  410. },
  411. onTextReplace: (params) => {
  412. const { data: { text } } = params
  413. const {
  414. workflowRunningData,
  415. setWorkflowRunningData,
  416. } = workflowStore.getState()
  417. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  418. draft.resultText = text
  419. }))
  420. },
  421. ...restCallback,
  422. },
  423. )
  424. }, [store, reactflow, workflowStore, doSyncWorkflowDraft])
  425. const handleStopRun = useCallback((taskId: string) => {
  426. const appId = useAppStore.getState().appDetail?.id
  427. stopWorkflowRun(`/apps/${appId}/workflow-runs/tasks/${taskId}/stop`)
  428. }, [])
  429. const handleRestoreFromPublishedWorkflow = useCallback(async () => {
  430. const appDetail = useAppStore.getState().appDetail
  431. const publishedWorkflow = await fetchPublishedWorkflow(`/apps/${appDetail?.id}/workflows/publish`)
  432. if (publishedWorkflow) {
  433. const nodes = publishedWorkflow.graph.nodes
  434. const edges = publishedWorkflow.graph.edges
  435. const viewport = publishedWorkflow.graph.viewport!
  436. handleUpdateWorkflowCanvas({
  437. nodes,
  438. edges,
  439. viewport,
  440. })
  441. featuresStore?.setState({ features: publishedWorkflow.features })
  442. workflowStore.getState().setPublishedAt(publishedWorkflow.created_at)
  443. }
  444. }, [featuresStore, handleUpdateWorkflowCanvas, workflowStore])
  445. return {
  446. handleBackupDraft,
  447. handleLoadBackupDraft,
  448. handleRun,
  449. handleStopRun,
  450. handleRestoreFromPublishedWorkflow,
  451. }
  452. }