use-workflow-run.ts 19 KB


  1. import { useCallback } from 'react'
  2. import {
  3. getIncomers,
  4. useReactFlow,
  5. useStoreApi,
  6. } from 'reactflow'
  7. import produce from 'immer'
  8. import { v4 as uuidV4 } from 'uuid'
  9. import { usePathname } from 'next/navigation'
  10. import { useWorkflowStore } from '../store'
  11. import { useNodesSyncDraft } from '../hooks'
  12. import type { Node } from '../types'
  13. import {
  14. NodeRunningStatus,
  15. WorkflowRunningStatus,
  16. } from '../types'
  17. import { useWorkflowUpdate } from './use-workflow-interactions'
  18. import { useStore as useAppStore } from '@/app/components/app/store'
  19. import type { IOtherOptions } from '@/service/base'
  20. import { ssePost } from '@/service/base'
  21. import {
  22. fetchPublishedWorkflow,
  23. stopWorkflowRun,
  24. } from '@/service/workflow'
  25. import { useFeaturesStore } from '@/app/components/base/features/hooks'
  26. import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player.manager'
  27. import {
  28. getProcessedFilesFromResponse,
  29. } from '@/app/components/base/file-uploader/utils'
  30. export const useWorkflowRun = () => {
  31. const store = useStoreApi()
  32. const workflowStore = useWorkflowStore()
  33. const reactflow = useReactFlow()
  34. const featuresStore = useFeaturesStore()
  35. const { doSyncWorkflowDraft } = useNodesSyncDraft()
  36. const { handleUpdateWorkflowCanvas } = useWorkflowUpdate()
  37. const pathname = usePathname()
  38. const handleBackupDraft = useCallback(() => {
  39. const {
  40. getNodes,
  41. edges,
  42. } = store.getState()
  43. const { getViewport } = reactflow
  44. const {
  45. backupDraft,
  46. setBackupDraft,
  47. environmentVariables,
  48. } = workflowStore.getState()
  49. const { features } = featuresStore!.getState()
  50. if (!backupDraft) {
  51. setBackupDraft({
  52. nodes: getNodes(),
  53. edges,
  54. viewport: getViewport(),
  55. features,
  56. environmentVariables,
  57. })
  58. doSyncWorkflowDraft()
  59. }
  60. }, [reactflow, workflowStore, store, featuresStore, doSyncWorkflowDraft])
  61. const handleLoadBackupDraft = useCallback(() => {
  62. const {
  63. backupDraft,
  64. setBackupDraft,
  65. setEnvironmentVariables,
  66. } = workflowStore.getState()
  67. if (backupDraft) {
  68. const {
  69. nodes,
  70. edges,
  71. viewport,
  72. features,
  73. environmentVariables,
  74. } = backupDraft
  75. handleUpdateWorkflowCanvas({
  76. nodes,
  77. edges,
  78. viewport,
  79. })
  80. setEnvironmentVariables(environmentVariables)
  81. featuresStore!.setState({ features })
  82. setBackupDraft(undefined)
  83. }
  84. }, [handleUpdateWorkflowCanvas, workflowStore, featuresStore])
  85. const handleRun = useCallback(async (
  86. params: any,
  87. callback?: IOtherOptions,
  88. ) => {
  89. const {
  90. getNodes,
  91. setNodes,
  92. } = store.getState()
  93. const newNodes = produce(getNodes(), (draft) => {
  94. draft.forEach((node) => {
  95. node.data.selected = false
  96. node.data._runningStatus = undefined
  97. })
  98. })
  99. setNodes(newNodes)
  100. await doSyncWorkflowDraft()
  101. const {
  102. onWorkflowStarted,
  103. onWorkflowFinished,
  104. onNodeStarted,
  105. onNodeFinished,
  106. onIterationStart,
  107. onIterationNext,
  108. onIterationFinish,
  109. onError,
  110. ...restCallback
  111. } = callback || {}
  112. workflowStore.setState({ historyWorkflowData: undefined })
  113. const appDetail = useAppStore.getState().appDetail
  114. const workflowContainer = document.getElementById('workflow-container')
  115. const {
  116. clientWidth,
  117. clientHeight,
  118. } = workflowContainer!
  119. let url = ''
  120. if (appDetail?.mode === 'advanced-chat')
  121. url = `/apps/${appDetail.id}/advanced-chat/workflows/draft/run`
  122. if (appDetail?.mode === 'workflow')
  123. url = `/apps/${appDetail.id}/workflows/draft/run`
  124. let prevNodeId = ''
  125. const {
  126. setWorkflowRunningData,
  127. } = workflowStore.getState()
  128. setWorkflowRunningData({
  129. result: {
  130. status: WorkflowRunningStatus.Running,
  131. },
  132. tracing: [],
  133. resultText: '',
  134. })
  135. let ttsUrl = ''
  136. let ttsIsPublic = false
  137. if (params.token) {
  138. ttsUrl = '/text-to-audio'
  139. ttsIsPublic = true
  140. }
  141. else if (params.appId) {
  142. if (pathname.search('explore/installed') > -1)
  143. ttsUrl = `/installed-apps/${params.appId}/text-to-audio`
  144. else
  145. ttsUrl = `/apps/${params.appId}/text-to-audio`
  146. }
  147. const player = AudioPlayerManager.getInstance().getAudioPlayer(ttsUrl, ttsIsPublic, uuidV4(), 'none', 'none', (_: any): any => {})
  148. ssePost(
  149. url,
  150. {
  151. body: params,
  152. },
  153. {
  154. onWorkflowStarted: (params) => {
  155. const { task_id, data } = params
  156. const {
  157. workflowRunningData,
  158. setWorkflowRunningData,
  159. } = workflowStore.getState()
  160. const {
  161. edges,
  162. setEdges,
  163. } = store.getState()
  164. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  165. draft.task_id = task_id
  166. draft.result = {
  167. ...draft?.result,
  168. ...data,
  169. status: WorkflowRunningStatus.Running,
  170. }
  171. }))
  172. const newEdges = produce(edges, (draft) => {
  173. draft.forEach((edge) => {
  174. edge.data = {
  175. ...edge.data,
  176. _run: false,
  177. }
  178. })
  179. })
  180. setEdges(newEdges)
  181. if (onWorkflowStarted)
  182. onWorkflowStarted(params)
  183. },
  184. onWorkflowFinished: (params) => {
  185. const { data } = params
  186. const {
  187. workflowRunningData,
  188. setWorkflowRunningData,
  189. } = workflowStore.getState()
  190. const isStringOutput = data.outputs && Object.keys(data.outputs).length === 1 && typeof data.outputs[Object.keys(data.outputs)[0]] === 'string'
  191. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  192. draft.result = {
  193. ...draft.result,
  194. ...data,
  195. files: getProcessedFilesFromResponse(data.files || []),
  196. } as any
  197. if (isStringOutput) {
  198. draft.resultTabActive = true
  199. draft.resultText = data.outputs[Object.keys(data.outputs)[0]]
  200. }
  201. }))
  202. prevNodeId = ''
  203. if (onWorkflowFinished)
  204. onWorkflowFinished(params)
  205. },
  206. onError: (params) => {
  207. const {
  208. workflowRunningData,
  209. setWorkflowRunningData,
  210. } = workflowStore.getState()
  211. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  212. draft.result = {
  213. ...draft.result,
  214. status: WorkflowRunningStatus.Failed,
  215. }
  216. }))
  217. if (onError)
  218. onError(params)
  219. },
  220. onNodeStarted: (params) => {
  221. const { data } = params
  222. const {
  223. workflowRunningData,
  224. setWorkflowRunningData,
  225. } = workflowStore.getState()
  226. const {
  227. getNodes,
  228. setNodes,
  229. edges,
  230. setEdges,
  231. transform,
  232. } = store.getState()
  233. const nodes = getNodes()
  234. const node = nodes.find(node => node.id === data.node_id)
  235. if (node?.parentId) {
  236. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  237. const tracing = draft.tracing!
  238. const iterations = tracing.find(trace => trace.node_id === node?.parentId)
  239. const currIteration = iterations?.details![node.data.iteration_index] || iterations?.details![iterations.details!.length - 1]
  240. currIteration?.push({
  241. ...data,
  242. status: NodeRunningStatus.Running,
  243. } as any)
  244. }))
  245. }
  246. else {
  247. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  248. draft.tracing!.push({
  249. ...data,
  250. status: NodeRunningStatus.Running,
  251. } as any)
  252. }))
  253. const {
  254. setViewport,
  255. } = reactflow
  256. const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id)
  257. const currentNode = nodes[currentNodeIndex]
  258. const position = currentNode.position
  259. const zoom = transform[2]
  260. if (!currentNode.parentId) {
  261. setViewport({
  262. x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom,
  263. y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom,
  264. zoom: transform[2],
  265. })
  266. }
  267. const newNodes = produce(nodes, (draft) => {
  268. draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
  269. })
  270. setNodes(newNodes)
  271. const incomeNodesId = getIncomers({ id: data.node_id } as Node, newNodes, edges).filter(node => node.data._runningStatus === NodeRunningStatus.Succeeded).map(node => node.id)
  272. const newEdges = produce(edges, (draft) => {
  273. draft.forEach((edge) => {
  274. if (edge.target === data.node_id && incomeNodesId.includes(edge.source))
  275. edge.data = { ...edge.data, _run: true } as any
  276. })
  277. })
  278. setEdges(newEdges)
  279. }
  280. if (onNodeStarted)
  281. onNodeStarted(params)
  282. },
  283. onNodeFinished: (params) => {
  284. const { data } = params
  285. const {
  286. workflowRunningData,
  287. setWorkflowRunningData,
  288. } = workflowStore.getState()
  289. const {
  290. getNodes,
  291. setNodes,
  292. } = store.getState()
  293. const nodes = getNodes()
  294. const nodeParentId = nodes.find(node => node.id === data.node_id)!.parentId
  295. if (nodeParentId) {
  296. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  297. const tracing = draft.tracing!
  298. const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node
  299. if (iterations && iterations.details) {
  300. const iterationIndex = data.execution_metadata?.iteration_index || 0
  301. if (!iterations.details[iterationIndex])
  302. iterations.details[iterationIndex] = []
  303. const currIteration = iterations.details[iterationIndex]
  304. const nodeIndex = currIteration.findIndex(node =>
  305. node.node_id === data.node_id && (
  306. node.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || node.parallel_id === data.execution_metadata?.parallel_id),
  307. )
  308. if (data.status === NodeRunningStatus.Succeeded) {
  309. if (nodeIndex !== -1) {
  310. currIteration[nodeIndex] = {
  311. ...currIteration[nodeIndex],
  312. ...data,
  313. } as any
  314. }
  315. else {
  316. currIteration.push({
  317. ...data,
  318. } as any)
  319. }
  320. }
  321. }
  322. }))
  323. }
  324. else {
  325. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  326. const currentIndex = draft.tracing!.findIndex((trace) => {
  327. if (!trace.execution_metadata?.parallel_id)
  328. return trace.node_id === data.node_id
  329. return trace.node_id === data.node_id && trace.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id
  330. })
  331. if (currentIndex > -1 && draft.tracing) {
  332. draft.tracing[currentIndex] = {
  333. ...(draft.tracing[currentIndex].extras
  334. ? { extras: draft.tracing[currentIndex].extras }
  335. : {}),
  336. ...data,
  337. } as any
  338. }
  339. }))
  340. const newNodes = produce(nodes, (draft) => {
  341. const currentNode = draft.find(node => node.id === data.node_id)!
  342. currentNode.data._runningStatus = data.status as any
  343. })
  344. setNodes(newNodes)
  345. prevNodeId = data.node_id
  346. }
  347. if (onNodeFinished)
  348. onNodeFinished(params)
  349. },
  350. onIterationStart: (params) => {
  351. const { data } = params
  352. const {
  353. workflowRunningData,
  354. setWorkflowRunningData,
  355. } = workflowStore.getState()
  356. const {
  357. getNodes,
  358. setNodes,
  359. edges,
  360. setEdges,
  361. transform,
  362. } = store.getState()
  363. const nodes = getNodes()
  364. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  365. draft.tracing!.push({
  366. ...data,
  367. status: NodeRunningStatus.Running,
  368. details: [],
  369. } as any)
  370. }))
  371. const {
  372. setViewport,
  373. } = reactflow
  374. const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id)
  375. const currentNode = nodes[currentNodeIndex]
  376. const position = currentNode.position
  377. const zoom = transform[2]
  378. if (!currentNode.parentId) {
  379. setViewport({
  380. x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom,
  381. y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom,
  382. zoom: transform[2],
  383. })
  384. }
  385. const newNodes = produce(nodes, (draft) => {
  386. draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
  387. draft[currentNodeIndex].data._iterationLength = data.metadata.iterator_length
  388. })
  389. setNodes(newNodes)
  390. const newEdges = produce(edges, (draft) => {
  391. const edge = draft.find(edge => edge.target === data.node_id && edge.source === prevNodeId)
  392. if (edge)
  393. edge.data = { ...edge.data, _run: true } as any
  394. })
  395. setEdges(newEdges)
  396. if (onIterationStart)
  397. onIterationStart(params)
  398. },
  399. onIterationNext: (params) => {
  400. const {
  401. workflowRunningData,
  402. setWorkflowRunningData,
  403. } = workflowStore.getState()
  404. const { data } = params
  405. const {
  406. getNodes,
  407. setNodes,
  408. } = store.getState()
  409. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  410. const iteration = draft.tracing!.find(trace => trace.node_id === data.node_id)
  411. if (iteration) {
  412. if (iteration.details!.length >= iteration.metadata.iterator_length!)
  413. return
  414. }
  415. iteration?.details!.push([])
  416. }))
  417. const nodes = getNodes()
  418. const newNodes = produce(nodes, (draft) => {
  419. const currentNode = draft.find(node => node.id === data.node_id)!
  420. currentNode.data._iterationIndex = data.index > 0 ? data.index : 1
  421. })
  422. setNodes(newNodes)
  423. if (onIterationNext)
  424. onIterationNext(params)
  425. },
  426. onIterationFinish: (params) => {
  427. const { data } = params
  428. const {
  429. workflowRunningData,
  430. setWorkflowRunningData,
  431. } = workflowStore.getState()
  432. const {
  433. getNodes,
  434. setNodes,
  435. } = store.getState()
  436. const nodes = getNodes()
  437. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  438. const tracing = draft.tracing!
  439. const currIterationNode = tracing.find(trace => trace.node_id === data.node_id)
  440. if (currIterationNode) {
  441. Object.assign(currIterationNode, {
  442. ...data,
  443. status: NodeRunningStatus.Succeeded,
  444. })
  445. }
  446. }))
  447. const newNodes = produce(nodes, (draft) => {
  448. const currentNode = draft.find(node => node.id === data.node_id)!
  449. currentNode.data._runningStatus = data.status
  450. })
  451. setNodes(newNodes)
  452. prevNodeId = data.node_id
  453. if (onIterationFinish)
  454. onIterationFinish(params)
  455. },
  456. onParallelBranchStarted: (params) => {
  457. // console.log(params, 'parallel start')
  458. },
  459. onParallelBranchFinished: (params) => {
  460. // console.log(params, 'finished')
  461. },
  462. onTextChunk: (params) => {
  463. const { data: { text } } = params
  464. const {
  465. workflowRunningData,
  466. setWorkflowRunningData,
  467. } = workflowStore.getState()
  468. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  469. draft.resultTabActive = true
  470. draft.resultText += text
  471. }))
  472. },
  473. onTextReplace: (params) => {
  474. const { data: { text } } = params
  475. const {
  476. workflowRunningData,
  477. setWorkflowRunningData,
  478. } = workflowStore.getState()
  479. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  480. draft.resultText = text
  481. }))
  482. },
  483. onTTSChunk: (messageId: string, audio: string, audioType?: string) => {
  484. if (!audio || audio === '')
  485. return
  486. player.playAudioWithAudio(audio, true)
  487. AudioPlayerManager.getInstance().resetMsgId(messageId)
  488. },
  489. onTTSEnd: (messageId: string, audio: string, audioType?: string) => {
  490. player.playAudioWithAudio(audio, false)
  491. },
  492. ...restCallback,
  493. },
  494. )
  495. }, [store, reactflow, workflowStore, doSyncWorkflowDraft])
  496. const handleStopRun = useCallback((taskId: string) => {
  497. const appId = useAppStore.getState().appDetail?.id
  498. stopWorkflowRun(`/apps/${appId}/workflow-runs/tasks/${taskId}/stop`)
  499. }, [])
  500. const handleRestoreFromPublishedWorkflow = useCallback(async () => {
  501. const appDetail = useAppStore.getState().appDetail
  502. const publishedWorkflow = await fetchPublishedWorkflow(`/apps/${appDetail?.id}/workflows/publish`)
  503. if (publishedWorkflow) {
  504. const nodes = publishedWorkflow.graph.nodes
  505. const edges = publishedWorkflow.graph.edges
  506. const viewport = publishedWorkflow.graph.viewport!
  507. handleUpdateWorkflowCanvas({
  508. nodes,
  509. edges,
  510. viewport,
  511. })
  512. featuresStore?.setState({ features: publishedWorkflow.features })
  513. workflowStore.getState().setPublishedAt(publishedWorkflow.created_at)
  514. workflowStore.getState().setEnvironmentVariables(publishedWorkflow.environment_variables || [])
  515. }
  516. }, [featuresStore, handleUpdateWorkflowCanvas, workflowStore])
  517. return {
  518. handleBackupDraft,
  519. handleLoadBackupDraft,
  520. handleRun,
  521. handleStopRun,
  522. handleRestoreFromPublishedWorkflow,
  523. }
  524. }