Просмотр исходного кода

aws-s3-multipart: implement retries (#2312)

Renée Kooi 4 лет назад
Родитель
Сommit
5012eb4124

+ 1 - 0
CHANGELOG.md

@@ -105,6 +105,7 @@ PRs are welcome! Please do open an issue to discuss first if it's a big feature,
 - [ ] core: add maxTotalFileSize restriction #514 (@arturi)
 - [ ] companion: what happens if access token expires during/between an download & upload (@ife)
 - [ ] providers: Provider Browser don't handle uppy restrictions, can we hide things that don't match the restrictions in Google Drive and Instagram? #1827 (@arturi)
+- [x] aws-s3-multipart: retry uploading failed parts (#2312 / @goto-bus-stop)
 
 ## 1.16.1
 

+ 5 - 0
examples/dev/Dashboard.js

@@ -11,6 +11,7 @@ const ScreenCapture = require('@uppy/screen-capture/src')
 const GoldenRetriever = require('@uppy/golden-retriever/src')
 const Tus = require('@uppy/tus/src')
 const AwsS3 = require('@uppy/aws-s3/src')
+const AwsS3Multipart = require('@uppy/aws-s3-multipart/src')
 const XHRUpload = require('@uppy/xhr-upload/src')
 const Transloadit = require('@uppy/transloadit/src')
 const Form = require('@uppy/form/src')
@@ -19,6 +20,7 @@ const Form = require('@uppy/form/src')
 
 const UPLOADER = 'tus'
 // const UPLOADER = 's3'
+// const UPLOADER = 's3-multipart'
 // const UPLOADER = 'xhr'
 // const UPLOADER = 'transloadit'
 
@@ -76,6 +78,9 @@ module.exports = () => {
     case 's3':
       uppyDashboard.use(AwsS3, { companionUrl: COMPANION_URL, limit: 6 })
       break
+    case 's3-multipart':
+      uppyDashboard.use(AwsS3Multipart, { companionUrl: COMPANION_URL, limit: 6 })
+      break
     case 'xhr':
       uppyDashboard.use(XHRUpload, { endpoint: XHR_ENDPOINT, limit: 6, bundle: true })
       break

+ 6 - 0
package-lock.json

@@ -7786,6 +7786,7 @@
     "@uppy/utils": {
       "version": "file:packages/@uppy/utils",
       "requires": {
+        "abortcontroller-polyfill": "^1.4.0",
         "lodash.throttle": "^4.1.1"
       }
     },
@@ -8973,6 +8974,11 @@
         }
       }
     },
