Browse Source

When file is removed (or all are canceled), controller.abort queued requests (#4504)

* When file is removed (or all are canceled), controller.abort queued requests

* add controller.abort to all other remote uploaders too

* fix the queue

---------

Co-authored-by: Antoine du Hamel <duhamelantoine1995@gmail.com>
Artur Paikin 1 year ago
parent
commit
b521a40ebb

+ 26 - 8
packages/@uppy/aws-s3-multipart/src/index.js

@@ -660,7 +660,7 @@ export default class AwsS3Multipart extends BasePlugin {
     })
   }
 
-  #requestSocketToken = async (file) => {
+  #requestSocketToken = async (file, options) => {
     const Client = file.remote.providerOptions.provider ? Provider : RequestClient
     const client = new Client(this.uppy, file.remote.providerOptions)
     const opts = { ...this.opts }
@@ -679,29 +679,33 @@ export default class AwsS3Multipart extends BasePlugin {
       protocol: 's3-multipart',
       size: file.data.size,
       metadata: file.meta,
-    })
+    }, options)
     return res.token
   }
 
   // NOTE! Keep this duplicated code in sync with other plugins
   // TODO we should probably abstract this into a common function
-  async #uploadRemote (file) {
+  async #uploadRemote (file, options) {
     this.resetUploaderReferences(file.id)
 
     try {
       if (file.serverToken) {
         return await this.connectToServerSocket(file)
       }
-      const serverToken = await this.#queueRequestSocketToken(file)
+      const serverToken = await this.#queueRequestSocketToken(file, options).abortOn(options?.signal)
 
       if (!this.uppy.getState().files[file.id]) return undefined
 
       this.uppy.setFileState(file.id, { serverToken })
       return await this.connectToServerSocket(this.uppy.getFile(file.id))
     } catch (err) {
-      this.uppy.setFileState(file.id, { serverToken: undefined })
-      this.uppy.emit('upload-error', file, err)
-      throw err
+      if (err?.cause?.name !== 'AbortError') {
+        this.uppy.setFileState(file.id, { serverToken: undefined })
+        this.uppy.emit('upload-error', file, err)
+        throw err
+      }
+      // The file upload was aborted, it’s not an error
+      return undefined
     }
   }
 
@@ -831,7 +835,21 @@ export default class AwsS3Multipart extends BasePlugin {
 
     const promises = filesFiltered.map((file) => {
       if (file.isRemote) {
-        return this.#uploadRemote(file)
+        const controller = new AbortController()
+
+        const removedHandler = (removedFile) => {
+          if (removedFile.id === file.id) controller.abort()
+        }
+
+        this.uppy.on('file-removed', removedHandler)
+
+        const uploadPromise = this.#uploadRemote(file, { signal: controller.signal })
+
+        this.requests.wrapSyncFunction(() => {
+          this.uppy.off('file-removed', removedHandler)
+        }, { priority: -1 })()
+
+        return uploadPromise
       }
       return this.#uploadFile(file)
     })

+ 26 - 8
packages/@uppy/aws-s3/src/MiniXHRUpload.js

@@ -91,10 +91,25 @@ export default class MiniXHRUpload {
 
   uploadFile (id, current, total) {
     const file = this.uppy.getFile(id)
+    this.uppy.log(`uploading ${current} of ${total}`)
+
     if (file.error) {
       throw new Error(file.error)
     } else if (file.isRemote) {
-      return this.#uploadRemoteFile(file, current, total)
+      const controller = new AbortController()
+
+      const removedHandler = (removedFile) => {
+        if (removedFile.id === file.id) controller.abort()
+      }
+      this.uppy.on('file-removed', removedHandler)
+
+      const uploadPromise = this.#uploadRemoteFile(file, { signal: controller.signal })
+
+      this.requests.wrapSyncFunction(() => {
+        this.uppy.off('file-removed', removedHandler)
+      }, { priority: -1 })()
+
+      return uploadPromise
     }
     return this.#uploadLocalFile(file, current, total)
   }
@@ -115,10 +130,9 @@ export default class MiniXHRUpload {
     })
   }
 
