فهرست منبع

@uppy/aws-s3-multipart: refactor rate limiting approach (#4187)

Co-authored-by: Mikael Finstad <finstaden@gmail.com>
Co-authored-by: Murderlon <merlijn@soverin.net>
Antoine du Hamel 2 سال پیش
والد
کامیت
60ae8c197b

+ 12 - 2
e2e/clients/dashboard-aws-multipart/app.js

@@ -1,15 +1,25 @@
 import { Uppy } from '@uppy/core'
 import Dashboard from '@uppy/dashboard'
-import AwsS3 from '@uppy/aws-s3-multipart'
+import AwsS3Multipart from '@uppy/aws-s3-multipart'
 
 import '@uppy/core/dist/style.css'
 import '@uppy/dashboard/dist/style.css'
 
 const uppy = new Uppy()
   .use(Dashboard, { target: '#app', inline: true })
-  .use(AwsS3, {
+  .use(AwsS3Multipart, {
     limit: 2,
     companionUrl: process.env.VITE_COMPANION_URL,
+    // This way we can test that the user provided API still works
+    // as expected in the flow. We call the default internal function for this,
+    // otherwise we would have to run another server to pre-sign requests
+    // and we don't care about that, just that the flow works.
+    async prepareUploadParts (file, { uploadId, key, parts, signal }) {
+      const { number: partNumber, chunk: body } = parts[0]
+      const plugin = uppy.getPlugin('AwsS3Multipart')
+      const { url } = await plugin.signPart(file, { uploadId, key, partNumber, body, signal })
+      return { presignedUrls: { [partNumber]: url } }
+    },
   })
 
 // Keep this here to access uppy in tests

+ 1 - 1
e2e/cypress/integration/dashboard-aws-multipart.spec.ts

@@ -3,7 +3,7 @@ describe('Dashboard with @uppy/aws-s3-multipart', () => {
     cy.visit('/dashboard-aws-multipart')
     cy.get('.uppy-Dashboard-input:first').as('file-input')
     cy.intercept({ method: 'POST', pathname: '/s3/multipart' }).as('post')
-    cy.intercept({ method: 'GET', pathname: '/s3/multipart/*/batch' }).as('get')
+    cy.intercept({ method: 'GET', pathname: '/s3/multipart/*/1' }).as('get')
     cy.intercept({ method: 'PUT' }).as('put')
   })
 

+ 80 - 378
packages/@uppy/aws-s3-multipart/src/MultipartUploader.js

@@ -1,15 +1,11 @@
-import { AbortController, createAbortError } from '@uppy/utils/lib/AbortController'
-import delay from '@uppy/utils/lib/delay'
+import { AbortController } from '@uppy/utils/lib/AbortController'
 
 const MB = 1024 * 1024
 
 const defaultOptions = {
-  limit: 6,
-  retryDelays: [0, 1000, 3000, 5000],
   getChunkSize (file) {
     return Math.ceil(file.size / 10000)
   },
-  onStart () {},
   onProgress () {},
   onPartComplete () {},
   onSuccess () {},
@@ -28,418 +24,115 @@ function ensureInt (value) {
   throw new TypeError('Expected a number')
 }
 
+const pausingUploadReason = Symbol('pausing upload, not an actual error')
+
 class MultipartUploader {
-  constructor (file, options) {
+  #abortController = new AbortController()
+
+  #chunks
+
+  #chunkState
+
+  #data
+
+  #file
+
+  #uploadPromise
+
+  #onError
+
+  #onSuccess
+
+  #onReject = (err) => (err?.cause === pausingUploadReason ? null : this.#onError(err))
+
+  constructor (data, options) {
     this.options = {
       ...defaultOptions,
       ...options,
     }
     // Use default `getChunkSize` if it was null or something
-    if (!this.options.getChunkSize) {
-      this.options.getChunkSize = defaultOptions.getChunkSize
-    }
+    this.options.getChunkSize ??= defaultOptions.getChunkSize
 
-    this.file = file
-    this.abortController = new AbortController()
-
-    this.key = this.options.key || null
-    this.uploadId = this.options.uploadId || null
-    this.parts = []
-
-    // Do `this.createdPromise.then(OP)` to execute an operation `OP` _only_ if the
-    // upload was created already. That also ensures that the sequencing is right
-    // (so the `OP` definitely happens if the upload is created).
-    //
-    // This mostly exists to make `#abortUpload` work well: only sending the abort request if
-    // the upload was already created, and if the createMultipartUpload request is still in flight,
-    // aborting it immediately after it finishes.
-    this.createdPromise = Promise.reject() // eslint-disable-line prefer-promise-reject-errors
-    this.isPaused = false
-    this.partsInProgress = 0
-    this.chunks = null
-    this.chunkState = null
+    this.#data = data
+    this.#file = options.file
+    this.#onSuccess = this.options.onSuccess
+    this.#onError = this.options.onError
 
     this.#initChunks()
-
-    this.createdPromise.catch(() => {}) // silence uncaught rejection warning
-  }
-
-  /**
-   * Was this upload aborted?
-   *
-   * If yes, we may need to throw an AbortError.
-   *
-   * @returns {boolean}
-   */
-  #aborted () {
-    return this.abortController.signal.aborted
   }
 
   #initChunks () {
-    const chunks = []
-    const desiredChunkSize = this.options.getChunkSize(this.file)
+    const desiredChunkSize = this.options.getChunkSize(this.#data)
     // at least 5MB per request, at most 10k requests
-    const minChunkSize = Math.max(5 * MB, Math.ceil(this.file.size / 10000))
+    const fileSize = this.#data.size
+    const minChunkSize = Math.max(5 * MB, Math.ceil(fileSize / 10000))
     const chunkSize = Math.max(desiredChunkSize, minChunkSize)
 
     // Upload zero-sized files in one zero-sized chunk
-    if (this.file.size === 0) {
-      chunks.push(this.file)
+    if (this.#data.size === 0) {
+      this.#chunks = [this.#data]
+      this.#data.onProgress = this.#onPartProgress(0)
+      this.#data.onComplete = this.#onPartComplete(0)
     } else {
-      for (let i = 0; i < this.file.size; i += chunkSize) {
-        const end = Math.min(this.file.size, i + chunkSize)
-        chunks.push(this.file.slice(i, end))
+      const arraySize = Math.ceil(fileSize / chunkSize)
+      this.#chunks = Array(arraySize)
+      let j = 0
+      for (let i = 0; i < fileSize; i += chunkSize) {
+        const end = Math.min(fileSize, i + chunkSize)
+        const chunk = this.#data.slice(i, end)
+        chunk.onProgress = this.#onPartProgress(j)
+        chunk.onComplete = this.#onPartComplete(j)
+        this.#chunks[j++] = chunk
       }
     }
 
-    this.chunks = chunks
-    this.chunkState = chunks.map(() => ({
-      uploaded: 0,
-      busy: false,
-      done: false,
-    }))
+    this.#chunkState = this.#chunks.map(() => ({ uploaded: 0 }))
   }
 
   #createUpload () {
-    this.createdPromise = Promise.resolve().then(() => this.options.createMultipartUpload())
-    return this.createdPromise.then((result) => {
-      if (this.#aborted()) throw createAbortError()
-
-      const valid = typeof result === 'object' && result
-        && typeof result.uploadId === 'string'
-        && typeof result.key === 'string'
-      if (!valid) {
-        throw new TypeError('AwsS3/Multipart: Got incorrect result from `createMultipartUpload()`, expected an object `{ uploadId, key }`.')
-      }
-
-      this.key = result.key
-      this.uploadId = result.uploadId
-
-      this.options.onStart(result)
-      this.#uploadParts()
-    }).catch((err) => {
-      this.#onError(err)
-    })
-  }
-
-  async #resumeUpload () {
-    try {
-      const parts = await this.options.listParts({
-        uploadId: this.uploadId,
-        key: this.key,
-      })
-      if (this.#aborted()) throw createAbortError()
-
-      parts.forEach((part) => {
-        const i = part.PartNumber - 1
-
-        this.chunkState[i] = {
-          uploaded: ensureInt(part.Size),
-          etag: part.ETag,
-          done: true,
-        }
-
-        // Only add if we did not yet know about this part.
-        if (!this.parts.some((p) => p.PartNumber === part.PartNumber)) {
-          this.parts.push({
-            PartNumber: part.PartNumber,
-            ETag: part.ETag,
-          })
-        }
-      })
-      this.#uploadParts()
-    } catch (err) {
-      this.#onError(err)
-    }
-  }
-
-  #uploadParts () {
-    if (this.isPaused) return
-
-    // All parts are uploaded.
-    if (this.chunkState.every((state) => state.done)) {
-      this.#completeUpload()
-      return
-    }
-
-    const getChunkIndexes = () => {
-      // For a 100MB file, with the default min chunk size of 5MB and a limit of 10:
-      //
-      // Total 20 parts
-      // ---------
-      // Need 1 is 10
-      // Need 2 is 5
-      // Need 3 is 5
-      const need = this.options.limit - this.partsInProgress
-      const completeChunks = this.chunkState.filter((state) => state.done).length
-      const remainingChunks = this.chunks.length - completeChunks
-      let minNeeded = Math.ceil(this.options.limit / 2)
-      if (minNeeded > remainingChunks) {
-        minNeeded = remainingChunks
-      }
-      if (need < minNeeded) return []
-
-      const chunkIndexes = []
-      for (let i = 0; i < this.chunkState.length; i++) {
-        const state = this.chunkState[i]
-        // eslint-disable-next-line no-continue
-        if (state.done || state.busy) continue
-
-        chunkIndexes.push(i)
-        if (chunkIndexes.length >= need) {
-          break
-        }
-      }
-
-      return chunkIndexes
-    }
-
-    const chunkIndexes = getChunkIndexes()
-
-    if (chunkIndexes.length === 0) return
-
-    this.#prepareUploadPartsRetryable(chunkIndexes).then(
-      ({ presignedUrls, headers }) => {
-        for (const index of chunkIndexes) {
-          const partNumber = index + 1
-          const prePreparedPart = {
-            url: presignedUrls[partNumber],
-            headers: headers?.[partNumber],
-          }
-          this.#uploadPartRetryable(index, prePreparedPart).then(
-            () => this.#uploadParts(),
-            (err) => this.#onError(err),
-          )
-        }
-      },
-      (err) => this.#onError(err),
-    )
+    this.#uploadPromise = this
+      .options.companionComm.uploadFile(this.#file, this.#chunks, this.#abortController.signal)
+      .then(this.#onSuccess, this.#onReject)
   }
 
-  #retryable ({ before, attempt, after }) {
-    const { retryDelays } = this.options
-    const { signal } = this.abortController
-
-    if (before) before()
-
-    function shouldRetry (err) {
-      if (err.source && typeof err.source.status === 'number') {
-        const { status } = err.source
-        // 0 probably indicates network failure
-        return status === 0 || status === 409 || status === 423 || (status >= 500 && status < 600)
-      }
-      return false
-    }
-
-    const doAttempt = (retryAttempt) => attempt().catch((err) => {
-      if (this.#aborted()) throw createAbortError()
-
-      if (shouldRetry(err) && retryAttempt < retryDelays.length) {
-        return delay(retryDelays[retryAttempt], { signal })
-          .then(() => doAttempt(retryAttempt + 1))
-      }
-      throw err
-    })
-
-    return doAttempt(0).then((result) => {
-      if (after) after()
-      return result
-    }, (err) => {
-      if (after) after()
-      throw err
-    })
+  #resumeUpload () {
+    this.#uploadPromise = this
+      .options.companionComm.resumeUploadFile(this.#file, this.#chunks, this.#abortController.signal)
+      .then(this.#onSuccess, this.#onReject)
   }
 
-  async #prepareUploadPartsRetryable (chunkIndexes) {
-    chunkIndexes.forEach((i) => {
-      this.chunkState[i].busy = true
-    })
-
-    const result = await this.#retryable({
-      attempt: () => this.options.prepareUploadParts({
-        key: this.key,
-        uploadId: this.uploadId,
-        parts: chunkIndexes.map((index) => ({
-          number: index + 1, // Use the part number as the index
-          chunk: this.chunks[index],
-        })),
-      }),
-    })
-
-    if (typeof result?.presignedUrls !== 'object') {
-      throw new TypeError(
-        'AwsS3/Multipart: Got incorrect result from `prepareUploadParts()`, expected an object `{ presignedUrls }`.',
-      )
-    }
+  #onPartProgress = (index) => (ev) => {
+    if (!ev.lengthComputable) return
 
-    return result
-  }
+    const sent = ev.loaded
+    this.#chunkState[index].uploaded = ensureInt(sent)
 
-  #uploadPartRetryable (index, prePreparedPart) {
-    return this.#retryable({
-      before: () => {
-        this.chunkState[index].busy = true
-        this.partsInProgress += 1
-      },
-      attempt: () => this.#uploadPart(index, prePreparedPart),
-      after: () => {
-        this.chunkState[index].busy = false
-        this.partsInProgress -= 1
-      },
-    })
+    const totalUploaded = this.#chunkState.reduce((n, c) => n + c.uploaded, 0)
+    this.options.onProgress(totalUploaded, this.#data.size)
   }
 
-  #uploadPart (index, prePreparedPart) {
-    const valid = typeof prePreparedPart?.url === 'string'
-    if (!valid) {
-      throw new TypeError('AwsS3/Multipart: Got incorrect result for `prePreparedPart`, expected an object `{ url }`.')
-    }
-
-    const { url, headers } = prePreparedPart
-    if (this.#aborted()) {
-      throw createAbortError()
-    }
-
-    return this.#uploadPartBytes(index, url, headers)
-  }
-
-  #onPartProgress (index, sent) {
-    this.chunkState[index].uploaded = ensureInt(sent)
-
-    const totalUploaded = this.chunkState.reduce((n, c) => n + c.uploaded, 0)
-    this.options.onProgress(totalUploaded, this.file.size)
-  }
-
-  #onPartComplete (index, etag) {
-    this.chunkState[index].etag = etag
-    this.chunkState[index].done = true
+  #onPartComplete = (index) => (etag) => {
+    // This avoids the net::ERR_OUT_OF_MEMORY in Chromium Browsers.
+    this.#chunks[index] = null
+    this.#chunkState[index].etag = etag
+    this.#chunkState[index].done = true
 
     const part = {
       PartNumber: index + 1,
       ETag: etag,
     }
-    this.parts.push(part)
-
     this.options.onPartComplete(part)
   }
 
