ソースを参照

aws-s3: handle upload internally instead of deferring to xhr-up… (#2060)

* aws-s3: handle actual upload internally instead of deferring to XHRUpload

* aws-s3: add a comment explaining the mess

* aws-s3: move XHRUpload code into a separate class

* utils: support prioritization in rate limiting queue

* aws-s3: remove old implementation

* core,status-bar: account for pre/postprocessing files in progress calculations

* changelog: add entry for this PR

* utils: Support cancellation in `RateLimitedQueue#wrapPromiseFunction`

* s3: remove preprocessing stage, only use upload-* events

* utils: remove console.log

* aws-s3: comment out duplicate `upload-started`

* xhr-upload: reorder `ProgressTimeout` creation for readability

* aws-s3: apply 7d972e0 to MiniXHRUpload

* aws-s3: apply 4ed7508 to MiniXHRUpload
Renée Kooi 5 年 前
コミット
ab76448508

+ 1 - 0
CHANGELOG.md

@@ -100,6 +100,7 @@ PRs are welcome! Please do open an issue to discuss first if it's a big feature,
 - [ ] plugins: review & merge screenshot+screencast support similar to Webcam #148 (@arturi)
 - [ ] core: report information about the device --^ (@arturi)
 - [ ] providers: Provider Browser don't handle uppy restrictions, can we hide things that don't match the restrictions in Google Drive and Instagram? #1827 (@arturi)
+- [ ] s3: immediately start uploading files once the signed URL is generated (#2060, @goto-bus-stop)
 
 ## 1.12.1
 

+ 365 - 0
packages/@uppy/aws-s3/src/MiniXHRUpload.js

@@ -0,0 +1,365 @@
+const cuid = require('cuid')
+const { Provider, RequestClient, Socket } = require('@uppy/companion-client')
+const emitSocketProgress = require('@uppy/utils/lib/emitSocketProgress')
+const getSocketHost = require('@uppy/utils/lib/getSocketHost')
+const EventTracker = require('@uppy/utils/lib/EventTracker')
+const ProgressTimeout = require('@uppy/utils/lib/ProgressTimeout')
+
+// See XHRUpload
+function buildResponseError (xhr, error) {
+  // No error message
+  if (!error) error = new Error('Upload error')
+  // Got an error message string
+  if (typeof error === 'string') error = new Error(error)
+  // Got something else
+  if (!(error instanceof Error)) {
+    error = Object.assign(new Error('Upload error'), { data: error })
+  }
+
+  error.request = xhr
+  return error
+}
+
+// See XHRUpload
+function setTypeInBlob (file) {
+  const dataWithUpdatedType = file.data.slice(0, file.data.size, file.meta.type)
+  return dataWithUpdatedType
+}
+
+module.exports = class MiniXHRUpload {
+  constructor (uppy, opts) {
+    this.uppy = uppy
+    this.opts = {
+      validateStatus (status, responseText, response) {
+        return status >= 200 && status < 300
+      },
+      ...opts
+    }
+
+    this.requests = opts.__queue
+    this.uploaderEvents = Object.create(null)
+  }
+
+  _getOptions (file) {
+    const uppy = this.uppy
+
+    const overrides = uppy.getState().xhrUpload
+    const opts = {
+      ...this.opts,
+      ...(overrides || {}),
+      ...(file.xhrUpload || {}),
+      headers: {}
+    }
+    Object.assign(opts.headers, this.opts.headers)
+    if (overrides) {
+      Object.assign(opts.headers, overrides.headers)
+    }
+    if (file.xhrUpload) {
+      Object.assign(opts.headers, file.xhrUpload.headers)
+    }
+
+    return opts
+  }
+
+  uploadFile (id, current, total) {
+    const file = this.uppy.getFile(id)
+    if (file.error) {
+      throw new Error(file.error)
+    } else if (file.isRemote) {
+      return this._uploadRemoteFile(file, current, total)
+    }
+    return this._uploadLocalFile(file, current, total)
+  }
+
+  _addMetadata (formData, meta, opts) {
+    const metaFields = Array.isArray(opts.metaFields)
+      ? opts.metaFields
+      // Send along all fields by default.
+      : Object.keys(meta)
+    metaFields.forEach((item) => {
+      formData.append(item, meta[item])
+    })
+  }
+
+  _createFormDataUpload (file, opts) {
+    const formPost = new FormData()
+
+    this._addMetadata(formPost, file.meta, opts)
+
+    const dataWithUpdatedType = setTypeInBlob(file)
+
+    if (file.name) {
+      formPost.append(opts.fieldName, dataWithUpdatedType, file.meta.name)
+    } else {
+      formPost.append(opts.fieldName, dataWithUpdatedType)
+    }
+
+    return formPost
+  }
+
+  _createBareUpload (file, opts) {
+    return file.data
+  }
+
+  _onFileRemoved (fileID, cb) {
+    this.uploaderEvents[fileID].on('file-removed', (file) => {
+      if (fileID === file.id) cb(file.id)
+    })
+  }
+
+  _onRetry (fileID, cb) {
+    this.uploaderEvents[fileID].on('upload-retry', (targetFileID) => {
+      if (fileID === targetFileID) {
+        cb()
+      }
+    })
+  }
+
+  _onRetryAll (fileID, cb) {
+    this.uploaderEvents[fileID].on('retry-all', (filesToRetry) => {
+      if (!this.uppy.getFile(fileID)) return
+      cb()
+    })
+  }
+
+  _onCancelAll (fileID, cb) {
+    this.uploaderEvents[fileID].on('cancel-all', () => {
+      if (!this.uppy.getFile(fileID)) return
+      cb()
+    })
+  }
+
+  _uploadLocalFile (file, current, total) {
+    const opts = this._getOptions(file)
+
+    this.uppy.log(`uploading ${current} of ${total}`)
+    return new Promise((resolve, reject) => {
+      // This is done in index.js in the S3 plugin.
+      // this.uppy.emit('upload-started', file)
+
+      const data = opts.formData
+        ? this._createFormDataUpload(file, opts)
+        : this._createBareUpload(file, opts)
+
+      const xhr = new XMLHttpRequest()
+      this.uploaderEvents[file.id] = new EventTracker(this.uppy)
+
+      const timer = new ProgressTimeout(opts.timeout, () => {
+        xhr.abort()
+        queuedRequest.done()
+        const error = new Error(this.i18n('timedOut', { seconds: Math.ceil(opts.timeout / 1000) }))
+        this.uppy.emit('upload-error', file, error)
+        reject(error)
+      })
+
+      const id = cuid()
+
+      xhr.upload.addEventListener('loadstart', (ev) => {
+        this.uppy.log(`[AwsS3/XHRUpload] ${id} started`)
+      })
+
+      xhr.upload.addEventListener('progress', (ev) => {
+        this.uppy.log(`[AwsS3/XHRUpload] ${id} progress: ${ev.loaded} / ${ev.total}`)
+        // Begin checking for timeouts when progress starts, instead of loading,
+        // to avoid timing out requests on browser concurrency queue
+        timer.progress()
+
+        if (ev.lengthComputable) {
+          this.uppy.emit('upload-progress', file, {
+            uploader: this,
+            bytesUploaded: ev.loaded,
+            bytesTotal: ev.total
+          })
+        }
+      })
+
+      xhr.addEventListener('load', (ev) => {
+        this.uppy.log(`[AwsS3/XHRUpload] ${id} finished`)
+        timer.done()
+        queuedRequest.done()
+        if (this.uploaderEvents[file.id]) {
+          this.uploaderEvents[file.id].remove()
+          this.uploaderEvents[file.id] = null
+        }
+
+        if (opts.validateStatus(ev.target.status, xhr.responseText, xhr)) {
+          const body = opts.getResponseData(xhr.responseText, xhr)
+          const uploadURL = body[opts.responseUrlFieldName]
+
+          const uploadResp = {
+            status: ev.target.status,
+            body,
+            uploadURL
+          }
+
+          this.uppy.emit('upload-success', file, uploadResp)
+
+          if (uploadURL) {
+            this.uppy.log(`Download ${file.name} from ${uploadURL}`)
+          }
+
+          return resolve(file)
+        } else {
+          const body = opts.getResponseData(xhr.responseText, xhr)
+          const error = buildResponseError(xhr, opts.getResponseError(xhr.responseText, xhr))
+
+          const response = {
+            status: ev.target.status,
+            body
+          }
+
+          this.uppy.emit('upload-error', file, error, response)
+          return reject(error)
+        }
+      })
+
+      xhr.addEventListener('error', (ev) => {
+        this.uppy.log(`[AwsS3/XHRUpload] ${id} errored`)
+        timer.done()
+        queuedRequest.done()
+        if (this.uploaderEvents[file.id]) {
+          this.uploaderEvents[file.id].remove()
+          this.uploaderEvents[file.id] = null
+        }
+
+        const error = buildResponseError(xhr, opts.getResponseError(xhr.responseText, xhr))
+        this.uppy.emit('upload-error', file, error)
+        return reject(error)
+      })
+
+      xhr.open(opts.method.toUpperCase(), opts.endpoint, true)
+      // IE10 does not allow setting `withCredentials` and `responseType`
+      // before `open()` is called.
+      xhr.withCredentials = opts.withCredentials
+      if (opts.responseType !== '') {
+        xhr.responseType = opts.responseType
+      }
+
+      Object.keys(opts.headers).forEach((header) => {
+        xhr.setRequestHeader(header, opts.headers[header])
+      })
+
+      const queuedRequest = this.requests.run(() => {
+        xhr.send(data)
+        return () => {
+          timer.done()
+          xhr.abort()
+        }
+      }, { priority: 1 })
+
+      this._onFileRemoved(file.id, () => {
+        queuedRequest.abort()
+        reject(new Error('File removed'))
+      })
+
+      this._onCancelAll(file.id, () => {
+        queuedRequest.abort()
+        reject(new Error('Upload cancelled'))
+      })
+    })
+  }
+
+  _uploadRemoteFile (file, current, total) {
+    const opts = this._getOptions(file)
+    return new Promise((resolve, reject) => {
+      // This is done in index.js in the S3 plugin.
+      // this.uppy.emit('upload-started', file)
+
+      const fields = {}
+      const metaFields = Array.isArray(opts.metaFields)
+        ? opts.metaFields
+        // Send along all fields by default.
+        : Object.keys(file.meta)
+
+      metaFields.forEach((name) => {
+        fields[name] = file.meta[name]
+      })
+
+      const Client = file.remote.providerOptions.provider ? Provider : RequestClient
+      const client = new Client(this.uppy, file.remote.providerOptions)
+      client.post(file.remote.url, {
+        ...file.remote.body,
+        endpoint: opts.endpoint,
+        size: file.data.size,
+        fieldname: opts.fieldName,
+        metadata: fields,
+        httpMethod: opts.method,
+        headers: opts.headers
+      }).then((res) => {
+        const token = res.token
+        const host = getSocketHost(file.remote.companionUrl)
+        const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
+        this.uploaderEvents[file.id] = new EventTracker(this.uppy)
+
+        this._onFileRemoved(file.id, () => {
+          socket.send('pause', {})
+          queuedRequest.abort()
+          resolve(`upload ${file.id} was removed`)
+        })
+
+        this._onCancelAll(file.id, () => {
+          socket.send('pause', {})
+          queuedRequest.abort()
+          resolve(`upload ${file.id} was canceled`)
+        })
+
+        this._onRetry(file.id, () => {
+          socket.send('pause', {})
+          socket.send('resume', {})
+        })
+
+        this._onRetryAll(file.id, () => {
+          socket.send('pause', {})
+          socket.send('resume', {})
+        })
+
+        socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
+
+        socket.on('success', (data) => {
+          const body = opts.getResponseData(data.response.responseText, data.response)
+          const uploadURL = body[opts.responseUrlFieldName]
+
+          const uploadResp = {
+            status: data.response.status,
+            body,
+            uploadURL
+          }
+
+          this.uppy.emit('upload-success', file, uploadResp)
+          queuedRequest.done()
+          if (this.uploaderEvents[file.id]) {
+            this.uploaderEvents[file.id].remove()
+            this.uploaderEvents[file.id] = null
+          }
+          return resolve()
+        })
+
+        socket.on('error', (errData) => {
+          const resp = errData.response
+          const error = resp
+            ? opts.getResponseError(resp.responseText, resp)
+            : Object.assign(new Error(errData.error.message), { cause: errData.error })
+          this.uppy.emit('upload-error', file, error)
+          queuedRequest.done()
+          if (this.uploaderEvents[file.id]) {
+            this.uploaderEvents[file.id].remove()
+            this.uploaderEvents[file.id] = null
+          }
+          reject(error)
+        })
+
+        const queuedRequest = this.requests.run(() => {
+          socket.open()
+          if (file.isPaused) {
+            socket.send('pause', {})
+          }
+
+          return () => socket.close()
+        })
+      }).catch((err) => {
+        this.uppy.emit('upload-error', file, err)
+        reject(err)
+      })
+    })
+  }
+}

+ 133 - 119
packages/@uppy/aws-s3/src/index.js

@@ -1,11 +1,40 @@
+/**
+ * This plugin is currently a A Big Hack™! The core reason for that is how this plugin
+ * interacts with Uppy's current pipeline design. The pipeline can handle files in steps,
+ * including preprocessing, uploading, and postprocessing steps. This plugin initially
+ * was designed to do its work in a preprocessing step, and let XHRUpload deal with the
+ * actual file upload as an uploading step. However, Uppy runs steps on all files at once,
+ * sequentially: first, all files go through a preprocessing step, then, once they are all
+ * done, they go through the uploading step.
+ *
+ * For S3, this causes severely broken behaviour when users upload many files. The
+ * preprocessing step will request S3 upload URLs that are valid for a short time only,
+ * but it has to do this for _all_ files, which can take a long time if there are hundreds
+ * or even thousands of files. By the time the uploader step starts, the first URLs may
+ * already have expired. If not, the uploading might take such a long time that later URLs
+ * will expire before some files can be uploaded.
+ *
+ * The long-term solution to this problem is to change the upload pipeline so that files
+ * can be sent to the next step individually. That requires a breakig change, so it is
+ * planned for Uppy v2.
+ *
+ * In the mean time, this plugin is stuck with a hackier approach: the necessary parts
+ * of the XHRUpload implementation were copied into this plugin, as the MiniXHRUpload
+ * class, and this plugin calls into it immediately once it receives an upload URL.
+ * This isn't as nicely modular as we'd like and requires us to maintain two copies of
+ * the XHRUpload code, but at least it's not horrifically broken :)
+ */
+
 // If global `URL` constructor is available, use it
 const URL_ = typeof URL === 'function' ? URL : require('url-parse')
 const { Plugin } = require('@uppy/core')
 const Translator = require('@uppy/utils/lib/Translator')
 const RateLimitedQueue = require('@uppy/utils/lib/RateLimitedQueue')
+const settle = require('@uppy/utils/lib/settle')
+const hasProperty = require('@uppy/utils/lib/hasProperty')
 const { RequestClient } = require('@uppy/companion-client')
-const XHRUpload = require('@uppy/xhr-upload')
 const qsStringify = require('qs-stringify')
+const MiniXHRUpload = require('./MiniXHRUpload')
 
 function resolveUrl (origin, link) {
   return new URL_(link, origin).toString()
@@ -50,6 +79,9 @@ function assertServerError (res) {
   return res
 }
 
+// warning deduplication flag: see `getResponseData()` XHRUpload option definition
+let warnedSuccessActionStatus = false
+
 module.exports = class AwsS3 extends Plugin {
   static VERSION = require('../package.json').version
 
@@ -77,7 +109,7 @@ module.exports = class AwsS3 extends Plugin {
     this.i18nInit()
 
     this.client = new RequestClient(uppy, opts)
-    this.prepareUpload = this.prepareUpload.bind(this)
+    this.handleUpload = this.handleUpload.bind(this)
     this.requests = new RateLimitedQueue(this.opts.limit)
   }
 
@@ -97,8 +129,8 @@ module.exports = class AwsS3 extends Plugin {
       throw new Error('Expected a `companionUrl` option containing a Companion address.')
     }
 
-    const filename = file.meta.name
-    const type = file.meta.type
+    const filename = encodeURIComponent(file.meta.name)
+    const type = encodeURIComponent(file.meta.type)
     const metadata = {}
     this.opts.metaFields.forEach((key) => {
       if (file.meta[key] != null) {
@@ -122,18 +154,27 @@ module.exports = class AwsS3 extends Plugin {
       console.error(err)
       throw err
     }
-
-    return params
   }
 
-  prepareUpload (fileIDs) {
+  handleUpload (fileIDs) {
+    /**
+     * keep track of `getUploadParameters()` responses
+     * so we can cancel the calls individually using just a file ID
+     * @type {Object.<string, Promise>}
+     */
+    const paramsPromises = Object.create(null)
+
+    function onremove (file) {
+      const { id } = file
+      if (hasProperty(paramsPromises, id)) {
+        paramsPromises[id].abort()
+      }
+    }
+    this.uppy.on('file-removed', onremove)
+
     fileIDs.forEach((id) => {
       const file = this.uppy.getFile(id)
-      this.uppy.emit('preprocess-progress', file, {
-        mode: 'determinate',
-        message: this.i18n('preparingUpload'),
-        value: 0
-      })
+      this.uppy.emit('upload-started', file)
     })
 
     // Wrapping rate-limited opts.getUploadParameters in a Promise takes some boilerplate!
@@ -141,39 +182,21 @@ module.exports = class AwsS3 extends Plugin {
       return this.opts.getUploadParameters(file)
     })
 
-    return Promise.all(
-      fileIDs.map((id) => {
-        const file = this.uppy.getFile(id)
-        return getUploadParameters(file)
-          .then((params) => {
-            return this.validateParameters(file, params)
-          })
-          .then((params) => {
-            this.uppy.emit('preprocess-progress', file, {
-              mode: 'determinate',
-              message: this.i18n('preparingUpload'),
-              value: 1
-            })
-            return params
-          })
-          .catch((error) => {
-            this.uppy.emit('upload-error', file, error)
-          })
-      })
-    ).then((responses) => {
-      const updatedFiles = {}
-      fileIDs.forEach((id, index) => {
-        const file = this.uppy.getFile(id)
-        if (!file || file.error) {
-          return
-        }
+    const numberOfFiles = fileIDs.length
+
+    return settle(fileIDs.map((id, index) => {
+      const file = this.uppy.getFile(id)
+      paramsPromises[id] = getUploadParameters(file)
+      return paramsPromises[id].then((params) => {
+        delete paramsPromises[id]
+        this.validateParameters(file, params)
 
         const {
           method = 'post',
           url,
           fields,
           headers
-        } = responses[index]
+        } = params
         const xhrOpts = {
           method,
           formData: method.toLowerCase() === 'post',
@@ -185,105 +208,96 @@ module.exports = class AwsS3 extends Plugin {
           xhrOpts.headers = headers
         }
 
-        const updatedFile = {
-          ...file,
+        this.uppy.setFileState(file.id, {
           meta: { ...file.meta, ...fields },
           xhrUpload: xhrOpts
-        }
-
-        updatedFiles[id] = updatedFile
-      })
-
-      const { files } = this.uppy.getState()
-      this.uppy.setState({
-        files: {
-          ...files,
-          ...updatedFiles
-        }
-      })
+        })
 
-      fileIDs.forEach((id) => {
-        const file = this.uppy.getFile(id)
-        this.uppy.emit('preprocess-complete', file)
+        return this._uploader.uploadFile(file.id, index, numberOfFiles)
+      }).catch((error) => {
+        delete paramsPromises[id]
+        this.uppy.emit('upload-error', file, error)
       })
+    })).then((settled) => {
+      // cleanup.
+      this.uppy.off('file-removed', onremove)
+      return settled
     })
   }
 
   install () {
-    const { log } = this.uppy
-    this.uppy.addPreProcessor(this.prepareUpload)
-
-    let warnedSuccessActionStatus = false
-    const xhrUploadOpts = {
-      fieldName: 'file',
-      responseUrlFieldName: 'location',
-      timeout: this.opts.timeout,
-      __queue: this.requests,
-      responseType: 'text',
-      // Get the response data from a successful XMLHttpRequest instance.
-      // `content` is the S3 response as a string.
-      // `xhr` is the XMLHttpRequest instance.
-      getResponseData (content, xhr) {
-        const opts = this
-
-        // If no response, we've hopefully done a PUT request to the file
-        // in the bucket on its full URL.
-        if (!isXml(content, xhr)) {
-          if (opts.method.toUpperCase() === 'POST') {
-            if (!warnedSuccessActionStatus) {
-              log('[AwsS3] No response data found, make sure to set the success_action_status AWS SDK option to 201. See https://uppy.io/docs/aws-s3/#POST-Uploads', 'warning')
-              warnedSuccessActionStatus = true
-            }
-            // The responseURL won't contain the object key. Give up.
-            return { location: null }
-          }
-
-          // responseURL is not available in older browsers.
-          if (!xhr.responseURL) {
-            return { location: null }
+    const uppy = this.uppy
+    this.uppy.addUploader(this.handleUpload)
+
+    // Get the response data from a successful XMLHttpRequest instance.
+    // `content` is the S3 response as a string.
+    // `xhr` is the XMLHttpRequest instance.
+    function defaultGetResponseData (content, xhr) {
+      const opts = this
+
+      // If no response, we've hopefully done a PUT request to the file
+      // in the bucket on its full URL.
+      if (!isXml(content, xhr)) {
+        if (opts.method.toUpperCase() === 'POST') {
+          if (!warnedSuccessActionStatus) {
+            uppy.log('[AwsS3] No response data found, make sure to set the success_action_status AWS SDK option to 201. See https://uppy.io/docs/aws-s3/#POST-Uploads', 'warning')
+            warnedSuccessActionStatus = true
           }
-
-          // Trim the query string because it's going to be a bunch of presign
-          // parameters for a PUT request—doing a GET request with those will
-          // always result in an error
-          return { location: xhr.responseURL.replace(/\?.*$/, '') }
+          // The responseURL won't contain the object key. Give up.
+          return { location: null }
         }
 
-        return {
-          // Some S3 alternatives do not reply with an absolute URL.
-          // Eg DigitalOcean Spaces uses /$bucketName/xyz
-          location: resolveUrl(xhr.responseURL, getXmlValue(content, 'Location')),
-          bucket: getXmlValue(content, 'Bucket'),
-          key: getXmlValue(content, 'Key'),
-          etag: getXmlValue(content, 'ETag')
-        }
-      },
-
-      // Get the error data from a failed XMLHttpRequest instance.
-      // `content` is the S3 response as a string.
-      // `xhr` is the XMLHttpRequest instance.
-      getResponseError (content, xhr) {
-        // If no response, we don't have a specific error message, use the default.
-        if (!isXml(content, xhr)) {
-          return
+        // responseURL is not available in older browsers.
+        if (!xhr.responseURL) {
+          return { location: null }
         }
-        const error = getXmlValue(content, 'Message')
-        return new Error(error)
+
+        // Trim the query string because it's going to be a bunch of presign
+        // parameters for a PUT request—doing a GET request with those will
+        // always result in an error
+        return { location: xhr.responseURL.replace(/\?.*$/, '') }
+      }
+
+      return {
+        // Some S3 alternatives do not reply with an absolute URL.
+        // Eg DigitalOcean Spaces uses /$bucketName/xyz
+        location: resolveUrl(xhr.responseURL, getXmlValue(content, 'Location')),
+        bucket: getXmlValue(content, 'Bucket'),
+        key: getXmlValue(content, 'Key'),
+        etag: getXmlValue(content, 'ETag')
       }
     }
 
-    // Replace getResponseData() with overwritten version.
-    if (this.opts.getResponseData) {
-      xhrUploadOpts.getResponseData = this.opts.getResponseData
+    // Get the error data from a failed XMLHttpRequest instance.
+    // `content` is the S3 response as a string.
+    // `xhr` is the XMLHttpRequest instance.
+    function defaultGetResponseError (content, xhr) {
+      // If no response, we don't have a specific error message, use the default.
+      if (!isXml(content, xhr)) {
+        return
+      }
+      const error = getXmlValue(content, 'Message')
+      return new Error(error)
     }
 
-    this.uppy.use(XHRUpload, xhrUploadOpts)
+    const xhrOptions = {
+      fieldName: 'file',
+      responseUrlFieldName: 'location',
+      timeout: this.opts.timeout,
+      // Share the rate limiting queue with XHRUpload.
+      __queue: this.requests,
+      responseType: 'text',
+      getResponseData: this.opts.getResponseData || defaultGetResponseData,
+      getResponseError: defaultGetResponseError
+    }
+
+    // Revert to `this.uppy.use(XHRUpload)` once the big comment block at the top of
+    // this file is solved
+    this._uploader = new MiniXHRUpload(this.uppy, xhrOptions)
+    this._uploader.i18n = this.i18n
   }
 
   uninstall () {
-    const uploader = this.uppy.getPlugin('XHRUpload')
-    this.uppy.removePlugin(uploader)
-
-    this.uppy.removePreProcessor(this.prepareUpload)
+    this.uppy.removePreProcessor(this.handleUpload)
   }
 }

+ 6 - 3
packages/@uppy/core/src/index.js

@@ -902,7 +902,8 @@ class Uppy {
     // bytesTotal may be null or zero; in that case we can't divide by it
     const canHavePercentage = isFinite(data.bytesTotal) && data.bytesTotal > 0
     this.setFileState(file.id, {
-      progress: Object.assign({}, this.getFile(file.id).progress, {
+      progress: {
+        ...this.getFile(file.id).progress,
         bytesUploaded: data.bytesUploaded,
         bytesTotal: data.bytesTotal,
         percentage: canHavePercentage
@@ -910,7 +911,7 @@ class Uppy {
           // we get more accurate calculations if we don't round this at all.
           ? Math.round(data.bytesUploaded / data.bytesTotal * 100)
           : 0
-      })
+      }
     })
 
     this._calculateTotalProgress()
@@ -922,7 +923,9 @@ class Uppy {
     const files = this.getFiles()
 
     const inProgress = files.filter((file) => {
-      return file.progress.uploadStarted
+      return file.progress.uploadStarted ||
+        file.progress.preprocess ||
+        file.progress.postprocess
     })
 
     if (inProgress.length === 0) {

+ 2 - 2
packages/@uppy/status-bar/src/index.js

@@ -182,12 +182,12 @@ module.exports = class StatusBar extends Plugin {
 
     let totalSize = 0
     let totalUploadedSize = 0
-    uploadStartedFiles.forEach((file) => {
+    startedFiles.forEach((file) => {
       totalSize = totalSize + (file.progress.bytesTotal || 0)
       totalUploadedSize = totalUploadedSize + (file.progress.bytesUploaded || 0)
     })
 
-    const isUploadStarted = uploadStartedFiles.length > 0
+    const isUploadStarted = startedFiles.length > 0
 
     const isAllComplete = totalProgress === 100 &&
       completeFiles.length === Object.keys(files).length &&

+ 65 - 33
packages/@uppy/utils/src/RateLimitedQueue.js

@@ -1,3 +1,17 @@
+/**
+ * Array.prototype.findIndex ponyfill for old browsers.
+ */
+function findIndex (array, predicate) {
+  for (let i = 0; i < array.length; i++) {
+    if (predicate(array[i])) return i
+  }
+  return -1
+}
+
+function createCancelError () {
+  return new Error('Cancelled')
+}
+
 module.exports = class RateLimitedQueue {
   constructor (limit) {
     if (typeof limit !== 'number' || limit === 0) {
@@ -67,9 +81,10 @@ module.exports = class RateLimitedQueue {
     next.done = handler.done
   }
 
-  _queue (fn) {
+  _queue (fn, options = {}) {
     const handler = {
       fn,
+      priority: options.priority || 0,
       abort: () => {
         this._dequeue(handler)
       },
@@ -77,7 +92,15 @@ module.exports = class RateLimitedQueue {
         throw new Error('Cannot mark a queued request as done: this indicates a bug')
       }
     }
-    this.queuedHandlers.push(handler)
+
+    const index = findIndex(this.queuedHandlers, (other) => {
+      return handler.priority > other.priority
+    })
+    if (index === -1) {
+      this.queuedHandlers.push(handler)
+    } else {
+      this.queuedHandlers.splice(index, 0, handler)
+    }
     return handler
   }
 
@@ -88,44 +111,53 @@ module.exports = class RateLimitedQueue {
     }
   }
 
-  run (fn) {
+  run (fn, queueOptions) {
     if (this.activeRequests < this.limit) {
       return this._call(fn)
     }
-    return this._queue(fn)
+    return this._queue(fn, queueOptions)
   }
 
-  wrapPromiseFunction (fn) {
-    return (...args) => new Promise((resolve, reject) => {
-      const queuedRequest = this.run(() => {
-        let cancelError
-        let promise
-        try {
-          promise = Promise.resolve(fn(...args))
-        } catch (err) {
-          promise = Promise.reject(err)
-        }
-
-        promise.then((result) => {
-          if (cancelError) {
-            reject(cancelError)
-          } else {
-            queuedRequest.done()
-            resolve(result)
+  wrapPromiseFunction (fn, queueOptions) {
+    return (...args) => {
+      let queuedRequest
+      const outerPromise = new Promise((resolve, reject) => {
+        queuedRequest = this.run(() => {
+          let cancelError
+          let innerPromise
+          try {
+            innerPromise = Promise.resolve(fn(...args))
+          } catch (err) {
+            innerPromise = Promise.reject(err)
           }
-        }, (err) => {
-          if (cancelError) {
-            reject(cancelError)
-          } else {
-            queuedRequest.done()
-            reject(err)
-          }
-        })
 
-        return () => {
-          cancelError = new Error('Cancelled')
-        }
+          innerPromise.then((result) => {
+            if (cancelError) {
+              reject(cancelError)
+            } else {
+              queuedRequest.done()
+              resolve(result)
+            }
+          }, (err) => {
+            if (cancelError) {
+              reject(cancelError)
+            } else {
+              queuedRequest.done()
+              reject(err)
+            }
+          })
+
+          return () => {
+            cancelError = createCancelError()
+          }
+        }, queueOptions)
       })
-    })
+
+      outerPromise.abort = () => {
+        queuedRequest.abort()
+      }
+
+      return outerPromise
+    }
   }
 }

+ 13 - 6
packages/@uppy/utils/types/index.d.ts

@@ -49,16 +49,23 @@ declare module '@uppy/utils/lib/RateLimitedQueue' {
     export type AbortFunction = () => void
     export type PromiseFunction = (...args: any[]) => Promise<any>
     export type QueueEntry = {
-      abort: () => void
-      done: () => void
+      abort: () => void,
+      done: () => void,
+    }
+    export type QueueOptions = {
+      priority?: number
     }
   }
 
   class RateLimitedQueue {
-    constructor (limit: number)
-    run (fn: () => RateLimitedQueue.AbortFunction): RateLimitedQueue.QueueEntry
+    constructor(limit: number)
+    run(
+      fn: () => RateLimitedQueue.AbortFunction,
+      queueOptions?: RateLimitedQueue.QueueOptions
+    ): RateLimitedQueue.QueueEntry
     wrapPromiseFunction(
-      fn: () => RateLimitedQueue.PromiseFunction
+      fn: () => RateLimitedQueue.PromiseFunction,
+      queueOptions?: RateLimitedQueue.QueueOptions
     ): RateLimitedQueue.PromiseFunction
   }
 
@@ -140,7 +147,7 @@ declare module '@uppy/utils/lib/getETA' {
 declare module '@uppy/utils/lib/getFileNameAndExtension' {
   function getFileNameAndExtension(
     filename: string
-  ): { name: string, extension: string | undefined };
+  ): { name: string, extension: string | undefined }
   export = getFileNameAndExtension
 }
 

+ 4 - 4
packages/@uppy/xhr-upload/src/index.js

@@ -217,6 +217,9 @@ module.exports = class XHRUpload extends Plugin {
         ? this.createFormDataUpload(file, opts)
         : this.createBareUpload(file, opts)
 
+      const xhr = new XMLHttpRequest()
+      this.uploaderEvents[file.id] = new EventTracker(this.uppy)
+
       const timer = new ProgressTimeout(opts.timeout, () => {
         xhr.abort()
         queuedRequest.done()
@@ -225,9 +228,6 @@ module.exports = class XHRUpload extends Plugin {
         reject(error)
       })
 
-      const xhr = new XMLHttpRequest()
-      this.uploaderEvents[file.id] = new EventTracker(this.uppy)
-
       const id = cuid()
 
       xhr.upload.addEventListener('loadstart', (ev) => {
@@ -358,7 +358,7 @@ module.exports = class XHRUpload extends Plugin {
         size: file.data.size,
         fieldname: opts.fieldName,
         metadata: fields,
-        httpMethod: this.opts.method,
+        httpMethod: opts.method,
         headers: opts.headers
       }).then((res) => {
         const token = res.token