Parcourir la source

@uppy/aws-s3-multipart: refactor to TS (#4902)

Antoine du Hamel il y a 1 an
Parent
commit
7b0533b4bb

+ 427 - 0
packages/@uppy/aws-s3-multipart/src/HTTPCommunicationQueue.ts

@@ -0,0 +1,427 @@
+import type { Meta, UppyFile } from '@uppy/utils/lib/UppyFile'
+import type {
+  RateLimitedQueue,
+  WrapPromiseFunctionType,
+} from '@uppy/utils/lib/RateLimitedQueue'
+import { pausingUploadReason, type Chunk } from './MultipartUploader.ts'
+import type AwsS3Multipart from './index.ts'
+import { throwIfAborted } from './utils.ts'
+import type { Body, UploadPartBytesResult, UploadResult } from './utils.ts'
+import type { AwsS3MultipartOptions, uploadPartBytes } from './index.ts'
+
+function removeMetadataFromURL(urlString: string) {
+  const urlObject = new URL(urlString)
+  urlObject.search = ''
+  urlObject.hash = ''
+  return urlObject.href
+}
+
+export class HTTPCommunicationQueue<M extends Meta, B extends Body> {
+  #abortMultipartUpload: WrapPromiseFunctionType<
+    AwsS3Multipart<M, B>['abortMultipartUpload']
+  >
+
+  #cache = new WeakMap()
+
+  #createMultipartUpload: WrapPromiseFunctionType<
+    AwsS3Multipart<M, B>['createMultipartUpload']
+  >
+
+  #fetchSignature: WrapPromiseFunctionType<AwsS3Multipart<M, B>['signPart']>
+
+  #getUploadParameters: WrapPromiseFunctionType<
+    AwsS3Multipart<M, B>['getUploadParameters']
+  >
+
+  #listParts: WrapPromiseFunctionType<AwsS3Multipart<M, B>['listParts']>
+
+  #previousRetryDelay: number
+
+  #requests
+
+  #retryDelays: { values: () => Iterator<number> }
+
+  #sendCompletionRequest: WrapPromiseFunctionType<
+    AwsS3Multipart<M, B>['completeMultipartUpload']
+  >
+
+  #setS3MultipartState
+
+  #uploadPartBytes: WrapPromiseFunctionType<uploadPartBytes>
+
+  #getFile
+
+  constructor(
+    requests: RateLimitedQueue,
+    options: AwsS3MultipartOptions<M, B>,
+    setS3MultipartState: (file: UppyFile<M, B>, result: UploadResult) => void,
+    getFile: (file: UppyFile<M, B>) => UppyFile<M, B>,
+  ) {
+    this.#requests = requests
+    this.#setS3MultipartState = setS3MultipartState
+    this.#getFile = getFile
+    this.setOptions(options)
+  }
+
+  setOptions(options: Partial<AwsS3MultipartOptions<M, B>>): void {
+    const requests = this.#requests
+
+    if ('abortMultipartUpload' in options) {
+      this.#abortMultipartUpload = requests.wrapPromiseFunction(
+        options.abortMultipartUpload as any,
+        { priority: 1 },
+      )
+    }
+    if ('createMultipartUpload' in options) {
+      this.#createMultipartUpload = requests.wrapPromiseFunction(
+        options.createMultipartUpload as any,
+        { priority: -1 },
+      )
+    }
+    if ('signPart' in options) {
+      this.#fetchSignature = requests.wrapPromiseFunction(
+        options.signPart as any,
+      )
+    }
+    if ('listParts' in options) {
+      this.#listParts = requests.wrapPromiseFunction(options.listParts as any)
+    }
+    if ('completeMultipartUpload' in options) {
+      this.#sendCompletionRequest = requests.wrapPromiseFunction(
+        options.completeMultipartUpload as any,
+        { priority: 1 },
+      )
+    }
+    if ('retryDelays' in options) {
+      this.#retryDelays = options.retryDelays ?? []
+    }
+    if ('uploadPartBytes' in options) {
+      this.#uploadPartBytes = requests.wrapPromiseFunction(
+        options.uploadPartBytes as any,
+        { priority: Infinity },
+      )
+    }
+    if ('getUploadParameters' in options) {
+      this.#getUploadParameters = requests.wrapPromiseFunction(
+        options.getUploadParameters as any,
+      )
+    }
+  }
+
+  async #shouldRetry(err: any, retryDelayIterator: Iterator<number>) {
+    const requests = this.#requests
+    const status = err?.source?.status
+
+    // TODO: this retry logic is taken out of Tus. We should have a centralized place for retrying,
+    // perhaps the rate limited queue, and dedupe all plugins with that.
+    if (status == null) {
+      return false
+    }
+    if (status === 403 && err.message === 'Request has expired') {
+      if (!requests.isPaused) {
+        // We don't want to exhaust the retryDelayIterator as long as there are
+        // more than one request in parallel, to give slower connection a chance
+        // to catch up with the expiry set in Companion.
+        if (requests.limit === 1 || this.#previousRetryDelay == null) {
+          const next = retryDelayIterator.next()
+          if (next == null || next.done) {
+            return false
+          }
+          // If there are more than 1 request done in parallel, the RLQ limit is
+          // decreased and the failed request is requeued after waiting for a bit.
+          // If there is only one request in parallel, the limit can't be
+          // decreased, so we iterate over `retryDelayIterator` as we do for
+          // other failures.
+          // `#previousRetryDelay` caches the value so we can re-use it next time.
+          this.#previousRetryDelay = next.value
+        }
+        // No need to stop the other requests, we just want to lower the limit.
+        requests.rateLimit(0)
+        await new Promise((resolve) =>
+          setTimeout(resolve, this.#previousRetryDelay),
+        )
+      }
+    } else if (status === 429) {
+      // HTTP 429 Too Many Requests => to avoid the whole download to fail, pause all requests.
+      if (!requests.isPaused) {
+        const next = retryDelayIterator.next()
+        if (next == null || next.done) {
+          return false
+        }
+        requests.rateLimit(next.value)
+      }
+    } else if (status > 400 && status < 500 && status !== 409) {
+      // HTTP 4xx, the server won't send anything, it's doesn't make sense to retry
+      return false
+    } else if (typeof navigator !== 'undefined' && navigator.onLine === false) {
+      // The navigator is offline, let's wait for it to come back online.
+      if (!requests.isPaused) {
+        requests.pause()
+        window.addEventListener(
+          'online',
+          () => {
+            requests.resume()
+          },
+          { once: true },
+        )
+      }
+    } else {
+      // Other error code means the request can be retried later.
+      const next = retryDelayIterator.next()
+      if (next == null || next.done) {
+        return false
+      }
+      await new Promise((resolve) => setTimeout(resolve, next.value))
+    }
+    return true
+  }
+
+  async getUploadId(
+    file: UppyFile<M, B>,
+    signal: AbortSignal,
+  ): Promise<UploadResult> {
+    let cachedResult
+    // As the cache is updated asynchronously, there could be a race condition
+    // where we just miss a new result so we loop here until we get nothing back,
+    // at which point it's out turn to create a new cache entry.
+    // eslint-disable-next-line no-cond-assign
+    while ((cachedResult = this.#cache.get(file.data)) != null) {
+      try {
+        return await cachedResult
+      } catch {
+        // In case of failure, we want to ignore the cached error.
+        // At this point, either there's a new cached value, or we'll exit the loop a create a new one.
+      }
+    }
+
+    const promise = this.#createMultipartUpload(this.#getFile(file), signal)
+
+    const abortPromise = () => {
+      promise.abort(signal.reason)
+      this.#cache.delete(file.data)
+    }
+    signal.addEventListener('abort', abortPromise, { once: true })
+    this.#cache.set(file.data, promise)
+    promise.then(
+      async (result) => {
+        signal.removeEventListener('abort', abortPromise)
+        this.#setS3MultipartState(file, result)
+        this.#cache.set(file.data, result)
+      },
+      () => {
+        signal.removeEventListener('abort', abortPromise)
+        this.#cache.delete(file.data)
+      },
+    )
+
+    return promise
+  }
+
+  async abortFileUpload(file: UppyFile<M, B>): Promise<void> {
+    const result = this.#cache.get(file.data)
+    if (result == null) {
+      // If the createMultipartUpload request never was made, we don't
+      // need to send the abortMultipartUpload request.
+      return
+    }
+    // Remove the cache entry right away for follow-up requests do not try to
+    // use the soon-to-be aborted cached values.
+    this.#cache.delete(file.data)
+    this.#setS3MultipartState(file, Object.create(null))
+    let awaitedResult
+    try {
+      awaitedResult = await result
+    } catch {
+      // If the cached result rejects, there's nothing to abort.
+      return
+    }
+    await this.#abortMultipartUpload(this.#getFile(file), awaitedResult)
+  }
+
+  async #nonMultipartUpload(
+    file: UppyFile<M, B>,
+    chunk: Chunk,
+    signal?: AbortSignal,
+  ): Promise<UploadPartBytesResult & B> {
+    const {
+      method = 'POST',
+      url,
+      fields,
+      headers,
+    } = await this.#getUploadParameters(this.#getFile(file), {
+      signal,
+    }).abortOn(signal)
+
+    let body
+    const data = chunk.getData()
+    if (method.toUpperCase() === 'POST') {
+      const formData = new FormData()
+      Object.entries(fields!).forEach(([key, value]) =>
+        formData.set(key, value),
+      )
+      formData.set('file', data)
+      body = formData
+    } else {
+      body = data
+    }
+
+    const { onProgress, onComplete } = chunk
+
+    const result = await this.#uploadPartBytes({
+      signature: { url, headers, method } as any,
+      body,
+      size: data.size,
+      onProgress,
+      onComplete,
+      signal,
+    }).abortOn(signal)
+
+    return 'location' in result ?
+        (result as UploadPartBytesResult & B)
+      : ({
+          location: removeMetadataFromURL(url),
+          ...result,
+        } as any)
+  }
+
+  async uploadFile(
+    file: UppyFile<M, B>,
+    chunks: Chunk[],
+    signal: AbortSignal,
+  ): Promise<B & Partial<UploadPartBytesResult>> {
+    throwIfAborted(signal)
+    if (chunks.length === 1 && !chunks[0].shouldUseMultipart) {
+      return this.#nonMultipartUpload(file, chunks[0], signal)
+    }
+    const { uploadId, key } = await this.getUploadId(file, signal)
+    throwIfAborted(signal)
+    try {
+      const parts = await Promise.all(
+        chunks.map((chunk, i) => this.uploadChunk(file, i + 1, chunk, signal)),
+      )
+      throwIfAborted(signal)
+      return await this.#sendCompletionRequest(
+        this.#getFile(file),
+        { key, uploadId, parts, signal },
+        signal,
+      ).abortOn(signal)
+    } catch (err) {
+      if (err?.cause !== pausingUploadReason && err?.name !== 'AbortError') {
+        // We purposefully don't wait for the promise and ignore its status,
+        // because we want the error `err` to bubble up ASAP to report it to the
+        // user. A failure to abort is not that big of a deal anyway.
+        this.abortFileUpload(file)
+      }
+      throw err
+    }
+  }
+
+  restoreUploadFile(file: UppyFile<M, B>, uploadIdAndKey: UploadResult): void {
+    this.#cache.set(file.data, uploadIdAndKey)
+  }
+
+  async resumeUploadFile(
+    file: UppyFile<M, B>,
+    chunks: Array<Chunk | null>,
+    signal: AbortSignal,
+  ): Promise<B> {
+    throwIfAborted(signal)
+    if (
+      chunks.length === 1 &&
+      chunks[0] != null &&
+      !chunks[0].shouldUseMultipart
+    ) {
+      return this.#nonMultipartUpload(file, chunks[0], signal)
+    }
+    const { uploadId, key } = await this.getUploadId(file, signal)
+    throwIfAborted(signal)
+    const alreadyUploadedParts = await this.#listParts(
+      this.#getFile(file),
+      { uploadId, key, signal },
+      signal,
+    ).abortOn(signal)
+    throwIfAborted(signal)
+    const parts = await Promise.all(
+      chunks.map((chunk, i) => {
+        const partNumber = i + 1
+        const alreadyUploadedInfo = alreadyUploadedParts.find(
+          ({ PartNumber }) => PartNumber === partNumber,
+        )
+        if (alreadyUploadedInfo == null) {
+          return this.uploadChunk(file, partNumber, chunk!, signal)
+        }
+        // Already uploaded chunks are set to null. If we are restoring the upload, we need to mark it as already uploaded.
+        chunk?.setAsUploaded?.()
+        return { PartNumber: partNumber, ETag: alreadyUploadedInfo.ETag }
+      }),
+    )
+    throwIfAborted(signal)
+    return this.#sendCompletionRequest(
+      this.#getFile(file),
+      { key, uploadId, parts, signal },
+      signal,
+    ).abortOn(signal)
+  }
+
+  async uploadChunk(
+    file: UppyFile<M, B>,
+    partNumber: number,
+    chunk: Chunk,
+    signal: AbortSignal,
+  ): Promise<UploadPartBytesResult & { PartNumber: number }> {
+    throwIfAborted(signal)
+    const { uploadId, key } = await this.getUploadId(file, signal)
+
+    const signatureRetryIterator = this.#retryDelays.values()
+    const chunkRetryIterator = this.#retryDelays.values()
+    const shouldRetrySignature = () => {
+      const next = signatureRetryIterator.next()
+      if (next == null || next.done) {
+        return null
+      }
+      return next.value
+    }
+
+    for (;;) {
+      throwIfAborted(signal)
+      const chunkData = chunk.getData()
+      const { onProgress, onComplete } = chunk
+      let signature
+
+      try {
+        signature = await this.#fetchSignature(this.#getFile(file), {
+          uploadId,
+          key,
+          partNumber,
+          body: chunkData,
+          signal,
+        }).abortOn(signal)
+      } catch (err) {
+        const timeout = shouldRetrySignature()
+        if (timeout == null || signal.aborted) {
+          throw err
+        }
+        await new Promise((resolve) => setTimeout(resolve, timeout))
+        // eslint-disable-next-line no-continue
+        continue
+      }
+
+      throwIfAborted(signal)
+      try {
+        return {
+          PartNumber: partNumber,
+          ...(await this.#uploadPartBytes({
+            signature,
+            body: chunkData,
+            size: chunkData.size,
+            onProgress,
+            onComplete,
+            signal,
+          }).abortOn(signal)),
+        }
+      } catch (err) {
+        if (!(await this.#shouldRetry(err, chunkRetryIterator))) throw err
+      }
+    }
+  }
+}

+ 97 - 56
packages/@uppy/aws-s3-multipart/src/MultipartUploader.js → packages/@uppy/aws-s3-multipart/src/MultipartUploader.ts

@@ -1,24 +1,53 @@
+import type { Uppy } from '@uppy/core'
 import { AbortController } from '@uppy/utils/lib/AbortController'
+import type { Meta, UppyFile } from '@uppy/utils/lib/UppyFile'
+import type { HTTPCommunicationQueue } from './HTTPCommunicationQueue'
+import type { Body } from './utils'
 
 const MB = 1024 * 1024
 