-  #uploadLocalFile (file, current, total) {
+  #uploadLocalFile (file) {
     const opts = this.#getOptions(file)
 
-    this.uppy.log(`uploading ${current} of ${total}`)
     return new Promise((resolve, reject) => {
       // This is done in index.js in the S3 plugin.
       // this.uppy.emit('upload-started', file)
@@ -281,22 +295,26 @@ export default class MiniXHRUpload {
 
   // NOTE! Keep this duplicated code in sync with other plugins
   // TODO we should probably abstract this into a common function
-  async #uploadRemoteFile (file) {
+  async #uploadRemoteFile (file, options) {
     // TODO: we could rewrite this to use server-sent events instead of creating WebSockets.
     try {
       if (file.serverToken) {
         return await this.connectToServerSocket(file)
       }
-      const serverToken = await this.#queueRequestSocketToken(file)
+      const serverToken = await this.#queueRequestSocketToken(file, options).abortOn(options?.signal)
 
       if (!this.uppy.getState().files[file.id]) return undefined
 
       this.uppy.setFileState(file.id, { serverToken })
       return await this.connectToServerSocket(this.uppy.getFile(file.id))
     } catch (err) {
-      this.uppy.setFileState(file.id, { serverToken: undefined })
-      this.uppy.emit('upload-error', file, err)
-      throw err
+      if (err?.cause?.name !== 'AbortError') {
+        this.uppy.setFileState(file.id, { serverToken: undefined })
+        this.uppy.emit('upload-error', file, err)
+        throw err
+      }
+      // The file upload was aborted, it’s not an error
+      return undefined
     }
   }
 

+ 25 - 8
packages/@uppy/tus/src/index.js

@@ -431,7 +431,7 @@ export default class Tus extends BasePlugin {
     })
   }
 
-  #requestSocketToken = async (file) => {
+  #requestSocketToken = async (file, options) => {
     const Client = file.remote.providerOptions.provider ? Provider : RequestClient
     const client = new Client(this.uppy, file.remote.providerOptions)
     const opts = { ...this.opts }
@@ -449,7 +449,7 @@ export default class Tus extends BasePlugin {
       size: file.data.size,
       headers: opts.headers,
       metadata: file.meta,
-    })
+    }, options)
     return res.token
   }
 
@@ -459,23 +459,27 @@ export default class Tus extends BasePlugin {
    * @param {UppyFile} file for use with upload
    * @returns {Promise<void>}
    */
-  async #uploadRemote (file) {
+  async #uploadRemote (file, options) {
     this.resetUploaderReferences(file.id)
 
     try {
       if (file.serverToken) {
         return await this.connectToServerSocket(file)
       }
-      const serverToken = await this.#queueRequestSocketToken(file)
+      const serverToken = await this.#queueRequestSocketToken(file, options).abortOn(options?.signal)
 
       if (!this.uppy.getState().files[file.id]) return undefined
 
       this.uppy.setFileState(file.id, { serverToken })
       return await this.connectToServerSocket(this.uppy.getFile(file.id))
     } catch (err) {
-      this.uppy.setFileState(file.id, { serverToken: undefined })
-      this.uppy.emit('upload-error', file, err)
-      throw err
+      if (err?.cause?.name !== 'AbortError') {
+        this.uppy.setFileState(file.id, { serverToken: undefined })
+        this.uppy.emit('upload-error', file, err)
+        throw err
+      }
+      // The file upload was aborted, it’s not an error
+      return undefined
     }
   }
 
@@ -732,7 +736,20 @@ export default class Tus extends BasePlugin {
       const total = files.length
 
       if (file.isRemote) {
-        return this.#uploadRemote(file, current, total)
+        const controller = new AbortController()
+
+        const removedHandler = (removedFile) => {
+          if (removedFile.id === file.id) controller.abort()
+        }
+        this.uppy.on('file-removed', removedHandler)
+
+        const uploadPromise = this.#uploadRemote(file, { signal: controller.signal })
+
+        this.requests.wrapSyncFunction(() => {
+          this.uppy.off('file-removed', removedHandler)
+        }, { priority: -1 })()
+
+        return uploadPromise
       }
       return this.#upload(file, current, total)
     }))

