123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- const Emitter = require('component-emitter')
- const has = require('@uppy/utils/lib/hasProperty')
- const NetworkError = require('@uppy/utils/lib/NetworkError')
- const fetchWithNetworkError = require('@uppy/utils/lib/fetchWithNetworkError')
- const parseUrl = require('./parseUrl')
- // Lazy load socket.io to avoid a console error
- // in IE 10 when the Transloadit plugin is not used.
- // (The console.error call comes from `buffer`. I
- // think we actually don't use that part of socket.io
- // at all…)
- let socketIo
- function requireSocketIo () {
- // eslint-disable-next-line global-require
- socketIo ??= require('socket.io-client')
- return socketIo
- }
- const ASSEMBLY_UPLOADING = 'ASSEMBLY_UPLOADING'
- const ASSEMBLY_EXECUTING = 'ASSEMBLY_EXECUTING'
- const ASSEMBLY_COMPLETED = 'ASSEMBLY_COMPLETED'
- const statusOrder = [
- ASSEMBLY_UPLOADING,
- ASSEMBLY_EXECUTING,
- ASSEMBLY_COMPLETED,
- ]
- /**
- * Check that an assembly status is equal to or larger than some desired status.
- * It checks for things that are larger so that a comparison like this works,
- * when the old assembly status is UPLOADING but the new is FINISHED:
- *
- * !isStatus(oldStatus, ASSEMBLY_EXECUTING) && isStatus(newState, ASSEMBLY_EXECUTING)
- *
- * …so that we can emit the 'executing' event even if the execution step was so
- * fast that we missed it.
- */
- function isStatus (status, test) {
- return statusOrder.indexOf(status) >= statusOrder.indexOf(test)
- }
- class TransloaditAssembly extends Emitter {
- constructor (assembly) {
- super()
- // The current assembly status.
- this.status = assembly
- // The socket.io connection.
- this.socket = null
- // The interval timer for full status updates.
- this.pollInterval = null
- // Whether this assembly has been closed (finished or errored)
- this.closed = false
- }
- connect () {
- this.#connectSocket()
- this.#beginPolling()
- }
- #onFinished () {
- this.emit('finished')
- this.close()
- }
- #connectSocket () {
- const parsed = parseUrl(this.status.websocket_url)
- const socket = requireSocketIo().connect(parsed.origin, {
- transports: ['websocket'],
- path: parsed.pathname,
- })
- socket.on('connect', () => {
- socket.emit('assembly_connect', {
- id: this.status.assembly_id,
- })
- this.emit('connect')
- })
- socket.on('connect_error', () => {
- socket.disconnect()
- this.socket = null
- })
- socket.on('assembly_finished', () => {
- this.#onFinished()
- })
- socket.on('assembly_upload_finished', (file) => {
- this.emit('upload', file)
- this.status.uploads.push(file)
- })
- socket.on('assembly_uploading_finished', () => {
- this.emit('executing')
- })
- socket.on('assembly_upload_meta_data_extracted', () => {
- this.emit('metadata')
- this.#fetchStatus({ diff: false })
- })
- socket.on('assembly_result_finished', (stepName, result) => {
- this.emit('result', stepName, result)
- if (!this.status.results[stepName]) {
- this.status.results[stepName] = []
- }
- this.status.results[stepName].push(result)
- })
- socket.on('assembly_error', (err) => {
- this.#onError(err)
- // Refetch for updated status code
- this.#fetchStatus({ diff: false })
- })
- this.socket = socket
- }
- #onError (err) {
- this.emit('error', Object.assign(new Error(err.message), err))
- this.close()
- }
- /**
- * Begin polling for assembly status changes. This sends a request to the
- * assembly status endpoint every so often, if the socket is not connected.
- * If the socket connection fails or takes a long time, we won't miss any
- * events.
- */
- #beginPolling () {
- this.pollInterval = setInterval(() => {
- if (!this.socket || !this.socket.connected) {
- this.#fetchStatus()
- }
- }, 2000)
- }
- /**
- * Reload assembly status. Useful if the socket doesn't work.
- *
- * Pass `diff: false` to avoid emitting diff events, instead only emitting
- * 'status'.
- */
- async #fetchStatus ({ diff = true } = {}) {
- if (this.closed) return
- try {
- const response = await fetchWithNetworkError(this.status.assembly_ssl_url)
- if (this.closed) return
- // In case of rate-limiting, ignore the error.
- if (response.status === 429) return
- if (!response.ok) {
- this.#onError(new NetworkError(response.statusText))
- return
- }
- const status = await response.json()
- // Avoid updating if we closed during this request's lifetime.
- if (this.closed) return
- this.emit('status', status)
- if (diff) {
- this.updateStatus(status)
- } else {
- this.status = status
- }
- } catch (err) {
- this.#onError(err)
- }
- }
- update () {
- return this.#fetchStatus({ diff: true })
- }
- /**
- * Update this assembly's status with a full new object. Events will be
- * emitted for status changes, new files, and new results.
- *
- * @param {object} next The new assembly status object.
- */
- updateStatus (next) {
- this.#diffStatus(this.status, next)
- this.status = next
- }
- /**
- * Diff two assembly statuses, and emit the events necessary to go from `prev`
- * to `next`.
- *
- * @param {object} prev The previous assembly status.
- * @param {object} next The new assembly status.
- */
- #diffStatus (prev, next) {
- const prevStatus = prev.ok
- const nextStatus = next.ok
- if (next.error && !prev.error) {
- return this.#onError(next)
- }
- // Desired emit order:
- // - executing
- // - (n × upload)
- // - metadata
- // - (m × result)
- // - finished
- // The below checks run in this order, that way even if we jump from
- // UPLOADING straight to FINISHED all the events are emitted as expected.
- const nowExecuting = isStatus(nextStatus, ASSEMBLY_EXECUTING)
- && !isStatus(prevStatus, ASSEMBLY_EXECUTING)
- if (nowExecuting) {
- // Without WebSockets, this is our only way to tell if uploading finished.
- // Hence, we emit this just before the 'upload's and before the 'metadata'
- // event for the most intuitive ordering, corresponding to the _usual_
- // ordering (if not guaranteed) that you'd get on the WebSocket.
- this.emit('executing')
- }
- // Find new uploaded files.
- Object.keys(next.uploads)
- .filter((upload) => !has(prev.uploads, upload))
- .forEach((upload) => {
- this.emit('upload', next.uploads[upload])
- })
- if (nowExecuting) {
- this.emit('metadata')
- }
- // Find new results.
- Object.keys(next.results).forEach((stepName) => {
- const nextResults = next.results[stepName]
- const prevResults = prev.results[stepName]
- nextResults
- .filter((n) => !prevResults || !prevResults.some((p) => p.id === n.id))
- .forEach((result) => {
- this.emit('result', stepName, result)
- })
- })
- if (isStatus(nextStatus, ASSEMBLY_COMPLETED)
- && !isStatus(prevStatus, ASSEMBLY_COMPLETED)) {
- this.emit('finished')
- }
- return undefined
- }
- /**
- * Stop updating this assembly.
- */
- close () {
- this.closed = true
- if (this.socket) {
- this.socket.disconnect()
- this.socket = null
- }
- clearInterval(this.pollInterval)
- }
- }
- module.exports = TransloaditAssembly
|