Assembly.ts 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. import Emitter from 'component-emitter'
  2. import has from '@uppy/utils/lib/hasProperty'
  3. import NetworkError from '@uppy/utils/lib/NetworkError'
  4. import fetchWithNetworkError from '@uppy/utils/lib/fetchWithNetworkError'
  5. // eslint-disable-next-line @typescript-eslint/ban-ts-comment
  6. // @ts-ignore untyped
  7. import type {
  8. RateLimitedQueue,
  9. WrapPromiseFunctionType,
  10. } from '@uppy/utils/lib/RateLimitedQueue'
  11. import type { AssemblyResponse } from './index.js'
  12. const ASSEMBLY_UPLOADING = 'ASSEMBLY_UPLOADING'
  13. const ASSEMBLY_EXECUTING = 'ASSEMBLY_EXECUTING'
  14. const ASSEMBLY_COMPLETED = 'ASSEMBLY_COMPLETED'
  15. const statusOrder = [ASSEMBLY_UPLOADING, ASSEMBLY_EXECUTING, ASSEMBLY_COMPLETED]
  16. /**
  17. * Check that an assembly status is equal to or larger than some desired status.
  18. * It checks for things that are larger so that a comparison like this works,
  19. * when the old assembly status is UPLOADING but the new is FINISHED:
  20. *
  21. * !isStatus(oldStatus, ASSEMBLY_EXECUTING) && isStatus(newState, ASSEMBLY_EXECUTING)
  22. *
  23. * …so that we can emit the 'executing' event even if the execution step was so
  24. * fast that we missed it.
  25. */
  26. function isStatus(status: string, test: string) {
  27. return statusOrder.indexOf(status) >= statusOrder.indexOf(test)
  28. }
  29. class TransloaditAssembly extends Emitter {
  30. #rateLimitedQueue: RateLimitedQueue
  31. #fetchWithNetworkError: WrapPromiseFunctionType<typeof fetchWithNetworkError>
  32. #previousFetchStatusStillPending = false
  33. #sse: EventSource | null
  34. status: AssemblyResponse
  35. pollInterval: ReturnType<typeof setInterval> | null
  36. closed: boolean
  37. constructor(assembly: AssemblyResponse, rateLimitedQueue: RateLimitedQueue) {
  38. super()
  39. // The current assembly status.
  40. this.status = assembly
  41. // The interval timer for full status updates.
  42. this.pollInterval = null
  43. // Whether this assembly has been closed (finished or errored)
  44. this.closed = false
  45. this.#rateLimitedQueue = rateLimitedQueue
  46. this.#fetchWithNetworkError = rateLimitedQueue.wrapPromiseFunction(
  47. fetchWithNetworkError,
  48. )
  49. }
  50. connect(): void {
  51. this.#connectServerSentEvents()
  52. this.#beginPolling()
  53. }
  54. #onFinished() {
  55. this.emit('finished')
  56. this.close()
  57. }
  58. #connectServerSentEvents() {
  59. this.#sse = new EventSource(
  60. `${this.status.websocket_url}?assembly=${this.status.assembly_id}`,
  61. )
  62. this.#sse.addEventListener('open', () => {
  63. clearInterval(this.pollInterval!)
  64. this.pollInterval = null
  65. })
  66. /*
  67. * The event "message" is a special case, as it
  68. * will capture events without an event field
  69. * as well as events that have the specific type
  70. * other event type.
  71. */
  72. this.#sse.addEventListener('message', (e) => {
  73. if (e.data === 'assembly_finished') {
  74. this.#onFinished()
  75. }
  76. if (e.data === 'assembly_uploading_finished') {
  77. this.emit('executing')
  78. }
  79. if (e.data === 'assembly_upload_meta_data_extracted') {
  80. this.emit('metadata')
  81. this.#fetchStatus({ diff: false })
  82. }
  83. })
  84. this.#sse.addEventListener('assembly_upload_finished', (e) => {
  85. const file = JSON.parse(e.data)
  86. this.status.uploads.push(file)
  87. this.emit('upload', file)
  88. })
  89. this.#sse.addEventListener('assembly_result_finished', (e) => {
  90. const [stepName, result] = JSON.parse(e.data)
  91. ;(this.status.results[stepName] ??= []).push(result)
  92. this.emit('result', stepName, result)
  93. })
  94. this.#sse.addEventListener('assembly_execution_progress', (e) => {
  95. const details = JSON.parse(e.data)
  96. this.emit('execution-progress', details)
  97. })
  98. this.#sse.addEventListener('assembly_error', (e) => {
  99. try {
  100. this.#onError(JSON.parse(e.data))
  101. } catch {
  102. this.#onError(new Error(e.data))
  103. }
  104. // Refetch for updated status code
  105. this.#fetchStatus({ diff: false })
  106. })
  107. }
  108. #onError(assemblyOrError: AssemblyResponse | NetworkError | Error) {
  109. this.emit(
  110. 'error',
  111. Object.assign(new Error(assemblyOrError.message), assemblyOrError),
  112. )
  113. this.close()
  114. }
  115. /**
  116. * Begin polling for assembly status changes. This sends a request to the
  117. * assembly status endpoint every so often, if SSE connection failed.
  118. * If the SSE connection fails or takes a long time, we won't miss any
  119. * events.
  120. */
  121. #beginPolling() {
  122. this.pollInterval = setInterval(() => {
  123. this.#fetchStatus()
  124. }, 2000)
  125. }
  126. /**
  127. * Reload assembly status. Useful if SSE doesn't work.
  128. *
  129. * Pass `diff: false` to avoid emitting diff events, instead only emitting
  130. * 'status'.
  131. */
  132. async #fetchStatus({ diff = true } = {}) {
  133. if (
  134. this.closed ||
  135. this.#rateLimitedQueue.isPaused ||
  136. this.#previousFetchStatusStillPending
  137. )
  138. return
  139. try {
  140. this.#previousFetchStatusStillPending = true
  141. const response = await this.#fetchWithNetworkError(
  142. this.status.assembly_ssl_url,
  143. )
  144. this.#previousFetchStatusStillPending = false
  145. if (this.closed) return
  146. if (response.status === 429) {
  147. this.#rateLimitedQueue.rateLimit(2_000)
  148. return
  149. }
  150. if (!response.ok) {
  151. this.#onError(new NetworkError(response.statusText))
  152. return
  153. }
  154. const status = await response.json()
  155. // Avoid updating if we closed during this request's lifetime.
  156. if (this.closed) return
  157. this.emit('status', status)
  158. if (diff) {
  159. this.updateStatus(status)
  160. } else {
  161. this.status = status
  162. }
  163. } catch (err) {
  164. this.#onError(err)
  165. }
  166. }
  167. update(): Promise<void> {
  168. return this.#fetchStatus({ diff: true })
  169. }
  170. /**
  171. * Update this assembly's status with a full new object. Events will be
  172. * emitted for status changes, new files, and new results.
  173. */
  174. updateStatus(next: AssemblyResponse): void {
  175. this.#diffStatus(this.status, next)
  176. this.status = next
  177. }
  178. /**
  179. * Diff two assembly statuses, and emit the events necessary to go from `prev`
  180. * to `next`.
  181. */
  182. #diffStatus(prev: AssemblyResponse, next: AssemblyResponse) {
  183. const prevStatus = prev.ok
  184. const nextStatus = next.ok
  185. if (next.error && !prev.error) {
  186. return this.#onError(next)
  187. }
  188. // Desired emit order:
  189. // - executing
  190. // - (n × upload)
  191. // - metadata
  192. // - (m × result)
  193. // - finished
  194. // The below checks run in this order, that way even if we jump from
  195. // UPLOADING straight to FINISHED all the events are emitted as expected.
  196. const nowExecuting =
  197. isStatus(nextStatus, ASSEMBLY_EXECUTING) &&
  198. !isStatus(prevStatus, ASSEMBLY_EXECUTING)
  199. if (nowExecuting) {
  200. // Without SSE, this is our only way to tell if uploading finished.
  201. // Hence, we emit this just before the 'upload's and before the 'metadata'
  202. // event for the most intuitive ordering, corresponding to the _usual_
  203. // ordering (if not guaranteed) that you'd get on SSE.
  204. this.emit('executing')
  205. }
  206. // Only emit if the upload is new (not in prev.uploads).
  207. Object.keys(next.uploads)
  208. .filter((upload) => !has(prev.uploads, upload))
  209. .forEach((upload) => {
  210. // eslint-disable-next-line @typescript-eslint/ban-ts-comment
  211. // @ts-ignore either the types are wrong or the tests are wrong.
  212. // types think next.uploads is an array, but the tests pass an object.
  213. this.emit('upload', next.uploads[upload])
  214. })
  215. if (nowExecuting) {
  216. this.emit('metadata')
  217. }
  218. // Find new results.
  219. Object.keys(next.results).forEach((stepName) => {
  220. const nextResults = next.results[stepName]
  221. const prevResults = prev.results[stepName]
  222. nextResults
  223. .filter((n) => !prevResults || !prevResults.some((p) => p.id === n.id))
  224. .forEach((result) => {
  225. this.emit('result', stepName, result)
  226. })
  227. })
  228. if (
  229. isStatus(nextStatus, ASSEMBLY_COMPLETED) &&
  230. !isStatus(prevStatus, ASSEMBLY_COMPLETED)
  231. ) {
  232. this.emit('finished')
  233. }
  234. return undefined
  235. }
  236. /**
  237. * Stop updating this assembly.
  238. */
  239. close(): void {
  240. this.closed = true
  241. if (this.#sse) {
  242. this.#sse.close()
  243. this.#sse = null
  244. }
  245. clearInterval(this.pollInterval!)
  246. this.pollInterval = null
  247. }
  248. }
  249. export default TransloaditAssembly