+ 18 - 1
packages/@uppy/utils/src/RateLimitedQueue.js

@@ -7,7 +7,7 @@ function abortOn (signal) {
     const abortPromise = () => this.abort(signal.reason)
     signal.addEventListener('abort', abortPromise, { once: true })
     const removeAbortListener = () => { signal.removeEventListener('abort', abortPromise) }
-    this.then(removeAbortListener, removeAbortListener)
+    this.then?.(removeAbortListener, removeAbortListener)
   }
 
   return this
@@ -128,6 +128,23 @@ export class RateLimitedQueue {
     return this.#queue(fn, queueOptions)
   }
 
+  wrapSyncFunction (fn, queueOptions) {
+    return (...args) => {
+      const queuedRequest = this.run(() => {
+        fn(...args)
+        queueMicrotask(() => queuedRequest.done())
+        return () => {}
+      }, queueOptions)
+
+      return {
+        abortOn,
+        abort () {
+          queuedRequest.abort()
+        },
+      }
+    }
+  }
+
   wrapPromiseFunction (fn, queueOptions) {
     return (...args) => {
       let queuedRequest

+ 25 - 8
packages/@uppy/xhr-upload/src/index.js

@@ -351,7 +351,7 @@ export default class XHRUpload extends BasePlugin {
     })
   }
 
-  #requestSocketToken = async (file) => {
+  #requestSocketToken = async (file, options) => {
     const opts = this.getOptions(file)
     const Client = file.remote.providerOptions.provider ? Provider : RequestClient
     const client = new Client(this.uppy, file.remote.providerOptions)
@@ -369,28 +369,32 @@ export default class XHRUpload extends BasePlugin {
       httpMethod: opts.method,
       useFormData: opts.formData,
       headers: opts.headers,
-    })
+    }, options)
     return res.token
   }
 
   // NOTE! Keep this duplicated code in sync with other plugins
   // TODO we should probably abstract this into a common function
-  async #uploadRemote (file) {
+  async #uploadRemote (file, options) {
     // TODO: we could rewrite this to use server-sent events instead of creating WebSockets.
     try {
       if (file.serverToken) {
         return await this.connectToServerSocket(file)
       }
-      const serverToken = await this.#queueRequestSocketToken(file)
+      const serverToken = await this.#queueRequestSocketToken(file, options).abortOn(options?.signal)
 
       if (!this.uppy.getState().files[file.id]) return undefined
 
       this.uppy.setFileState(file.id, { serverToken })
       return await this.connectToServerSocket(this.uppy.getFile(file.id))
     } catch (err) {
-      this.uppy.setFileState(file.id, { serverToken: undefined })
-      this.uppy.emit('upload-error', file, err)
-      throw err
+      if (err?.cause?.name !== 'AbortError') {
+        this.uppy.setFileState(file.id, { serverToken: undefined })
+        this.uppy.emit('upload-error', file, err)
+        throw err
+      }
+      // The file upload was aborted, it’s not an error
+      return undefined
     }
   }
 
@@ -592,7 +596,20 @@ export default class XHRUpload extends BasePlugin {
       const total = files.length
 
       if (file.isRemote) {
-        return this.#uploadRemote(file, current, total)
+        const controller = new AbortController()
+
+        const removedHandler = (removedFile) => {
+          if (removedFile.id === file.id) controller.abort()
+        }
+        this.uppy.on('file-removed', removedHandler)
+
+        const uploadPromise = this.#uploadRemote(file, { signal: controller.signal })
+
+        this.requests.wrapSyncFunction(() => {
+          this.uppy.off('file-removed', removedHandler)
+        }, { priority: -1 })()
+
+        return uploadPromise
       }
       return this.#upload(file, current, total)
     }))