Assembly.js 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. const Emitter = require('component-emitter')
  2. const has = require('@uppy/utils/lib/hasProperty')
  3. const NetworkError = require('@uppy/utils/lib/NetworkError')
  4. const fetchWithNetworkError = require('@uppy/utils/lib/fetchWithNetworkError')
  5. const parseUrl = require('./parseUrl')
  6. // Lazy load socket.io to avoid a console error
  7. // in IE 10 when the Transloadit plugin is not used.
  8. // (The console.error call comes from `buffer`. I
  9. // think we actually don't use that part of socket.io
  10. // at all…)
  11. let socketIo
  12. function requireSocketIo () {
  13. // eslint-disable-next-line global-require
  14. socketIo ??= require('socket.io-client')
  15. return socketIo
  16. }
  17. const ASSEMBLY_UPLOADING = 'ASSEMBLY_UPLOADING'
  18. const ASSEMBLY_EXECUTING = 'ASSEMBLY_EXECUTING'
  19. const ASSEMBLY_COMPLETED = 'ASSEMBLY_COMPLETED'
  20. const statusOrder = [
  21. ASSEMBLY_UPLOADING,
  22. ASSEMBLY_EXECUTING,
  23. ASSEMBLY_COMPLETED,
  24. ]
  25. /**
  26. * Check that an assembly status is equal to or larger than some desired status.
  27. * It checks for things that are larger so that a comparison like this works,
  28. * when the old assembly status is UPLOADING but the new is FINISHED:
  29. *
  30. * !isStatus(oldStatus, ASSEMBLY_EXECUTING) && isStatus(newState, ASSEMBLY_EXECUTING)
  31. *
  32. * …so that we can emit the 'executing' event even if the execution step was so
  33. * fast that we missed it.
  34. */
  35. function isStatus (status, test) {
  36. return statusOrder.indexOf(status) >= statusOrder.indexOf(test)
  37. }
  38. class TransloaditAssembly extends Emitter {
  39. constructor (assembly) {
  40. super()
  41. // The current assembly status.
  42. this.status = assembly
  43. // The socket.io connection.
  44. this.socket = null
  45. // The interval timer for full status updates.
  46. this.pollInterval = null
  47. // Whether this assembly has been closed (finished or errored)
  48. this.closed = false
  49. }
  50. connect () {
  51. this.#connectSocket()
  52. this.#beginPolling()
  53. }
  54. #onFinished () {
  55. this.emit('finished')
  56. this.close()
  57. }
  58. #connectSocket () {
  59. const parsed = parseUrl(this.status.websocket_url)
  60. const socket = requireSocketIo().connect(parsed.origin, {
  61. transports: ['websocket'],
  62. path: parsed.pathname,
  63. })
  64. socket.on('connect', () => {
  65. socket.emit('assembly_connect', {
  66. id: this.status.assembly_id,
  67. })
  68. this.emit('connect')
  69. })
  70. socket.on('connect_error', () => {
  71. socket.disconnect()
  72. this.socket = null
  73. })
  74. socket.on('assembly_finished', () => {
  75. this.#onFinished()
  76. })
  77. socket.on('assembly_upload_finished', (file) => {
  78. this.emit('upload', file)
  79. this.status.uploads.push(file)
  80. })
  81. socket.on('assembly_uploading_finished', () => {
  82. this.emit('executing')
  83. })
  84. socket.on('assembly_upload_meta_data_extracted', () => {
  85. this.emit('metadata')
  86. this.#fetchStatus({ diff: false })
  87. })
  88. socket.on('assembly_result_finished', (stepName, result) => {
  89. this.emit('result', stepName, result)
  90. if (!this.status.results[stepName]) {
  91. this.status.results[stepName] = []
  92. }
  93. this.status.results[stepName].push(result)
  94. })
  95. socket.on('assembly_error', (err) => {
  96. this.#onError(err)
  97. // Refetch for updated status code
  98. this.#fetchStatus({ diff: false })
  99. })
  100. this.socket = socket
  101. }
  102. #onError (err) {
  103. this.emit('error', Object.assign(new Error(err.message), err))
  104. this.close()
  105. }
  106. /**
  107. * Begin polling for assembly status changes. This sends a request to the
  108. * assembly status endpoint every so often, if the socket is not connected.
  109. * If the socket connection fails or takes a long time, we won't miss any
  110. * events.
  111. */
  112. #beginPolling () {
  113. this.pollInterval = setInterval(() => {
  114. if (!this.socket || !this.socket.connected) {
  115. this.#fetchStatus()
  116. }
  117. }, 2000)
  118. }
  119. /**
  120. * Reload assembly status. Useful if the socket doesn't work.
  121. *
  122. * Pass `diff: false` to avoid emitting diff events, instead only emitting
  123. * 'status'.
  124. */
  125. async #fetchStatus ({ diff = true } = {}) {
  126. if (this.closed) return
  127. try {
  128. const response = await fetchWithNetworkError(this.status.assembly_ssl_url)
  129. if (this.closed) return
  130. // In case of rate-limiting, ignore the error.
  131. if (response.status === 429) return
  132. if (!response.ok) {
  133. this.#onError(new NetworkError(response.statusText))
  134. return
  135. }
  136. const status = await response.json()
  137. // Avoid updating if we closed during this request's lifetime.
  138. if (this.closed) return
  139. this.emit('status', status)
  140. if (diff) {
  141. this.updateStatus(status)
  142. } else {
  143. this.status = status
  144. }
  145. } catch (err) {
  146. this.#onError(err)
  147. }
  148. }
  149. update () {
  150. return this.#fetchStatus({ diff: true })
  151. }
  152. /**
  153. * Update this assembly's status with a full new object. Events will be
  154. * emitted for status changes, new files, and new results.
  155. *
  156. * @param {object} next The new assembly status object.
  157. */
  158. updateStatus (next) {
  159. this.#diffStatus(this.status, next)
  160. this.status = next
  161. }
  162. /**
  163. * Diff two assembly statuses, and emit the events necessary to go from `prev`
  164. * to `next`.
  165. *
  166. * @param {object} prev The previous assembly status.
  167. * @param {object} next The new assembly status.
  168. */
  169. #diffStatus (prev, next) {
  170. const prevStatus = prev.ok
  171. const nextStatus = next.ok
  172. if (next.error && !prev.error) {
  173. return this.#onError(next)
  174. }
  175. // Desired emit order:
  176. // - executing
  177. // - (n × upload)
  178. // - metadata
  179. // - (m × result)
  180. // - finished
  181. // The below checks run in this order, that way even if we jump from
  182. // UPLOADING straight to FINISHED all the events are emitted as expected.
  183. const nowExecuting = isStatus(nextStatus, ASSEMBLY_EXECUTING)
  184. && !isStatus(prevStatus, ASSEMBLY_EXECUTING)
  185. if (nowExecuting) {
  186. // Without WebSockets, this is our only way to tell if uploading finished.
  187. // Hence, we emit this just before the 'upload's and before the 'metadata'
  188. // event for the most intuitive ordering, corresponding to the _usual_
  189. // ordering (if not guaranteed) that you'd get on the WebSocket.
  190. this.emit('executing')
  191. }
  192. // Find new uploaded files.
  193. Object.keys(next.uploads)
  194. .filter((upload) => !has(prev.uploads, upload))
  195. .forEach((upload) => {
  196. this.emit('upload', next.uploads[upload])
  197. })
  198. if (nowExecuting) {
  199. this.emit('metadata')
  200. }
  201. // Find new results.
  202. Object.keys(next.results).forEach((stepName) => {
  203. const nextResults = next.results[stepName]
  204. const prevResults = prev.results[stepName]
  205. nextResults
  206. .filter((n) => !prevResults || !prevResults.some((p) => p.id === n.id))
  207. .forEach((result) => {
  208. this.emit('result', stepName, result)
  209. })
  210. })
  211. if (isStatus(nextStatus, ASSEMBLY_COMPLETED)
  212. && !isStatus(prevStatus, ASSEMBLY_COMPLETED)) {
  213. this.emit('finished')
  214. }
  215. return undefined
  216. }
  217. /**
  218. * Stop updating this assembly.
  219. */
  220. close () {
  221. this.closed = true
  222. if (this.socket) {
  223. this.socket.disconnect()
  224. this.socket = null
  225. }
  226. clearInterval(this.pollInterval)
  227. }
  228. }
  229. module.exports = TransloaditAssembly