+interface MultipartUploaderOptions<M extends Meta, B extends Body> {
+  getChunkSize?: (file: { size: number }) => number
+  onProgress?: (bytesUploaded: number, bytesTotal: number) => void
+  onPartComplete?: (part: { PartNumber: number; ETag: string }) => void
+  shouldUseMultipart?: boolean | ((file: UppyFile<M, B>) => boolean)
+  onSuccess?: (result: B) => void
+  onError?: (err: unknown) => void
+  companionComm: HTTPCommunicationQueue<M, B>
+  file: UppyFile<M, B>
+  log: Uppy<M, B>['log']
+
+  uploadId: string
+  key: string
+}
+
 const defaultOptions = {
-  getChunkSize (file) {
+  getChunkSize(file: { size: number }) {
     return Math.ceil(file.size / 10000)
   },
-  onProgress () {},
-  onPartComplete () {},
-  onSuccess () {},
-  onError (err) {
+  onProgress() {},
+  onPartComplete() {},
+  onSuccess() {},
+  onError(err: unknown) {
     throw err
   },
+} satisfies Partial<MultipartUploaderOptions<any, any>>
+
+export interface Chunk {
+  getData: () => Blob
+  onProgress: (ev: ProgressEvent) => void
+  onComplete: (etag: string) => void
+  shouldUseMultipart: boolean
+  setAsUploaded?: () => void
 }
 
-function ensureInt (value) {
+function ensureInt<T>(value: T): T extends number | string ? number : never {
   if (typeof value === 'string') {
+    // @ts-expect-error TS is not able to recognize it's fine.
     return parseInt(value, 10)
   }
   if (typeof value === 'number') {
+    // @ts-expect-error TS is not able to recognize it's fine.
     return value
   }
   throw new TypeError('Expected a number')
@@ -32,47 +61,41 @@ export const pausingUploadReason = Symbol('pausing upload, not an actual error')
  * (based on the user-provided `shouldUseMultipart` option value) and to manage
  * the chunk splitting.
  */
-class MultipartUploader {
+class MultipartUploader<M extends Meta, B extends Body> {
+  options: MultipartUploaderOptions<M, B> &
+    Required<Pick<MultipartUploaderOptions<M, B>, keyof typeof defaultOptions>>
+
   #abortController = new AbortController()
 
-  /** @type {import("../types/chunk").Chunk[]} */
-  #chunks
+  #chunks: Array<Chunk | null>
 
-  /** @type {{ uploaded: number, etag?: string, done?: boolean }[]} */
-  #chunkState
+  #chunkState: { uploaded: number; etag?: string; done?: boolean }[]
 
   /**
    * The (un-chunked) data to upload.
-   *
-   * @type {Blob}
    */
-  #data
+  #data: Blob
 
-  /** @type {import("@uppy/core").UppyFile} */
-  #file
+  #file: UppyFile<M, B>
 
-  /** @type {boolean} */
   #uploadHasStarted = false
 
-  /** @type {(err?: Error | any) => void} */
-  #onError
+  #onError: (err: unknown) => void
 
-  /** @type {() => void} */
-  #onSuccess
+  #onSuccess: (result: B) => void
 
-  /** @type {import('../types/index').AwsS3MultipartOptions["shouldUseMultipart"]} */
-  #shouldUseMultipart
+  #shouldUseMultipart: MultipartUploaderOptions<M, B>['shouldUseMultipart']
 
-  /** @type {boolean} */
-  #isRestoring
+  #isRestoring: boolean
 
-  #onReject = (err) => (err?.cause === pausingUploadReason ? null : this.#onError(err))
+  #onReject = (err: unknown) =>
+    (err as any)?.cause === pausingUploadReason ? null : this.#onError(err)
 
   #maxMultipartParts = 10_000
 
   #minPartSize = 5 * MB
 
-  constructor (data, options) {
+  constructor(data: Blob, options: MultipartUploaderOptions<M, B>) {
     this.options = {
       ...defaultOptions,
       ...options,
@@ -89,7 +112,7 @@ class MultipartUploader {
     // When we are restoring an upload, we already have an UploadId and a Key. Otherwise
     // we need to call `createMultipartUpload` to get an `uploadId` and a `key`.
     // Non-multipart uploads are not restorable.
-    this.#isRestoring = options.uploadId && options.key
+    this.#isRestoring = (options.uploadId && options.key) as any as boolean
 
     this.#initChunks()
   }
@@ -98,15 +121,19 @@ class MultipartUploader {
   // and calculates the optimal part size. When using multipart part uploads every part except for the last has
   // to be at least 5 MB and there can be no more than 10K parts.
   // This means we sometimes need to change the preferred part size from the user in order to meet these requirements.
-  #initChunks () {
+  #initChunks() {
     const fileSize = this.#data.size
-    const shouldUseMultipart = typeof this.#shouldUseMultipart === 'function'
-      ? this.#shouldUseMultipart(this.#file)
+    const shouldUseMultipart =
+      typeof this.#shouldUseMultipart === 'function' ?
+        this.#shouldUseMultipart(this.#file)
       : Boolean(this.#shouldUseMultipart)
 
     if (shouldUseMultipart && fileSize > this.#minPartSize) {
       // At least 5MB per request:
-      let chunkSize = Math.max(this.options.getChunkSize(this.#data), this.#minPartSize)
+      let chunkSize = Math.max(
+        this.options.getChunkSize(this.#data),
+        this.#minPartSize,
+      )
       let arraySize = Math.floor(fileSize / chunkSize)
 
       // At most 10k requests per file:
@@ -132,41 +159,48 @@ class MultipartUploader {
           shouldUseMultipart,
         }
         if (this.#isRestoring) {
-          const size = offset + chunkSize > fileSize ? fileSize - offset : chunkSize
+          const size =
+            offset + chunkSize > fileSize ? fileSize - offset : chunkSize
           // setAsUploaded is called by listPart, to keep up-to-date the
           // quantity of data that is left to actually upload.
-          this.#chunks[j].setAsUploaded = () => {
+          this.#chunks[j]!.setAsUploaded = () => {
             this.#chunks[j] = null
             this.#chunkState[j].uploaded = size
           }
         }
       }
     } else {
-      this.#chunks = [{
-        getData: () => this.#data,
-        onProgress: this.#onPartProgress(0),
-        onComplete: this.#onPartComplete(0),
-        shouldUseMultipart,
-      }]
+      this.#chunks = [
+        {
+          getData: () => this.#data,
+          onProgress: this.#onPartProgress(0),
+          onComplete: this.#onPartComplete(0),
+          shouldUseMultipart,
+        },
+      ]
     }
 
     this.#chunkState = this.#chunks.map(() => ({ uploaded: 0 }))
   }
 
-  #createUpload () {
-    this
-      .options.companionComm.uploadFile(this.#file, this.#chunks, this.#abortController.signal)
+  #createUpload() {
+    this.options.companionComm
+      .uploadFile(
+        this.#file,
+        this.#chunks as Chunk[],
+        this.#abortController.signal,
+      )
       .then(this.#onSuccess, this.#onReject)
     this.#uploadHasStarted = true
   }
 
-  #resumeUpload () {
-    this
-      .options.companionComm.resumeUploadFile(this.#file, this.#chunks, this.#abortController.signal)
+  #resumeUpload() {
+    this.options.companionComm
+      .resumeUploadFile(this.#file, this.#chunks, this.#abortController.signal)
       .then(this.#onSuccess, this.#onReject)
   }
 
-  #onPartProgress = (index) => (ev) => {
+  #onPartProgress = (index: number) => (ev: ProgressEvent) => {
     if (!ev.lengthComputable) return
 
     this.#chunkState[index].uploaded = ensureInt(ev.loaded)
@@ -175,7 +209,7 @@ class MultipartUploader {
     this.options.onProgress(totalUploaded, this.#data.size)
   }
 
-  #onPartComplete = (index) => (etag) => {
+  #onPartComplete = (index: number) => (etag: string) => {
     // This avoids the net::ERR_OUT_OF_MEMORY in Chromium Browsers.
     this.#chunks[index] = null
     this.#chunkState[index].etag = etag
@@ -188,37 +222,44 @@ class MultipartUploader {
     this.options.onPartComplete(part)
   }
 
-  #abortUpload () {
+  #abortUpload() {
     this.#abortController.abort()
-    this.options.companionComm.abortFileUpload(this.#file).catch((err) => this.options.log(err))
+    this.options.companionComm
+      .abortFileUpload(this.#file)
+      .catch((err: unknown) => this.options.log(err as Error))
   }
 
-  start () {
+  start(): void {
     if (this.#uploadHasStarted) {
-      if (!this.#abortController.signal.aborted) this.#abortController.abort(pausingUploadReason)
+      if (!this.#abortController.signal.aborted)
+        this.#abortController.abort(pausingUploadReason)
       this.#abortController = new AbortController()
       this.#resumeUpload()
     } else if (this.#isRestoring) {
-      this.options.companionComm.restoreUploadFile(this.#file, { uploadId: this.options.uploadId, key: this.options.key })
+      this.options.companionComm.restoreUploadFile(this.#file, {
+        uploadId: this.options.uploadId,
+        key: this.options.key,
+      })
       this.#resumeUpload()
     } else {
       this.#createUpload()
     }
   }
 
-  pause () {
+  pause(): void {
     this.#abortController.abort(pausingUploadReason)
     // Swap it out for a new controller, because this instance may be resumed later.
     this.#abortController = new AbortController()
   }
 
-  abort (opts = undefined) {
+  abort(opts?: { really?: boolean }): void {
     if (opts?.really) this.#abortUpload()
     else this.pause()
   }
 
   // TODO: remove this in the next major
-  get chunkState () {
+  // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
+  get chunkState() {
     return this.#chunkState
   }
 }

+ 0 - 113
packages/@uppy/aws-s3-multipart/src/createSignedURL.test.js

@@ -1,113 +0,0 @@
-import { describe, it, beforeEach, afterEach } from 'vitest'
-import assert from 'node:assert'
-import { S3Client, UploadPartCommand, PutObjectCommand } from '@aws-sdk/client-s3'
-import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
-import createSignedURL from './createSignedURL.js'
-
-const bucketName = 'some-bucket'
-const s3ClientOptions = {
-  region: 'us-bar-1',
-  credentials: {
-    accessKeyId: 'foo',
-    secretAccessKey: 'bar',
-    sessionToken: 'foobar',
-  },
-}
-const { Date: OriginalDate } = globalThis
-
-describe('createSignedURL', () => {
-  beforeEach(() => {
-    const now_ms = OriginalDate.now()
-    globalThis.Date = function Date () {
-      if (new.target) {
-        return Reflect.construct(OriginalDate, [now_ms])
-      }
-      return Reflect.apply(OriginalDate, this, [now_ms])
-    }
-    globalThis.Date.now = function now () {
-      return now_ms
-    }
-  })
-  afterEach(() => {
-    globalThis.Date = OriginalDate
-  })
-  it('should be able to sign non-multipart upload', async () => {
-    const client = new S3Client(s3ClientOptions)
-    assert.strictEqual(
-      (await createSignedURL({
-        accountKey: s3ClientOptions.credentials.accessKeyId,
-        accountSecret: s3ClientOptions.credentials.secretAccessKey,
-        sessionToken: s3ClientOptions.credentials.sessionToken,
-        bucketName,
-        Key: 'some/key',
-        Region: s3ClientOptions.region,
-        expires: 900,
-      })).searchParams.get('X-Amz-Signature'),
-      new URL(await getSignedUrl(client, new PutObjectCommand({
-        Bucket: bucketName,
-        Fields: {},
-        Key: 'some/key',
-      }), { expiresIn: 900 })).searchParams.get('X-Amz-Signature'),
-    )
-  })
-  it('should be able to sign multipart upload', async () => {
-    const client = new S3Client(s3ClientOptions)
-    const partNumber = 99
-    const uploadId = 'dummyUploadId'
-    assert.strictEqual(
-      (await createSignedURL({
-        accountKey: s3ClientOptions.credentials.accessKeyId,
-        accountSecret: s3ClientOptions.credentials.secretAccessKey,
-        sessionToken: s3ClientOptions.credentials.sessionToken,
-        uploadId,
-        partNumber,
-        bucketName,
-        Key: 'some/key',
-        Region: s3ClientOptions.region,
-        expires: 900,
-      })).searchParams.get('X-Amz-Signature'),
-      new URL(await getSignedUrl(client, new UploadPartCommand({
-        Bucket: bucketName,
-        UploadId: uploadId,
-        PartNumber: partNumber,
-        Key: 'some/key',
-      }), { expiresIn: 900 })).searchParams.get('X-Amz-Signature'),
-    )
-  })
-  it('should escape path and query as restricted to RFC 3986', async () => {
-    const client = new S3Client(s3ClientOptions)
-    const partNumber = 99
-    const specialChars = ';?:@&=+$,#!\'()'
-    const uploadId = `Upload${specialChars}Id`
-    // '.*' chars of path should be encoded
-    const Key = `${specialChars}.*/${specialChars}.*.ext`
-    const implResult =
-      await createSignedURL({
-        accountKey: s3ClientOptions.credentials.accessKeyId,
-        accountSecret: s3ClientOptions.credentials.secretAccessKey,
-        sessionToken: s3ClientOptions.credentials.sessionToken,
-        uploadId,
-        partNumber,
-        bucketName,
-        Key,
-        Region: s3ClientOptions.region,
-        expires: 900,
-      })
-    const sdkResult =
-      new URL(
-        await getSignedUrl(client, new UploadPartCommand({
-          Bucket: bucketName,
-          UploadId: uploadId,
-          PartNumber: partNumber,
-          Key,
-        }), { expiresIn: 900 }
-      )
-    )
-    assert.strictEqual(implResult.pathname, sdkResult.pathname)
-
-    const extractUploadId = /([?&])uploadId=([^&]+?)(&|$)/
-    const extractSignature = /([?&])X-Amz-Signature=([^&]+?)(&|$)/
-    assert.strictEqual(implResult.search.match(extractUploadId)[2], sdkResult.search.match(extractUploadId)[2])
-    assert.strictEqual(implResult.search.match(extractSignature)[2], sdkResult.search.match(extractSignature)[2])
-  })
-})

+ 141 - 0
packages/@uppy/aws-s3-multipart/src/createSignedURL.test.ts

@@ -0,0 +1,141 @@
+import { describe, it, beforeEach, afterEach } from 'vitest'
+import assert from 'node:assert'
+import {
+  S3Client,
+  UploadPartCommand,
+  PutObjectCommand,
+} from '@aws-sdk/client-s3'
+import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
+import createSignedURL from './createSignedURL.ts'
+
+const bucketName = 'some-bucket'
+const s3ClientOptions = {
+  region: 'us-bar-1',
+  credentials: {
+    accessKeyId: 'foo',
+    secretAccessKey: 'bar',
+    sessionToken: 'foobar',
+  },
+}
+const { Date: OriginalDate } = globalThis
+
+describe('createSignedURL', () => {
+  beforeEach(() => {
+    const now_ms = OriginalDate.now()
+    // @ts-expect-error we're touching globals for test purposes.
+    globalThis.Date = function Date() {
+      if (new.target) {
+        return Reflect.construct(OriginalDate, [now_ms])
+      }
+      return Reflect.apply(OriginalDate, this, [now_ms])
+    }
+    globalThis.Date.now = function now() {
+      return now_ms
+    }
+  })
+  afterEach(() => {
+    globalThis.Date = OriginalDate
+  })
+  it('should be able to sign non-multipart upload', async () => {
+    const client = new S3Client(s3ClientOptions)
+    assert.strictEqual(
+      (
+        await createSignedURL({
+          accountKey: s3ClientOptions.credentials.accessKeyId,
+          accountSecret: s3ClientOptions.credentials.secretAccessKey,
+          sessionToken: s3ClientOptions.credentials.sessionToken,
+          bucketName,
+          Key: 'some/key',
+          Region: s3ClientOptions.region,
+          expires: 900,
+        })
+      ).searchParams.get('X-Amz-Signature'),
+      new URL(
+        await getSignedUrl(
+          client,
+          new PutObjectCommand({
+            Bucket: bucketName,
+            Key: 'some/key',
+          }),
+          { expiresIn: 900 },
+        ),
+      ).searchParams.get('X-Amz-Signature'),
+    )
+  })
+  it('should be able to sign multipart upload', async () => {
+    const client = new S3Client(s3ClientOptions)
+    const partNumber = 99
+    const uploadId = 'dummyUploadId'
+    assert.strictEqual(
+      (
+        await createSignedURL({
+          accountKey: s3ClientOptions.credentials.accessKeyId,
+          accountSecret: s3ClientOptions.credentials.secretAccessKey,
+          sessionToken: s3ClientOptions.credentials.sessionToken,
+          uploadId,
+          partNumber,
+          bucketName,
+          Key: 'some/key',
+          Region: s3ClientOptions.region,
+          expires: 900,
+        })
+      ).searchParams.get('X-Amz-Signature'),
+      new URL(
+        await getSignedUrl(
+          client,
+          new UploadPartCommand({
+            Bucket: bucketName,
+            UploadId: uploadId,
+            PartNumber: partNumber,
+            Key: 'some/key',
+          }),
+          { expiresIn: 900 },
+        ),
+      ).searchParams.get('X-Amz-Signature'),
+    )
+  })
+
+  it('should escape path and query as restricted to RFC 3986', async () => {
+    const client = new S3Client(s3ClientOptions)
+    const partNumber = 99
+    const specialChars = ";?:@&=+$,#!'()"
+    const uploadId = `Upload${specialChars}Id`
+    // '.*' chars of path should be encoded
+    const Key = `${specialChars}.*/${specialChars}.*.ext`
+    const implResult = await createSignedURL({
+      accountKey: s3ClientOptions.credentials.accessKeyId,
+      accountSecret: s3ClientOptions.credentials.secretAccessKey,
+      sessionToken: s3ClientOptions.credentials.sessionToken,
+      uploadId,
+      partNumber,
+      bucketName,
+      Key,
+      Region: s3ClientOptions.region,
+      expires: 900,
+    })
+    const sdkResult = new URL(
+      await getSignedUrl(
+        client,
+        new UploadPartCommand({
+          Bucket: bucketName,
+          UploadId: uploadId,
+          PartNumber: partNumber,
+          Key,
+        }),
+        { expiresIn: 900 },
+      ),
+    )
+    assert.strictEqual(implResult.pathname, sdkResult.pathname)
+
+    const extractUploadId = /([?&])uploadId=([^&]+?)(&|$)/
+    const extractSignature = /([?&])X-Amz-Signature=([^&]+?)(&|$)/
+    assert.strictEqual(
+      implResult.search.match(extractUploadId)![2],
+      sdkResult.search.match(extractUploadId)![2],
+    )
+    assert.strictEqual(
+      implResult.search.match(extractSignature)![2],
+      sdkResult.search.match(extractSignature)![2],
+    )
+  })
+})

+ 55 - 31
packages/@uppy/aws-s3-multipart/src/createSignedURL.js → packages/@uppy/aws-s3-multipart/src/createSignedURL.ts

@@ -5,44 +5,51 @@
  *
  * @see https://docs.aws.amazon.com/IAM/latest/UserGuide/create-signed-request.html#create-canonical-request
  *
- * @param {object} param0
- * @param {string} param0.method – The HTTP method.
- * @param {string} param0.CanonicalUri – The URI-encoded version of the absolute
+ * @param param0
+ * @param param0.method – The HTTP method.
+ * @param param0.CanonicalUri – The URI-encoded version of the absolute
  * path component URL (everything between the host and the question mark
  * character (?) that starts the query string parameters). If the absolute path
  * is empty, use a forward slash character (/).
- * @param {string} param0.CanonicalQueryString – The URL-encoded query string
+ * @param param0.CanonicalQueryString – The URL-encoded query string
  * parameters, separated by ampersands (&). Percent-encode reserved characters,
  * including the space character. Encode names and values separately. If there
  * are empty parameters, append the equals sign to the parameter name before
  * encoding. After encoding, sort the parameters alphabetically by key name. If
  * there is no query string, use an empty string ("").
- * @param {Record<string, string>} param0.SignedHeaders – The request headers,
+ * @param param0.SignedHeaders – The request headers,
  * that will be signed, and their values, separated by newline characters.
  * For the values, trim any leading or trailing spaces, convert sequential
  * spaces to a single space, and separate the values for a multi-value header
  * using commas. You must include the host header (HTTP/1.1), and any x-amz-*
  * headers in the signature. You can optionally include other standard headers
  * in the signature, such as content-type.
- * @param {string} param0.HashedPayload – A string created using the payload in
+ * @param param0.HashedPayload – A string created using the payload in
  * the body of the HTTP request as input to a hash function. This string uses
  * lowercase hexadecimal characters. If the payload is empty, use an empty
  * string as the input to the hash function.
- * @returns {string}
  */
-function createCanonicalRequest ({
+function createCanonicalRequest({
   method = 'PUT',
   CanonicalUri = '/',
   CanonicalQueryString = '',
   SignedHeaders,
   HashedPayload,
-}) {
-  const headerKeys = Object.keys(SignedHeaders).map(k => k.toLowerCase()).sort()
+}: {
+  method?: string
+  CanonicalUri: string
+  CanonicalQueryString: string
+  SignedHeaders: Record<string, string>
+  HashedPayload: string
+}): string {
+  const headerKeys = Object.keys(SignedHeaders)
+    .map((k) => k.toLowerCase())
+    .sort()
   return [
     method,
     CanonicalUri,
     CanonicalQueryString,
-    ...headerKeys.map(k => `${k}:${SignedHeaders[k]}`),
+    ...headerKeys.map((k) => `${k}:${SignedHeaders[k]}`),
     '',
     headerKeys.join(';'),
     HashedPayload,
@@ -52,17 +59,23 @@ function createCanonicalRequest ({
 const ec = new TextEncoder()
 const algorithm = { name: 'HMAC', hash: 'SHA-256' }
 
-async function digest (data) {
+async function digest(data: string): ReturnType<SubtleCrypto['digest']> {
   const { subtle } = globalThis.crypto
   return subtle.digest(algorithm.hash, ec.encode(data))
 }
 
-async function generateHmacKey (secret) {
+async function generateHmacKey(secret: string | Uint8Array | ArrayBuffer) {
   const { subtle } = globalThis.crypto
-  return subtle.importKey('raw', typeof secret === 'string' ? ec.encode(secret) : secret, algorithm, false, ['sign'])
+  return subtle.importKey(
+    'raw',
+    typeof secret === 'string' ? ec.encode(secret) : secret,
+    algorithm,
+    false,
+    ['sign'],
+  )
 }
 
-function arrayBufferToHexString (arrayBuffer) {
+function arrayBufferToHexString(arrayBuffer: ArrayBuffer) {
   const byteArray = new Uint8Array(arrayBuffer)
   let hexString = ''
   for (let i = 0; i < byteArray.length; i++) {
@@ -71,27 +84,35 @@ function arrayBufferToHexString (arrayBuffer) {
   return hexString
 }
 
-async function hash (key, data) {
+async function hash(key: Parameters<typeof generateHmacKey>[0], data: string) {
   const { subtle } = globalThis.crypto
   return subtle.sign(algorithm, await generateHmacKey(key), ec.encode(data))
 }
 
-function percentEncode(c) {
-  return `%${c.charCodeAt(0).toString(16).toUpperCase()}`
-}
-
 /**
  * @see https://docs.aws.amazon.com/IAM/latest/UserGuide/create-signed-request.html
- * @param {Record<string,string>} param0
- * @returns {Promise<URL>} the signed URL
  */
-export default async function createSignedURL ({
-  accountKey, accountSecret, sessionToken,
+export default async function createSignedURL({
+  accountKey,
+  accountSecret,
+  sessionToken,
   bucketName,
-  Key, Region,
+  Key,
+  Region,
   expires,
-  uploadId, partNumber,
-}) {
+  uploadId,
+  partNumber,
+}: {
+  accountKey: string
+  accountSecret: string
+  sessionToken: string
+  bucketName: string
+  Key: string
+  Region: string
+  expires: string | number
+  uploadId?: string
+  partNumber?: string | number
+}): Promise<URL> {
   const Service = 's3'
   const host = `${bucketName}.${Service}.${Region}.amazonaws.com`
   /**
@@ -100,7 +121,7 @@ export default async function createSignedURL ({
    *
    * @see https://tc39.es/ecma262/#sec-encodeuri-uri
    */
-  const CanonicalUri = `/${encodeURI(Key).replace(/[;?:@&=+$,#!'()*]/g, percentEncode)}`
+  const CanonicalUri = `/${encodeURI(Key).replace(/[;?:@&=+$,#!'()*]/g, (c) => `%${c.charCodeAt(0).toString(16).toUpperCase()}`)}`
   const payload = 'UNSIGNED-PAYLOAD'
 
   const requestDateTime = new Date().toISOString().replace(/[-:]|\.\d+/g, '') // YYYYMMDDTHHMMSSZ
@@ -113,14 +134,17 @@ export default async function createSignedURL ({
   url.searchParams.set('X-Amz-Content-Sha256', payload)
   url.searchParams.set('X-Amz-Credential', `${accountKey}/${scope}`)
   url.searchParams.set('X-Amz-Date', requestDateTime)
-  url.searchParams.set('X-Amz-Expires', expires)
+  url.searchParams.set('X-Amz-Expires', expires as string)
   // We are signing on the client, so we expect there's going to be a session token:
   url.searchParams.set('X-Amz-Security-Token', sessionToken)
   url.searchParams.set('X-Amz-SignedHeaders', 'host')
   // Those two are present only for Multipart Uploads:
-  if (partNumber) url.searchParams.set('partNumber', partNumber)
+  if (partNumber) url.searchParams.set('partNumber', partNumber as string)
   if (uploadId) url.searchParams.set('uploadId', uploadId)
-  url.searchParams.set('x-id', partNumber && uploadId ? 'UploadPart' : 'PutObject')
+  url.searchParams.set(
+    'x-id',
+    partNumber && uploadId ? 'UploadPart' : 'PutObject',
+  )
 
   // Step 1: Create a canonical request
   const canonical = createCanonicalRequest({

+ 0 - 934
packages/@uppy/aws-s3-multipart/src/index.js

@@ -1,934 +0,0 @@
-import BasePlugin from '@uppy/core/lib/BasePlugin.js'
-import { RequestClient } from '@uppy/companion-client'
-import EventManager from '@uppy/utils/lib/EventManager'
-import { RateLimitedQueue } from '@uppy/utils/lib/RateLimitedQueue'
-import { filterNonFailedFiles, filterFilesToEmitUploadStarted } from '@uppy/utils/lib/fileFilters'
-import { createAbortError } from '@uppy/utils/lib/AbortController'
-
-import MultipartUploader, { pausingUploadReason } from './MultipartUploader.js'
-import createSignedURL from './createSignedURL.js'
-import packageJson from '../package.json'
-
-function assertServerError (res) {
-  if (res && res.error) {
-    const error = new Error(res.message)
-    Object.assign(error, res.error)
-    throw error
-  }
-  return res
-}
-
-function removeMetadataFromURL (urlString) {
-  const urlObject = new URL(urlString)
-  urlObject.search = ''
-  urlObject.hash = ''
-  return urlObject.href
-}
-
-/**
- * Computes the expiry time for a request signed with temporary credentials. If
- * no expiration was provided, or an invalid value (e.g. in the past) is
- * provided, undefined is returned. This function assumes the client clock is in
- * sync with the remote server, which is a requirement for the signature to be
- * validated for AWS anyway.
- *
- * @param {import('../types/index.js').AwsS3STSResponse['credentials']} credentials
- * @returns {number | undefined}
- */
-function getExpiry (credentials) {
-  const expirationDate = credentials.Expiration
-  if (expirationDate) {
-    const timeUntilExpiry = Math.floor((new Date(expirationDate) - Date.now()) / 1000)
-    if (timeUntilExpiry > 9) {
-      return timeUntilExpiry
-    }
-  }
-  return undefined
-}
-
-function getAllowedMetadata ({ meta, allowedMetaFields, querify = false }) {
-  const metaFields = allowedMetaFields ?? Object.keys(meta)
-
-  if (!meta) return {}
-
-  return Object.fromEntries(
-    metaFields
-      .filter(key => meta[key] != null)
-      .map((key) => {
-        const realKey = querify ? `metadata[${key}]` : key
-        const value = String(meta[key])
-        return [realKey, value]
-      }),
-  )
-}
-
-function throwIfAborted (signal) {
-  if (signal?.aborted) { throw createAbortError('The operation was aborted', { cause: signal.reason }) }
-}
-
-class HTTPCommunicationQueue {
-  #abortMultipartUpload
-
-  #cache = new WeakMap()
-
-  #createMultipartUpload
-
-  #fetchSignature
-
-  #getUploadParameters
-
-  #listParts
-
-  #previousRetryDelay
-
-  #requests
-
-  #retryDelays
-
-  #sendCompletionRequest
-
-  #setS3MultipartState
-
-  #uploadPartBytes
-
-  #getFile
-
-  constructor (requests, options, setS3MultipartState, getFile) {
-    this.#requests = requests
-    this.#setS3MultipartState = setS3MultipartState
-    this.#getFile = getFile
-    this.setOptions(options)
-  }
-
-  setOptions (options) {
-    const requests = this.#requests
-
-    if ('abortMultipartUpload' in options) {
-      this.#abortMultipartUpload = requests.wrapPromiseFunction(options.abortMultipartUpload, { priority:1 })
-    }
-    if ('createMultipartUpload' in options) {
-      this.#createMultipartUpload = requests.wrapPromiseFunction(options.createMultipartUpload, { priority:-1 })
-    }
-    if ('signPart' in options) {
-      this.#fetchSignature = requests.wrapPromiseFunction(options.signPart)
-    }
-    if ('listParts' in options) {
-      this.#listParts = requests.wrapPromiseFunction(options.listParts)
-    }
-    if ('completeMultipartUpload' in options) {
-      this.#sendCompletionRequest = requests.wrapPromiseFunction(options.completeMultipartUpload, { priority:1 })
-    }
-    if ('retryDelays' in options) {
-      this.#retryDelays = options.retryDelays ?? []
-    }
-    if ('uploadPartBytes' in options) {
-      this.#uploadPartBytes = requests.wrapPromiseFunction(options.uploadPartBytes, { priority:Infinity })
-    }
-    if ('getUploadParameters' in options) {
-      this.#getUploadParameters = requests.wrapPromiseFunction(options.getUploadParameters)
-    }
-  }
-
-  async #shouldRetry (err, retryDelayIterator) {
-    const requests = this.#requests
-    const status = err?.source?.status
-
-    // TODO: this retry logic is taken out of Tus. We should have a centralized place for retrying,
-    // perhaps the rate limited queue, and dedupe all plugins with that.
-    if (status == null) {
-      return false
-    }
-    if (status === 403 && err.message === 'Request has expired') {
-      if (!requests.isPaused) {
-        // We don't want to exhaust the retryDelayIterator as long as there are
-        // more than one request in parallel, to give slower connection a chance
-        // to catch up with the expiry set in Companion.
-        if (requests.limit === 1 || this.#previousRetryDelay == null) {
-          const next = retryDelayIterator.next()
-          if (next == null || next.done) {
-            return false
-          }
-          // If there are more than 1 request done in parallel, the RLQ limit is
-          // decreased and the failed request is requeued after waiting for a bit.
-          // If there is only one request in parallel, the limit can't be
-          // decreased, so we iterate over `retryDelayIterator` as we do for
-          // other failures.
-          // `#previousRetryDelay` caches the value so we can re-use it next time.
-          this.#previousRetryDelay = next.value
-        }
-        // No need to stop the other requests, we just want to lower the limit.
-        requests.rateLimit(0)
-        await new Promise(resolve => setTimeout(resolve, this.#previousRetryDelay))
-      }
-    } else if (status === 429) {
-      // HTTP 429 Too Many Requests => to avoid the whole download to fail, pause all requests.
-      if (!requests.isPaused) {
-        const next = retryDelayIterator.next()
-        if (next == null || next.done) {
-          return false
-        }
-        requests.rateLimit(next.value)
-      }
-    } else if (status > 400 && status < 500 && status !== 409) {
-      // HTTP 4xx, the server won't send anything, it's doesn't make sense to retry
-      return false
-    } else if (typeof navigator !== 'undefined' && navigator.onLine === false) {
-      // The navigator is offline, let's wait for it to come back online.
-      if (!requests.isPaused) {
-        requests.pause()
-        window.addEventListener('online', () => {
-          requests.resume()
-        }, { once: true })
-      }
-    } else {
-      // Other error code means the request can be retried later.
-      const next = retryDelayIterator.next()
-      if (next == null || next.done) {
-        return false
-      }
-      await new Promise(resolve => setTimeout(resolve, next.value))
-    }
-    return true
-  }
-
-  async getUploadId (file, signal) {
-    let cachedResult
-    // As the cache is updated asynchronously, there could be a race condition
-    // where we just miss a new result so we loop here until we get nothing back,
-    // at which point it's out turn to create a new cache entry.
-    while ((cachedResult = this.#cache.get(file.data)) != null) {
-      try {
-        return await cachedResult
-      } catch {
-        // In case of failure, we want to ignore the cached error.
-        // At this point, either there's a new cached value, or we'll exit the loop a create a new one.
-      }
-    }
-
-    const promise = this.#createMultipartUpload(this.#getFile(file), signal)
-
-    const abortPromise = () => {
-      promise.abort(signal.reason)
-      this.#cache.delete(file.data)
-    }
-    signal.addEventListener('abort', abortPromise, { once: true })
-    this.#cache.set(file.data, promise)
-    promise.then(async (result) => {
-      signal.removeEventListener('abort', abortPromise)
-      this.#setS3MultipartState(file, result)
-      this.#cache.set(file.data, result)
-    }, () => {
-      signal.removeEventListener('abort', abortPromise)
-      this.#cache.delete(file.data)
-    })
-
-    return promise
-  }
-
-  async abortFileUpload (file) {
-    const result = this.#cache.get(file.data)
-    if (result == null) {
-      // If the createMultipartUpload request never was made, we don't
-      // need to send the abortMultipartUpload request.
-      return
-    }
-    // Remove the cache entry right away for follow-up requests do not try to
-    // use the soon-to-be aborted chached values.
-    this.#cache.delete(file.data)
-    this.#setS3MultipartState(file, Object.create(null))
-    let awaitedResult
-    try {
-      awaitedResult = await result
-    } catch {
-      // If the cached result rejects, there's nothing to abort.
-      return
-    }
-    await this.#abortMultipartUpload(this.#getFile(file), awaitedResult)
-  }
-
-  async #nonMultipartUpload (file, chunk, signal) {
-    const {
-      method = 'POST',
-      url,
-      fields,
-      headers,
-    } = await this.#getUploadParameters(this.#getFile(file), { signal }).abortOn(signal)
-
-    let body
-    const data = chunk.getData()
-    if (method.toUpperCase() === 'POST') {
-      const formData = new FormData()
-      Object.entries(fields).forEach(([key, value]) => formData.set(key, value))
-      formData.set('file', data)
-      body = formData
-    } else {
-      body = data
-    }
-
-    const { onProgress, onComplete } = chunk
-
-    const result = await this.#uploadPartBytes({
-      signature: { url, headers, method },
-      body,
-      size: data.size,
-      onProgress,
-      onComplete,
-      signal,
-    }).abortOn(signal)
-
-    return 'location' in result ? result : {
-      location: removeMetadataFromURL(url),
-      ...result,
-    }
-  }
-
-  /**
-   * @param {import("@uppy/core").UppyFile} file
-   * @param {import("../types/chunk").Chunk[]} chunks
-   * @param {AbortSignal} signal
-   * @returns {Promise<void>}
-   */
-  async uploadFile (file, chunks, signal) {
-    throwIfAborted(signal)
-    if (chunks.length === 1 && !chunks[0].shouldUseMultipart) {
-      return this.#nonMultipartUpload(file, chunks[0], signal)
-    }
-    const { uploadId, key } = await this.getUploadId(file, signal)
-    throwIfAborted(signal)
-    try {
-      const parts = await Promise.all(chunks.map((chunk, i) => this.uploadChunk(file, i + 1, chunk, signal)))
-      throwIfAborted(signal)
-      return await this.#sendCompletionRequest(
-        this.#getFile(file),
-        { key, uploadId, parts, signal },
-        signal,
-      ).abortOn(signal)
-    } catch (err) {
-      if (err?.cause !== pausingUploadReason && err?.name !== 'AbortError') {
-        // We purposefully don't wait for the promise and ignore its status,
-        // because we want the error `err` to bubble up ASAP to report it to the
-        // user. A failure to abort is not that big of a deal anyway.
-        this.abortFileUpload(file)
-      }
-      throw err
-    }
-  }
-
-  restoreUploadFile (file, uploadIdAndKey) {
-    this.#cache.set(file.data, uploadIdAndKey)
-  }
-
-  async resumeUploadFile (file, chunks, signal) {
-    throwIfAborted(signal)
-    if (chunks.length === 1 && chunks[0] != null && !chunks[0].shouldUseMultipart) {
-      return this.#nonMultipartUpload(file, chunks[0], signal)
-    }
-    const { uploadId, key } = await this.getUploadId(file, signal)
-    throwIfAborted(signal)
-    const alreadyUploadedParts = await this.#listParts(
-      this.#getFile(file),
-      { uploadId, key, signal },
-      signal,
-    ).abortOn(signal)
-    throwIfAborted(signal)
-    const parts = await Promise.all(
-      chunks
-        .map((chunk, i) => {
-          const partNumber = i + 1
-          const alreadyUploadedInfo = alreadyUploadedParts.find(({ PartNumber }) => PartNumber === partNumber)
-          if (alreadyUploadedInfo == null) {
-            return this.uploadChunk(file, partNumber, chunk, signal)
-          }
-          // Already uploaded chunks are set to null. If we are restoring the upload, we need to mark it as already uploaded.
-          chunk?.setAsUploaded?.()
-          return { PartNumber: partNumber, ETag: alreadyUploadedInfo.ETag }
-        }),
-    )
-    throwIfAborted(signal)
-    return this.#sendCompletionRequest(
-      this.#getFile(file),
-      { key, uploadId, parts, signal },
-      signal,
-    ).abortOn(signal)
-  }
-
-  /**
-   *
-   * @param {import("@uppy/core").UppyFile} file
-   * @param {number} partNumber
-   * @param {import("../types/chunk").Chunk} chunk
-   * @param {AbortSignal} signal
-   * @returns {Promise<object>}
-   */
-  async uploadChunk (file, partNumber, chunk, signal) {
-    throwIfAborted(signal)
-    const { uploadId, key } = await this.getUploadId(file, signal)
-
-    const signatureRetryIterator = this.#retryDelays.values()
-    const chunkRetryIterator = this.#retryDelays.values()
-    const shouldRetrySignature = () => {
-      const next = signatureRetryIterator.next()
-      if (next == null || next.done) {
-        return null
-      }
-      return next.value
-    }
-
-    for (;;) {
-      throwIfAborted(signal)
-      const chunkData = chunk.getData()
-      const { onProgress, onComplete } = chunk
-      let signature
-
-      try {
-        signature = await this.#fetchSignature(this.#getFile(file), {
-          uploadId, key, partNumber, body: chunkData, signal,
-        }).abortOn(signal)
-      } catch (err) {
-        const timeout = shouldRetrySignature()
-        if (timeout == null || signal.aborted) {
-          throw err
-        }
-        await new Promise(resolve => setTimeout(resolve, timeout))
-        // eslint-disable-next-line no-continue
-        continue
-      }
-
-      throwIfAborted(signal)
-      try {
-        return {
-          PartNumber: partNumber,
-          ...await this.#uploadPartBytes({
-            signature, body: chunkData, size: chunkData.size, onProgress, onComplete, signal,
-          }).abortOn(signal),
-        }
-      } catch (err) {
-        if (!await this.#shouldRetry(err, chunkRetryIterator)) throw err
-      }
-    }
-  }
-}
-
-export default class AwsS3Multipart extends BasePlugin {
-  static VERSION = packageJson.version
-
-  #companionCommunicationQueue
-
-  #client
-
-  constructor (uppy, opts) {
-    super(uppy, opts)
-    this.type = 'uploader'
-    this.id = this.opts.id || 'AwsS3Multipart'
-    this.title = 'AWS S3 Multipart'
-    this.#client = new RequestClient(uppy, opts)
-
-    const defaultOptions = {
-      // TODO: null here means “include all”, [] means include none.
-      // This is inconsistent with @uppy/aws-s3 and @uppy/transloadit
-      allowedMetaFields: null,
-      limit: 6,
-      shouldUseMultipart: (file) => file.size !== 0, // TODO: Switch default to:
-      // eslint-disable-next-line no-bitwise
-      // shouldUseMultipart: (file) => file.size >> 10 >> 10 > 100,
-      retryDelays: [0, 1000, 3000, 5000],
-      createMultipartUpload: this.createMultipartUpload.bind(this),
-      listParts: this.listParts.bind(this),
-      abortMultipartUpload: this.abortMultipartUpload.bind(this),
-      completeMultipartUpload: this.completeMultipartUpload.bind(this),
-      getTemporarySecurityCredentials: false,
-      signPart: opts?.getTemporarySecurityCredentials ? this.createSignedURL.bind(this) : this.signPart.bind(this),
-      uploadPartBytes: AwsS3Multipart.uploadPartBytes,
-      getUploadParameters: opts?.getTemporarySecurityCredentials
-        ? this.createSignedURL.bind(this)
-        : this.getUploadParameters.bind(this),
-      companionHeaders: {},
-    }
-
-    this.opts = { ...defaultOptions, ...opts }
-    if (opts?.prepareUploadParts != null && opts.signPart == null) {
-      this.opts.signPart = async (file, { uploadId, key, partNumber, body, signal }) => {
-        const { presignedUrls, headers } = await opts
-          .prepareUploadParts(file, { uploadId, key, parts: [{ number: partNumber, chunk: body }], signal })
-        return { url: presignedUrls?.[partNumber], headers: headers?.[partNumber] }
-      }
-    }
-
-    /**
-     * Simultaneous upload limiting is shared across all uploads with this plugin.
-     *
-     * @type {RateLimitedQueue}
-     */
-    this.requests = this.opts.rateLimitedQueue ?? new RateLimitedQueue(this.opts.limit)
-    this.#companionCommunicationQueue = new HTTPCommunicationQueue(
-      this.requests,
-      this.opts,
-      this.#setS3MultipartState,
-      this.#getFile,
-    )
-
-    this.uploaders = Object.create(null)
-    this.uploaderEvents = Object.create(null)
-    this.uploaderSockets = Object.create(null)
-  }
-
-  [Symbol.for('uppy test: getClient')] () { return this.#client }
-
-  setOptions (newOptions) {
-    this.#companionCommunicationQueue.setOptions(newOptions)
-    super.setOptions(newOptions)
-    this.#setCompanionHeaders()
-  }
-
-  /**
-   * Clean up all references for a file's upload: the MultipartUploader instance,
-   * any events related to the file, and the Companion WebSocket connection.
-   *
-   * Set `opts.abort` to tell S3 that the multipart upload is cancelled and must be removed.
-   * This should be done when the user cancels the upload, not when the upload is completed or errored.
-   */
-  resetUploaderReferences (fileID, opts = {}) {
-    if (this.uploaders[fileID]) {
-      this.uploaders[fileID].abort({ really: opts.abort || false })
-      this.uploaders[fileID] = null
-    }
-    if (this.uploaderEvents[fileID]) {
-      this.uploaderEvents[fileID].remove()
-      this.uploaderEvents[fileID] = null
-    }
-    if (this.uploaderSockets[fileID]) {
-      this.uploaderSockets[fileID].close()
-      this.uploaderSockets[fileID] = null
-    }
-  }
-
-  // TODO: make this a private method in the next major
-  assertHost (method) {
-    if (!this.opts.companionUrl) {
-      throw new Error(`Expected a \`companionUrl\` option containing a Companion address, or if you are not using Companion, a custom \`${method}\` implementation.`)
-    }
-  }
-
-  createMultipartUpload (file, signal) {
-    this.assertHost('createMultipartUpload')
-    throwIfAborted(signal)
-
-    const metadata = getAllowedMetadata({ meta: file.meta, allowedMetaFields: this.opts.allowedMetaFields })
-
-    return this.#client.post('s3/multipart', {
-      filename: file.name,
-      type: file.type,
-      metadata,
-    }, { signal }).then(assertServerError)
-  }
-
-  listParts (file, { key, uploadId }, signal) {
-    this.assertHost('listParts')
-    throwIfAborted(signal)
-
-    const filename = encodeURIComponent(key)
-    return this.#client.get(`s3/multipart/${uploadId}?key=${filename}`, { signal })
-      .then(assertServerError)
-  }
-
-  completeMultipartUpload (file, { key, uploadId, parts }, signal) {
-    this.assertHost('completeMultipartUpload')
-    throwIfAborted(signal)
-
-    const filename = encodeURIComponent(key)
-    const uploadIdEnc = encodeURIComponent(uploadId)
-    return this.#client.post(`s3/multipart/${uploadIdEnc}/complete?key=${filename}`, { parts }, { signal })
-      .then(assertServerError)
-  }
-
-  /**
-   * @type {import("../types").AwsS3STSResponse | Promise<import("../types").AwsS3STSResponse>}
-   */
-  #cachedTemporaryCredentials
-
-  async #getTemporarySecurityCredentials (options) {
-    throwIfAborted(options?.signal)
-
-    if (this.#cachedTemporaryCredentials == null) {
-      // We do not await it just yet, so concurrent calls do not try to override it:
-      if (this.opts.getTemporarySecurityCredentials === true) {
-        this.assertHost('getTemporarySecurityCredentials')
-        this.#cachedTemporaryCredentials = this.#client.get('s3/sts', null, options).then(assertServerError)
-      } else {
-        this.#cachedTemporaryCredentials = this.opts.getTemporarySecurityCredentials(options)
-      }
-      this.#cachedTemporaryCredentials = await this.#cachedTemporaryCredentials
-      setTimeout(() => {
-        // At half the time left before expiration, we clear the cache. That's
-        // an arbitrary tradeoff to limit the number of requests made to the
-        // remote while limiting the risk of using an expired token in case the
-        // clocks are not exactly synced.
-        // The HTTP cache should be configured to ensure a client doesn't request
-        // more tokens than it needs, but this timeout provides a second layer of
-        // security in case the HTTP cache is disabled or misconfigured.
-        this.#cachedTemporaryCredentials = null
-      }, (getExpiry(this.#cachedTemporaryCredentials.credentials) || 0) * 500)
-    }
-
-    return this.#cachedTemporaryCredentials
-  }
-
-  async createSignedURL (file, options) {
-    const data = await this.#getTemporarySecurityCredentials(options)
-    const expires = getExpiry(data.credentials) || 604_800 // 604 800 is the max value accepted by AWS.
-
-    const { uploadId, key, partNumber, signal } = options
-
-    // Return an object in the correct shape.
-    return {
-      method: 'PUT',
-      expires,
-      fields: {},
-      url: `${await createSignedURL({
-        accountKey: data.credentials.AccessKeyId,
-        accountSecret: data.credentials.SecretAccessKey,
-        sessionToken: data.credentials.SessionToken,
-        expires,
-        bucketName: data.bucket,
-        Region: data.region,
-        Key: key ?? `${crypto.randomUUID()}-${file.name}`,
-        uploadId,
-        partNumber,
-        signal,
-      })}`,
-      // Provide content type header required by S3
-      headers: {
-        'Content-Type': file.type,
-      },
-    }
-  }
-
-  signPart (file, { uploadId, key, partNumber, signal }) {
-    this.assertHost('signPart')
-    throwIfAborted(signal)
-
-    if (uploadId == null || key == null || partNumber == null) {
-      throw new Error('Cannot sign without a key, an uploadId, and a partNumber')
-    }
-
-    const filename = encodeURIComponent(key)
-    return this.#client.get(`s3/multipart/${uploadId}/${partNumber}?key=${filename}`, { signal })
-      .then(assertServerError)
-  }
-
-  abortMultipartUpload (file, { key, uploadId }, signal) {
-    this.assertHost('abortMultipartUpload')
-
-    const filename = encodeURIComponent(key)
-    const uploadIdEnc = encodeURIComponent(uploadId)
-    return this.#client.delete(`s3/multipart/${uploadIdEnc}?key=${filename}`, undefined, { signal })
-      .then(assertServerError)
-  }
-
-  getUploadParameters (file, options) {
-    const { meta } = file
-    const { type, name: filename } = meta
-    const metadata = getAllowedMetadata({ meta, allowedMetaFields: this.opts.allowedMetaFields, querify: true })
-
-    const query = new URLSearchParams({ filename, type, ...metadata })
-
-    return this.#client.get(`s3/params?${query}`, options)
-  }
-
-  static async uploadPartBytes ({ signature: { url, expires, headers, method = 'PUT' }, body, size = body.size, onProgress, onComplete, signal }) {
-    throwIfAborted(signal)
-
-    if (url == null) {
-      throw new Error('Cannot upload to an undefined URL')
-    }
-
-    return new Promise((resolve, reject) => {
-      const xhr = new XMLHttpRequest()
-      xhr.open(method, url, true)
-      if (headers) {
-        Object.keys(headers).forEach((key) => {
-          xhr.setRequestHeader(key, headers[key])
-        })
-      }
-      xhr.responseType = 'text'
-      if (typeof expires === 'number') {
-        xhr.timeout = expires * 1000
-      }
-
-      function onabort () {
-        xhr.abort()
-      }
-      function cleanup () {
-        signal.removeEventListener('abort', onabort)
-      }
-      signal.addEventListener('abort', onabort)
-
-      xhr.upload.addEventListener('progress', (ev) => {
-        onProgress(ev)
-      })
-
-      xhr.addEventListener('abort', () => {
-        cleanup()
-
-        reject(createAbortError())
-      })
-
-      xhr.addEventListener('timeout', () => {
-        cleanup()
-
-        const error = new Error('Request has expired')
-        error.source = { status: 403 }
-        reject(error)
-      })
-      xhr.addEventListener('load', (ev) => {
-        cleanup()
-
-        if (ev.target.status === 403 && ev.target.responseText.includes('<Message>Request has expired</Message>')) {
-          const error = new Error('Request has expired')
-          error.source = ev.target
-          reject(error)
-          return
-        } if (ev.target.status < 200 || ev.target.status >= 300) {
-          const error = new Error('Non 2xx')
-          error.source = ev.target
-          reject(error)
-          return
-        }
-
-        // todo make a proper onProgress API (breaking change)
-        onProgress?.({ loaded: size, lengthComputable: true })
-
-        // NOTE This must be allowed by CORS.
-        const etag = ev.target.getResponseHeader('ETag')
-        const location = ev.target.getResponseHeader('Location')
-
-        if (method.toUpperCase() === 'POST' && location === null) {
-          // Not being able to read the Location header is not a fatal error.
-          // eslint-disable-next-line no-console
-          console.warn('AwsS3/Multipart: Could not read the Location header. This likely means CORS is not configured correctly on the S3 Bucket. See https://uppy.io/docs/aws-s3-multipart#S3-Bucket-Configuration for instructions.')
-        }
-        if (etag === null) {
-          reject(new Error('AwsS3/Multipart: Could not read the ETag header. This likely means CORS is not configured correctly on the S3 Bucket. See https://uppy.io/docs/aws-s3-multipart#S3-Bucket-Configuration for instructions.'))
-          return
-        }
-
-        onComplete?.(etag)
-        resolve({
-          ETag: etag,
-          ...(location ? { location } : undefined),
-        })
-      })
-
-      xhr.addEventListener('error', (ev) => {
-        cleanup()
-
-        const error = new Error('Unknown error')
-        error.source = ev.target
-        reject(error)
-      })
-
-      xhr.send(body)
-    })
-  }
-
-  #setS3MultipartState = (file, { key, uploadId }) => {
-    const cFile = this.uppy.getFile(file.id)
-    if (cFile == null) {
-      // file was removed from store
-      return
-    }
-
-    this.uppy.setFileState(file.id, {
-      s3Multipart: {
-        ...cFile.s3Multipart,
-        key,
-        uploadId,
-      },
-    })
-  }
-
-  #getFile = (file) => {
-    return this.uppy.getFile(file.id) || file
-  }
-
-  #uploadLocalFile (file) {
-    return new Promise((resolve, reject) => {
-      const onProgress = (bytesUploaded, bytesTotal) => {
-        this.uppy.emit('upload-progress', this.uppy.getFile(file.id), {
-          uploader: this,
-          bytesUploaded,
-          bytesTotal,
-        })
-      }
-
-      const onError = (err) => {
-        this.uppy.log(err)
-        this.uppy.emit('upload-error', file, err)
-
-        this.resetUploaderReferences(file.id)
-        reject(err)
-      }
-
-      const onSuccess = (result) => {
-        const uploadResp = {
-          body: {
-            ...result,
-          },
-          uploadURL: result.location,
-        }
-
-        this.resetUploaderReferences(file.id)
-
-        this.uppy.emit('upload-success', this.#getFile(file), uploadResp)
-
-        if (result.location) {
-          this.uppy.log(`Download ${file.name} from ${result.location}`)
-        }
-
-        resolve()
-      }
-
-      const onPartComplete = (part) => {
-        this.uppy.emit('s3-multipart:part-uploaded', this.#getFile(file), part)
-      }
-
-      const upload = new MultipartUploader(file.data, {
-        // .bind to pass the file object to each handler.
-        companionComm: this.#companionCommunicationQueue,
-
-        log: (...args) => this.uppy.log(...args),
-        getChunkSize: this.opts.getChunkSize ? this.opts.getChunkSize.bind(this) : null,
-
-        onProgress,
-        onError,
-        onSuccess,
-        onPartComplete,
-
-        file,
-        shouldUseMultipart: this.opts.shouldUseMultipart,
-
-        ...file.s3Multipart,
-      })
-
-      this.uploaders[file.id] = upload
-      const eventManager = new EventManager(this.uppy)
-      this.uploaderEvents[file.id] = eventManager
-
-      eventManager.onFileRemove(file.id, (removed) => {
-        upload.abort()
-        this.resetUploaderReferences(file.id, { abort: true })
-        resolve(`upload ${removed.id} was removed`)
-      })
-
-      eventManager.onCancelAll(file.id, ({ reason } = {}) => {
-        if (reason === 'user') {
-          upload.abort()
-          this.resetUploaderReferences(file.id, { abort: true })
-        }
-        resolve(`upload ${file.id} was canceled`)
-      })
-
-      eventManager.onFilePause(file.id, (isPaused) => {
-        if (isPaused) {
-          upload.pause()
-        } else {
-          upload.start()
-        }
-      })
-
-      eventManager.onPauseAll(file.id, () => {
-        upload.pause()
-      })
-
-      eventManager.onResumeAll(file.id, () => {
-        upload.start()
-      })
-
-      upload.start()
-    })
-  }
-
-  // eslint-disable-next-line class-methods-use-this
-  #getCompanionClientArgs (file) {
-    return {
-      ...file.remote.body,
-      protocol: 's3-multipart',
-      size: file.data.size,
-      metadata: file.meta,
-    }
-  }
-
-  #upload = async (fileIDs) => {
-    if (fileIDs.length === 0) return undefined
-
-    const files = this.uppy.getFilesByIds(fileIDs)
-    const filesFiltered = filterNonFailedFiles(files)
-    const filesToEmit = filterFilesToEmitUploadStarted(filesFiltered)
-
-    this.uppy.emit('upload-start', filesToEmit)
-
-    const promises = filesFiltered.map((file) => {
-      if (file.isRemote) {
-        const getQueue = () => this.requests
-        this.#setResumableUploadsCapability(false)
-        const controller = new AbortController()
-
-        const removedHandler = (removedFile) => {
-          if (removedFile.id === file.id) controller.abort()
-        }
-        this.uppy.on('file-removed', removedHandler)
-
-        const uploadPromise = this.uppy.getRequestClientForFile(file).uploadRemoteFile(
-          file,
-          this.#getCompanionClientArgs(file),
-          { signal: controller.signal, getQueue },
-        )
-
-        this.requests.wrapSyncFunction(() => {
-          this.uppy.off('file-removed', removedHandler)
-        }, { priority: -1 })()
-
-        return uploadPromise
-      }
-
-      return this.#uploadLocalFile(file)
-    })
-
-    const upload = await Promise.all(promises)
-    // After the upload is done, another upload may happen with only local files.
-    // We reset the capability so that the next upload can use resumable uploads.
-    this.#setResumableUploadsCapability(true)
-    return upload
-  }
-
-  #setCompanionHeaders = () => {
-    this.#client.setCompanionHeaders(this.opts.companionHeaders)
-  }
-
-  #setResumableUploadsCapability = (boolean) => {
-    const { capabilities } = this.uppy.getState()
-    this.uppy.setState({
-      capabilities: {
-        ...capabilities,
-        resumableUploads: boolean,
-      },
-    })
-  }
-
-  #resetResumableCapability = () => {
-    this.#setResumableUploadsCapability(true)
-  }
-
-  install () {
-    this.#setResumableUploadsCapability(true)
-    this.uppy.addPreProcessor(this.#setCompanionHeaders)
-    this.uppy.addUploader(this.#upload)
-    this.uppy.on('cancel-all', this.#resetResumableCapability)
-  }
-
-  uninstall () {
-    this.uppy.removePreProcessor(this.#setCompanionHeaders)
-    this.uppy.removeUploader(this.#upload)
-    this.uppy.off('cancel-all', this.#resetResumableCapability)
-  }
-}

+ 199 - 128
packages/@uppy/aws-s3-multipart/src/index.test.js → packages/@uppy/aws-s3-multipart/src/index.test.ts

@@ -3,7 +3,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
 import 'whatwg-fetch'
 import nock from 'nock'
 import Core from '@uppy/core'
-import AwsS3Multipart from './index.js'
+import AwsS3Multipart from './index.ts'
+import type { Body } from './utils.ts'
 
 const KB = 1024
 const MB = KB * KB
@@ -12,36 +13,41 @@ describe('AwsS3Multipart', () => {
   beforeEach(() => nock.disableNetConnect())
 
   it('Registers AwsS3Multipart upload plugin', () => {
-    const core = new Core()
+    const core = new Core<any, Body>()
     core.use(AwsS3Multipart)
 
-    const pluginNames = core[Symbol.for('uppy test: getPlugins')]('uploader').map((plugin) => plugin.constructor.name)
+    // @ts-expect-error private property
+    const pluginNames = core[Symbol.for('uppy test: getPlugins')](
+      'uploader',
+    ).map((plugin: AwsS3Multipart<any, Body>) => plugin.constructor.name)
     expect(pluginNames).toContain('AwsS3Multipart')
   })
 
   describe('companionUrl assertion', () => {
     it('Throws an error for main functions if configured without companionUrl', () => {
-      const core = new Core()
+      const core = new Core<any, Body>()
       core.use(AwsS3Multipart)
-      const awsS3Multipart = core.getPlugin('AwsS3Multipart')
+      const awsS3Multipart = core.getPlugin('AwsS3Multipart')!
 
       const err = 'Expected a `companionUrl` option'
       const file = {}
       const opts = {}
 
-      expect(() => awsS3Multipart.opts.createMultipartUpload(file)).toThrow(
-        err,
-      )
+      expect(() => awsS3Multipart.opts.createMultipartUpload(file)).toThrow(err)
       expect(() => awsS3Multipart.opts.listParts(file, opts)).toThrow(err)
-      expect(() => awsS3Multipart.opts.completeMultipartUpload(file, opts)).toThrow(err)
-      expect(() => awsS3Multipart.opts.abortMultipartUpload(file, opts)).toThrow(err)
+      expect(() =>
+        awsS3Multipart.opts.completeMultipartUpload(file, opts),
+      ).toThrow(err)
+      expect(() =>
+        awsS3Multipart.opts.abortMultipartUpload(file, opts),
+      ).toThrow(err)
       expect(() => awsS3Multipart.opts.signPart(file, opts)).toThrow(err)
     })
   })
 
   describe('non-multipart upload', () => {
     it('should handle POST uploads', async () => {
-      const core = new Core()
+      const core = new Core<any, Body>()
       core.use(AwsS3Multipart, {
         shouldUseMultipart: false,
         limit: 0,
@@ -71,7 +77,7 @@ describe('AwsS3Multipart', () => {
         source: 'vi',
         name: 'multitest.dat',
         type: 'application/octet-stream',
-        data: new File([new Uint8Array(fileSize)], {
+        data: new File([new Uint8Array(fileSize)], '', {
           type: 'application/octet-stream',
         }),
       })
@@ -87,6 +93,7 @@ describe('AwsS3Multipart', () => {
           ETag: 'test',
           location: 'http://example.com',
         },
+        status: 200,
         uploadURL: 'http://example.com',
       })
 
@@ -95,11 +102,11 @@ describe('AwsS3Multipart', () => {
   })
 
   describe('without companionUrl (custom main functions)', () => {
-    let core
-    let awsS3Multipart
+    let core: Core<any, Body>
+    let awsS3Multipart: AwsS3Multipart<any, Body>
 
     beforeEach(() => {
-      core = new Core()
+      core = new Core<any, Body>()
       core.use(AwsS3Multipart, {
         limit: 0,
         createMultipartUpload: vi.fn(() => {
@@ -110,17 +117,19 @@ describe('AwsS3Multipart', () => {
         }),
         completeMultipartUpload: vi.fn(async () => ({ location: 'test' })),
         abortMultipartUpload: vi.fn(),
-        prepareUploadParts: vi.fn(async (file, { parts }) => {
-          const presignedUrls = {}
-          parts.forEach(({ number }) => {
-            presignedUrls[
-              number
-            ] = `https://bucket.s3.us-east-2.amazonaws.com/test/upload/multitest.dat?partNumber=${number}&uploadId=6aeb1980f3fc7ce0b5454d25b71992&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIATEST%2F20210729%2Fus-east-2%2Fs3%2Faws4_request&X-Amz-Date=20210729T014044Z&X-Amz-Expires=600&X-Amz-SignedHeaders=host&X-Amz-Signature=test`
-          })
-          return { presignedUrls, headers: { 1: { 'Content-MD5': 'foo' } } }
-        }),
+        prepareUploadParts: vi.fn(
+          async (file, { parts }: { parts: { number: number }[] }) => {
+            const presignedUrls: Record<number, string> = {}
+            parts.forEach(({ number }) => {
+              presignedUrls[number] =
+                `https://bucket.s3.us-east-2.amazonaws.com/test/upload/multitest.dat?partNumber=${number}&uploadId=6aeb1980f3fc7ce0b5454d25b71992&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIATEST%2F20210729%2Fus-east-2%2Fs3%2Faws4_request&X-Amz-Date=20210729T014044Z&X-Amz-Expires=600&X-Amz-SignedHeaders=host&X-Amz-Signature=test`
+            })
+            return { presignedUrls, headers: { 1: { 'Content-MD5': 'foo' } } }
+          },
+        ),
+        listParts: undefined as any,
       })
-      awsS3Multipart = core.getPlugin('AwsS3Multipart')
+      awsS3Multipart = core.getPlugin('AwsS3Multipart') as any
     })
 
     it('Calls the prepareUploadParts function totalChunks / limit times', async () => {
@@ -137,15 +146,23 @@ describe('AwsS3Multipart', () => {
       const fileSize = 5 * MB + 1 * MB
 
       scope
-        .options((uri) => uri.includes('test/upload/multitest.dat?partNumber=1'))
-        .reply(function replyFn () {
-          expect(this.req.headers['access-control-request-headers']).toEqual('Content-MD5')
+        .options((uri) =>
+          uri.includes('test/upload/multitest.dat?partNumber=1'),
+        )
+        .reply(function replyFn() {
+          expect(this.req.headers['access-control-request-headers']).toEqual(
+            'Content-MD5',
+          )
           return [200, '']
         })
       scope
-        .options((uri) => uri.includes('test/upload/multitest.dat?partNumber=2'))
-        .reply(function replyFn () {
-          expect(this.req.headers['access-control-request-headers']).toBeUndefined()
+        .options((uri) =>
+          uri.includes('test/upload/multitest.dat?partNumber=2'),
+        )
+        .reply(function replyFn() {
+          expect(
+            this.req.headers['access-control-request-headers'],
+          ).toBeUndefined()
           return [200, '']
         })
       scope
@@ -159,7 +176,7 @@ describe('AwsS3Multipart', () => {
         source: 'vi',
         name: 'multitest.dat',
         type: 'application/octet-stream',
-        data: new File([new Uint8Array(fileSize)], {
+        data: new File([new Uint8Array(fileSize)], '', {
           type: 'application/octet-stream',
         }),
       })
@@ -167,7 +184,7 @@ describe('AwsS3Multipart', () => {
       await core.upload()
 
       expect(
-        awsS3Multipart.opts.prepareUploadParts.mock.calls.length,
+        (awsS3Multipart.opts as any).prepareUploadParts.mock.calls.length,
       ).toEqual(2)
 
       scope.done()
@@ -201,14 +218,17 @@ describe('AwsS3Multipart', () => {
         source: 'vi',
         name: 'multitest.dat',
         type: 'application/octet-stream',
-        data: new File([new Uint8Array(fileSize)], {
+        data: new File([new Uint8Array(fileSize)], '', {
           type: 'application/octet-stream',
         }),
       })
 
       await core.upload()
 
-      function validatePartData ({ parts }, expected) {
+      function validatePartData(
+        { parts }: { parts: { number: number; chunk: unknown }[] },
+        expected: number[],
+      ) {
         expect(parts.map((part) => part.number)).toEqual(expected)
 
         for (const part of parts) {
@@ -216,13 +236,25 @@ describe('AwsS3Multipart', () => {
         }
       }
 
-      expect(awsS3Multipart.opts.prepareUploadParts.mock.calls.length).toEqual(10)
+      expect(
+        (awsS3Multipart.opts as any).prepareUploadParts.mock.calls.length,
+      ).toEqual(10)
 
-      validatePartData(awsS3Multipart.opts.prepareUploadParts.mock.calls[0][1], [1])
-      validatePartData(awsS3Multipart.opts.prepareUploadParts.mock.calls[1][1], [2])
-      validatePartData(awsS3Multipart.opts.prepareUploadParts.mock.calls[2][1], [3])
+      validatePartData(
+        (awsS3Multipart.opts as any).prepareUploadParts.mock.calls[0][1],
+        [1],
+      )
+      validatePartData(
+        (awsS3Multipart.opts as any).prepareUploadParts.mock.calls[1][1],
+        [2],
+      )
+      validatePartData(
+        (awsS3Multipart.opts as any).prepareUploadParts.mock.calls[2][1],
+        [3],
+      )
 
-      const completeCall = awsS3Multipart.opts.completeMultipartUpload.mock.calls[0][1]
+      const completeCall = (awsS3Multipart.opts as any).completeMultipartUpload
+        .mock.calls[0][1]
 
       expect(completeCall.parts).toEqual([
         { ETag: 'test', PartNumber: 1 },
@@ -254,13 +286,21 @@ describe('AwsS3Multipart', () => {
         .options((uri) => uri.includes('test/upload/multitest.dat'))
         .reply(200, '')
       scope
-        .put((uri) => uri.includes('test/upload/multitest.dat') && !uri.includes('partNumber=7'))
+        .put(
+          (uri) =>
+            uri.includes('test/upload/multitest.dat') &&
+            !uri.includes('partNumber=7'),
+        )
         .reply(200, '', { ETag: 'test' })
 
       // Fail the part 7 upload once, then let it succeed
       let calls = 0
       scope
-        .put((uri) => uri.includes('test/upload/multitest.dat') && uri.includes('partNumber=7'))
+        .put(
+          (uri) =>
+            uri.includes('test/upload/multitest.dat') &&
+            uri.includes('partNumber=7'),
+        )
         .reply(() => (calls++ === 0 ? [500] : [200, '', { ETag: 'test' }]))
 
       scope.persist()
@@ -271,14 +311,25 @@ describe('AwsS3Multipart', () => {
       awsS3Multipart.setOptions({
         retryDelays: [10],
         createMultipartUpload: vi.fn((file) => {
-          const multipartUploader = awsS3Multipart.uploaders[file.id]
+          // @ts-expect-error protected property
+          const multipartUploader = awsS3Multipart.uploaders[file.id]!
           const testChunkState = multipartUploader.chunkState[6]
           let busy = false
           let done = false
-          busySpy = vi.fn((value) => { busy = value })
-          doneSpy = vi.fn((value) => { done = value })
-          Object.defineProperty(testChunkState, 'busy', { get: () => busy, set: busySpy })
-          Object.defineProperty(testChunkState, 'done', { get: () => done, set: doneSpy })
+          busySpy = vi.fn((value) => {
+            busy = value
+          })
+          doneSpy = vi.fn((value) => {
+            done = value
+          })
+          Object.defineProperty(testChunkState, 'busy', {
+            get: () => busy,
+            set: busySpy,
+          })
+          Object.defineProperty(testChunkState, 'done', {
+            get: () => done,
+            set: doneSpy,
+          })
 
           return {
             uploadId: '6aeb1980f3fc7ce0b5454d25b71992',
@@ -291,7 +342,7 @@ describe('AwsS3Multipart', () => {
         source: 'vi',
         name: 'multitest.dat',
         type: 'application/octet-stream',
-        data: new File([new Uint8Array(fileSize)], {
+        data: new File([new Uint8Array(fileSize)], '', {
           type: 'application/octet-stream',
         }),
       })
@@ -299,19 +350,23 @@ describe('AwsS3Multipart', () => {
       await core.upload()
 
       // The chunk should be marked as done once
-      expect(doneSpy.mock.calls.length).toEqual(1)
-      expect(doneSpy.mock.calls[0][0]).toEqual(true)
+      expect(doneSpy!.mock.calls.length).toEqual(1)
+      expect(doneSpy!.mock.calls[0][0]).toEqual(true)
 
       // Any changes that set busy to false should only happen after the chunk has been marked done,
       // otherwise a race condition occurs (see PR #3955)
-      const doneCallOrderNumber = doneSpy.mock.invocationCallOrder[0]
-      for (const [index, callArgs] of busySpy.mock.calls.entries()) {
+      const doneCallOrderNumber = doneSpy!.mock.invocationCallOrder[0]
+      for (const [index, callArgs] of busySpy!.mock.calls.entries()) {
         if (callArgs[0] === false) {
-          expect(busySpy.mock.invocationCallOrder[index]).toBeGreaterThan(doneCallOrderNumber)
+          expect(busySpy!.mock.invocationCallOrder[index]).toBeGreaterThan(
+            doneCallOrderNumber,
+          )
         }
       }
 
-      expect(awsS3Multipart.opts.prepareUploadParts.mock.calls.length).toEqual(10)
+      expect(
+        (awsS3Multipart.opts as any).prepareUploadParts.mock.calls.length,
+      ).toEqual(10)
     })
   })
 
@@ -323,36 +378,41 @@ describe('AwsS3Multipart', () => {
       }
     })
 
-    const signPart = vi
-      .fn(async (file, { partNumber }) => {
-        return { url: `https://bucket.s3.us-east-2.amazonaws.com/test/upload/multitest.dat?partNumber=${partNumber}&uploadId=6aeb1980f3fc7ce0b5454d25b71992&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIATEST%2F20210729%2Fus-east-2%2Fs3%2Faws4_request&X-Amz-Date=20210729T014044Z&X-Amz-Expires=600&X-Amz-SignedHeaders=host&X-Amz-Signature=test` }
-      })
+    const signPart = vi.fn(async (file, { partNumber }) => {
+      return {
+        url: `https://bucket.s3.us-east-2.amazonaws.com/test/upload/multitest.dat?partNumber=${partNumber}&uploadId=6aeb1980f3fc7ce0b5454d25b71992&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIATEST%2F20210729%2Fus-east-2%2Fs3%2Faws4_request&X-Amz-Date=20210729T014044Z&X-Amz-Expires=600&X-Amz-SignedHeaders=host&X-Amz-Signature=test`,
+      }
+    })
 
     const uploadPartBytes = vi.fn()
 
-    afterEach(() => vi.clearAllMocks())
+    afterEach(() => {
+      vi.clearAllMocks()
+    })
 
     it('retries uploadPartBytes when it fails once', async () => {
-      const core = new Core()
-        .use(AwsS3Multipart, {
-          createMultipartUpload,
-          completeMultipartUpload: vi.fn(async () => ({ location: 'test' })),
+      const core = new Core<any, Body>().use(AwsS3Multipart, {
+        createMultipartUpload,
+        completeMultipartUpload: vi.fn(async () => ({ location: 'test' })),
+        abortMultipartUpload: vi.fn(() => {
           // eslint-disable-next-line no-throw-literal
-          abortMultipartUpload: vi.fn(() => { throw 'should ignore' }),
-          signPart,
-          uploadPartBytes:
-            uploadPartBytes
-              // eslint-disable-next-line prefer-promise-reject-errors
-              .mockImplementationOnce(() => Promise.reject({ source: { status: 500 } })),
-        })
-      const awsS3Multipart = core.getPlugin('AwsS3Multipart')
+          throw 'should ignore'
+        }),
+        signPart,
+        uploadPartBytes: uploadPartBytes.mockImplementationOnce(() =>
+          // eslint-disable-next-line prefer-promise-reject-errors
+          Promise.reject({ source: { status: 500 } }),
+        ),
+        listParts: undefined as any,
+      })
+      const awsS3Multipart = core.getPlugin('AwsS3Multipart')!
       const fileSize = 5 * MB + 1 * MB
 
       core.addFile({
         source: 'vi',
         name: 'multitest.dat',
         type: 'application/octet-stream',
-        data: new File([new Uint8Array(fileSize)], {
+        data: new File([new Uint8Array(fileSize)], '', {
           type: 'application/octet-stream',
         }),
       })
@@ -363,18 +423,19 @@ describe('AwsS3Multipart', () => {
     })
 
     it('calls `upload-error` when uploadPartBytes fails after all retries', async () => {
-      const core = new Core()
-        .use(AwsS3Multipart, {
-          retryDelays: [10],
-          createMultipartUpload,
-          completeMultipartUpload: vi.fn(async () => ({ location: 'test' })),
-          abortMultipartUpload: vi.fn(),
-          signPart,
-          uploadPartBytes: uploadPartBytes
-            // eslint-disable-next-line prefer-promise-reject-errors
-            .mockImplementation(() => Promise.reject({ source: { status: 500 } })),
-        })
-      const awsS3Multipart = core.getPlugin('AwsS3Multipart')
+      const core = new Core<any, Body>().use(AwsS3Multipart, {
+        retryDelays: [10],
+        createMultipartUpload,
+        completeMultipartUpload: vi.fn(async () => ({ location: 'test' })),
+        abortMultipartUpload: vi.fn(),
+        signPart,
+        uploadPartBytes: uploadPartBytes.mockImplementation(() =>
+          // eslint-disable-next-line prefer-promise-reject-errors
+          Promise.reject({ source: { status: 500 } }),
+        ),
+        listParts: undefined as any,
+      })
+      const awsS3Multipart = core.getPlugin('AwsS3Multipart')!
       const fileSize = 5 * MB + 1 * MB
       const mock = vi.fn()
       core.on('upload-error', mock)
@@ -383,7 +444,7 @@ describe('AwsS3Multipart', () => {
         source: 'vi',
         name: 'multitest.dat',
         type: 'application/octet-stream',
-        data: new File([new Uint8Array(fileSize)], {
+        data: new File([new Uint8Array(fileSize)], '', {
           type: 'application/octet-stream',
         }),
       })
@@ -396,19 +457,20 @@ describe('AwsS3Multipart', () => {
   })
 
   describe('dynamic companionHeader', () => {
-    let core
-    let awsS3Multipart
+    let core: Core<any, any>
+    let awsS3Multipart: AwsS3Multipart<any, any>
     const oldToken = 'old token'
     const newToken = 'new token'
 
     beforeEach(() => {
-      core = new Core()
+      core = new Core<any, Body>()
       core.use(AwsS3Multipart, {
+        companionUrl: '',
         companionHeaders: {
           authorization: oldToken,
         },
       })
-      awsS3Multipart = core.getPlugin('AwsS3Multipart')
+      awsS3Multipart = core.getPlugin('AwsS3Multipart') as any
     })
 
     it('companionHeader is updated before uploading file', async () => {
@@ -420,23 +482,29 @@ describe('AwsS3Multipart', () => {
 
       await core.upload()
 
+      // @ts-expect-error private property
       const client = awsS3Multipart[Symbol.for('uppy test: getClient')]()
 
-      expect(client[Symbol.for('uppy test: getCompanionHeaders')]().authorization).toEqual(newToken)
+      expect(
+        client[Symbol.for('uppy test: getCompanionHeaders')]().authorization,
+      ).toEqual(newToken)
     })
   })
 
   describe('dynamic companionHeader using setOption', () => {
-    let core
-    let awsS3Multipart
+    let core: Core<any, Body>
+    let awsS3Multipart: AwsS3Multipart<any, Body>
     const newToken = 'new token'
 
     it('companionHeader is updated before uploading file', async () => {
-      core = new Core()
+      core = new Core<any, Body>()
       core.use(AwsS3Multipart)
       /* Set up preprocessor */
       core.addPreProcessor(() => {
-        awsS3Multipart = core.getPlugin('AwsS3Multipart')
+        awsS3Multipart = core.getPlugin('AwsS3Multipart') as AwsS3Multipart<
+          any,
+          Body
+        >
         awsS3Multipart.setOptions({
           companionHeaders: {
             authorization: newToken,
@@ -446,15 +514,18 @@ describe('AwsS3Multipart', () => {
 
       await core.upload()
 
+      // @ts-expect-error private property
       const client = awsS3Multipart[Symbol.for('uppy test: getClient')]()
 
-      expect(client[Symbol.for('uppy test: getCompanionHeaders')]().authorization).toEqual(newToken)
+      expect(
+        client[Symbol.for('uppy test: getCompanionHeaders')]().authorization,
+      ).toEqual(newToken)
     })
   })
 
   describe('file metadata across custom main functions', () => {
-    let core
-    const createMultipartUpload = vi.fn(file => {
+    let core: Core<any, Body>
+    const createMultipartUpload = vi.fn((file) => {
       core.setFileMeta(file.id, {
         ...file.meta,
         createMultipartUpload: true,
@@ -487,8 +558,10 @@ describe('AwsS3Multipart', () => {
         listParts: true,
       })
 
-      const partKeys = Object.keys(file.meta).filter(metaKey => metaKey.startsWith('part'))
-      return partKeys.map(metaKey => ({
+      const partKeys = Object.keys(file.meta).filter((metaKey) =>
+        metaKey.startsWith('part'),
+      )
+      return partKeys.map((metaKey) => ({
         PartNumber: file.meta[metaKey],
         ETag: metaKey,
         Size: 5 * MB,
@@ -508,7 +581,6 @@ describe('AwsS3Multipart', () => {
       expect(file.meta.createMultipartUpload).toBe(true)
       expect(file.meta.signPart).toBe(true)
       expect(file.meta.abortingPart).toBe(5)
-      return {}
     })
 
     beforeEach(() => {
@@ -520,14 +592,13 @@ describe('AwsS3Multipart', () => {
     })
 
     it('preserves file metadata if upload is completed', async () => {
-      core = new Core()
-        .use(AwsS3Multipart, {
-          createMultipartUpload,
-          signPart,
-          listParts,
-          completeMultipartUpload,
-          abortMultipartUpload,
-        })
+      core = new Core<any, Body>().use(AwsS3Multipart, {
+        createMultipartUpload,
+        signPart,
+        listParts,
+        completeMultipartUpload,
+        abortMultipartUpload,
+      })
 
       nock('https://bucket.s3.us-east-2.amazonaws.com')
         .defaultReplyHeaders({
@@ -545,7 +616,7 @@ describe('AwsS3Multipart', () => {
         source: 'vi',
         name: 'multitest.dat',
         type: 'application/octet-stream',
-        data: new File([new Uint8Array(fileSize)], {
+        data: new File([new Uint8Array(fileSize)], '', {
           type: 'application/octet-stream',
         }),
       })
@@ -565,7 +636,9 @@ describe('AwsS3Multipart', () => {
             abortingPart: partData.partNumber,
           })
           core.removeFile(file.id)
-          return {}
+          return {
+            url: undefined as any as string,
+          }
         }
 
         core.setFileMeta(file.id, {
@@ -579,14 +652,13 @@ describe('AwsS3Multipart', () => {
         }
       })
 
-      core = new Core()
-        .use(AwsS3Multipart, {
-          createMultipartUpload,
-          signPart: signPartWithAbort,
-          listParts,
-          completeMultipartUpload,
-          abortMultipartUpload,
-        })
+      core = new Core<any, Body>().use(AwsS3Multipart, {
+        createMultipartUpload,
+        signPart: signPartWithAbort,
+        listParts,
+        completeMultipartUpload,
+        abortMultipartUpload,
+      })
 
       nock('https://bucket.s3.us-east-2.amazonaws.com')
         .defaultReplyHeaders({
@@ -604,7 +676,7 @@ describe('AwsS3Multipart', () => {
         source: 'vi',
         name: 'multitest.dat',
         type: 'application/octet-stream',
-        data: new File([new Uint8Array(fileSize)], {
+        data: new File([new Uint8Array(fileSize)], '', {
           type: 'application/octet-stream',
         }),
       })
@@ -649,14 +721,13 @@ describe('AwsS3Multipart', () => {
         }
       })
 
-      core = new Core()
-        .use(AwsS3Multipart, {
-          createMultipartUpload,
-          signPart: signPartWithPause,
-          listParts,
-          completeMultipartUpload: completeMultipartUploadAfterPause,
-          abortMultipartUpload,
-        })
+      core = new Core<any, Body>().use(AwsS3Multipart, {
+        createMultipartUpload,
+        signPart: signPartWithPause,
+        listParts,
+        completeMultipartUpload: completeMultipartUploadAfterPause,
+        abortMultipartUpload,
+      })
 
       nock('https://bucket.s3.us-east-2.amazonaws.com')
         .defaultReplyHeaders({
@@ -674,7 +745,7 @@ describe('AwsS3Multipart', () => {
         source: 'vi',
         name: 'multitest.dat',
         type: 'application/octet-stream',
-        data: new File([new Uint8Array(fileSize)], {
+        data: new File([new Uint8Array(fileSize)], '', {
           type: 'application/octet-stream',
         }),
       })

+ 1007 - 0
packages/@uppy/aws-s3-multipart/src/index.ts

@@ -0,0 +1,1007 @@
+import BasePlugin, {
+  type DefinePluginOpts,
+  type PluginOpts,
+} from '@uppy/core/lib/BasePlugin.js'
+import { RequestClient } from '@uppy/companion-client'
+import type { RequestOptions } from '@uppy/utils/lib/CompanionClientProvider.ts'
+import type { Body as _Body, Meta, UppyFile } from '@uppy/utils/lib/UppyFile'
+import type { Uppy } from '@uppy/core'
+import EventManager from '@uppy/core/lib/EventManager.js'
+import { RateLimitedQueue } from '@uppy/utils/lib/RateLimitedQueue'
+import {
+  filterNonFailedFiles,
+  filterFilesToEmitUploadStarted,
+} from '@uppy/utils/lib/fileFilters'
+import { createAbortError } from '@uppy/utils/lib/AbortController'
+
+import MultipartUploader from './MultipartUploader.ts'
+import { throwIfAborted } from './utils.ts'
+import type {
+  UploadResult,
+  UploadResultWithSignal,
+  MultipartUploadResultWithSignal,
+  UploadPartBytesResult,
+  Body,
+} from './utils.ts'
+import createSignedURL from './createSignedURL.ts'
+import { HTTPCommunicationQueue } from './HTTPCommunicationQueue.ts'
+// eslint-disable-next-line @typescript-eslint/ban-ts-comment
+// @ts-ignore We don't want TS to generate types for the package.json
+import packageJson from '../package.json'
+
+interface MultipartFile<M extends Meta, B extends Body> extends UppyFile<M, B> {
+  s3Multipart: UploadResult
+}
+
+type PartUploadedCallback<M extends Meta, B extends _Body> = (
+  file: UppyFile<M, B>,
+  part: { PartNumber: number; ETag: string },
+) => void
+
+declare module '@uppy/core' {
+  export interface UppyEventMap<M extends Meta, B extends _Body> {
+    's3-multipart:part-uploaded': PartUploadedCallback<M, B>
+  }
+}
+
+function assertServerError<T>(res: T): T {
+  if ((res as any)?.error) {
+    const error = new Error((res as any).message)
+    Object.assign(error, (res as any).error)
+    throw error
+  }
+  return res
+}
+
+export interface AwsS3STSResponse {
+  credentials: {
+    AccessKeyId: string
+    SecretAccessKey: string
+    SessionToken: string
+    Expiration?: string
+  }
+  bucket: string
+  region: string
+}
+
+/**
+ * Computes the expiry time for a request signed with temporary credentials. If
+ * no expiration was provided, or an invalid value (e.g. in the past) is
+ * provided, undefined is returned. This function assumes the client clock is in
+ * sync with the remote server, which is a requirement for the signature to be
+ * validated for AWS anyway.
+ */
+function getExpiry(
+  credentials: AwsS3STSResponse['credentials'],
+): number | undefined {
+  const expirationDate = credentials.Expiration
+  if (expirationDate) {
+    const timeUntilExpiry = Math.floor(
+      ((new Date(expirationDate) as any as number) - Date.now()) / 1000,
+    )
+    if (timeUntilExpiry > 9) {
+      return timeUntilExpiry
+    }
+  }
+  return undefined
+}
+
+function getAllowedMetadata<M extends Record<string, any>>({
+  meta,
+  allowedMetaFields,
+  querify = false,
+}: {
+  meta: M
+  allowedMetaFields?: string[] | null
+  querify?: boolean
+}) {
+  const metaFields = allowedMetaFields ?? Object.keys(meta)
+
+  if (!meta) return {}
+
+  return Object.fromEntries(
+    metaFields
+      .filter((key) => meta[key] != null)
+      .map((key) => {
+        const realKey = querify ? `metadata[${key}]` : key
+        const value = String(meta[key])
+        return [realKey, value]
+      }),
+  )
+}
+
+type MaybePromise<T> = T | Promise<T>
+
+type SignPartOptions = {
+  uploadId: string
+  key: string
+  partNumber: number
+  body: Blob
+  signal?: AbortSignal
+}
+
+export type AwsS3UploadParameters =
+  | {
+      method: 'POST'
+      url: string
+      fields: Record<string, string>
+      expires?: number
+      headers?: Record<string, string>
+    }
+  | {
+      method?: 'PUT'
+      url: string
+      fields?: Record<string, never>
+      expires?: number
+      headers?: Record<string, string>
+    }
+
+export interface AwsS3Part {
+  PartNumber?: number
+  Size?: number
+  ETag?: string
+}
+
+type AWSS3WithCompanion = {
+  companionUrl: string
+  companionHeaders?: Record<string, string>
+  companionCookiesRule?: string
+  getTemporarySecurityCredentials?: true
+}
+type AWSS3WithoutCompanion = {
+  getTemporarySecurityCredentials?: (options?: {
+    signal?: AbortSignal
+  }) => MaybePromise<AwsS3STSResponse>
+  uploadPartBytes?: (options: {
+    signature: AwsS3UploadParameters
+    body: FormData | Blob
+    size?: number
+    onProgress: any
+    onComplete: any
+    signal?: AbortSignal
+  }) => Promise<UploadPartBytesResult>
+}
+
+type AWSS3NonMultipartWithCompanionMandatory = {
+  // No related options
+}
+
+type AWSS3NonMultipartWithoutCompanionMandatory<
+  M extends Meta,
+  B extends Body,
+> = {
+  getUploadParameters: (
+    file: UppyFile<M, B>,
+    options: RequestOptions,
+  ) => MaybePromise<AwsS3UploadParameters>
+}
+type AWSS3NonMultipartWithCompanion = AWSS3WithCompanion &
+  AWSS3NonMultipartWithCompanionMandatory & {
+    shouldUseMultipart: false
+  }
+
+type AWSS3NonMultipartWithoutCompanion<
+  M extends Meta,
+  B extends Body,
+> = AWSS3WithoutCompanion &
+  AWSS3NonMultipartWithoutCompanionMandatory<M, B> & {
+    shouldUseMultipart: false
+  }
+
+type AWSS3MultipartWithoutCompanionMandatorySignPart<
+  M extends Meta,
+  B extends Body,
+> = {
+  signPart: (
+    file: UppyFile<M, B>,
+    opts: SignPartOptions,
+  ) => MaybePromise<AwsS3UploadParameters>
+}
+/** @deprecated Use signPart instead */
+type AWSS3MultipartWithoutCompanionMandatoryPrepareUploadParts<
+  M extends Meta,
+  B extends Body,
+> = {
+  /** @deprecated Use signPart instead */
+  prepareUploadParts: (
+    file: UppyFile<M, B>,
+    partData: {
+      uploadId: string
+      key: string
+      parts: [{ number: number; chunk: Blob }]
+      signal?: AbortSignal
+    },
+  ) => MaybePromise<{
+    presignedUrls: Record<number, string>
+    headers?: Record<number, Record<string, string>>
+  }>
+}
+type AWSS3MultipartWithoutCompanionMandatory<M extends Meta, B extends Body> = {
+  getChunkSize?: (file: UppyFile<M, B>) => number
+  createMultipartUpload: (file: UppyFile<M, B>) => MaybePromise<UploadResult>
+  listParts: (
+    file: UppyFile<M, B>,
+    opts: UploadResultWithSignal,
+  ) => MaybePromise<AwsS3Part[]>
+  abortMultipartUpload: (
+    file: UppyFile<M, B>,
+    opts: UploadResultWithSignal,
+  ) => MaybePromise<void>
+  completeMultipartUpload: (
+    file: UppyFile<M, B>,
+    opts: {
+      uploadId: string
+      key: string
+      parts: AwsS3Part[]
+      signal: AbortSignal
+    },
+  ) => MaybePromise<{ location?: string }>
+} & (
+  | AWSS3MultipartWithoutCompanionMandatorySignPart<M, B>
+  | AWSS3MultipartWithoutCompanionMandatoryPrepareUploadParts<M, B>
+)
+
+type AWSS3MultipartWithoutCompanion<
+  M extends Meta,
+  B extends Body,
+> = AWSS3WithoutCompanion &
+  AWSS3MultipartWithoutCompanionMandatory<M, B> & {
+    shouldUseMultipart?: true
+  }
+
+type AWSS3MultipartWithCompanion<
+  M extends Meta,
+  B extends Body,
+> = AWSS3WithCompanion &
+  Partial<AWSS3MultipartWithoutCompanionMandatory<M, B>> & {
+    shouldUseMultipart?: true
+  }
+
+type AWSS3MaybeMultipartWithCompanion<
+  M extends Meta,
+  B extends Body,
+> = AWSS3WithCompanion &
+  Partial<AWSS3MultipartWithoutCompanionMandatory<M, B>> &
+  AWSS3NonMultipartWithCompanionMandatory & {
+    shouldUseMultipart: (file: UppyFile<M, B>) => boolean
+  }
+
+type AWSS3MaybeMultipartWithoutCompanion<
+  M extends Meta,
+  B extends Body,
+> = AWSS3WithoutCompanion &
+  AWSS3MultipartWithoutCompanionMandatory<M, B> &
+  AWSS3NonMultipartWithoutCompanionMandatory<M, B> & {
+    shouldUseMultipart: (file: UppyFile<M, B>) => boolean
+  }
+
+type RequestClientOptions = Partial<
+  ConstructorParameters<typeof RequestClient<any, any>>[1]
+>
+
+interface _AwsS3MultipartOptions extends PluginOpts, RequestClientOptions {
+  allowedMetaFields?: string[] | null
+  limit?: number
+  retryDelays?: number[] | null
+}
+
+export type AwsS3MultipartOptions<
+  M extends Meta,
+  B extends Body,
+> = _AwsS3MultipartOptions &
+  (
+    | AWSS3NonMultipartWithCompanion
+    | AWSS3NonMultipartWithoutCompanion<M, B>
+    | AWSS3MultipartWithCompanion<M, B>
+    | AWSS3MultipartWithoutCompanion<M, B>
+    | AWSS3MaybeMultipartWithCompanion<M, B>
+    | AWSS3MaybeMultipartWithoutCompanion<M, B>
+  )
+
+const defaultOptions = {
+  // TODO: null here means “include all”, [] means include none.
+  // This is inconsistent with @uppy/aws-s3 and @uppy/transloadit
+  allowedMetaFields: null,
+  limit: 6,
+  getTemporarySecurityCredentials: false as any,
+  shouldUseMultipart: ((file: UppyFile<any, any>) =>
+    file.size !== 0) as any as true, // TODO: Switch default to:
+  // eslint-disable-next-line no-bitwise
+  // shouldUseMultipart: (file) => file.size >> 10 >> 10 > 100,
+  retryDelays: [0, 1000, 3000, 5000],
+  companionHeaders: {},
+} satisfies Partial<AwsS3MultipartOptions<any, any>>
+
+export default class AwsS3Multipart<
+  M extends Meta,
+  B extends Body,
+> extends BasePlugin<
+  DefinePluginOpts<AwsS3MultipartOptions<M, B>, keyof typeof defaultOptions> &
+    // We also have a few dynamic options defined below:
+    Pick<
+      AWSS3MultipartWithoutCompanionMandatory<M, B>,
+      | 'getChunkSize'
+      | 'createMultipartUpload'
+      | 'listParts'
+      | 'abortMultipartUpload'
+      | 'completeMultipartUpload'
+    > &
+    Required<Pick<AWSS3WithoutCompanion, 'uploadPartBytes'>> &
+    AWSS3MultipartWithoutCompanionMandatorySignPart<M, B> &
+    AWSS3NonMultipartWithoutCompanionMandatory<M, B>,
+  M,
+  B
+> {
+  static VERSION = packageJson.version
+
+  #companionCommunicationQueue
+
+  #client: RequestClient<M, B>
+
+  protected requests: any
+
+  protected uploaderEvents: Record<string, EventManager<M, B> | null>
+
+  protected uploaders: Record<string, MultipartUploader<M, B> | null>
+
+  protected uploaderSockets: Record<string, never>
+
+  constructor(uppy: Uppy<M, B>, opts: AwsS3MultipartOptions<M, B>) {
+    super(uppy, {
+      ...defaultOptions,
+      uploadPartBytes: AwsS3Multipart.uploadPartBytes,
+      createMultipartUpload: null as any,
+      listParts: null as any,
+      abortMultipartUpload: null as any,
+      completeMultipartUpload: null as any,
+      signPart: null as any,
+      getUploadParameters: null as any,
+      ...opts,
+    })
+    // We need the `as any` here because of the dynamic default options.
+    this.type = 'uploader'
+    this.id = this.opts.id || 'AwsS3Multipart'
+    // @ts-expect-error TODO: remove unused
+    this.title = 'AWS S3 Multipart'
+    // TODO: only initiate `RequestClient` is `companionUrl` is defined.
+    this.#client = new RequestClient(uppy, opts as any)
+
+    const dynamicDefaultOptions = {
+      createMultipartUpload: this.createMultipartUpload,
+      listParts: this.listParts,
+      abortMultipartUpload: this.abortMultipartUpload,
+      completeMultipartUpload: this.completeMultipartUpload,
+      signPart:
+        opts?.getTemporarySecurityCredentials ?
+          this.createSignedURL
+        : this.signPart,
+      getUploadParameters:
+        opts?.getTemporarySecurityCredentials ?
+          (this.createSignedURL as any)
+        : this.getUploadParameters,
+    } satisfies Partial<AwsS3MultipartOptions<M, B>>
+
+    for (const key of Object.keys(dynamicDefaultOptions)) {
+      if (this.opts[key as keyof typeof dynamicDefaultOptions] == null) {
+        this.opts[key as keyof typeof dynamicDefaultOptions] =
+          dynamicDefaultOptions[key as keyof typeof dynamicDefaultOptions].bind(
+            this,
+          )
+      }
+    }
+    if (
+      (opts as AWSS3MultipartWithoutCompanionMandatoryPrepareUploadParts<M, B>)
+        ?.prepareUploadParts != null &&
+      (opts as AWSS3MultipartWithoutCompanionMandatorySignPart<M, B>)
+        .signPart == null
+    ) {
+      this.opts.signPart = async (
+        file: UppyFile<M, B>,
+        { uploadId, key, partNumber, body, signal }: SignPartOptions,
+      ) => {
+        const { presignedUrls, headers } = await (
+          opts as AWSS3MultipartWithoutCompanionMandatoryPrepareUploadParts<
+            M,
+            B
+          >
+        ).prepareUploadParts(file, {
+          uploadId,
+          key,
+          parts: [{ number: partNumber, chunk: body }],
+          signal,
+        })
+        return {
+          url: presignedUrls?.[partNumber],
+          headers: headers?.[partNumber],
+        }
+      }
+    }
+
+    /**
+     * Simultaneous upload limiting is shared across all uploads with this plugin.
+     *
+     * @type {RateLimitedQueue}
+     */
+    this.requests =
+      (this.opts as any).rateLimitedQueue ??
+      new RateLimitedQueue(this.opts.limit)
+    this.#companionCommunicationQueue = new HTTPCommunicationQueue(
+      this.requests,
+      this.opts,
+      this.#setS3MultipartState,
+      this.#getFile,
+    )
+
+    this.uploaders = Object.create(null)
+    this.uploaderEvents = Object.create(null)
+    this.uploaderSockets = Object.create(null)
+  }
+
+  private [Symbol.for('uppy test: getClient')]() {
+    return this.#client
+  }
+
+  setOptions(newOptions: Partial<AwsS3MultipartOptions<M, B>>): void {
+    this.#companionCommunicationQueue.setOptions(newOptions)
+    super.setOptions(newOptions)
+    this.#setCompanionHeaders()
+  }
+
+  /**
+   * Clean up all references for a file's upload: the MultipartUploader instance,
+   * any events related to the file, and the Companion WebSocket connection.
+   *
+   * Set `opts.abort` to tell S3 that the multipart upload is cancelled and must be removed.
+   * This should be done when the user cancels the upload, not when the upload is completed or errored.
+   */
+  resetUploaderReferences(fileID: string, opts?: { abort: boolean }): void {
+    if (this.uploaders[fileID]) {
+      this.uploaders[fileID]!.abort({ really: opts?.abort || false })
+      this.uploaders[fileID] = null
+    }
+    if (this.uploaderEvents[fileID]) {
+      this.uploaderEvents[fileID]!.remove()
+      this.uploaderEvents[fileID] = null
+    }
+    if (this.uploaderSockets[fileID]) {
+      // @ts-expect-error TODO: remove this block in the next major
+      this.uploaderSockets[fileID].close()
+      // @ts-expect-error TODO: remove this block in the next major
+      this.uploaderSockets[fileID] = null
+    }
+  }
+
+  // TODO: make this a private method in the next major
+  assertHost(method: string): void {
+    if (!this.opts.companionUrl) {
+      throw new Error(
+        `Expected a \`companionUrl\` option containing a Companion address, or if you are not using Companion, a custom \`${method}\` implementation.`,
+      )
+    }
+  }
+
+  createMultipartUpload(
+    file: UppyFile<M, B>,
+    signal?: AbortSignal,
+  ): Promise<UploadResult> {
+    this.assertHost('createMultipartUpload')
+    throwIfAborted(signal)
+
+    const metadata = getAllowedMetadata({
+      meta: file.meta,
+      allowedMetaFields: this.opts.allowedMetaFields,
+    })
+
+    return this.#client
+      .post<UploadResult>(
+        's3/multipart',
+        {
+          filename: file.name,
+          type: file.type,
+          metadata,
+        },
+        { signal },
+      )
+      .then(assertServerError)
+  }
+
+  listParts(
+    file: UppyFile<M, B>,
+    { key, uploadId, signal }: UploadResultWithSignal,
+    oldSignal?: AbortSignal,
+  ): Promise<AwsS3Part[]> {
+    signal ??= oldSignal // eslint-disable-line no-param-reassign
+    this.assertHost('listParts')
+    throwIfAborted(signal)
+
+    const filename = encodeURIComponent(key)
+    return this.#client
+      .get<AwsS3Part[]>(`s3/multipart/${uploadId}?key=${filename}`, { signal })
+      .then(assertServerError)
+  }
+
+  completeMultipartUpload(
+    file: UppyFile<M, B>,
+    { key, uploadId, parts, signal }: MultipartUploadResultWithSignal,
+    oldSignal?: AbortSignal,
+  ): Promise<B> {
+    signal ??= oldSignal // eslint-disable-line no-param-reassign
+    this.assertHost('completeMultipartUpload')
+    throwIfAborted(signal)
+
+    const filename = encodeURIComponent(key)
+    const uploadIdEnc = encodeURIComponent(uploadId)
+    return this.#client
+      .post<B>(
+        `s3/multipart/${uploadIdEnc}/complete?key=${filename}`,
+        { parts },
+        { signal },
+      )
+      .then(assertServerError)
+  }
+
+  #cachedTemporaryCredentials: MaybePromise<AwsS3STSResponse>
+
+  async #getTemporarySecurityCredentials(options?: RequestOptions) {
+    throwIfAborted(options?.signal)
+
+    if (this.#cachedTemporaryCredentials == null) {
+      // We do not await it just yet, so concurrent calls do not try to override it:
+      if (this.opts.getTemporarySecurityCredentials === true) {
+        this.assertHost('getTemporarySecurityCredentials')
+        this.#cachedTemporaryCredentials = this.#client
+          .get<AwsS3STSResponse>('s3/sts', options)
+          .then(assertServerError)
+      } else {
+        this.#cachedTemporaryCredentials =
+          this.opts.getTemporarySecurityCredentials(options)
+      }
+      this.#cachedTemporaryCredentials = await this.#cachedTemporaryCredentials
+      setTimeout(
+        () => {
+          // At half the time left before expiration, we clear the cache. That's
+          // an arbitrary tradeoff to limit the number of requests made to the
+          // remote while limiting the risk of using an expired token in case the
+          // clocks are not exactly synced.
+          // The HTTP cache should be configured to ensure a client doesn't request
+          // more tokens than it needs, but this timeout provides a second layer of
+          // security in case the HTTP cache is disabled or misconfigured.
+          this.#cachedTemporaryCredentials = null as any
+        },
+        (getExpiry(this.#cachedTemporaryCredentials.credentials) || 0) * 500,
+      )
+    }
+
+    return this.#cachedTemporaryCredentials
+  }
+
+  async createSignedURL(
+    file: UppyFile<M, B>,
+    options: SignPartOptions,
+  ): Promise<AwsS3UploadParameters> {
+    const data = await this.#getTemporarySecurityCredentials(options)
+    const expires = getExpiry(data.credentials) || 604_800 // 604 800 is the max value accepted by AWS.
+
+    const { uploadId, key, partNumber } = options
+
+    // Return an object in the correct shape.
+    return {
+      method: 'PUT',
+      expires,
+      fields: {},
+      url: `${await createSignedURL({
+        accountKey: data.credentials.AccessKeyId,
+        accountSecret: data.credentials.SecretAccessKey,
+        sessionToken: data.credentials.SessionToken,
+        expires,
+        bucketName: data.bucket,
+        Region: data.region,
+        Key: key ?? `${crypto.randomUUID()}-${file.name}`,
+        uploadId,
+        partNumber,
+      })}`,
+      // Provide content type header required by S3
+      headers: {
+        'Content-Type': file.type as string,
+      },
+    }
+  }
+
+  signPart(
+    file: UppyFile<M, B>,
+    { uploadId, key, partNumber, signal }: SignPartOptions,
+  ): Promise<AwsS3UploadParameters> {
+    this.assertHost('signPart')
+    throwIfAborted(signal)
+
+    if (uploadId == null || key == null || partNumber == null) {
+      throw new Error(
+        'Cannot sign without a key, an uploadId, and a partNumber',
+      )
+    }
+
+    const filename = encodeURIComponent(key)
+    return this.#client
+      .get<AwsS3UploadParameters>(
+        `s3/multipart/${uploadId}/${partNumber}?key=${filename}`,
+        { signal },
+      )
+      .then(assertServerError)
+  }
+
+  abortMultipartUpload(
+    file: UppyFile<M, B>,
+    { key, uploadId, signal }: UploadResultWithSignal,
+    // eslint-disable-next-line @typescript-eslint/no-unused-vars
+    oldSignal?: AbortSignal, // TODO: remove in next major
+  ): Promise<void> {
+    signal ??= oldSignal // eslint-disable-line no-param-reassign
+    this.assertHost('abortMultipartUpload')
+
+    const filename = encodeURIComponent(key)
+    const uploadIdEnc = encodeURIComponent(uploadId)
+    return this.#client
+      .delete<void>(`s3/multipart/${uploadIdEnc}?key=${filename}`, undefined, {
+        signal,
+      })
+      .then(assertServerError)
+  }
+
+  getUploadParameters(
+    file: UppyFile<M, B>,
+    options: RequestOptions,
+  ): Promise<AwsS3UploadParameters> {
+    const { meta } = file
+    const { type, name: filename } = meta
+    const metadata = getAllowedMetadata({
+      meta,
+      allowedMetaFields: this.opts.allowedMetaFields,
+      querify: true,
+    })
+
+    const query = new URLSearchParams({ filename, type, ...metadata } as Record<
+      string,
+      string
+    >)
+
+    return this.#client.get(`s3/params?${query}`, options)
+  }
+
+  static async uploadPartBytes({
+    signature: { url, expires, headers, method = 'PUT' },
+    body,
+    size = (body as Blob).size,
+    onProgress,
+    onComplete,
+    signal,
+  }: {
+    signature: AwsS3UploadParameters
+    body: FormData | Blob
+    size?: number
+    onProgress: any
+    onComplete: any
+    signal?: AbortSignal
+  }): Promise<UploadPartBytesResult> {
+    throwIfAborted(signal)
+
+    if (url == null) {
+      throw new Error('Cannot upload to an undefined URL')
+    }
+
+    return new Promise((resolve, reject) => {
+      const xhr = new XMLHttpRequest()
+      xhr.open(method, url, true)
+      if (headers) {
+        Object.keys(headers).forEach((key) => {
+          xhr.setRequestHeader(key, headers[key])
+        })
+      }
+      xhr.responseType = 'text'
+      if (typeof expires === 'number') {
+        xhr.timeout = expires * 1000
+      }
+
+      function onabort() {
+        xhr.abort()
+      }
+      function cleanup() {
+        signal?.removeEventListener('abort', onabort)
+      }
+      signal?.addEventListener('abort', onabort)
+
+      xhr.upload.addEventListener('progress', (ev) => {
+        onProgress(ev)
+      })
+
+      xhr.addEventListener('abort', () => {
+        cleanup()
+
+        reject(createAbortError())
+      })
+
+      xhr.addEventListener('timeout', () => {
+        cleanup()
+
+        const error = new Error('Request has expired')
+        ;(error as any).source = { status: 403 }
+        reject(error)
+      })
+      xhr.addEventListener('load', (ev) => {
+        cleanup()
+
+        if (
+          xhr.status === 403 &&
+          xhr.responseText.includes('<Message>Request has expired</Message>')
+        ) {
+          const error = new Error('Request has expired')
+          ;(error as any).source = xhr
+          reject(error)
+          return
+        }
+        if (xhr.status < 200 || xhr.status >= 300) {
+          const error = new Error('Non 2xx')
+          ;(error as any).source = xhr
+          reject(error)
+          return
+        }
+
+        // todo make a proper onProgress API (breaking change)
+        onProgress?.({ loaded: size, lengthComputable: true })
+
+        // NOTE This must be allowed by CORS.
+        const etag = xhr.getResponseHeader('ETag')
+        const location = xhr.getResponseHeader('Location')
+
+        if (method.toUpperCase() === 'POST' && location === null) {
+          // Not being able to read the Location header is not a fatal error.
+          // eslint-disable-next-line no-console
+          console.warn(
+            'AwsS3/Multipart: Could not read the Location header. This likely means CORS is not configured correctly on the S3 Bucket. See https://uppy.io/docs/aws-s3-multipart#S3-Bucket-Configuration for instructions.',
+          )
+        }
+        if (etag === null) {
+          reject(
+            new Error(
+              'AwsS3/Multipart: Could not read the ETag header. This likely means CORS is not configured correctly on the S3 Bucket. See https://uppy.io/docs/aws-s3-multipart#S3-Bucket-Configuration for instructions.',
+            ),
+          )
+          return
+        }
+
+        onComplete?.(etag)
+        resolve({
+          ETag: etag,
+          ...(location ? { location } : undefined),
+        })
+      })
+
+      xhr.addEventListener('error', (ev) => {
+        cleanup()
+
+        const error = new Error('Unknown error')
+        ;(error as any).source = ev.target
+        reject(error)
+      })
+
+      xhr.send(body)
+    })
+  }
+
+  #setS3MultipartState = (
+    file: UppyFile<M, B>,
+    { key, uploadId }: UploadResult,
+  ) => {
+    const cFile = this.uppy.getFile(file.id)
+    if (cFile == null) {
+      // file was removed from store
+      return
+    }
+
+    this.uppy.setFileState(file.id, {
+      s3Multipart: {
+        ...(cFile as MultipartFile<M, B>).s3Multipart,
+        key,
+        uploadId,
+      },
+    } as Partial<MultipartFile<M, B>>)
+  }
+
+  #getFile = (file: UppyFile<M, B>) => {
+    return this.uppy.getFile(file.id) || file
+  }
+
+  #uploadLocalFile(file: UppyFile<M, B>) {
+    return new Promise<void | string>((resolve, reject) => {
+      const onProgress = (bytesUploaded: number, bytesTotal: number) => {
+        this.uppy.emit('upload-progress', this.uppy.getFile(file.id), {
+          // @ts-expect-error TODO: figure out if we need this
+          uploader: this,
+          bytesUploaded,
+          bytesTotal,
+        })
+      }
+
+      const onError = (err: unknown) => {
+        this.uppy.log(err as Error)
+        this.uppy.emit('upload-error', file, err as Error)
+
+        this.resetUploaderReferences(file.id)
+        reject(err)
+      }
+
+      const onSuccess = (result: B) => {
+        const uploadResp = {
+          body: {
+            ...result,
+          },
+          status: 200,
+          uploadURL: result.location,
+        }
+
+        this.resetUploaderReferences(file.id)
+
+        this.uppy.emit('upload-success', this.#getFile(file), uploadResp)
+
+        if (result.location) {
+          this.uppy.log(`Download ${file.name} from ${result.location}`)
+        }
+
+        resolve()
+      }
+
+      const upload = new MultipartUploader<M, B>(file.data, {
+        // .bind to pass the file object to each handler.
+        companionComm: this.#companionCommunicationQueue,
+
+        log: (...args: Parameters<Uppy<M, B>['log']>) => this.uppy.log(...args),
+        getChunkSize:
+          this.opts.getChunkSize ? this.opts.getChunkSize.bind(this) : null,
+
+        onProgress,
+        onError,
+        onSuccess,
+        onPartComplete: (part) => {
+          this.uppy.emit(
+            's3-multipart:part-uploaded',
+            this.#getFile(file),
+            part,
+          )
+        },
+
+        file,
+        shouldUseMultipart: this.opts.shouldUseMultipart,
+
+        ...(file as MultipartFile<M, B>).s3Multipart,
+      })
+
+      this.uploaders[file.id] = upload
+      const eventManager = new EventManager(this.uppy)
+      this.uploaderEvents[file.id] = eventManager
+
+      eventManager.onFileRemove(file.id, (removed) => {
+        upload.abort()
+        this.resetUploaderReferences(file.id, { abort: true })
+        resolve(`upload ${removed} was removed`)
+      })
+
+      eventManager.onCancelAll(file.id, (options) => {
+        if (options?.reason === 'user') {
+          upload.abort()
+          this.resetUploaderReferences(file.id, { abort: true })
+        }
+        resolve(`upload ${file.id} was canceled`)
+      })
+
+      eventManager.onFilePause(file.id, (isPaused) => {
+        if (isPaused) {
+          upload.pause()
+        } else {
+          upload.start()
+        }
+      })
+
+      eventManager.onPauseAll(file.id, () => {
+        upload.pause()
+      })
+
+      eventManager.onResumeAll(file.id, () => {
+        upload.start()
+      })
+
+      upload.start()
+    })
+  }
+
+  // eslint-disable-next-line class-methods-use-this
+  #getCompanionClientArgs(file: UppyFile<M, B>) {
+    return {
+      ...file.remote?.body,
+      protocol: 's3-multipart',
+      size: file.data.size,
+      metadata: file.meta,
+    }
+  }
+
+  #upload = async (fileIDs: string[]) => {
+    if (fileIDs.length === 0) return undefined
+
+    const files = this.uppy.getFilesByIds(fileIDs)
+    const filesFiltered = filterNonFailedFiles(files)
+    const filesToEmit = filterFilesToEmitUploadStarted(filesFiltered)
+
+    this.uppy.emit('upload-start', filesToEmit)
+
+    const promises = filesFiltered.map((file) => {
+      if (file.isRemote) {
+        const getQueue = () => this.requests
+        this.#setResumableUploadsCapability(false)
+        const controller = new AbortController()
+
+        const removedHandler = (removedFile: UppyFile<M, B>) => {
+          if (removedFile.id === file.id) controller.abort()
+        }
+        this.uppy.on('file-removed', removedHandler)
+
+        const uploadPromise = this.uppy
+          .getRequestClientForFile<RequestClient<M, B>>(file)
+          .uploadRemoteFile(file, this.#getCompanionClientArgs(file), {
+            signal: controller.signal,
+            getQueue,
+          })
+
+        this.requests.wrapSyncFunction(
+          () => {
+            this.uppy.off('file-removed', removedHandler)
+          },
+          { priority: -1 },
+        )()
+
+        return uploadPromise
+      }
+
+      return this.#uploadLocalFile(file)
+    })
+
+    const upload = await Promise.all(promises)
+    // After the upload is done, another upload may happen with only local files.
+    // We reset the capability so that the next upload can use resumable uploads.
+    this.#setResumableUploadsCapability(true)
+    return upload
+  }
+
+  #setCompanionHeaders = () => {
+    this.#client.setCompanionHeaders(this.opts.companionHeaders)
+  }
+
+  #setResumableUploadsCapability = (boolean: boolean) => {
+    const { capabilities } = this.uppy.getState()
+    this.uppy.setState({
+      capabilities: {
+        ...capabilities,
+        resumableUploads: boolean,
+      },
+    })
+  }
+
+  #resetResumableCapability = () => {
+    this.#setResumableUploadsCapability(true)
+  }
+
+  install(): void {
+    this.#setResumableUploadsCapability(true)
+    this.uppy.addPreProcessor(this.#setCompanionHeaders)
+    this.uppy.addUploader(this.#upload)
+    this.uppy.on('cancel-all', this.#resetResumableCapability)
+  }
+
+  uninstall(): void {
+    this.uppy.removePreProcessor(this.#setCompanionHeaders)
+    this.uppy.removeUploader(this.#upload)
+    this.uppy.off('cancel-all', this.#resetResumableCapability)
+  }
+}
+
+export type uploadPartBytes = (typeof AwsS3Multipart<
+  any,
+  any
+>)['uploadPartBytes']

+ 28 - 0
packages/@uppy/aws-s3-multipart/src/utils.ts

@@ -0,0 +1,28 @@
+import { createAbortError } from '@uppy/utils/lib/AbortController'
+import type { Body as _Body } from '@uppy/utils/lib/UppyFile'
+
+import type { AwsS3Part } from './index'
+
+export function throwIfAborted(signal?: AbortSignal | null): void {
+  if (signal?.aborted) {
+    throw createAbortError('The operation was aborted', {
+      cause: signal.reason,
+    })
+  }
+}
+
+export type UploadResult = { key: string; uploadId: string }
+export type UploadResultWithSignal = UploadResult & { signal?: AbortSignal }
+export type MultipartUploadResult = UploadResult & { parts: AwsS3Part[] }
+export type MultipartUploadResultWithSignal = MultipartUploadResult & {
+  signal?: AbortSignal
+}
+
+export type UploadPartBytesResult = {
+  ETag: string
+  location?: string
+}
+
+export interface Body extends _Body {
+  location: string
+}

+ 30 - 0
packages/@uppy/aws-s3-multipart/tsconfig.build.json

@@ -0,0 +1,30 @@
+{
+  "extends": "../../../tsconfig.shared",
+  "compilerOptions": {
+    "noImplicitAny": false,
+    "outDir": "./lib",
+    "paths": {
+      "@uppy/companion-client": ["../companion-client/src/index.js"],
+      "@uppy/companion-client/lib/*": ["../companion-client/src/*"],
+      "@uppy/utils/lib/*": ["../utils/src/*"],
+      "@uppy/core": ["../core/src/index.js"],
+      "@uppy/core/lib/*": ["../core/src/*"]
+    },
+    "resolveJsonModule": false,
+    "rootDir": "./src",
+    "skipLibCheck": true
+  },
+  "include": ["./src/**/*.*"],
+  "exclude": ["./src/**/*.test.ts"],
+  "references": [
+    {
+      "path": "../companion-client/tsconfig.build.json"
+    },
+    {
+      "path": "../utils/tsconfig.build.json"
+    },
+    {
+      "path": "../core/tsconfig.build.json"
+    }
+  ]
+}

+ 26 - 0
packages/@uppy/aws-s3-multipart/tsconfig.json

@@ -0,0 +1,26 @@
+{
+  "extends": "../../../tsconfig.shared",
+  "compilerOptions": {
+    "emitDeclarationOnly": false,
+    "noEmit": true,
+    "paths": {
+      "@uppy/companion-client": ["../companion-client/src/index.js"],
+      "@uppy/companion-client/lib/*": ["../companion-client/src/*"],
+      "@uppy/utils/lib/*": ["../utils/src/*"],
+      "@uppy/core": ["../core/src/index.js"],
+      "@uppy/core/lib/*": ["../core/src/*"],
+    },
+  },
+  "include": ["./package.json", "./src/**/*.*"],
+  "references": [
+    {
+      "path": "../companion-client/tsconfig.build.json",
+    },
+    {
+      "path": "../utils/tsconfig.build.json",
+    },
+    {
+      "path": "../core/tsconfig.build.json",
+    },
+  ],
+}

+ 4 - 1
packages/@uppy/core/src/Uppy.ts

@@ -49,7 +49,10 @@ import locale from './locale.ts'
 import type BasePlugin from './BasePlugin.ts'
 import type { Restrictions, ValidateableFile } from './Restricter.ts'
 
-type Processor = (fileIDs: string[], uploadID: string) => Promise<void> | void
+type Processor = (
+  fileIDs: string[],
+  uploadID: string,
+) => Promise<unknown> | void
 
 type FileRemoveReason = 'user' | 'cancel-all'