-  #uploadPartBytes (index, url, headers) {
-    const body = this.chunks[index]
-    const { signal } = this.abortController
-
-    let defer
-    const promise = new Promise((resolve, reject) => {
-      defer = { resolve, reject }
-    })
-
-    const xhr = new XMLHttpRequest()
-    xhr.open('PUT', url, true)
-    if (headers) {
-      Object.keys(headers).forEach((key) => {
-        xhr.setRequestHeader(key, headers[key])
-      })
-    }
-    xhr.responseType = 'text'
-
-    function cleanup () {
-      // eslint-disable-next-line no-use-before-define
-      signal.removeEventListener('abort', onabort)
-    }
-    function onabort () {
-      xhr.abort()
-    }
-    signal.addEventListener('abort', onabort)
-
-    xhr.upload.addEventListener('progress', (ev) => {
-      if (!ev.lengthComputable) return
-
-      this.#onPartProgress(index, ev.loaded, ev.total)
-    })
-
-    xhr.addEventListener('abort', () => {
-      cleanup()
-
-      defer.reject(createAbortError())
-    })
-
-    xhr.addEventListener('load', (ev) => {
-      cleanup()
-
-      if (ev.target.status < 200 || ev.target.status >= 300) {
-        const error = new Error('Non 2xx')
-        error.source = ev.target
-        defer.reject(error)
-        return
-      }
-
-      // This avoids the net::ERR_OUT_OF_MEMORY in Chromium Browsers.
-      this.chunks[index] = null
-
-      this.#onPartProgress(index, body.size, body.size)
-
-      // NOTE This must be allowed by CORS.
-      const etag = ev.target.getResponseHeader('ETag')
-
-      if (etag === null) {
-        defer.reject(new Error('AwsS3/Multipart: Could not read the ETag header. This likely means CORS is not configured correctly on the S3 Bucket. See https://uppy.io/docs/aws-s3-multipart#S3-Bucket-Configuration for instructions.'))
-        return
-      }
-
-      this.#onPartComplete(index, etag)
-      defer.resolve()
-    })
-
-    xhr.addEventListener('error', (ev) => {
-      cleanup()
-
-      const error = new Error('Unknown error')
-      error.source = ev.target
-      defer.reject(error)
-    })
-
-    xhr.send(body)
-
-    return promise
-  }
-
-  async #completeUpload () {
-    // Parts may not have completed uploading in sorted order, if limit > 1.
-    this.parts.sort((a, b) => a.PartNumber - b.PartNumber)
-
-    try {
-      const result = await this.options.completeMultipartUpload({
-        key: this.key,
-        uploadId: this.uploadId,
-        parts: this.parts,
-      })
-      this.options.onSuccess(result)
-    } catch (err) {
-      this.#onError(err)
-    }
-  }
-
   #abortUpload () {
