use-workflow-run.ts 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784
  1. import { useCallback } from 'react'
  2. import {
  3. useReactFlow,
  4. useStoreApi,
  5. } from 'reactflow'
  6. import produce from 'immer'
  7. import { v4 as uuidV4 } from 'uuid'
  8. import { usePathname } from 'next/navigation'
  9. import { useWorkflowStore } from '../store'
  10. import { useNodesSyncDraft } from '../hooks'
  11. import {
  12. BlockEnum,
  13. NodeRunningStatus,
  14. WorkflowRunningStatus,
  15. } from '../types'
  16. import { DEFAULT_ITER_TIMES } from '../constants'
  17. import { useWorkflowUpdate } from './use-workflow-interactions'
  18. import { useStore as useAppStore } from '@/app/components/app/store'
  19. import type { IOtherOptions } from '@/service/base'
  20. import { ssePost } from '@/service/base'
  21. import {
  22. fetchPublishedWorkflow,
  23. stopWorkflowRun,
  24. } from '@/service/workflow'
  25. import { useFeaturesStore } from '@/app/components/base/features/hooks'
  26. import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player.manager'
  27. import {
  28. getFilesInLogs,
  29. } from '@/app/components/base/file-uploader/utils'
  30. import { ErrorHandleTypeEnum } from '@/app/components/workflow/nodes/_base/components/error-handle/types'
  31. import type { NodeTracing } from '@/types/workflow'
  32. export const useWorkflowRun = () => {
  33. const store = useStoreApi()
  34. const workflowStore = useWorkflowStore()
  35. const reactflow = useReactFlow()
  36. const featuresStore = useFeaturesStore()
  37. const { doSyncWorkflowDraft } = useNodesSyncDraft()
  38. const { handleUpdateWorkflowCanvas } = useWorkflowUpdate()
  39. const pathname = usePathname()
  40. const handleBackupDraft = useCallback(() => {
  41. const {
  42. getNodes,
  43. edges,
  44. } = store.getState()
  45. const { getViewport } = reactflow
  46. const {
  47. backupDraft,
  48. setBackupDraft,
  49. environmentVariables,
  50. } = workflowStore.getState()
  51. const { features } = featuresStore!.getState()
  52. if (!backupDraft) {
  53. setBackupDraft({
  54. nodes: getNodes(),
  55. edges,
  56. viewport: getViewport(),
  57. features,
  58. environmentVariables,
  59. })
  60. doSyncWorkflowDraft()
  61. }
  62. }, [reactflow, workflowStore, store, featuresStore, doSyncWorkflowDraft])
  63. const handleLoadBackupDraft = useCallback(() => {
  64. const {
  65. backupDraft,
  66. setBackupDraft,
  67. setEnvironmentVariables,
  68. } = workflowStore.getState()
  69. if (backupDraft) {
  70. const {
  71. nodes,
  72. edges,
  73. viewport,
  74. features,
  75. environmentVariables,
  76. } = backupDraft
  77. handleUpdateWorkflowCanvas({
  78. nodes,
  79. edges,
  80. viewport,
  81. })
  82. setEnvironmentVariables(environmentVariables)
  83. featuresStore!.setState({ features })
  84. setBackupDraft(undefined)
  85. }
  86. }, [handleUpdateWorkflowCanvas, workflowStore, featuresStore])
  87. const handleRun = useCallback(async (
  88. params: any,
  89. callback?: IOtherOptions,
  90. ) => {
  91. const {
  92. getNodes,
  93. setNodes,
  94. } = store.getState()
  95. const newNodes = produce(getNodes(), (draft) => {
  96. draft.forEach((node) => {
  97. node.data.selected = false
  98. node.data._runningStatus = undefined
  99. })
  100. })
  101. setNodes(newNodes)
  102. await doSyncWorkflowDraft()
  103. const {
  104. onWorkflowStarted,
  105. onWorkflowFinished,
  106. onNodeStarted,
  107. onNodeFinished,
  108. onIterationStart,
  109. onIterationNext,
  110. onIterationFinish,
  111. onNodeRetry,
  112. onError,
  113. ...restCallback
  114. } = callback || {}
  115. workflowStore.setState({ historyWorkflowData: undefined })
  116. const appDetail = useAppStore.getState().appDetail
  117. const workflowContainer = document.getElementById('workflow-container')
  118. const {
  119. clientWidth,
  120. clientHeight,
  121. } = workflowContainer!
  122. let url = ''
  123. if (appDetail?.mode === 'advanced-chat')
  124. url = `/apps/${appDetail.id}/advanced-chat/workflows/draft/run`
  125. if (appDetail?.mode === 'workflow')
  126. url = `/apps/${appDetail.id}/workflows/draft/run`
  127. let prevNodeId = ''
  128. const {
  129. setWorkflowRunningData,
  130. } = workflowStore.getState()
  131. setWorkflowRunningData({
  132. result: {
  133. status: WorkflowRunningStatus.Running,
  134. },
  135. tracing: [],
  136. resultText: '',
  137. })
  138. let ttsUrl = ''
  139. let ttsIsPublic = false
  140. if (params.token) {
  141. ttsUrl = '/text-to-audio'
  142. ttsIsPublic = true
  143. }
  144. else if (params.appId) {
  145. if (pathname.search('explore/installed') > -1)
  146. ttsUrl = `/installed-apps/${params.appId}/text-to-audio`
  147. else
  148. ttsUrl = `/apps/${params.appId}/text-to-audio`
  149. }
  150. const player = AudioPlayerManager.getInstance().getAudioPlayer(ttsUrl, ttsIsPublic, uuidV4(), 'none', 'none', (_: any): any => {})
  151. ssePost(
  152. url,
  153. {
  154. body: params,
  155. },
  156. {
  157. onWorkflowStarted: (params) => {
  158. const { task_id, data } = params
  159. const {
  160. workflowRunningData,
  161. setWorkflowRunningData,
  162. setIterParallelLogMap,
  163. } = workflowStore.getState()
  164. const {
  165. getNodes,
  166. setNodes,
  167. edges,
  168. setEdges,
  169. } = store.getState()
  170. setIterParallelLogMap(new Map())
  171. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  172. draft.task_id = task_id
  173. draft.result = {
  174. ...draft?.result,
  175. ...data,
  176. status: WorkflowRunningStatus.Running,
  177. }
  178. }))
  179. const nodes = getNodes()
  180. const newNodes = produce(nodes, (draft) => {
  181. draft.forEach((node) => {
  182. node.data._waitingRun = true
  183. node.data._runningBranchId = undefined
  184. })
  185. })
  186. setNodes(newNodes)
  187. const newEdges = produce(edges, (draft) => {
  188. draft.forEach((edge) => {
  189. edge.data = {
  190. ...edge.data,
  191. _sourceRunningStatus: undefined,
  192. _targetRunningStatus: undefined,
  193. _waitingRun: true,
  194. }
  195. })
  196. })
  197. setEdges(newEdges)
  198. if (onWorkflowStarted)
  199. onWorkflowStarted(params)
  200. },
  201. onWorkflowFinished: (params) => {
  202. const { data } = params
  203. const {
  204. workflowRunningData,
  205. setWorkflowRunningData,
  206. } = workflowStore.getState()
  207. const isStringOutput = data.outputs && Object.keys(data.outputs).length === 1 && typeof data.outputs[Object.keys(data.outputs)[0]] === 'string'
  208. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  209. draft.result = {
  210. ...draft.result,
  211. ...data,
  212. files: getFilesInLogs(data.outputs),
  213. } as any
  214. if (isStringOutput) {
  215. draft.resultTabActive = true
  216. draft.resultText = data.outputs[Object.keys(data.outputs)[0]]
  217. }
  218. }))
  219. prevNodeId = ''
  220. if (onWorkflowFinished)
  221. onWorkflowFinished(params)
  222. },
  223. onError: (params) => {
  224. const {
  225. workflowRunningData,
  226. setWorkflowRunningData,
  227. } = workflowStore.getState()
  228. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  229. draft.result = {
  230. ...draft.result,
  231. status: WorkflowRunningStatus.Failed,
  232. }
  233. }))
  234. if (onError)
  235. onError(params)
  236. },
  237. onNodeStarted: (params) => {
  238. const { data } = params
  239. const {
  240. workflowRunningData,
  241. setWorkflowRunningData,
  242. iterParallelLogMap,
  243. setIterParallelLogMap,
  244. } = workflowStore.getState()
  245. const {
  246. getNodes,
  247. setNodes,
  248. edges,
  249. setEdges,
  250. transform,
  251. } = store.getState()
  252. const nodes = getNodes()
  253. const node = nodes.find(node => node.id === data.node_id)
  254. if (node?.parentId) {
  255. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  256. const tracing = draft.tracing!
  257. const iterations = tracing.find(trace => trace.node_id === node?.parentId)
  258. const currIteration = iterations?.details![node.data.iteration_index] || iterations?.details![iterations.details!.length - 1]
  259. if (!data.parallel_run_id) {
  260. currIteration?.push({
  261. ...data,
  262. status: NodeRunningStatus.Running,
  263. } as any)
  264. }
  265. else {
  266. const nodeId = iterations?.node_id as string
  267. if (!iterParallelLogMap.has(nodeId as string))
  268. iterParallelLogMap.set(iterations?.node_id as string, new Map())
  269. const currentIterLogMap = iterParallelLogMap.get(nodeId)!
  270. if (!currentIterLogMap.has(data.parallel_run_id))
  271. currentIterLogMap.set(data.parallel_run_id, [{ ...data, status: NodeRunningStatus.Running } as any])
  272. else
  273. currentIterLogMap.get(data.parallel_run_id)!.push({ ...data, status: NodeRunningStatus.Running } as any)
  274. setIterParallelLogMap(iterParallelLogMap)
  275. if (iterations)
  276. iterations.details = Array.from(currentIterLogMap.values())
  277. }
  278. }))
  279. }
  280. else {
  281. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  282. draft.tracing!.push({
  283. ...data,
  284. status: NodeRunningStatus.Running,
  285. } as any)
  286. }))
  287. const {
  288. setViewport,
  289. } = reactflow
  290. const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id)
  291. const currentNode = nodes[currentNodeIndex]
  292. const position = currentNode.position
  293. const zoom = transform[2]
  294. if (!currentNode.parentId) {
  295. setViewport({
  296. x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom,
  297. y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom,
  298. zoom: transform[2],
  299. })
  300. }
  301. const newNodes = produce(nodes, (draft) => {
  302. draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
  303. draft[currentNodeIndex].data._waitingRun = false
  304. })
  305. setNodes(newNodes)
  306. const newEdges = produce(edges, (draft) => {
  307. const incomeEdges = draft.filter((edge) => {
  308. return edge.target === data.node_id
  309. })
  310. incomeEdges.forEach((edge) => {
  311. const incomeNode = nodes.find(node => node.id === edge.source)!
  312. if (
  313. (!incomeNode.data._runningBranchId && edge.sourceHandle === 'source')
  314. || (incomeNode.data._runningBranchId && edge.sourceHandle === incomeNode.data._runningBranchId)
  315. ) {
  316. edge.data = {
  317. ...edge.data,
  318. _sourceRunningStatus: incomeNode.data._runningStatus,
  319. _targetRunningStatus: NodeRunningStatus.Running,
  320. _waitingRun: false,
  321. }
  322. }
  323. })
  324. })
  325. setEdges(newEdges)
  326. }
  327. if (onNodeStarted)
  328. onNodeStarted(params)
  329. },
  330. onNodeFinished: (params) => {
  331. const { data } = params
  332. const {
  333. workflowRunningData,
  334. setWorkflowRunningData,
  335. iterParallelLogMap,
  336. setIterParallelLogMap,
  337. } = workflowStore.getState()
  338. const {
  339. getNodes,
  340. setNodes,
  341. edges,
  342. setEdges,
  343. } = store.getState()
  344. const nodes = getNodes()
  345. const nodeParentId = nodes.find(node => node.id === data.node_id)!.parentId
  346. if (nodeParentId) {
  347. if (!data.execution_metadata.parallel_mode_run_id) {
  348. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  349. const tracing = draft.tracing!
  350. const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node
  351. if (iterations && iterations.details) {
  352. const iterationIndex = data.execution_metadata?.iteration_index || 0
  353. if (!iterations.details[iterationIndex])
  354. iterations.details[iterationIndex] = []
  355. const currIteration = iterations.details[iterationIndex]
  356. const nodeIndex = currIteration.findIndex(node =>
  357. node.node_id === data.node_id && (
  358. node.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id || node.parallel_id === data.execution_metadata?.parallel_id),
  359. )
  360. if (nodeIndex !== -1) {
  361. currIteration[nodeIndex] = {
  362. ...currIteration[nodeIndex],
  363. ...(currIteration[nodeIndex].retryDetail
  364. ? { retryDetail: currIteration[nodeIndex].retryDetail }
  365. : {}),
  366. ...data,
  367. } as any
  368. }
  369. else {
  370. currIteration.push({
  371. ...data,
  372. } as any)
  373. }
  374. }
  375. }))
  376. }
  377. else {
  378. // open parallel mode
  379. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  380. const tracing = draft.tracing!
  381. const iterations = tracing.find(trace => trace.node_id === nodeParentId) // the iteration node
  382. if (iterations && iterations.details) {
  383. const iterRunID = data.execution_metadata?.parallel_mode_run_id
  384. const currIteration = iterParallelLogMap.get(iterations.node_id)?.get(iterRunID)
  385. const nodeIndex = currIteration?.findIndex(node =>
  386. node.node_id === data.node_id && (
  387. node?.parallel_run_id === data.execution_metadata?.parallel_mode_run_id),
  388. )
  389. if (currIteration) {
  390. if (nodeIndex !== undefined && nodeIndex !== -1) {
  391. currIteration[nodeIndex] = {
  392. ...currIteration[nodeIndex],
  393. ...data,
  394. } as any
  395. }
  396. else {
  397. currIteration.push({
  398. ...data,
  399. } as any)
  400. }
  401. }
  402. setIterParallelLogMap(iterParallelLogMap)
  403. const iterLogMap = iterParallelLogMap.get(iterations.node_id)
  404. if (iterLogMap)
  405. iterations.details = Array.from(iterLogMap.values())
  406. }
  407. }))
  408. }
  409. }
  410. else {
  411. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  412. const currentIndex = draft.tracing!.findIndex((trace) => {
  413. if (!trace.execution_metadata?.parallel_id)
  414. return trace.node_id === data.node_id
  415. return trace.node_id === data.node_id && trace.execution_metadata?.parallel_id === data.execution_metadata?.parallel_id
  416. })
  417. if (currentIndex > -1 && draft.tracing) {
  418. draft.tracing[currentIndex] = {
  419. ...data,
  420. ...(draft.tracing[currentIndex].extras
  421. ? { extras: draft.tracing[currentIndex].extras }
  422. : {}),
  423. ...(draft.tracing[currentIndex].retryDetail
  424. ? { retryDetail: draft.tracing[currentIndex].retryDetail }
  425. : {}),
  426. } as any
  427. }
  428. }))
  429. const newNodes = produce(nodes, (draft) => {
  430. const currentNode = draft.find(node => node.id === data.node_id)!
  431. currentNode.data._runningStatus = data.status as any
  432. if (data.status === NodeRunningStatus.Exception) {
  433. if (data.execution_metadata.error_strategy === ErrorHandleTypeEnum.failBranch)
  434. currentNode.data._runningBranchId = ErrorHandleTypeEnum.failBranch
  435. }
  436. else {
  437. if (data.node_type === BlockEnum.IfElse)
  438. currentNode.data._runningBranchId = data?.outputs?.selected_case_id
  439. if (data.node_type === BlockEnum.QuestionClassifier)
  440. currentNode.data._runningBranchId = data?.outputs?.class_id
  441. }
  442. })
  443. setNodes(newNodes)
  444. const newEdges = produce(edges, (draft) => {
  445. const incomeEdges = draft.filter((edge) => {
  446. return edge.target === data.node_id
  447. })
  448. incomeEdges.forEach((edge) => {
  449. edge.data = {
  450. ...edge.data,
  451. _targetRunningStatus: data.status as any,
  452. }
  453. })
  454. })
  455. setEdges(newEdges)
  456. prevNodeId = data.node_id
  457. }
  458. if (onNodeFinished)
  459. onNodeFinished(params)
  460. },
  461. onIterationStart: (params) => {
  462. const { data } = params
  463. const {
  464. workflowRunningData,
  465. setWorkflowRunningData,
  466. setIterTimes,
  467. } = workflowStore.getState()
  468. const {
  469. getNodes,
  470. setNodes,
  471. edges,
  472. setEdges,
  473. transform,
  474. } = store.getState()
  475. const nodes = getNodes()
  476. setIterTimes(DEFAULT_ITER_TIMES)
  477. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  478. draft.tracing!.push({
  479. ...data,
  480. status: NodeRunningStatus.Running,
  481. details: [],
  482. iterDurationMap: {},
  483. } as any)
  484. }))
  485. const {
  486. setViewport,
  487. } = reactflow
  488. const currentNodeIndex = nodes.findIndex(node => node.id === data.node_id)
  489. const currentNode = nodes[currentNodeIndex]
  490. const position = currentNode.position
  491. const zoom = transform[2]
  492. if (!currentNode.parentId) {
  493. setViewport({
  494. x: (clientWidth - 400 - currentNode.width! * zoom) / 2 - position.x * zoom,
  495. y: (clientHeight - currentNode.height! * zoom) / 2 - position.y * zoom,
  496. zoom: transform[2],
  497. })
  498. }
  499. const newNodes = produce(nodes, (draft) => {
  500. draft[currentNodeIndex].data._runningStatus = NodeRunningStatus.Running
  501. draft[currentNodeIndex].data._iterationLength = data.metadata.iterator_length
  502. draft[currentNodeIndex].data._waitingRun = false
  503. })
  504. setNodes(newNodes)
  505. const newEdges = produce(edges, (draft) => {
  506. const incomeEdges = draft.filter(edge => edge.target === data.node_id)
  507. incomeEdges.forEach((edge) => {
  508. edge.data = {
  509. ...edge.data,
  510. _sourceRunningStatus: nodes.find(node => node.id === edge.source)!.data._runningStatus,
  511. _targetRunningStatus: NodeRunningStatus.Running,
  512. _waitingRun: false,
  513. }
  514. })
  515. })
  516. setEdges(newEdges)
  517. if (onIterationStart)
  518. onIterationStart(params)
  519. },
  520. onIterationNext: (params) => {
  521. const {
  522. workflowRunningData,
  523. setWorkflowRunningData,
  524. iterTimes,
  525. setIterTimes,
  526. } = workflowStore.getState()
  527. const { data } = params
  528. const {
  529. getNodes,
  530. setNodes,
  531. } = store.getState()
  532. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  533. const iteration = draft.tracing!.find(trace => trace.node_id === data.node_id)
  534. if (iteration) {
  535. if (iteration.iterDurationMap && data.duration)
  536. iteration.iterDurationMap[data.parallel_mode_run_id ?? `${data.index - 1}`] = data.duration
  537. if (iteration.details!.length >= iteration.metadata.iterator_length!)
  538. return
  539. }
  540. if (!data.parallel_mode_run_id)
  541. iteration?.details!.push([])
  542. }))
  543. const nodes = getNodes()
  544. const newNodes = produce(nodes, (draft) => {
  545. const currentNode = draft.find(node => node.id === data.node_id)!
  546. currentNode.data._iterationIndex = iterTimes
  547. setIterTimes(iterTimes + 1)
  548. })
  549. setNodes(newNodes)
  550. if (onIterationNext)
  551. onIterationNext(params)
  552. },
  553. onIterationFinish: (params) => {
  554. const { data } = params
  555. const {
  556. workflowRunningData,
  557. setWorkflowRunningData,
  558. setIterTimes,
  559. } = workflowStore.getState()
  560. const {
  561. getNodes,
  562. setNodes,
  563. } = store.getState()
  564. const nodes = getNodes()
  565. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  566. const tracing = draft.tracing!
  567. const currIterationNode = tracing.find(trace => trace.node_id === data.node_id)
  568. if (currIterationNode) {
  569. Object.assign(currIterationNode, {
  570. ...data,
  571. status: NodeRunningStatus.Succeeded,
  572. })
  573. }
  574. }))
  575. setIterTimes(DEFAULT_ITER_TIMES)
  576. const newNodes = produce(nodes, (draft) => {
  577. const currentNode = draft.find(node => node.id === data.node_id)!
  578. currentNode.data._runningStatus = data.status
  579. })
  580. setNodes(newNodes)
  581. prevNodeId = data.node_id
  582. if (onIterationFinish)
  583. onIterationFinish(params)
  584. },
  585. onNodeRetry: (params) => {
  586. const { data } = params
  587. const {
  588. workflowRunningData,
  589. setWorkflowRunningData,
  590. iterParallelLogMap,
  591. setIterParallelLogMap,
  592. } = workflowStore.getState()
  593. const {
  594. getNodes,
  595. setNodes,
  596. } = store.getState()
  597. const nodes = getNodes()
  598. const currentNode = nodes.find(node => node.id === data.node_id)!
  599. const nodeParent = nodes.find(node => node.id === currentNode.parentId)
  600. if (nodeParent) {
  601. if (!data.execution_metadata.parallel_mode_run_id) {
  602. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  603. const tracing = draft.tracing!
  604. const iteration = tracing.find(trace => trace.node_id === nodeParent.id)
  605. if (iteration && iteration.details?.length) {
  606. const currentNodeRetry = iteration.details[nodeParent.data._iterationIndex - 1]?.find(item => item.node_id === data.node_id)
  607. if (currentNodeRetry) {
  608. if (currentNodeRetry?.retryDetail)
  609. currentNodeRetry?.retryDetail.push(data as NodeTracing)
  610. else
  611. currentNodeRetry.retryDetail = [data as NodeTracing]
  612. }
  613. }
  614. }))
  615. }
  616. else {
  617. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  618. const tracing = draft.tracing!
  619. const iteration = tracing.find(trace => trace.node_id === nodeParent.id)
  620. if (iteration && iteration.details?.length) {
  621. const iterRunID = data.execution_metadata?.parallel_mode_run_id
  622. const currIteration = iterParallelLogMap.get(iteration.node_id)?.get(iterRunID)
  623. const currentNodeRetry = currIteration?.find(item => item.node_id === data.node_id)
  624. if (currentNodeRetry) {
  625. if (currentNodeRetry?.retryDetail)
  626. currentNodeRetry?.retryDetail.push(data as NodeTracing)
  627. else
  628. currentNodeRetry.retryDetail = [data as NodeTracing]
  629. }
  630. setIterParallelLogMap(iterParallelLogMap)
  631. const iterLogMap = iterParallelLogMap.get(iteration.node_id)
  632. if (iterLogMap)
  633. iteration.details = Array.from(iterLogMap.values())
  634. }
  635. }))
  636. }
  637. }
  638. else {
  639. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  640. const tracing = draft.tracing!
  641. const currentRetryNodeIndex = tracing.findIndex(trace => trace.node_id === data.node_id)
  642. if (currentRetryNodeIndex > -1) {
  643. const currentRetryNode = tracing[currentRetryNodeIndex]
  644. if (currentRetryNode.retryDetail)
  645. draft.tracing![currentRetryNodeIndex].retryDetail!.push(data as NodeTracing)
  646. else
  647. draft.tracing![currentRetryNodeIndex].retryDetail = [data as NodeTracing]
  648. }
  649. }))
  650. }
  651. const newNodes = produce(nodes, (draft) => {
  652. const currentNode = draft.find(node => node.id === data.node_id)!
  653. currentNode.data._retryIndex = data.retry_index
  654. })
  655. setNodes(newNodes)
  656. if (onNodeRetry)
  657. onNodeRetry(params)
  658. },
  659. onParallelBranchStarted: (params) => {
  660. // console.log(params, 'parallel start')
  661. },
  662. onParallelBranchFinished: (params) => {
  663. // console.log(params, 'finished')
  664. },
  665. onTextChunk: (params) => {
  666. const { data: { text } } = params
  667. const {
  668. workflowRunningData,
  669. setWorkflowRunningData,
  670. } = workflowStore.getState()
  671. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  672. draft.resultTabActive = true
  673. draft.resultText += text
  674. }))
  675. },
  676. onTextReplace: (params) => {
  677. const { data: { text } } = params
  678. const {
  679. workflowRunningData,
  680. setWorkflowRunningData,
  681. } = workflowStore.getState()
  682. setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
  683. draft.resultText = text
  684. }))
  685. },
  686. onTTSChunk: (messageId: string, audio: string, audioType?: string) => {
  687. if (!audio || audio === '')
  688. return
  689. player.playAudioWithAudio(audio, true)
  690. AudioPlayerManager.getInstance().resetMsgId(messageId)
  691. },
  692. onTTSEnd: (messageId: string, audio: string, audioType?: string) => {
  693. player.playAudioWithAudio(audio, false)
  694. },
  695. ...restCallback,
  696. },
  697. )
  698. }, [store, reactflow, workflowStore, doSyncWorkflowDraft])
  699. const handleStopRun = useCallback((taskId: string) => {
  700. const appId = useAppStore.getState().appDetail?.id
  701. stopWorkflowRun(`/apps/${appId}/workflow-runs/tasks/${taskId}/stop`)
  702. }, [])
  703. const handleRestoreFromPublishedWorkflow = useCallback(async () => {
  704. const appDetail = useAppStore.getState().appDetail
  705. const publishedWorkflow = await fetchPublishedWorkflow(`/apps/${appDetail?.id}/workflows/publish`)
  706. if (publishedWorkflow) {
  707. const nodes = publishedWorkflow.graph.nodes
  708. const edges = publishedWorkflow.graph.edges
  709. const viewport = publishedWorkflow.graph.viewport!
  710. handleUpdateWorkflowCanvas({
  711. nodes,
  712. edges,
  713. viewport,
  714. })
  715. featuresStore?.setState({ features: publishedWorkflow.features })
  716. workflowStore.getState().setPublishedAt(publishedWorkflow.created_at)
  717. workflowStore.getState().setEnvironmentVariables(publishedWorkflow.environment_variables || [])
  718. }
  719. }, [featuresStore, handleUpdateWorkflowCanvas, workflowStore])
  720. return {
  721. handleBackupDraft,
  722. handleLoadBackupDraft,
  723. handleRun,
  724. handleStopRun,
  725. handleRestoreFromPublishedWorkflow,
  726. }
  727. }