|
@@ -2,16 +2,19 @@ import Emitter from 'component-emitter'
|
|
|
import has from '@uppy/utils/lib/hasProperty'
|
|
|
import NetworkError from '@uppy/utils/lib/NetworkError'
|
|
|
import fetchWithNetworkError from '@uppy/utils/lib/fetchWithNetworkError'
|
|
|
+// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
|
|
+// @ts-ignore untyped
|
|
|
+import type {
|
|
|
+ RateLimitedQueue,
|
|
|
+ WrapPromiseFunctionType,
|
|
|
+} from '@uppy/utils/lib/RateLimitedQueue'
|
|
|
+import type { AssemblyResponse } from '.'
|
|
|
|
|
|
const ASSEMBLY_UPLOADING = 'ASSEMBLY_UPLOADING'
|
|
|
const ASSEMBLY_EXECUTING = 'ASSEMBLY_EXECUTING'
|
|
|
const ASSEMBLY_COMPLETED = 'ASSEMBLY_COMPLETED'
|
|
|
|
|
|
-const statusOrder = [
|
|
|
- ASSEMBLY_UPLOADING,
|
|
|
- ASSEMBLY_EXECUTING,
|
|
|
- ASSEMBLY_COMPLETED,
|
|
|
-]
|
|
|
+const statusOrder = [ASSEMBLY_UPLOADING, ASSEMBLY_EXECUTING, ASSEMBLY_COMPLETED]
|
|
|
|
|
|
/**
|
|
|
* Check that an assembly status is equal to or larger than some desired status.
|
|
@@ -23,20 +26,26 @@ const statusOrder = [
|
|
|
* …so that we can emit the 'executing' event even if the execution step was so
|
|
|
* fast that we missed it.
|
|
|
*/
|
|
|
-function isStatus (status, test) {
|
|
|
+function isStatus(status: string, test: string) {
|
|
|
return statusOrder.indexOf(status) >= statusOrder.indexOf(test)
|
|
|
}
|
|
|
|
|
|
class TransloaditAssembly extends Emitter {
|
|
|
- #rateLimitedQueue
|
|
|
+ #rateLimitedQueue: RateLimitedQueue
|
|
|
|
|
|
- #fetchWithNetworkError
|
|
|
+ #fetchWithNetworkError: WrapPromiseFunctionType<typeof fetchWithNetworkError>
|
|
|
|
|
|
#previousFetchStatusStillPending = false
|
|
|
|
|
|
- #sse
|
|
|
+ #sse: EventSource | null
|
|
|
|
|
|
- constructor (assembly, rateLimitedQueue) {
|
|
|
+ status: AssemblyResponse
|
|
|
+
|
|
|
+ pollInterval: ReturnType<typeof setInterval> | null
|
|
|
+
|
|
|
+ closed: boolean
|
|
|
+
|
|
|
+ constructor(assembly: AssemblyResponse, rateLimitedQueue: RateLimitedQueue) {
|
|
|
super()
|
|
|
|
|
|
// The current assembly status.
|
|
@@ -47,29 +56,28 @@ class TransloaditAssembly extends Emitter {
|
|
|
this.closed = false
|
|
|
|
|
|
this.#rateLimitedQueue = rateLimitedQueue
|
|
|
- this.#fetchWithNetworkError = rateLimitedQueue.wrapPromiseFunction(fetchWithNetworkError)
|
|
|
+ this.#fetchWithNetworkError = rateLimitedQueue.wrapPromiseFunction(
|
|
|
+ fetchWithNetworkError,
|
|
|
+ )
|
|
|
}
|
|
|
|
|
|
- connect () {
|
|
|
+ connect(): void {
|
|
|
this.#connectServerSentEvents()
|
|
|
this.#beginPolling()
|
|
|
}
|
|
|
|
|
|
- #onFinished () {
|
|
|
+ #onFinished() {
|
|
|
this.emit('finished')
|
|
|
this.close()
|
|
|
}
|
|
|
|
|
|
- #connectServerSentEvents () {
|
|
|
- this.#sse = new EventSource(`${this.status.websocket_url}?assembly=${this.status.assembly_id}`)
|
|
|
+ #connectServerSentEvents() {
|
|
|
+ this.#sse = new EventSource(
|
|
|
+ `${this.status.websocket_url}?assembly=${this.status.assembly_id}`,
|
|
|
+ )
|
|
|
|
|
|
this.#sse.addEventListener('open', () => {
|
|
|
- // if server side events works, we don't need websockets anymore (it's just a fallback)
|
|
|
- if (this.socket) {
|
|
|
- this.socket.disconnect()
|
|
|
- this.socket = null
|
|
|
- }
|
|
|
- clearInterval(this.pollInterval)
|
|
|
+ clearInterval(this.pollInterval!)
|
|
|
this.pollInterval = null
|
|
|
})
|
|
|
|
|
@@ -115,15 +123,18 @@ class TransloaditAssembly extends Emitter {
|
|
|
try {
|
|
|
this.#onError(JSON.parse(e.data))
|
|
|
} catch {
|
|
|
- this.#onError({ msg: e.data })
|
|
|
+ this.#onError(new Error(e.data))
|
|
|
}
|
|
|
// Refetch for updated status code
|
|
|
this.#fetchStatus({ diff: false })
|
|
|
})
|
|
|
}
|
|
|
|
|
|
- #onError (status) {
|
|
|
- this.emit('error', Object.assign(new Error(status.msg), status))
|
|
|
+ #onError(assemblyOrError: AssemblyResponse | NetworkError | Error) {
|
|
|
+ this.emit(
|
|
|
+ 'error',
|
|
|
+ Object.assign(new Error(assemblyOrError.message), assemblyOrError),
|
|
|
+ )
|
|
|
this.close()
|
|
|
}
|
|
|
|
|
@@ -133,7 +144,7 @@ class TransloaditAssembly extends Emitter {
|
|
|
* If the SSE connection fails or takes a long time, we won't miss any
|
|
|
* events.
|
|
|
*/
|
|
|
- #beginPolling () {
|
|
|
+ #beginPolling() {
|
|
|
this.pollInterval = setInterval(() => {
|
|
|
this.#fetchStatus()
|
|
|
}, 2000)
|
|
@@ -145,12 +156,19 @@ class TransloaditAssembly extends Emitter {
|
|
|
* Pass `diff: false` to avoid emitting diff events, instead only emitting
|
|
|
* 'status'.
|
|
|
*/
|
|
|
- async #fetchStatus ({ diff = true } = {}) {
|
|
|
- if (this.closed || this.#rateLimitedQueue.isPaused || this.#previousFetchStatusStillPending) return
|
|
|
+ async #fetchStatus({ diff = true } = {}) {
|
|
|
+ if (
|
|
|
+ this.closed ||
|
|
|
+ this.#rateLimitedQueue.isPaused ||
|
|
|
+ this.#previousFetchStatusStillPending
|
|
|
+ )
|
|
|
+ return
|
|
|
|
|
|
try {
|
|
|
this.#previousFetchStatusStillPending = true
|
|
|
- const response = await this.#fetchWithNetworkError(this.status.assembly_ssl_url)
|
|
|
+ const response = await this.#fetchWithNetworkError(
|
|
|
+ this.status.assembly_ssl_url,
|
|
|
+ )
|
|
|
this.#previousFetchStatusStillPending = false
|
|
|
|
|
|
if (this.closed) return
|
|
@@ -166,6 +184,7 @@ class TransloaditAssembly extends Emitter {
|
|
|
}
|
|
|
|
|
|
const status = await response.json()
|
|
|
+
|
|
|
// Avoid updating if we closed during this request's lifetime.
|
|
|
if (this.closed) return
|
|
|
this.emit('status', status)
|
|
@@ -180,17 +199,15 @@ class TransloaditAssembly extends Emitter {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- update () {
|
|
|
+ update(): Promise<void> {
|
|
|
return this.#fetchStatus({ diff: true })
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Update this assembly's status with a full new object. Events will be
|
|
|
* emitted for status changes, new files, and new results.
|
|
|
- *
|
|
|
- * @param {object} next The new assembly status object.
|
|
|
*/
|
|
|
- updateStatus (next) {
|
|
|
+ updateStatus(next: AssemblyResponse): void {
|
|
|
this.#diffStatus(this.status, next)
|
|
|
this.status = next
|
|
|
}
|
|
@@ -198,11 +215,8 @@ class TransloaditAssembly extends Emitter {
|
|
|
/**
|
|
|
* Diff two assembly statuses, and emit the events necessary to go from `prev`
|
|
|
* to `next`.
|
|
|
- *
|
|
|
- * @param {object} prev The previous assembly status.
|
|
|
- * @param {object} next The new assembly status.
|
|
|
*/
|
|
|
- #diffStatus (prev, next) {
|
|
|
+ #diffStatus(prev: AssemblyResponse, next: AssemblyResponse) {
|
|
|
const prevStatus = prev.ok
|
|
|
const nextStatus = next.ok
|
|
|
|
|
@@ -219,8 +233,9 @@ class TransloaditAssembly extends Emitter {
|
|
|
// The below checks run in this order, that way even if we jump from
|
|
|
// UPLOADING straight to FINISHED all the events are emitted as expected.
|
|
|
|
|
|
- const nowExecuting = isStatus(nextStatus, ASSEMBLY_EXECUTING)
|
|
|
- && !isStatus(prevStatus, ASSEMBLY_EXECUTING)
|
|
|
+ const nowExecuting =
|
|
|
+ isStatus(nextStatus, ASSEMBLY_EXECUTING) &&
|
|
|
+ !isStatus(prevStatus, ASSEMBLY_EXECUTING)
|
|
|
if (nowExecuting) {
|
|
|
// Without SSE, this is our only way to tell if uploading finished.
|
|
|
// Hence, we emit this just before the 'upload's and before the 'metadata'
|
|
@@ -229,10 +244,13 @@ class TransloaditAssembly extends Emitter {
|
|
|
this.emit('executing')
|
|
|
}
|
|
|
|
|
|
- // Find new uploaded files.
|
|
|
+ // Only emit if the upload is new (not in prev.uploads).
|
|
|
Object.keys(next.uploads)
|
|
|
.filter((upload) => !has(prev.uploads, upload))
|
|
|
.forEach((upload) => {
|
|
|
+ // eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
|
|
+ // @ts-ignore either the types are wrong or the tests are wrong.
|
|
|
+ // types think next.uploads is an array, but the tests pass an object.
|
|
|
this.emit('upload', next.uploads[upload])
|
|
|
})
|
|
|
|
|
@@ -252,8 +270,10 @@ class TransloaditAssembly extends Emitter {
|
|
|
})
|
|
|
})
|
|
|
|
|
|
- if (isStatus(nextStatus, ASSEMBLY_COMPLETED)
|
|
|
- && !isStatus(prevStatus, ASSEMBLY_COMPLETED)) {
|
|
|
+ if (
|
|
|
+ isStatus(nextStatus, ASSEMBLY_COMPLETED) &&
|
|
|
+ !isStatus(prevStatus, ASSEMBLY_COMPLETED)
|
|
|
+ ) {
|
|
|
this.emit('finished')
|
|
|
}
|
|
|
|
|
@@ -263,13 +283,13 @@ class TransloaditAssembly extends Emitter {
|
|
|
/**
|
|
|
* Stop updating this assembly.
|
|
|
*/
|
|
|
- close () {
|
|
|
+ close(): void {
|
|
|
this.closed = true
|
|
|
if (this.#sse) {
|
|
|
this.#sse.close()
|
|
|
this.#sse = null
|
|
|
}
|
|
|
- clearInterval(this.pollInterval)
|
|
|
+ clearInterval(this.pollInterval!)
|
|
|
this.pollInterval = null
|
|
|
}
|
|
|
}
|