-    this.abortController.abort()
-
-    this.createdPromise.then(() => this.options.abortMultipartUpload({
-      key: this.key,
-      uploadId: this.uploadId,
-    })).catch(() => {
-      // if the creation failed we do not need to abort
-    })
-  }
-
-  #onError (err) {
-    if (err && err.name === 'AbortError') {
-      return
-    }
-
-    this.options.onError(err)
+    this.#abortController.abort()
+    this.options.companionComm.abortFileUpload(this.#file).catch((err) => this.options.log(err))
   }
 
   start () {
-    this.isPaused = false
-    if (this.uploadId) {
+    if (this.#uploadPromise) {
+      if (!this.#abortController.signal.aborted) this.#abortController.abort(pausingUploadReason)
+      this.#abortController = new AbortController()
       this.#resumeUpload()
     } else {
       this.#createUpload()
@@ -447,17 +140,26 @@ class MultipartUploader {
   }
 
   pause () {
-    this.abortController.abort()
-    // Swap it out for a new controller, because this instance may be resumed later.
-    this.abortController = new AbortController()
+    const onError = this.#onError
+    // We expect an AbortError to be thrown, which can be ignored.
+    this.#onError = (err) => (err?.name === 'AbortError' ? null : onError(err))
+    // Using setTimeout here to give time to the promises to reject.
+    setTimeout(() => { this.#onError = onError })
 
-    this.isPaused = true
+    this.#abortController.abort(pausingUploadReason)
+    // Swap it out for a new controller, because this instance may be resumed later.
+    this.#abortController = new AbortController()
   }
 
   abort (opts = undefined) {
     if (opts?.really) this.#abortUpload()
     else this.pause()
   }
+
+  // TODO: remove this in the next major
+  get chunkState () {
+    return this.#chunkState
+  }
 }
 
 export default MultipartUploader

+ 339 - 75
packages/@uppy/aws-s3-multipart/src/index.js

@@ -5,6 +5,7 @@ import emitSocketProgress from '@uppy/utils/lib/emitSocketProgress'
 import getSocketHost from '@uppy/utils/lib/getSocketHost'
 import { RateLimitedQueue } from '@uppy/utils/lib/RateLimitedQueue'
 
+import { createAbortError } from '@uppy/utils/lib/AbortController'
 import packageJson from '../package.json'
 import MultipartUploader from './MultipartUploader.js'
 
@@ -17,11 +18,192 @@ function assertServerError (res) {
   return res
 }
 
+function throwIfAborted (signal) {
+  if (signal?.aborted) { throw createAbortError('The operation was aborted', { cause: signal.reason }) }
+}
+
+class HTTPCommunicationQueue {
+  #abortMultipartUpload
+
+  #cache = new WeakMap()
+
+  #createMultipartUpload
+
+  #fetchSignature
+
+  #listParts
+
+  #requests
+
+  #retryDelayIterator
+
+  #sendCompletionRequest
+
+  #setS3MultipartState
+
+  #uploadPartBytes
+
+  constructor (requests, options, setS3MultipartState) {
+    this.#requests = requests
+    this.#setS3MultipartState = setS3MultipartState
+    this.setOptions(options)
+  }
+
+  setOptions (options) {
+    const requests = this.#requests
+
+    if ('abortMultipartUpload' in options) {
+      this.#abortMultipartUpload = requests.wrapPromiseFunction(options.abortMultipartUpload)
+    }
+    if ('createMultipartUpload' in options) {
+      this.#createMultipartUpload = requests.wrapPromiseFunction(options.createMultipartUpload, { priority:-1 })
+    }
+    if ('signPart' in options) {
+      this.#fetchSignature = requests.wrapPromiseFunction(options.signPart)
+    }
+    if ('listParts' in options) {
+      this.#listParts = requests.wrapPromiseFunction(options.listParts)
+    }
+    if ('completeMultipartUpload' in options) {
+      this.#sendCompletionRequest = requests.wrapPromiseFunction(options.completeMultipartUpload)
+    }
+    if ('retryDelays' in options) {
+      this.#retryDelayIterator = options.retryDelays?.values()
+    }
+    if ('uploadPartBytes' in options) {
+      this.#uploadPartBytes = requests.wrapPromiseFunction(options.uploadPartBytes, { priority:Infinity })
+    }
+  }
+
+  async #shouldRetry (err) {
+    const requests = this.#requests
+    const status = err?.source?.status
+
+    // TODO: this retry logic is taken out of Tus. We should have a centralized place for retrying,
+    // perhaps the rate limited queue, and dedupe all plugins with that.
+    if (status == null) {
+      return false
+    }
+    if (status === 403 && err.message === 'Request has expired') {
+      if (!requests.isPaused) {
+        const next = this.#retryDelayIterator?.next()
+        if (next == null || next.done) {
+          return false
+        }
+        // No need to stop the other requests, we just want to lower the limit.
+        requests.rateLimit(0)
+        await new Promise(resolve => setTimeout(resolve, next.value))
+      }
+    } else if (status === 429) {
+      // HTTP 429 Too Many Requests => to avoid the whole download to fail, pause all requests.
+      if (!requests.isPaused) {
+        const next = this.#retryDelayIterator?.next()
+        if (next == null || next.done) {
+          return false
+        }
+        requests.rateLimit(next.value)
+      }
+    } else if (status > 400 && status < 500 && status !== 409) {
+      // HTTP 4xx, the server won't send anything, it's doesn't make sense to retry
+      return false
+    } else if (typeof navigator !== 'undefined' && navigator.onLine === false) {
+      // The navigator is offline, let's wait for it to come back online.
+      if (!requests.isPaused) {
+        requests.pause()
+        window.addEventListener('online', () => {
+          requests.resume()
+        }, { once: true })
+      }
+    } else {
+      // Other error code means the request can be retried later.
+      const next = this.#retryDelayIterator?.next()
+      if (next == null || next.done) {
+        return false
+      }
+      await new Promise(resolve => setTimeout(resolve, next.value))
+    }
+    return true
+  }
+
+  async getUploadId (file, signal) {
+    const cachedResult = this.#cache.get(file.data)
+    if (cachedResult != null) {
+      return cachedResult
+    }
+
+    const promise = this.#createMultipartUpload(file, signal).then(async (result) => {
+      this.#setS3MultipartState(file, result)
+      this.#cache.set(file.data, result)
+      return result
+    })
+    this.#cache.set(file.data, promise)
+    return promise
+  }
+
+  async abortFileUpload (file) {
+    const result = this.#cache.get(file.data)
+    if (result != null) {
+      // If the createMultipartUpload request never was made, we don't
+      // need to send the abortMultipartUpload request.
+      await this.#abortMultipartUpload(file, await result)
+    }
+  }
+
+  async uploadFile (file, chunks, signal) {
+    throwIfAborted(signal)
+    const { uploadId, key } = await this.getUploadId(file, signal)
+    throwIfAborted(signal)
+    const parts = await Promise.all(chunks.map((chunk, i) => this.uploadChunk(file, i + 1, chunk, signal)))
+    throwIfAborted(signal)
+    return this.#sendCompletionRequest(file, { key, uploadId, parts, signal })
+  }
+
+  async resumeUploadFile (file, chunks, signal) {
+    throwIfAborted(signal)
+    const { uploadId, key } = await this.getUploadId(file, signal)
+    throwIfAborted(signal)
+    const alreadyUploadedParts = await this.#listParts(file, { uploadId, key, signal })
+    throwIfAborted(signal)
+    const parts = await Promise.all(
+      chunks
+        .map((chunk, i) => {
+          const partNumber = i + 1
+          const alreadyUploadedInfo = alreadyUploadedParts.find(({ PartNumber }) => PartNumber === partNumber)
+          return alreadyUploadedInfo == null
+            ? this.uploadChunk(file, partNumber, chunk, signal)
+            : { PartNumber: partNumber, ETag: alreadyUploadedInfo.ETag }
+        }),
+    )
+    throwIfAborted(signal)
+    return this.#sendCompletionRequest(file, { key, uploadId, parts, signal })
+  }
+
+  async uploadChunk (file, partNumber, body, signal) {
+    throwIfAborted(signal)
+    const { uploadId, key } = await this.getUploadId(file, signal)
+    throwIfAborted(signal)
+    for (;;) {
+      const signature = await this.#fetchSignature(file, { uploadId, key, partNumber, body, signal })
+      throwIfAborted(signal)
+      try {
+        return {
+          PartNumber: partNumber,
+          ...await this.#uploadPartBytes(signature, body, signal),
+        }
+      } catch (err) {
+        if (!await this.#shouldRetry(err)) throw err
+      }
+    }
+  }
+}
+
 export default class AwsS3Multipart extends BasePlugin {
   static VERSION = packageJson.version
 
   #queueRequestSocketToken
 
+  #companionCommunicationQueue
+
   #client
 
   constructor (uppy, opts) {
@@ -36,17 +218,31 @@ export default class AwsS3Multipart extends BasePlugin {
       retryDelays: [0, 1000, 3000, 5000],
       createMultipartUpload: this.createMultipartUpload.bind(this),
       listParts: this.listParts.bind(this),
-      prepareUploadParts: this.prepareUploadParts.bind(this),
       abortMultipartUpload: this.abortMultipartUpload.bind(this),
       completeMultipartUpload: this.completeMultipartUpload.bind(this),
+      signPart: this.signPart.bind(this),
+      uploadPartBytes: AwsS3Multipart.uploadPartBytes,
       companionHeaders: {},
     }
 
     this.opts = { ...defaultOptions, ...opts }
+    if (opts?.prepareUploadParts != null && opts.signPart == null) {
+      this.opts.signPart = async (file, { uploadId, key, partNumber, body, signal }) => {
+        const { presignedUrls, headers } = await opts
+          .prepareUploadParts(file, { uploadId, key, parts: [{ number: partNumber, chunk: body }], signal })
+        return { url: presignedUrls?.[partNumber], headers: headers?.[partNumber] }
+      }
+    }
 
     this.upload = this.upload.bind(this)
 
-    this.requests = new RateLimitedQueue(this.opts.limit)
+    /**
+     * Simultaneous upload limiting is shared across all uploads with this plugin.
+     *
+     * @type {RateLimitedQueue}
+     */
+    this.requests = this.opts.rateLimitedQueue ?? new RateLimitedQueue(this.opts.limit)
+    this.#companionCommunicationQueue = new HTTPCommunicationQueue(this.requests, this.opts, this.#setS3MultipartState)
 
     this.uploaders = Object.create(null)
     this.uploaderEvents = Object.create(null)
@@ -57,6 +253,11 @@ export default class AwsS3Multipart extends BasePlugin {
 
   [Symbol.for('uppy test: getClient')] () { return this.#client }
 
+  setOptions (newOptions) {
+    this.#companionCommunicationQueue.setOptions(newOptions)
+    return super.setOptions(newOptions)
+  }
+
   /**
    * Clean up all references for a file's upload: the MultipartUploader instance,
    * any events related to the file, and the Companion WebSocket connection.
@@ -79,18 +280,20 @@ export default class AwsS3Multipart extends BasePlugin {
     }
   }
 
+  // TODO: make this a private method in the next major
   assertHost (method) {
     if (!this.opts.companionUrl) {
       throw new Error(`Expected a \`companionUrl\` option containing a Companion address, or if you are not using Companion, a custom \`${method}\` implementation.`)
     }
   }
 
-  createMultipartUpload (file) {
+  createMultipartUpload (file, signal) {
     this.assertHost('createMultipartUpload')
+    throwIfAborted(signal)
 
     const metadata = {}
 
-    Object.keys(file.meta).forEach(key => {
+    Object.keys(file.meta || {}).forEach(key => {
       if (file.meta[key] != null) {
         metadata[key] = file.meta[key].toString()
       }
@@ -100,59 +303,149 @@ export default class AwsS3Multipart extends BasePlugin {
       filename: file.name,
       type: file.type,
       metadata,
-    }).then(assertServerError)
+    }, { signal }).then(assertServerError)
   }
 
-  listParts (file, { key, uploadId }) {
+  listParts (file, { key, uploadId }, signal) {
     this.assertHost('listParts')
+    throwIfAborted(signal)
 
     const filename = encodeURIComponent(key)
-    return this.#client.get(`s3/multipart/${uploadId}?key=${filename}`)
+    return this.#client.get(`s3/multipart/${uploadId}?key=${filename}`, { signal })
       .then(assertServerError)
   }
 
-  prepareUploadParts (file, { key, uploadId, parts }) {
-    this.assertHost('prepareUploadParts')
+  completeMultipartUpload (file, { key, uploadId, parts }, signal) {
+    this.assertHost('completeMultipartUpload')
+    throwIfAborted(signal)
 
     const filename = encodeURIComponent(key)
-    const partNumbers = parts.map((part) => part.number).join(',')
-    return this.#client.get(`s3/multipart/${uploadId}/batch?key=${filename}&partNumbers=${partNumbers}`)
+    const uploadIdEnc = encodeURIComponent(uploadId)
+    return this.#client.post(`s3/multipart/${uploadIdEnc}/complete?key=${filename}`, { parts }, { signal })
       .then(assertServerError)
   }
 
-  completeMultipartUpload (file, { key, uploadId, parts }) {
-    this.assertHost('completeMultipartUpload')
+  signPart (file, { uploadId, key, partNumber, signal }) {
+    this.assertHost('signPart')
+    throwIfAborted(signal)
+
+    if (uploadId == null || key == null || partNumber == null) {
+      throw new Error('Cannot sign without a key, an uploadId, and a partNumber')
+    }
 
     const filename = encodeURIComponent(key)
-    const uploadIdEnc = encodeURIComponent(uploadId)
-    return this.#client.post(`s3/multipart/${uploadIdEnc}/complete?key=${filename}`, { parts })
+    return this.#client.get(`s3/multipart/${uploadId}/${partNumber}?key=${filename}`, { signal })
       .then(assertServerError)
   }
 
-  abortMultipartUpload (file, { key, uploadId }) {
+  abortMultipartUpload (file, { key, uploadId }, signal) {
     this.assertHost('abortMultipartUpload')
 
     const filename = encodeURIComponent(key)
     const uploadIdEnc = encodeURIComponent(uploadId)
-    return this.#client.delete(`s3/multipart/${uploadIdEnc}?key=${filename}`)
+    return this.#client.delete(`s3/multipart/${uploadIdEnc}?key=${filename}`, undefined, { signal })
       .then(assertServerError)
   }
 
-  uploadFile (file) {
-    return new Promise((resolve, reject) => {
-      let queuedRequest
+  static async uploadPartBytes ({ url, expires, headers }, body, signal) {
+    throwIfAborted(signal)
 
-      const onStart = (data) => {
-        const cFile = this.uppy.getFile(file.id)
-        this.uppy.setFileState(file.id, {
-          s3Multipart: {
-            ...cFile.s3Multipart,
-            key: data.key,
-            uploadId: data.uploadId,
-          },
+    if (url == null) {
+      throw new Error('Cannot upload to an undefined URL')
+    }
+
+    return new Promise((resolve, reject) => {
+      const xhr = new XMLHttpRequest()
+      xhr.open('PUT', url, true)
+      if (headers) {
+        Object.keys(headers).forEach((key) => {
+          xhr.setRequestHeader(key, headers[key])
         })
       }
+      xhr.responseType = 'text'
+      if (typeof expires === 'number') {
+        xhr.timeout = expires * 1000
+      }
+
+      function onabort () {
+        xhr.abort()
+      }
+      function cleanup () {
+        signal.removeEventListener('abort', onabort)
+      }
+      signal.addEventListener('abort', onabort)
+
+      xhr.upload.addEventListener('progress', body.onProgress)
+
+      xhr.addEventListener('abort', () => {
+        cleanup()
+
+        reject(createAbortError())
+      })
+
+      xhr.addEventListener('timeout', () => {
+        cleanup()
+
+        const error = new Error('Request has expired')
+        error.source = { status: 403 }
+        reject(error)
+      })
+      xhr.addEventListener('load', (ev) => {
+        cleanup()
+
+        if (ev.target.status === 403 && ev.target.responseText.includes('<Message>Request has expired</Message>')) {
+          const error = new Error('Request has expired')
+          error.source = ev.target
+          reject(error)
+          return
+        } if (ev.target.status < 200 || ev.target.status >= 300) {
+          const error = new Error('Non 2xx')
+          error.source = ev.target
+          reject(error)
+          return
+        }
+
+        body.onProgress?.(body.size)
+
+        // NOTE This must be allowed by CORS.
+        const etag = ev.target.getResponseHeader('ETag')
+
+        if (etag === null) {
+          reject(new Error('AwsS3/Multipart: Could not read the ETag header. This likely means CORS is not configured correctly on the S3 Bucket. See https://uppy.io/docs/aws-s3-multipart#S3-Bucket-Configuration for instructions.'))
+          return
+        }
+
+        body.onComplete?.(etag)
+        resolve({
+          ETag: etag,
+        })
+      })
+
+      xhr.addEventListener('error', (ev) => {
+        cleanup()
+
+        const error = new Error('Unknown error')
+        error.source = ev.target
+        reject(error)
+      })
+
+      xhr.send(body)
+    })
+  }
+
+  #setS3MultipartState = (file, { key, uploadId }) => {
+    const cFile = this.uppy.getFile(file.id)
+    this.uppy.setFileState(file.id, {
+      s3Multipart: {
+        ...cFile.s3Multipart,
+        key,
+        uploadId,
+      },
+    })
+  }
 
+  uploadFile (file) {
+    return new Promise((resolve, reject) => {
       const onProgress = (bytesUploaded, bytesTotal) => {
         this.uppy.emit('upload-progress', file, {
           uploader: this,
@@ -165,7 +458,6 @@ export default class AwsS3Multipart extends BasePlugin {
         this.uppy.log(err)
         this.uppy.emit('upload-error', file, err)
 
-        queuedRequest.done()
         this.resetUploaderReferences(file.id)
         reject(err)
       }
@@ -179,14 +471,13 @@ export default class AwsS3Multipart extends BasePlugin {
           uploadURL: result.location,
         }
 
-        queuedRequest.done()
         this.resetUploaderReferences(file.id)
 
         const cFile = this.uppy.getFile(file.id)
         this.uppy.emit('upload-success', cFile || file, uploadResp)
 
         if (result.location) {
-          this.uppy.log(`Download ${uploadObject.file.name} from ${result.location}`)
+          this.uppy.log(`Download ${file.name} from ${result.location}`)
         }
 
         resolve(uploadObject)
@@ -203,47 +494,33 @@ export default class AwsS3Multipart extends BasePlugin {
 
       const upload = new MultipartUploader(file.data, {
         // .bind to pass the file object to each handler.
-        createMultipartUpload: this.opts.createMultipartUpload.bind(this, file),
-        listParts: this.opts.listParts.bind(this, file),
-        prepareUploadParts: this.opts.prepareUploadParts.bind(this, file),
-        completeMultipartUpload: this.opts.completeMultipartUpload.bind(this, file),
-        abortMultipartUpload: this.opts.abortMultipartUpload.bind(this, file),
+        companionComm: this.#companionCommunicationQueue,
+
+        log: (...args) => this.uppy.log(...args),
         getChunkSize: this.opts.getChunkSize ? this.opts.getChunkSize.bind(this) : null,
 
-        onStart,
         onProgress,
         onError,
         onSuccess,
         onPartComplete,
 
-        limit: this.opts.limit || 5,
-        retryDelays: this.opts.retryDelays || [],
+        file,
+
         ...file.s3Multipart,
       })
 
       this.uploaders[file.id] = upload
       this.uploaderEvents[file.id] = new EventTracker(this.uppy)
 
-      queuedRequest = this.requests.run(() => {
-        if (!file.isPaused) {
-          upload.start()
-        }
-        // Don't do anything here, the caller will take care of cancelling the upload itself
-        // using resetUploaderReferences(). This is because resetUploaderReferences() has to be
-        // called when this request is still in the queue, and has not been started yet, too. At
-        // that point this cancellation function is not going to be called.
-        return () => {}
-      })
-
       this.onFileRemove(file.id, (removed) => {
-        queuedRequest.abort()
+        upload.abort()
         this.resetUploaderReferences(file.id, { abort: true })
         resolve(`upload ${removed.id} was removed`)
       })
 
       this.onCancelAll(file.id, ({ reason } = {}) => {
         if (reason === 'user') {
-          queuedRequest.abort()
+          upload.abort()
           this.resetUploaderReferences(file.id, { abort: true })
         }
         resolve(`upload ${file.id} was canceled`)
@@ -251,38 +528,23 @@ export default class AwsS3Multipart extends BasePlugin {
 
       this.onFilePause(file.id, (isPaused) => {
         if (isPaused) {
-          // Remove this file from the queue so another file can start in its place.
-          queuedRequest.abort()
           upload.pause()
         } else {
-          // Resuming an upload should be queued, else you could pause and then
-          // resume a queued upload to make it skip the queue.
-          queuedRequest.abort()
-          queuedRequest = this.requests.run(() => {
-            upload.start()
-            return () => {}
-          })
+          upload.start()
         }
       })
 
       this.onPauseAll(file.id, () => {
-        queuedRequest.abort()
         upload.pause()
       })
 
       this.onResumeAll(file.id, () => {
-        queuedRequest.abort()
-        if (file.error) {
-          upload.abort()
-        }
-        queuedRequest = this.requests.run(() => {
-          upload.start()
-          return () => {}
-        })
+        upload.start()
       })
 
       // Don't double-emit upload-started for Golden Retriever-restored files that were already started
       if (!file.progress.uploadStarted || !file.isRestored) {
+        upload.start()
         this.uppy.emit('upload-started', file)
       }
     })
@@ -298,6 +560,10 @@ export default class AwsS3Multipart extends BasePlugin {
       Object.assign(opts, file.tus)
     }
 
+    if (file.remove.url == null) {
+      throw new Error('Cannot connect to an undefined URL')
+    }
+
     const res = await client.post(file.remote.url, {
       ...file.remote.body,
       protocol: 's3-multipart',
@@ -329,7 +595,7 @@ export default class AwsS3Multipart extends BasePlugin {
     }
   }
 
-  connectToServerSocket (file) {
+  async connectToServerSocket (file) {
     return new Promise((resolve, reject) => {
       let queuedRequest
 
@@ -434,8 +700,8 @@ export default class AwsS3Multipart extends BasePlugin {
     })
   }
 
-  upload (fileIDs) {
-    if (fileIDs.length === 0) return Promise.resolve()
+  async upload (fileIDs) {
+    if (fileIDs.length === 0) return undefined
 
     const promises = fileIDs.map((id) => {
       const file = this.uppy.getFile(id)
@@ -450,7 +716,6 @@ export default class AwsS3Multipart extends BasePlugin {
 
   #setCompanionHeaders = () => {
     this.#client.setCompanionHeaders(this.opts.companionHeaders)
-    return Promise.resolve()
   }
 
   onFileRemove (fileID, cb) {
@@ -462,7 +727,6 @@ export default class AwsS3Multipart extends BasePlugin {
   onFilePause (fileID, cb) {
     this.uploaderEvents[fileID].on('upload-pause', (targetFileID, isPaused) => {
       if (fileID === targetFileID) {
-        // const isPaused = this.uppy.pauseResume(fileID)
         cb(isPaused)
       }
     })

+ 23 - 27
packages/@uppy/aws-s3-multipart/src/index.test.js

@@ -35,7 +35,7 @@ describe('AwsS3Multipart', () => {
       expect(() => awsS3Multipart.opts.listParts(file, opts)).toThrow(err)
       expect(() => awsS3Multipart.opts.completeMultipartUpload(file, opts)).toThrow(err)
       expect(() => awsS3Multipart.opts.abortMultipartUpload(file, opts)).toThrow(err)
-      expect(() => awsS3Multipart.opts.prepareUploadParts(file, opts)).toThrow(err)
+      expect(() => awsS3Multipart.opts.signPart(file, opts)).toThrow(err)
     })
   })
 
@@ -113,7 +113,7 @@ describe('AwsS3Multipart', () => {
 
       expect(
         awsS3Multipart.opts.prepareUploadParts.mock.calls.length,
-      ).toEqual(1)
+      ).toEqual(2)
 
       scope.done()
     })
@@ -161,11 +161,11 @@ describe('AwsS3Multipart', () => {
         }
       }
 
-      expect(awsS3Multipart.opts.prepareUploadParts.mock.calls.length).toEqual(3)
+      expect(awsS3Multipart.opts.prepareUploadParts.mock.calls.length).toEqual(10)
 
-      validatePartData(awsS3Multipart.opts.prepareUploadParts.mock.calls[0][1], [1, 2, 3, 4, 5])
-      validatePartData(awsS3Multipart.opts.prepareUploadParts.mock.calls[1][1], [6, 7, 8])
-      validatePartData(awsS3Multipart.opts.prepareUploadParts.mock.calls[2][1], [9, 10])
+      validatePartData(awsS3Multipart.opts.prepareUploadParts.mock.calls[0][1], [1])
+      validatePartData(awsS3Multipart.opts.prepareUploadParts.mock.calls[1][1], [2])
+      validatePartData(awsS3Multipart.opts.prepareUploadParts.mock.calls[2][1], [3])
 
       const completeCall = awsS3Multipart.opts.completeMultipartUpload.mock.calls[0][1]
 
@@ -214,6 +214,7 @@ describe('AwsS3Multipart', () => {
       let busySpy
       let doneSpy
       awsS3Multipart.setOptions({
+        retryDelays: [10],
         createMultipartUpload: jest.fn((file) => {
           const multipartUploader = awsS3Multipart.uploaders[file.id]
           const testChunkState = multipartUploader.chunkState[6]
@@ -255,7 +256,7 @@ describe('AwsS3Multipart', () => {
         }
       }
 
-      expect(awsS3Multipart.opts.prepareUploadParts.mock.calls.length).toEqual(3)
+      expect(awsS3Multipart.opts.prepareUploadParts.mock.calls.length).toEqual(10)
     })
   })
 
@@ -267,31 +268,25 @@ describe('AwsS3Multipart', () => {
       }
     })
 
-    const prepareUploadParts = jest
-      .fn(async () => {
-        const presignedUrls = {}
-        const possiblePartNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
-
-        possiblePartNumbers.forEach((partNumber) => {
-          presignedUrls[
-            partNumber
-          ] = `https://bucket.s3.us-east-2.amazonaws.com/test/upload/multitest.dat?partNumber=${partNumber}&uploadId=6aeb1980f3fc7ce0b5454d25b71992&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIATEST%2F20210729%2Fus-east-2%2Fs3%2Faws4_request&X-Amz-Date=20210729T014044Z&X-Amz-Expires=600&X-Amz-SignedHeaders=host&X-Amz-Signature=test`
-        })
-
-        return { presignedUrls }
+    const signPart = jest
+      .fn(async (file, { partNumber }) => {
+        return { url: `https://bucket.s3.us-east-2.amazonaws.com/test/upload/multitest.dat?partNumber=${partNumber}&uploadId=6aeb1980f3fc7ce0b5454d25b71992&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIATEST%2F20210729%2Fus-east-2%2Fs3%2Faws4_request&X-Amz-Date=20210729T014044Z&X-Amz-Expires=600&X-Amz-SignedHeaders=host&X-Amz-Signature=test` }
       })
 
+    const uploadPartBytes = jest.fn()
+
     afterEach(() => jest.clearAllMocks())
 
-    it('retries prepareUploadParts when it fails once', async () => {
+    it('retries uploadPartBytes when it fails once', async () => {
       const core = new Core()
         .use(AwsS3Multipart, {
           createMultipartUpload,
           completeMultipartUpload: jest.fn(async () => ({ location: 'test' })),
           // eslint-disable-next-line no-throw-literal
           abortMultipartUpload: jest.fn(() => { throw 'should ignore' }),
-          prepareUploadParts:
-            prepareUploadParts
+          signPart,
+          uploadPartBytes:
+            uploadPartBytes
               // eslint-disable-next-line prefer-promise-reject-errors
               .mockImplementationOnce(() => Promise.reject({ source: { status: 500 } })),
         })
@@ -309,17 +304,18 @@ describe('AwsS3Multipart', () => {
 
       await core.upload()
 
-      expect(awsS3Multipart.opts.prepareUploadParts.mock.calls.length).toEqual(2)
+      expect(awsS3Multipart.opts.uploadPartBytes.mock.calls.length).toEqual(3)
     })
 
-    it('calls `upload-error` when prepareUploadParts fails after all retries', async () => {
+    it('calls `upload-error` when uploadPartBytes fails after all retries', async () => {
       const core = new Core()
         .use(AwsS3Multipart, {
-          retryDelays: [100],
+          retryDelays: [10],
           createMultipartUpload,
           completeMultipartUpload: jest.fn(async () => ({ location: 'test' })),
           abortMultipartUpload: jest.fn(),
-          prepareUploadParts: prepareUploadParts
+          signPart,
+          uploadPartBytes: uploadPartBytes
             // eslint-disable-next-line prefer-promise-reject-errors
             .mockImplementation(() => Promise.reject({ source: { status: 500 } })),
         })
@@ -339,7 +335,7 @@ describe('AwsS3Multipart', () => {
 
       await expect(core.upload()).rejects.toEqual({ source: { status: 500 } })
 
-      expect(awsS3Multipart.opts.prepareUploadParts.mock.calls.length).toEqual(2)
+      expect(awsS3Multipart.opts.uploadPartBytes.mock.calls.length).toEqual(2)
       expect(mock.mock.calls.length).toEqual(1)
     })
   })

+ 9 - 4
packages/@uppy/aws-s3-multipart/types/index.d.ts

@@ -18,19 +18,24 @@ export interface AwsS3MultipartOptions extends PluginOptions {
     ) => MaybePromise<{ uploadId: string; key: string }>
     listParts?: (
       file: UppyFile,
-      opts: { uploadId: string; key: string }
+      opts: { uploadId: string; key: string; signal: AbortSignal }
     ) => MaybePromise<AwsS3Part[]>
+    signPart?: (
+      file: UppyFile,
+      opts: { uploadId: string; key: string; partNumber: number; body: Blob, signal: AbortSignal }
+    ) => MaybePromise<AwsS3Part>
+    /** @deprecated Use signPart instead */
     prepareUploadParts?: (
       file: UppyFile,
-      partData: { uploadId: string; key: string; parts: Array<{ number: number, chunk: Blob }> }
+      partData: { uploadId: string; key: string; parts: [{ number: number, chunk: Blob }], signal: AbortSignal }
     ) => MaybePromise<{ presignedUrls: { [k: number]: string }, headers?: { [k: string]: string } }>
     abortMultipartUpload?: (
       file: UppyFile,
-      opts: { uploadId: string; key: string }
+      opts: { uploadId: string; key: string; signal: AbortSignal }
     ) => MaybePromise<void>
     completeMultipartUpload?: (
       file: UppyFile,
-      opts: { uploadId: string; key: string; parts: AwsS3Part[] }
+      opts: { uploadId: string; key: string; parts: AwsS3Part[]; signal: AbortSignal }
     ) => MaybePromise<{ location?: string }>
     limit?: number
     retryDelays?: number[] | null

+ 1 - 1
packages/@uppy/aws-s3-multipart/types/index.test-d.ts

@@ -21,7 +21,7 @@ import type { AwsS3Part } from '..'
       expectType<UppyFile>(file)
       expectType<string>(partData.uploadId)
       expectType<string>(partData.key)
-      expectType<Array<{number: number, chunk: Blob}>>(partData.parts)
+      expectType<[{number: number, chunk: Blob}]>(partData.parts)
       return { presignedUrls: {} }
     },
     abortMultipartUpload (file, opts) {

+ 28 - 14
website/src/docs/aws-s3-multipart.md

@@ -41,15 +41,14 @@ The `@uppy/aws-s3-multipart` plugin has the following configurable options:
 
 ### `limit: 6`
 
-The maximum amount of chunks to upload simultaneously. This affects [`prepareUploadParts()`](#prepareUploadParts-file-partData) as well; after the initial batch of `limit` parts is presigned, a minimum of `limit / 2` rounded up will be presigned at a time. You should set the limit carefully. Setting it to a value too high could cause issues where the presigned URLs begin to expire before the chunks they are for start uploading. Too low and you will end up with a lot of extra round trips to your server (or Companion) than necessary to presign URLs. If the default chunk size of 5MB is used, a `limit` between 5 and 6 is recommended.
+The maximum amount of chunks to upload simultaneously. You should set the limit carefully. Setting it to a value too high could cause issues where the presigned URLs begin to expire before the chunks they are for start uploading. Too low and you will end up with a lot of extra round trips to your server (or Companion) than necessary to presign URLs. If the default chunk size of 5MB is used, a `limit` between 5 and 6 is recommended.
 
 Because HTTP/1.1 limits the number of concurrent requests to one origin to 6, it’s recommended to always set a limit of 6 or smaller for all your uploads, or to not override the default.
 
-For example, with a 50MB file and a `limit` of 5 we end up with 10 chunks. 5 of these are presigned in one batch, then 3, then 2, for a total of 3 round trips to the server via [`prepareUploadParts()`](#prepareUploadParts-file-partData) and 10 requests sent to AWS via the presigned URLs generated.
 
 ### `retryDelays: [0, 1000, 3000, 5000]`
 
-`retryDelays` are the intervals in milliseconds used to retry a failed chunk as well as [`prepareUploadParts`](#prepareUploadParts-file-partData).
+`retryDelays` are the intervals in milliseconds used to retry a failed chunk.
 
 By default, we first retry instantly; if that fails, we retry after 1 second; if that fails, we retry after 3 seconds, etc.
 
@@ -75,7 +74,7 @@ This option correlates to the [RequestCredentials value](https://developer.mozil
 
 A function that returns the minimum chunk size to use when uploading the given file.
 
-The S3 Multipart plugin uploads files in chunks. Chunks are sent in batches to have presigned URLs generated via ([`prepareUploadParts()`](#prepareUploadParts-file-partData)). To reduce the amount of requests for large files, you can choose a larger chunk size, at the cost of having to re-upload more data if one chunk fails to upload.
+The S3 Multipart plugin uploads files in chunks. Chunks are sent one by one to have presigned URLs generated via [`signPart()`][]. To reduce the amount of requests for large files, you can choose a larger chunk size, at the cost of having to re-upload more data if one chunk fails to upload.
 
 S3 requires a minimum chunk size of 5MB, and supports at most 10,000 chunks per multipart upload. If `getChunkSize()` returns a size that’s too small, Uppy will increase it to S3’s minimum requirements.
 
@@ -107,11 +106,14 @@ The default implementation calls out to Companion’s S3 signing endpoints.
 
 ### `prepareUploadParts(file, partData)`
 
+> This option is deprecated. Use [`signPart()`][] instead.
+
 A function that generates a batch of signed URLs for the specified part numbers. Receives the `file` object from Uppy’s state. The `partData` argument is an object with keys:
 
 * `uploadId` - The UploadID of this Multipart upload.
 * `key` - The object key in the S3 bucket.
-* `parts` - An array of objects with the part number and chunk (`Array<{ number: number, chunk: blob }>`). `number` can’t be zero.
+* `parts` - An array containing a single object with the part number and chunk (`Array<{ number: number, chunk: blob }>`). `number` can’t be zero.
+* `signal` – An `AbortSignal` that may be used to abort an ongoing request.
 
 `prepareUploadParts` should return a `Promise` with an `Object` with keys:
 
@@ -124,27 +126,37 @@ An example of what the return value should look like:
 {
   "presignedUrls": {
     "1": "https://bucket.region.amazonaws.com/path/to/file.jpg?partNumber=1&...",
-    "2": "https://bucket.region.amazonaws.com/path/to/file.jpg?partNumber=2&...",
-    "3": "https://bucket.region.amazonaws.com/path/to/file.jpg?partNumber=3&..."
   },
   "headers": { 
     "1": { "Content-MD5": "foo" },
-    "2": { "Content-MD5": "bar" },
-    "3": { "Content-MD5": "baz" }
   }
 }
 ```
 
-If an error occured, reject the `Promise` with an `Object` with the following keys:
+### `signPart(file, partData)`
+
+A function that generates a signed URL for the specified part number. The `partData` argument is an object with the keys:
+
+* `uploadId` - The UploadID of this Multipart upload.
+* `key` - The object key in the S3 bucket.
+* `partNumber` - can’t be zero.
+* `body` – The data that will be signed.
+* `signal` – An `AbortSignal` that may be used to abort an ongoing request.
+
+This function should return a object, or a promise that resolves to an object, with the following keys:
 
-<!-- eslint-disable -->
+* `url` – the presigned URL, as a `string`.
+* `headers` – **(Optional)** Custom headers to send along with the request to S3 endpoint.
+
+An example of what the return value should look like:
 
 ```json
-{ "source": { "status": 500 } }
+{
+  "url": "https://bucket.region.amazonaws.com/path/to/file.jpg?partNumber=1&...",
+  "headers": { "Content-MD5": "foo" }
+}
 ```
 
-`status` is the HTTP code and is required for determining whether to retry the request. `prepareUploadParts` will be retried if the code is `0`, `409`, `423`, or between `500` and `600`.
-
 ### `abortMultipartUpload(file, { uploadId, key })`
 
 A function that calls the S3 Multipart API to abort a Multipart upload, and removes all parts that have been uploaded so far. Receives the `file` object from Uppy’s state, and an object with keys:
@@ -197,3 +209,5 @@ While the AWS S3 plugin uses `POST` requests when uploading files to an S3 bucke
   }
 ]
 ```
+
+[`signPart()`]: #signPart-file-partData