Browse Source

@uppy/aws-s3-multipart: add `shouldUseMultipart ` option (#4205)

* @uppy/aws-s3-multipart: add support for non-multipart uploads

* Set threshold at 100 MiB

* add `shouldUseMultipart` option

* makes the change semver-minor, the breaking change should be done in the next major

* fix merge conflict

* fix lint

* fix crash

i think it can be considered a breaking change, because i'm changing the signature of uploadPartBytes, which is an option. however I don't see uploadPartBytes documented in our docs, so maybe we can get away with a non major:

- onProgress now gets passed number of bytes instead of `ev`
- a new `size` argument (previously size was on body)

* fix shouldUseMultipart confusingness

* handle zero-size files as it was before

* remove breaking change in `uploadPartBytes`

* fix `allowedMetaFields` for non-multipart

* make backwards compatible and fix meta

* nits

* fix metadata for non-multipart

(inside multipart)

* Remove second `fileSize` argugment from shouldUseMultipart, because users can use file.size

* console.log

---------

Co-authored-by: Mikael Finstad <finstaden@gmail.com>
Co-authored-by: Artur Paikin <artur@arturpaikin.com>
Antoine du Hamel 1 year ago
parent
commit
8e15f27a97

+ 21 - 14
packages/@uppy/aws-s3-multipart/src/MultipartUploader.js

@@ -43,6 +43,8 @@ class MultipartUploader {
 
   #onSuccess
 
+  #shouldUseMultipart
+
   #onReject = (err) => (err?.cause === pausingUploadReason ? null : this.#onError(err))
 
   constructor (data, options) {
@@ -57,25 +59,23 @@ class MultipartUploader {
     this.#file = options.file
     this.#onSuccess = this.options.onSuccess
     this.#onError = this.options.onError
+    this.#shouldUseMultipart = this.options.shouldUseMultipart
 
     this.#initChunks()
   }
 
   #initChunks () {
-    const desiredChunkSize = this.options.getChunkSize(this.#data)
-    // at least 5MB per request, at most 10k requests
     const fileSize = this.#data.size
-    const minChunkSize = Math.max(5 * MB, Math.ceil(fileSize / 10000))
-    const chunkSize = Math.max(desiredChunkSize, minChunkSize)
+    const shouldUseMultipart = typeof this.#shouldUseMultipart === 'function'
+      ? this.#shouldUseMultipart(this.#file)
+      : Boolean(this.#shouldUseMultipart)
+
+    if (shouldUseMultipart) {
+      const desiredChunkSize = this.options.getChunkSize(this.#data)
+      // at least 5MB per request, at most 10k requests
+      const minChunkSize = Math.max(5 * MB, Math.ceil(fileSize / 10000))
+      const chunkSize = Math.max(desiredChunkSize, minChunkSize)
 
-    // Upload zero-sized files in one zero-sized chunk
-    if (this.#data.size === 0) {
-      this.#chunks = [{
-        getData: () => this.#data,
-        onProgress: this.#onPartProgress(0),
-        onComplete: this.#onPartComplete(0),
-      }]
-    } else {
       const arraySize = Math.ceil(fileSize / chunkSize)
       this.#chunks = Array(arraySize)
 
@@ -92,8 +92,16 @@ class MultipartUploader {
           getData,
           onProgress: this.#onPartProgress(j),
           onComplete: this.#onPartComplete(j),
+          shouldUseMultipart,
         }
       }
+    } else {
+      this.#chunks = [{
+        getData: () => this.#data,
+        onProgress: this.#onPartProgress(0),
+        onComplete: this.#onPartComplete(0),
+        shouldUseMultipart,
+      }]
     }
 
     this.#chunkState = this.#chunks.map(() => ({ uploaded: 0 }))
@@ -114,8 +122,7 @@ class MultipartUploader {
   #onPartProgress = (index) => (ev) => {
     if (!ev.lengthComputable) return
 
-    const sent = ev.loaded
-    this.#chunkState[index].uploaded = ensureInt(sent)
+    this.#chunkState[index].uploaded = ensureInt(ev.loaded)
 
     const totalUploaded = this.#chunkState.reduce((n, c) => n + c.uploaded, 0)
     this.options.onProgress(totalUploaded, this.#data.size)

+ 78 - 10
packages/@uppy/aws-s3-multipart/src/index.js

@@ -18,6 +18,22 @@ function assertServerError (res) {
   return res
 }
 
+function getAllowedMetadata ({ meta, allowedMetaFields, querify = false }) {
+  const metaFields = allowedMetaFields ?? Object.keys(meta)
+
+  if (!meta) return {}
+
+  return Object.fromEntries(
+    metaFields
+      .filter(key => meta[key] != null)
+      .map((key) => {
+        const realKey = querify ? `metadata[${key}]` : key
+        const value = String(meta[key])
+        return [realKey, value]
+      }),
+  )
+}
+
 function throwIfAborted (signal) {
   if (signal?.aborted) { throw createAbortError('The operation was aborted', { cause: signal.reason }) }
 }
@@ -25,12 +41,16 @@ function throwIfAborted (signal) {
 class HTTPCommunicationQueue {
   #abortMultipartUpload
 
+  #allowedMetaFields
+
   #cache = new WeakMap()
 
   #createMultipartUpload
 
   #fetchSignature
 
+  #getUploadParameters
+
   #listParts
 
   #previousRetryDelay
@@ -57,6 +77,9 @@ class HTTPCommunicationQueue {
     if ('abortMultipartUpload' in options) {
       this.#abortMultipartUpload = requests.wrapPromiseFunction(options.abortMultipartUpload)
     }
+    if ('allowedMetaFields' in options) {
+      this.#allowedMetaFields = options.allowedMetaFields
+    }
     if ('createMultipartUpload' in options) {
       this.#createMultipartUpload = requests.wrapPromiseFunction(options.createMultipartUpload, { priority:-1 })
     }
@@ -75,6 +98,9 @@ class HTTPCommunicationQueue {
     if ('uploadPartBytes' in options) {
       this.#uploadPartBytes = requests.wrapPromiseFunction(options.uploadPartBytes, { priority:Infinity })
     }
+    if ('getUploadParameters' in options) {
+      this.#getUploadParameters = requests.wrapPromiseFunction(options.getUploadParameters)
+    }
   }
 
   async #shouldRetry (err) {
@@ -190,8 +216,41 @@ class HTTPCommunicationQueue {
     await this.#abortMultipartUpload(file, awaitedResult)
   }
 
+  async #nonMultipartUpload (file, chunk, signal) {
+    const { meta } = file
+    const { type, name: filename } = meta
+    const metadata = getAllowedMetadata({ meta, allowedMetaFields: this.#allowedMetaFields, querify: true })
+
+    const query = new URLSearchParams({ filename, type, ...metadata })
+    const {
+      method = 'post',
+      url,
+      fields,
+      headers,
+    } = await this.#getUploadParameters(`s3/params?${query}`, { signal }).abortOn(signal)
+
+    const formData = new FormData()
+    Object.entries(fields).forEach(([key, value]) => formData.set(key, value))
+    const data = chunk.getData()
+    formData.set('file', data)
+
+    const { onProgress, onComplete } = chunk
+
+    return this.#uploadPartBytes({
+      signature: { url, headers, method },
+      body: formData,
+      size: data.size,
+      onProgress,
+      onComplete,
+      signal,
+    }).abortOn(signal)
+  }
+
   async uploadFile (file, chunks, signal) {
     throwIfAborted(signal)
+    if (chunks.length === 1 && !chunks[0].shouldUseMultipart) {
+      return this.#nonMultipartUpload(file, chunks[0], signal)
+    }
     const { uploadId, key } = await this.getUploadId(file, signal)
     throwIfAborted(signal)
     try {
@@ -206,6 +265,9 @@ class HTTPCommunicationQueue {
 
   async resumeUploadFile (file, chunks, signal) {
     throwIfAborted(signal)
+    if (chunks.length === 1) {
+      return this.#nonMultipartUpload(file, chunks[0], signal)
+    }
     const { uploadId, key } = await this.getUploadId(file, signal)
     throwIfAborted(signal)
     const alreadyUploadedParts = await this.#listParts(file, { uploadId, key, signal }).abortOn(signal)
@@ -240,7 +302,9 @@ class HTTPCommunicationQueue {
       try {
         return {
           PartNumber: partNumber,
-          ...await this.#uploadPartBytes({ signature, body: chunkData, onProgress, onComplete, signal }).abortOn(signal),
+          ...await this.#uploadPartBytes({
+            signature, body: chunkData, size: chunkData.size, onProgress, onComplete, signal,
+          }).abortOn(signal),
         }
       } catch (err) {
         if (!await this.#shouldRetry(err)) throw err
@@ -269,6 +333,9 @@ export default class AwsS3Multipart extends BasePlugin {
       // TODO: this is currently opt-in for backward compat, switch to opt-out in the next major
       allowedMetaFields: null,
       limit: 6,
+      shouldUseMultipart: (file) => file.size !== 0, // TODO: Switch default to:
+      // eslint-disable-next-line no-bitwise
+      // shouldUseMultipart: (file) => file.size >> 10 >> 10 > 100,
       retryDelays: [0, 1000, 3000, 5000],
       createMultipartUpload: this.createMultipartUpload.bind(this),
       listParts: this.listParts.bind(this),
@@ -276,6 +343,7 @@ export default class AwsS3Multipart extends BasePlugin {
       completeMultipartUpload: this.completeMultipartUpload.bind(this),
       signPart: this.signPart.bind(this),
       uploadPartBytes: AwsS3Multipart.uploadPartBytes,
+      getUploadParameters: (...args) => this.#client.get(...args),
       companionHeaders: {},
     }
 
@@ -343,11 +411,7 @@ export default class AwsS3Multipart extends BasePlugin {
     this.assertHost('createMultipartUpload')
     throwIfAborted(signal)
 
-    const metadata = file.meta ? Object.fromEntries(
-      (this.opts.allowedMetaFields ?? Object.keys(file.meta))
-        .filter(key => file.meta[key] != null)
-        .map(key => [key, String(file.meta[key])]),
-    ) : {}
+    const metadata = getAllowedMetadata({ meta: file.meta, allowedMetaFields: this.opts.allowedMetaFields })
 
     return this.#client.post('s3/multipart', {
       filename: file.name,
@@ -397,7 +461,7 @@ export default class AwsS3Multipart extends BasePlugin {
       .then(assertServerError)
   }
 
-  static async uploadPartBytes ({ signature: { url, expires, headers }, body, onProgress, onComplete, signal }) {
+  static async uploadPartBytes ({ signature: { url, expires, headers, method = 'PUT' }, body, size = body.size, onProgress, onComplete, signal }) {
     throwIfAborted(signal)
 
     if (url == null) {
@@ -406,7 +470,7 @@ export default class AwsS3Multipart extends BasePlugin {
 
     return new Promise((resolve, reject) => {
       const xhr = new XMLHttpRequest()
-      xhr.open('PUT', url, true)
+      xhr.open(method, url, true)
       if (headers) {
         Object.keys(headers).forEach((key) => {
           xhr.setRequestHeader(key, headers[key])
@@ -425,7 +489,9 @@ export default class AwsS3Multipart extends BasePlugin {
       }
       signal.addEventListener('abort', onabort)
 
-      xhr.upload.addEventListener('progress', onProgress)
+      xhr.upload.addEventListener('progress', (ev) => {
+        onProgress(ev)
+      })
 
       xhr.addEventListener('abort', () => {
         cleanup()
@@ -455,7 +521,8 @@ export default class AwsS3Multipart extends BasePlugin {
           return
         }
 
-        onProgress?.(body.size)
+        // todo make a proper onProgress API (breaking change)
+        onProgress?.({ loaded: size, lengthComputable: true })
 
         // NOTE This must be allowed by CORS.
         const etag = ev.target.getResponseHeader('ETag')
@@ -550,6 +617,7 @@ export default class AwsS3Multipart extends BasePlugin {
         onPartComplete,
 
         file,
+        shouldUseMultipart: this.opts.shouldUseMultipart,
 
         ...file.s3Multipart,
       })

+ 1 - 0
packages/@uppy/aws-s3-multipart/types/index.d.ts

@@ -43,6 +43,7 @@ export interface AwsS3MultipartOptions extends PluginOptions {
       opts: { uploadId: string; key: string; parts: AwsS3Part[]; signal: AbortSignal }
     ) => MaybePromise<{ location?: string }>
     limit?: number
+    shouldUseMultipart?: boolean | ((file: UppyFile) => boolean)
     retryDelays?: number[] | null
 }
 

+ 1 - 0
packages/@uppy/aws-s3/src/index.js

@@ -101,6 +101,7 @@ function defaultGetResponseError (content, xhr) {
 // warning deduplication flag: see `getResponseData()` XHRUpload option definition
 let warnedSuccessActionStatus = false
 
+// TODO deprecate this, will use s3-multipart instead
 export default class AwsS3 extends BasePlugin {
   static VERSION = packageJson.version