Kaynağa Gözat

@uppy/aws-s3-multipart: empty the queue when pausing (#4203)

This is to avoid leaving aborted requests in the queue that would
be reported as errors instead of being ignored.
Antoine du Hamel 2 yıl önce
ebeveyn
işleme
0375a490b5

+ 0 - 6
packages/@uppy/aws-s3-multipart/src/MultipartUploader.js

@@ -140,12 +140,6 @@ class MultipartUploader {
   }
 
   pause () {
-    const onError = this.#onError
-    // We expect an AbortError to be thrown, which can be ignored.
-    this.#onError = (err) => (err?.name === 'AbortError' ? null : onError(err))
-    // Using setTimeout here to give time to the promises to reject.
-    setTimeout(() => { this.#onError = onError })
-
     this.#abortController.abort(pausingUploadReason)
     // Swap it out for a new controller, because this instance may be resumed later.
     this.#abortController = new AbortController()

+ 17 - 9
packages/@uppy/aws-s3-multipart/src/index.js

@@ -131,12 +131,20 @@ class HTTPCommunicationQueue {
       return cachedResult
     }
 
-    const promise = this.#createMultipartUpload(file, signal).then(async (result) => {
+    const promise = this.#createMultipartUpload(file, signal)
+
+    const abortPromise = () => {
+      promise.abort(signal.reason)
+      this.#cache.delete(file.data)
+    }
+    signal.addEventListener('abort', abortPromise, { once: true })
+    this.#cache.set(file.data, promise)
+    promise.then(async (result) => {
+      signal.removeEventListener('abort', abortPromise)
       this.#setS3MultipartState(file, result)
       this.#cache.set(file.data, result)
-      return result
-    })
-    this.#cache.set(file.data, promise)
+    }, () => { signal.removeEventListener('abort', abortPromise) })
+
     return promise
   }
 
@@ -155,14 +163,14 @@ class HTTPCommunicationQueue {
     throwIfAborted(signal)
     const parts = await Promise.all(chunks.map((chunk, i) => this.uploadChunk(file, i + 1, chunk, signal)))
     throwIfAborted(signal)
-    return this.#sendCompletionRequest(file, { key, uploadId, parts, signal })
+    return this.#sendCompletionRequest(file, { key, uploadId, parts, signal }).abortOn(signal)
   }
 
   async resumeUploadFile (file, chunks, signal) {
     throwIfAborted(signal)
     const { uploadId, key } = await this.getUploadId(file, signal)
     throwIfAborted(signal)
-    const alreadyUploadedParts = await this.#listParts(file, { uploadId, key, signal })
+    const alreadyUploadedParts = await this.#listParts(file, { uploadId, key, signal }).abortOn(signal)
     throwIfAborted(signal)
     const parts = await Promise.all(
       chunks
@@ -175,7 +183,7 @@ class HTTPCommunicationQueue {
         }),
     )
     throwIfAborted(signal)
-    return this.#sendCompletionRequest(file, { key, uploadId, parts, signal })
+    return this.#sendCompletionRequest(file, { key, uploadId, parts, signal }).abortOn(signal)
   }
 
   async uploadChunk (file, partNumber, body, signal) {
@@ -183,12 +191,12 @@ class HTTPCommunicationQueue {
     const { uploadId, key } = await this.getUploadId(file, signal)
     throwIfAborted(signal)
     for (;;) {
-      const signature = await this.#fetchSignature(file, { uploadId, key, partNumber, body, signal })
+      const signature = await this.#fetchSignature(file, { uploadId, key, partNumber, body, signal }).abortOn(signal)
       throwIfAborted(signal)
       try {
         return {
           PartNumber: partNumber,
-          ...await this.#uploadPartBytes(signature, body, signal),
+          ...await this.#uploadPartBytes(signature, body, signal).abortOn(signal),
         }
       } catch (err) {
         if (!await this.#shouldRetry(err)) throw err

+ 20 - 8
packages/@uppy/utils/src/RateLimitedQueue.js

@@ -1,5 +1,16 @@
-function createCancelError () {
-  return new Error('Cancelled')
+function createCancelError (cause) {
+  return new Error('Cancelled', { cause })
+}
+
+function abortOn (signal) {
+  if (signal != null) {
+    const abortPromise = () => this.abort(signal.reason)
+    signal.addEventListener('abort', abortPromise, { once: true })
+    const removeAbortListener = () => { signal.removeEventListener('abort', abortPromise) }
+    this.then(removeAbortListener, removeAbortListener)
+  }
+
+  return this
 }
 
 export class RateLimitedQueue {
@@ -39,11 +50,11 @@ export class RateLimitedQueue {
     }
 
     return {
-      abort: () => {
+      abort: (cause) => {
         if (done) return
         done = true
         this.#activeRequests -= 1
-        cancelActive()
+        cancelActive(cause)
         this.#queueNext()
       },
 
@@ -146,15 +157,16 @@ export class RateLimitedQueue {
             }
           })
 
-          return () => {
-            cancelError = createCancelError()
+          return (cause) => {
+            cancelError = createCancelError(cause)
           }
         }, queueOptions)
       })
 
-      outerPromise.abort = () => {
-        queuedRequest.abort()
+      outerPromise.abort = (cause) => {
+        queuedRequest.abort(cause)
       }
+      outerPromise.abortOn = abortOn
 
       return outerPromise
     }