+    "abortcontroller-polyfill": {
+      "version": "1.4.0",
+      "resolved": "https://registry.npmjs.org/abortcontroller-polyfill/-/abortcontroller-polyfill-1.4.0.tgz",
+      "integrity": "sha512-3ZFfCRfDzx3GFjO6RAkYx81lPGpUS20ISxux9gLxuKnqafNcFQo59+IoZqpO2WvQlyc287B62HDnDdNYRmlvWA=="
+    },
     "absolute-path": {
       "version": "0.0.0",
       "resolved": "https://registry.npmjs.org/absolute-path/-/absolute-path-0.0.0.tgz",

+ 117 - 26
packages/@uppy/aws-s3-multipart/src/MultipartUploader.js

@@ -1,7 +1,11 @@
+const { AbortController, createAbortError } = require('@uppy/utils/lib/AbortController')
+const delay = require('@uppy/utils/lib/delay')
+
 const MB = 1024 * 1024
 
 const defaultOptions = {
   limit: 1,
+  retryDelays: [0, 1000, 3000, 5000],
   getChunkSize (file) {
     return Math.ceil(file.size / 10000)
   },
@@ -14,11 +18,6 @@ const defaultOptions = {
   }
 }
 
-function remove (arr, el) {
-  const i = arr.indexOf(el)
-  if (i !== -1) arr.splice(i, 1)
-}
-
 class MultipartUploader {
   constructor (file, options) {
     this.options = {
@@ -31,6 +30,7 @@ class MultipartUploader {
     }
 
     this.file = file
+    this.abortController = new AbortController()
 
     this.key = this.options.key || null
     this.uploadId = this.options.uploadId || null
@@ -45,15 +45,26 @@ class MultipartUploader {
     // aborting it immediately after it finishes.
     this.createdPromise = Promise.reject() // eslint-disable-line prefer-promise-reject-errors
     this.isPaused = false
+    this.partsInProgress = 0
     this.chunks = null
     this.chunkState = null
-    this.uploading = []
 
     this._initChunks()
 
     this.createdPromise.catch(() => {}) // silence uncaught rejection warning
   }
 
+  /**
+   * Was this upload aborted?
+   *
+   * If yes, we may need to throw an AbortError.
+   *
+   * @returns {boolean}
+   */
+  _aborted () {
+    return this.abortController.signal.aborted
+  }
+
   _initChunks () {
     const chunks = []
     const desiredChunkSize = this.options.getChunkSize(this.file)
@@ -79,6 +90,8 @@ class MultipartUploader {
       this.options.createMultipartUpload()
     )
     return this.createdPromise.then((result) => {
+      if (this._aborted()) throw createAbortError()
+
       const valid = typeof result === 'object' && result &&
         typeof result.uploadId === 'string' &&
         typeof result.key === 'string'
@@ -103,6 +116,8 @@ class MultipartUploader {
         key: this.key
       })
     ).then((parts) => {
+      if (this._aborted()) throw createAbortError()
+
       parts.forEach((part) => {
         const i = part.PartNumber - 1
         this.chunkState[i] = {
@@ -128,7 +143,7 @@ class MultipartUploader {
   _uploadParts () {
     if (this.isPaused) return
 
-    const need = this.options.limit - this.uploading.length
+    const need = this.options.limit - this.partsInProgress
     if (need === 0) return
 
     // All parts are uploaded.
@@ -149,7 +164,57 @@ class MultipartUploader {
     }
 
     candidates.forEach((index) => {
-      this._uploadPart(index)
+      this._uploadPartRetryable(index).catch((err) => {
+        this._onError(err)
+      })
+    })
+  }
+
+  _retryable ({ before, attempt, after }) {
+    const { retryDelays } = this.options
+    const { signal } = this.abortController
+
+    if (before) before()
+
+    function shouldRetry (err) {
+      if (err.source && typeof err.source.status === 'number') {
+        const { status } = err.source
+        // 0 probably indicates network failure
+        return status === 0 || status === 409 || status === 423 || (status >= 500 && status < 600)
+      }
+      return false
+    }
+
+    const doAttempt = (retryAttempt) =>
+      attempt().catch((err) => {
+        if (this._aborted()) throw createAbortError()
+
+        if (shouldRetry(err) && retryAttempt < retryDelays.length) {
+          return delay(retryDelays[retryAttempt], { signal })
+            .then(() => doAttempt(retryAttempt + 1))
+        } else {
+          throw err
+        }
+      })
+
+    return doAttempt(0).then((result) => {
+      if (after) after()
+      return result
+    }, (err) => {
+      if (after) after()
+      throw err
+    })
+  }
+
+  _uploadPartRetryable (index) {
+    return this._retryable({
+      before: () => {
+        this.partsInProgress += 1
+      },
+      attempt: () => this._uploadPart(index),
+      after: () => {
+        this.partsInProgress -= 1
+      }
     })
   }
 
@@ -170,11 +235,15 @@ class MultipartUploader {
       if (!valid) {
         throw new TypeError('AwsS3/Multipart: Got incorrect result from `prepareUploadPart()`, expected an object `{ url }`.')
       }
+
       return result
     }).then(({ url, headers }) => {
-      this._uploadPartBytes(index, url, headers)
-    }, (err) => {
-      this._onError(err)
+      if (this._aborted()) {
+        this.chunkState[index].busy = false
+        throw createAbortError()
+      }
+
+      return this._uploadPartBytes(index, url, headers)
     })
   }
 
@@ -202,6 +271,13 @@ class MultipartUploader {
 
   _uploadPartBytes (index, url, headers) {
     const body = this.chunks[index]
+    const { signal } = this.abortController
+
+    let defer
+    const promise = new Promise((resolve, reject) => {
+      defer = { resolve, reject }
+    })
+
     const xhr = new XMLHttpRequest()
     xhr.open('PUT', url, true)
     if (headers) {
@@ -211,7 +287,13 @@ class MultipartUploader {
     }
     xhr.responseType = 'text'
 
-    this.uploading.push(xhr)
+    function cleanup () {
+      signal.removeEventListener('abort', onabort)
+    }
+    function onabort () {
+      xhr.abort()
+    }
+    signal.addEventListener('abort', onabort)
 
     xhr.upload.addEventListener('progress', (ev) => {
       if (!ev.lengthComputable) return
@@ -220,16 +302,20 @@ class MultipartUploader {
     })
 
     xhr.addEventListener('abort', (ev) => {
-      remove(this.uploading, ev.target)
+      cleanup()
       this.chunkState[index].busy = false
+
+      defer.reject(createAbortError())
     })
 
     xhr.addEventListener('load', (ev) => {
-      remove(this.uploading, ev.target)
+      cleanup()
       this.chunkState[index].busy = false
 
       if (ev.target.status < 200 || ev.target.status >= 300) {
-        this._onError(new Error('Non 2xx'))
+        const error = new Error('Non 2xx')
+        error.source = ev.target
+        defer.reject(error)
         return
       }
 
@@ -238,23 +324,26 @@ class MultipartUploader {
       // NOTE This must be allowed by CORS.
       const etag = ev.target.getResponseHeader('ETag')
       if (etag === null) {
-        this._onError(new Error('AwsS3/Multipart: Could not read the ETag header. This likely means CORS is not configured correctly on the S3 Bucket. Seee https://uppy.io/docs/aws-s3-multipart#S3-Bucket-Configuration for instructions.'))
+        defer.reject(new Error('AwsS3/Multipart: Could not read the ETag header. This likely means CORS is not configured correctly on the S3 Bucket. Seee https://uppy.io/docs/aws-s3-multipart#S3-Bucket-Configuration for instructions.'))
         return
       }
 
       this._onPartComplete(index, etag)
+      defer.resolve()
     })
 
     xhr.addEventListener('error', (ev) => {
-      remove(this.uploading, ev.target)
+      cleanup()
       this.chunkState[index].busy = false
 
       const error = new Error('Unknown error')
       error.source = ev.target
-      this._onError(error)
+      defer.reject(error)
     })
 
     xhr.send(body)
+
+    return promise
   }
 
   _completeUpload () {
@@ -275,9 +364,8 @@ class MultipartUploader {
   }
 
   _abortUpload () {
-    this.uploading.slice().forEach(xhr => {
-      xhr.abort()
-    })
+    this.abortController.abort()
+
     this.createdPromise.then(() => {
       this.options.abortMultipartUpload({
         key: this.key,
@@ -286,10 +374,13 @@ class MultipartUploader {
     }, () => {
       // if the creation failed we do not need to abort
     })
-    this.uploading = []
   }
 
   _onError (err) {
+    if (err && err.name === 'AbortError') {
+      return
+    }
+
     this.options.onError(err)
   }
 
@@ -303,10 +394,10 @@ class MultipartUploader {
   }
 
   pause () {
-    const inProgress = this.uploading.slice()
-    inProgress.forEach((xhr) => {
-      xhr.abort()
-    })
+    this.abortController.abort()
+    // Swap it out for a new controller, because this instance may be resumed later.
+    this.abortController = new AbortController()
+
     this.isPaused = true
   }
 

+ 2 - 0
packages/@uppy/aws-s3-multipart/src/index.js

@@ -28,6 +28,7 @@ module.exports = class AwsS3Multipart extends Plugin {
     const defaultOptions = {
       timeout: 30 * 1000,
       limit: 0,
+      retryDelays: [0, 1000, 3000, 5000],
       createMultipartUpload: this.createMultipartUpload.bind(this),
       listParts: this.listParts.bind(this),
       prepareUploadPart: this.prepareUploadPart.bind(this),
@@ -198,6 +199,7 @@ module.exports = class AwsS3Multipart extends Plugin {
         onPartComplete,
 
         limit: this.opts.limit || 5,
+        retryDelays: this.opts.retryDelays || [],
         ...file.s3Multipart
       })
 

+ 1 - 0
packages/@uppy/aws-s3-multipart/types/index.d.ts

@@ -33,6 +33,7 @@ declare module AwsS3Multipart {
     ) => MaybePromise<{ location?: string }>
     timeout?: number
     limit?: number
+    retryDelays?: number[] | null
   }
 }
 

+ 1 - 0
packages/@uppy/utils/package.json

@@ -18,6 +18,7 @@
     "url": "git+https://github.com/transloadit/uppy.git"
   },
   "dependencies": {
+    "abortcontroller-polyfill": "^1.4.0",
     "lodash.throttle": "^4.1.1"
   }
 }

+ 20 - 0
packages/@uppy/utils/src/AbortController.js

@@ -0,0 +1,20 @@
+/**
+ * Little AbortController proxy module so we can swap out the implementation easily later.
+ */
+
+const { AbortController, AbortSignal } = require('abortcontroller-polyfill/dist/abortcontroller')
+
+function createAbortError (message = 'Aborted') {
+  try {
+    return new DOMException(message, 'AbortError')
+  } catch {
+    // For Internet Explorer
+    const error = new Error(message)
+    error.name = 'AbortError'
+    return error
+  }
+}
+
+exports.AbortController = AbortController
+exports.AbortSignal = AbortSignal
+exports.createAbortError = createAbortError

+ 51 - 0
packages/@uppy/utils/src/AbortController.test.js

@@ -0,0 +1,51 @@
+const { AbortController, AbortSignal } = require('./AbortController')
+
+function flushInstantTimeouts () {
+  return new Promise(resolve => setTimeout(resolve, 0))
+}
+
+describe('AbortController', () => {
+  it('has the expected shape', () => {
+    const controller = new AbortController()
+    expect(typeof controller.abort).toBe('function')
+    expect(controller.signal).toBeInstanceOf(AbortSignal)
+  })
+
+  it('emits "abort" when abort() is called', async () => {
+    const controller = new AbortController()
+    const callback = jest.fn()
+
+    controller.signal.addEventListener('abort', callback)
+    controller.abort()
+
+    await flushInstantTimeouts()
+
+    expect(callback).toHaveBeenCalled()
+    expect(callback.mock.calls[0][0]).toBeInstanceOf(Event)
+  })
+
+  it('add and remove events', async () => {
+    const controller = new AbortController()
+    const callback = jest.fn()
+    const callback2 = jest.fn()
+
+    controller.signal.addEventListener('abort', callback)
+    controller.signal.addEventListener('abort', callback2)
+    controller.signal.removeEventListener('abort', callback)
+    controller.abort()
+
+    await flushInstantTimeouts()
+
+    expect(callback2).toHaveBeenCalled()
+    expect(callback2.mock.calls[0][0]).toBeInstanceOf(Event)
+    expect(callback).not.toHaveBeenCalled()
+  })
+
+  it('sets `signal.aborted` property when abort() is called', () => {
+    const controller = new AbortController()
+
+    expect(controller.signal.aborted).toBe(false)
+    controller.abort()
+    expect(controller.signal.aborted).toBe(true)
+  })
+})

+ 36 - 0
packages/@uppy/utils/src/delay.js

@@ -0,0 +1,36 @@
+const { createAbortError } = require('./AbortController')
+
+/**
+ * Return a Promise that resolves after `ms` milliseconds.
+ *
+ * @param {number} ms - Number of milliseconds to wait.
+ * @param {{ signal?: AbortSignal }} [opts] - An abort signal that can be used to cancel the delay early.
+ * @returns {Promise<void>} A Promise that resolves after the given amount of `ms`.
+ */
+module.exports = function delay (ms, opts) {
+  return new Promise((resolve, reject) => {
+    if (opts && opts.signal && opts.signal.aborted) {
+      return reject(createAbortError())
+    }
+
+    function onabort () {
+      clearTimeout(timeout)
+      cleanup()
+      reject(createAbortError())
+    }
+
+    const timeout = setTimeout(() => {
+      cleanup()
+      resolve()
+    }, ms)
+
+    if (opts && opts.signal) {
+      opts.signal.addEventListener('abort', onabort)
+    }
+    function cleanup () {
+      if (opts && opts.signal) {
+        opts.signal.removeEventListener('abort', onabort)
+      }
+    }
+  })
+}

+ 33 - 0
packages/@uppy/utils/src/delay.test.js

@@ -0,0 +1,33 @@
+const delay = require('./delay')
+const { AbortController } = require('./AbortController')
+
+describe('delay', () => {
+  it('should wait for the specified time', async () => {
+    const start = Date.now()
+    await delay(100)
+    expect(Date.now() - start).toBeGreaterThanOrEqual(100)
+  })
+
+  it('should reject if signal is already aborted', async () => {
+    const signal = { aborted: true }
+    const start = Date.now()
+    await expect(delay(100, { signal })).rejects.toHaveProperty('name', 'AbortError')
+    // should really be instant but using a very large range in case CI decides to be super busy and block the event loop for a while
+    expect(Date.now() - start).toBeLessThan(50)
+  })
+
+  it('should reject when signal is aborted', async () => {
+    const controller = new AbortController()
+    const start = Date.now()
+    const testDelay = delay(100, { signal: controller.signal })
+    await Promise.all([
+      delay(50).then(() => controller.abort()),
+      expect(testDelay).rejects.toHaveProperty('name', 'AbortError')
+    ])
+
+    // should have rejected before the timer is done
+    const time = Date.now() - start
+    expect(time).toBeGreaterThanOrEqual(50)
+    expect(time).toBeLessThan(100)
+  })
+})

+ 15 - 9
website/src/docs/aws-s3-multipart.md

@@ -38,23 +38,29 @@ const AwsS3Multipart = Uppy.AwsS3Multipart
 
 The `@uppy/aws-s3-multipart` plugin has the following configurable options:
 
-### limit: 0
+### `limit: 0`
 
 The maximum amount of chunks to upload simultaneously. Set to `0` to disable limiting.
 
-### companionUrl: null
+### `retryDelays: [0, 1000, 3000, 5000]`
+
+When uploading a chunk fails, automatically try again after the millisecond intervals specified in this array. By default, we first retry instantly; if that fails, we retry after 1 second; if that fails, we retry after 3 seconds, etc.
+
+Set to `null` to disable automatic retries, and fail instantly if any chunk fails to upload.
+
+### `companionUrl: null`
 
 URL of the [Companion](/docs/companion) instance to use for proxying calls to the S3 Multipart API.
 
 This will be used by the default implementations of the upload-related functions below. If you provide your own implementations, a `companionUrl` is unnecessary.
 
-### companionHeaders: {}
+### `companionHeaders: {}`
 
 Custom headers that should be sent along to [Companion](/docs/companion) on every request.
 
 This will be used by the default implementations of the upload-related functions below. If you provide your own implementations, these headers are not sent automatically.
 
-### getChunkSize(file)
+### `getChunkSize(file)`
 
 A function that returns the minimum chunk size to use when uploading the given file.
 
@@ -62,7 +68,7 @@ The S3 Multipart plugin uploads files in chunks. Each chunk requires a signing r
 
 S3 requires a minimum chunk size of 5MB, and supports at most 10,000 chunks per multipart upload. If `getChunkSize()` returns a size that's too small, Uppy will increase it to S3's minimum requirements.
 
-### createMultipartUpload(file)
+### `createMultipartUpload(file)`
 
 A function that calls the S3 Multipart API to create a new upload. `file` is the file object from Uppy's state. The most relevant keys are `file.name` and `file.type`.
 
@@ -73,7 +79,7 @@ Return a Promise for an object with keys:
 
 The default implementation calls out to Companion's S3 signing endpoints.
 
-### listParts(file, { uploadId, key })
+### `listParts(file, { uploadId, key })`
 
 A function that calls the S3 Multipart API to list the parts of a file that have already been uploaded. Receives the `file` object from Uppy's state, and an object with keys:
 
@@ -88,7 +94,7 @@ Return a Promise for an array of S3 Part objects, as returned by the S3 Multipar
 
 The default implementation calls out to Companion's S3 signing endpoints.
 
-### prepareUploadPart(file, partData)
+### `prepareUploadPart(file, partData)`
 
 A function that generates a signed URL to upload a single part. Receives the `file` object from Uppy's state. The `partData` argument is an object with keys:
 
@@ -113,7 +119,7 @@ Return a Promise for an object with keys:
    ```
  - `headers` - **(Optional)** Custom headers that should be sent to the S3 presigned URL.
 
-### abortMultipartUpload(file, { uploadId, key })
+### `abortMultipartUpload(file, { uploadId, key })`
 
 A function that calls the S3 Multipart API to abort a Multipart upload, and delete all parts that have been uploaded so far. Receives the `file` object from Uppy's state, and an object with keys:
 
@@ -124,7 +130,7 @@ This is typically called when the user cancels an upload. Cancellation cannot fa
 
 The default implementation calls out to Companion's S3 signing endpoints.
 
-### completeMultipartUpload(file, { uploadId, key, parts })
+### `completeMultipartUpload(file, { uploadId, key, parts })`
 
 A function that calls the S3 Multipart API to complete a Multipart upload, combining all parts into a single object in the S3 bucket. Receives the `file` object from Uppy's state, and an object with keys: