Browse Source

Use more cancellation-friendly strategy for `limit: N` uploads (#1736)

* xhr-upload: redo limit option using different strategy

* xhr-upload: fix marking requests as done

* Move duplicate createEventTracker definitions to @uppy/utils

* tus: new cancellation for local uploads

* tus: fix cancelling queued uploads

* tus: fix wrong name

* aws-s3-multipart: smol refactor

* aws-s3-multipart: new cancellation style

* aws-s3: new cancellation

* utils: port limitPromises test to RateLimitedQueue

* timing fix?

* tus: new cancellation on websocket

* xhr-upload: implement cancellation for remote uploads

* aws-s3-multipart: port to new cancellation

* utils: remove limitPromises

* xhr-upload: remove pause/resume code

* xhr-upload: clean up event listeners

* xhr-upload: extract progress timer to separate class

* Move ProgressTimeout class to utils

* utils: update typings

* Fix lint

* tus: make pause/resume respect the rate limiting queue

* tus: try to explain some of the tricky things at play in the Tus#upload method

* aws-s3-multipart: add missing { abort: true }

* aws-s3-multipart: make pause/resume respect the rate limiting queue

* eslintrc.json - fixed eslint warnings for jsdoc

* Revert "eslintrc.json - fixed eslint warnings for jsdoc"

This reverts commit 50b24773ceaaafba6414437a3ebc0335681d65a4.

* eslintrc.json - allow indentation in jsdoc comments

* xhr-upload: fix promise return value

* tus: fix promise return value

* tus: add missing `return () => {}`

* utils: typing export type fixes

* tus: add more jsdoc

* companion-client: add missing SocketOptions type

* utils: fix more export typings

* core,companion-client,tus: more typings tweaking

* companion-client: test fix :weary:

* companion-client: add type for Socket#open

* tus: fix emit() call

* add local cancellation fuzz....ish? test
Renée Kooi 5 năm trước cách đây
mục cha
commit
bb8b66d918

+ 2 - 4
.eslintrc.json

@@ -15,17 +15,15 @@
     "jsx-quotes": ["error", "prefer-double"],
     "compat/compat": ["error"],
 
-    // "no-unused-vars": ["warn", { "vars": "all", "args": "after-used", "ignoreRestSiblings": false }],
-
     "jsdoc/check-alignment": ["warn"],
     "jsdoc/check-examples": ["warn"],
-    "jsdoc/check-indentation": ["warn"],
     "jsdoc/check-param-names": ["warn"],
     "jsdoc/check-syntax": ["warn"],
     "jsdoc/check-tag-names": ["warn"],
     "jsdoc/check-types": ["warn"],
     "jsdoc/newline-after-description": ["warn"],
-    "jsdoc/valid-types": ["warn"]
+    "jsdoc/valid-types": ["warn"],
+    "jsdoc/check-indentation": ["off"]
   },
   "settings": {
     "react": {

+ 1 - 1
bin/endtoend-build-tests

@@ -12,7 +12,7 @@ __base="$(basename ${__file} .sh)"
 __root="$(cd "$(dirname "${__dir}")" && pwd)"
 
 # Tests using a simple build setup.
-tests="tus-drag-drop tus-dashboard i18n-drag-drop xhr-limit providers thumbnails transloadit url-plugin"
+tests="chaos-monkey i18n-drag-drop providers thumbnails transloadit tus-drag-drop url-plugin xhr-limit"
 
 for t in $tests; do
   mkdir -p "${__root}/test/endtoend/$t/dist"

+ 61 - 8
package-lock.json

@@ -1304,6 +1304,15 @@
       "resolved": "https://registry.npmjs.org/@iarna/toml/-/toml-2.2.3.tgz",
       "integrity": "sha512-FmuxfCuolpLl0AnQ2NHSzoUKWEJDFl63qXjzdoWBVyFCXzMGm1spBzk7LeHNoVCiWCF7mRVms9e6jEV9+MoPbg=="
     },
+    "@jamen/lorem": {
+      "version": "0.2.0",
+      "resolved": "https://registry.npmjs.org/@jamen/lorem/-/lorem-0.2.0.tgz",
+      "integrity": "sha512-qdJTjlAdAicZUcpJFFEl/tkoQ9t9q/FqhDndIwtRKRNoWG+IwU5zqxV+96RlBWtmzudjw7SYOod76h1FP7PSzg==",
+      "dev": true,
+      "requires": {
+        "bytes": "^3.0.0"
+      }
+    },
     "@jest/console": {
       "version": "24.7.1",
       "resolved": "https://registry.npmjs.org/@jest/console/-/console-24.7.1.tgz",
@@ -9325,6 +9334,43 @@
         }
       }
     },
+    "brake": {
+      "version": "1.0.1",
+      "resolved": "https://registry.npmjs.org/brake/-/brake-1.0.1.tgz",
+      "integrity": "sha1-V8SFTboeChjJnRX08qqEx3E0c9s=",
+      "dev": true,
+      "requires": {
+        "inherits": "~2.0.1",
+        "minimist": "~0.1.0",
+        "readable-stream": "~1.0.27-1"
+      },
+      "dependencies": {
+        "minimist": {
+          "version": "0.1.0",
+          "resolved": "https://registry.npmjs.org/minimist/-/minimist-0.1.0.tgz",
+          "integrity": "sha1-md9lelJXTCHJBXSX33QnkLK0wN4=",
+          "dev": true
+        },
+        "readable-stream": {
+          "version": "1.0.34",
+          "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.0.34.tgz",
+          "integrity": "sha1-Elgg40vIQtLyqq+v5MKRbuMsFXw=",
+          "dev": true,
+          "requires": {
+            "core-util-is": "~1.0.0",
+            "inherits": "~2.0.1",
+            "isarray": "0.0.1",
+            "string_decoder": "~0.10.x"
+          }
+        },
+        "string_decoder": {
+          "version": "0.10.31",
+          "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz",
+          "integrity": "sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=",
+          "dev": true
+        }
+      }
+    },
     "brfs": {
       "version": "1.6.1",
       "resolved": "https://registry.npmjs.org/brfs/-/brfs-1.6.1.tgz",
@@ -15797,11 +15843,11 @@
       "dev": true
     },
     "follow-redirects": {
-      "version": "1.7.0",
-      "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.7.0.tgz",
-      "integrity": "sha512-m/pZQy4Gj287eNy94nivy5wchN3Kp+Q5WgUPNy5lJSZ3sgkVKSYV/ZChMAQVIgx1SqfZ2zBZtPA2YlXIWxxJOQ==",
+      "version": "1.9.0",
+      "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.9.0.tgz",
+      "integrity": "sha512-CRcPzsSIbXyVDl0QI01muNDu69S8trU4jArW9LpOt2WtC6LyUJetcIrmfHsRBx7/Jb6GHJUiuqyYxPooFfNt6A==",
       "requires": {
-        "debug": "^3.2.6"
+        "debug": "^3.0.0"
       },
       "dependencies": {
         "debug": {
@@ -18347,13 +18393,20 @@
       }
     },
     "http-proxy": {
-      "version": "1.17.0",
-      "resolved": "https://registry.npmjs.org/http-proxy/-/http-proxy-1.17.0.tgz",
-      "integrity": "sha512-Taqn+3nNvYRfJ3bGvKfBSRwy1v6eePlm3oc/aWVxZp57DQr5Eq3xhKJi7Z4hZpS8PC3H4qI+Yly5EmFacGuA/g==",
+      "version": "1.18.0",
+      "resolved": "https://registry.npmjs.org/http-proxy/-/http-proxy-1.18.0.tgz",
+      "integrity": "sha512-84I2iJM/n1d4Hdgc6y2+qY5mDaz2PUVjlg9znE9byl+q0uC3DeByqBGReQu5tpLK0TAqTIXScRUV+dg7+bUPpQ==",
       "requires": {
-        "eventemitter3": "^3.0.0",
+        "eventemitter3": "^4.0.0",
         "follow-redirects": "^1.0.0",
         "requires-port": "^1.0.0"
+      },
+      "dependencies": {
+        "eventemitter3": {
+          "version": "4.0.0",
+          "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.0.tgz",
+          "integrity": "sha512-qerSRB0p+UDEssxTtm6EDKcE7W4OaoisfIMl4CngyEhjpYglocpNg6UEqCvemdGhosAsg4sO2dXJOdyBifPGCg=="
+        }
       }
     },
     "http-proxy-agent": {

+ 3 - 0
package.json

@@ -78,6 +78,7 @@
     "@babel/polyfill": "^7.4.4",
     "@babel/preset-env": "^7.4.5",
     "@babel/register": "^7.4.4",
+    "@jamen/lorem": "^0.2.0",
     "@octokit/rest": "^16.25.0",
     "@size-limit/preset-big-lib": "^2.1.1",
     "@types/aws-serverless-express": "^3.0.1",
@@ -107,6 +108,7 @@
     "babel-jest": "^24.8.0",
     "babel-plugin-inline-package-json": "^2.0.0",
     "babelify": "^10.0.0",
+    "brake": "^1.0.1",
     "browser-resolve": "^1.11.3",
     "browser-sync": "^2.26.5",
     "browserify": "^16.2.3",
@@ -138,6 +140,7 @@
     "glob": "^7.1.3",
     "globby": "^9.2.0",
     "gzip-size": "^5.0.0",
+    "http-proxy": "^1.18.0",
     "isomorphic-fetch": "2.2.1",
     "jest": "24.8.0",
     "json3": "^3.3.2",

+ 196 - 124
packages/@uppy/aws-s3-multipart/src/index.js

@@ -1,29 +1,11 @@
 const { Plugin } = require('@uppy/core')
 const { Socket, Provider, RequestClient } = require('@uppy/companion-client')
+const EventTracker = require('@uppy/utils/lib/EventTracker')
 const emitSocketProgress = require('@uppy/utils/lib/emitSocketProgress')
 const getSocketHost = require('@uppy/utils/lib/getSocketHost')
-const limitPromises = require('@uppy/utils/lib/limitPromises')
+const RateLimitedQueue = require('@uppy/utils/lib/RateLimitedQueue')
 const Uploader = require('./MultipartUploader')
 
-/**
- * Create a wrapper around an event emitter with a `remove` method to remove
- * all events that were added using the wrapped emitter.
- */
-function createEventTracker (emitter) {
-  const events = []
-  return {
-    on (event, fn) {
-      events.push([event, fn])
-      return emitter.on(event, fn)
-    },
-    remove () {
-      events.forEach(([event, fn]) => {
-        emitter.off(event, fn)
-      })
-    }
-  }
-}
-
 function assertServerError (res) {
   if (res && res.error) {
     const error = new Error(res.message)
@@ -53,15 +35,11 @@ module.exports = class AwsS3Multipart extends Plugin {
       completeMultipartUpload: this.completeMultipartUpload.bind(this)
     }
 
-    this.opts = Object.assign({}, defaultOptions, opts)
+    this.opts = { ...defaultOptions, ...opts }
 
     this.upload = this.upload.bind(this)
 
-    if (typeof this.opts.limit === 'number' && this.opts.limit !== 0) {
-      this.limitRequests = limitPromises(this.opts.limit)
-    } else {
-      this.limitRequests = (fn) => fn
-    }
+    this.requests = new RateLimitedQueue(this.opts.limit)
 
     this.uploaders = Object.create(null)
     this.uploaderEvents = Object.create(null)
@@ -71,6 +49,9 @@ module.exports = class AwsS3Multipart extends Plugin {
   /**
    * Clean up all references for a file's upload: the MultipartUploader instance,
    * any events related to the file, and the Companion WebSocket connection.
+   *
+   * Set `opts.abort` to tell S3 that the multipart upload is cancelled and must be removed.
+   * This should be done when the user cancels the upload, not when the upload is completed or errored.
    */
   resetUploaderReferences (fileID, opts = {}) {
     if (this.uploaders[fileID]) {
@@ -147,102 +128,149 @@ module.exports = class AwsS3Multipart extends Plugin {
 
   uploadFile (file) {
     return new Promise((resolve, reject) => {
-      const upload = new Uploader(file.data, Object.assign({
-        // .bind to pass the file object to each handler.
-        createMultipartUpload: this.limitRequests(this.opts.createMultipartUpload.bind(this, file)),
-        listParts: this.limitRequests(this.opts.listParts.bind(this, file)),
-        prepareUploadPart: this.opts.prepareUploadPart.bind(this, file),
-        completeMultipartUpload: this.limitRequests(this.opts.completeMultipartUpload.bind(this, file)),
-        abortMultipartUpload: this.limitRequests(this.opts.abortMultipartUpload.bind(this, file)),
-
-        limit: this.opts.limit || 5,
-        onStart: (data) => {
-          const cFile = this.uppy.getFile(file.id)
-          this.uppy.setFileState(file.id, {
-            s3Multipart: Object.assign({}, cFile.s3Multipart, {
-              key: data.key,
-              uploadId: data.uploadId,
-              parts: []
-            })
-          })
-        },
-        onProgress: (bytesUploaded, bytesTotal) => {
-          this.uppy.emit('upload-progress', file, {
-            uploader: this,
-            bytesUploaded: bytesUploaded,
-            bytesTotal: bytesTotal
-          })
-        },
-        onError: (err) => {
-          this.uppy.log(err)
-          this.uppy.emit('upload-error', file, err)
-          err.message = `Failed because: ${err.message}`
-
-          this.resetUploaderReferences(file.id)
-          reject(err)
-        },
-        onSuccess: (result) => {
-          const uploadResp = {
-            uploadURL: result.location
+      const onStart = (data) => {
+        const cFile = this.uppy.getFile(file.id)
+        this.uppy.setFileState(file.id, {
+          s3Multipart: {
+            ...cFile.s3Multipart,
+            key: data.key,
+            uploadId: data.uploadId,
+            parts: []
           }
+        })
+      }
 
-          this.resetUploaderReferences(file.id)
+      const onProgress = (bytesUploaded, bytesTotal) => {
+        this.uppy.emit('upload-progress', file, {
+          uploader: this,
+          bytesUploaded: bytesUploaded,
+          bytesTotal: bytesTotal
+        })
+      }
 
-          this.uppy.emit('upload-success', file, uploadResp)
+      const onError = (err) => {
+        this.uppy.log(err)
+        this.uppy.emit('upload-error', file, err)
+        err.message = `Failed because: ${err.message}`
 
-          if (result.location) {
-            this.uppy.log('Download ' + upload.file.name + ' from ' + result.location)
-          }
+        this.resetUploaderReferences(file.id)
+        reject(err)
+      }
 
-          resolve(upload)
-        },
-        onPartComplete: (part) => {
-          // Store completed parts in state.
-          const cFile = this.uppy.getFile(file.id)
-          if (!cFile) {
-            return
-          }
-          this.uppy.setFileState(file.id, {
-            s3Multipart: Object.assign({}, cFile.s3Multipart, {
-              parts: [
-                ...cFile.s3Multipart.parts,
-                part
-              ]
-            })
-          })
+      const onSuccess = (result) => {
+        const uploadResp = {
+          uploadURL: result.location
+        }
+
+        this.resetUploaderReferences(file.id)
+
+        this.uppy.emit('upload-success', file, uploadResp)
+
+        if (result.location) {
+          this.uppy.log('Download ' + upload.file.name + ' from ' + result.location)
+        }
+
+        resolve(upload)
+      }
 
-          this.uppy.emit('s3-multipart:part-uploaded', cFile, part)
+      const onPartComplete = (part) => {
+        // Store completed parts in state.
+        const cFile = this.uppy.getFile(file.id)
+        if (!cFile) {
+          return
         }
-      }, file.s3Multipart))
+        this.uppy.setFileState(file.id, {
+          s3Multipart: {
+            ...cFile.s3Multipart,
+            parts: [
+              ...cFile.s3Multipart.parts,
+              part
+            ]
+          }
+        })
+
+        this.uppy.emit('s3-multipart:part-uploaded', cFile, part)
+      }
+
+      const upload = new Uploader(file.data, {
+        // .bind to pass the file object to each handler.
+        createMultipartUpload: this.requests.wrapPromiseFunction(
+          this.opts.createMultipartUpload.bind(this, file)),
+        listParts: this.requests.wrapPromiseFunction(
+          this.opts.listParts.bind(this, file)),
+        prepareUploadPart: this.opts.prepareUploadPart.bind(this, file),
+        completeMultipartUpload: this.requests.wrapPromiseFunction(
+          this.opts.completeMultipartUpload.bind(this, file)),
+        abortMultipartUpload: this.requests.wrapPromiseFunction(
+          this.opts.abortMultipartUpload.bind(this, file)),
+
+        onStart,
+        onProgress,
+        onError,
+        onSuccess,
+        onPartComplete,
+
+        limit: this.opts.limit || 5,
+        ...file.s3Multipart
+      })
 
       this.uploaders[file.id] = upload
-      this.uploaderEvents[file.id] = createEventTracker(this.uppy)
+      this.uploaderEvents[file.id] = new EventTracker(this.uppy)
+
+      let queuedRequest = this.requests.run(() => {
+        if (!file.isPaused) {
+          upload.start()
+        }
+        // Don't do anything here, the caller will take care of cancelling the upload itself
+        // using resetUploaderReferences(). This is because resetUploaderReferences() has to be
+        // called when this request is still in the queue, and has not been started yet, too. At
+        // that point this cancellation function is not going to be called.
+        return () => {}
+      })
 
       this.onFileRemove(file.id, (removed) => {
+        queuedRequest.abort()
         this.resetUploaderReferences(file.id, { abort: true })
         resolve(`upload ${removed.id} was removed`)
       })
 
+      this.onCancelAll(file.id, () => {
+        queuedRequest.abort()
+        this.resetUploaderReferences(file.id, { abort: true })
+        resolve(`upload ${file.id} was canceled`)
+      })
+
       this.onFilePause(file.id, (isPaused) => {
         if (isPaused) {
+          // Remove this file from the queue so another file can start in its place.
+          queuedRequest.abort()
           upload.pause()
         } else {
-          upload.start()
+          // Resuming an upload should be queued, else you could pause and then resume a queued upload to make it skip the queue.
+          queuedRequest.abort()
+          queuedRequest = this.requests.run(() => {
+            upload.start()
+            return () => {}
+          })
         }
       })
 
       this.onPauseAll(file.id, () => {
+        queuedRequest.abort()
         upload.pause()
       })
 
       this.onResumeAll(file.id, () => {
-        upload.start()
+        queuedRequest.abort()
+        if (file.error) {
+          upload.abort()
+        }
+        queuedRequest = this.requests.run(() => {
+          upload.start()
+          return () => {}
+        })
       })
 
-      if (!file.isPaused) {
-        upload.start()
-      }
-
       if (!file.isRestored) {
         this.uppy.emit('upload-started', file, upload)
       }
@@ -252,24 +280,22 @@ module.exports = class AwsS3Multipart extends Plugin {
   uploadRemote (file) {
     this.resetUploaderReferences(file.id)
 
-    return new Promise((resolve, reject) => {
-      if (file.serverToken) {
-        return this.connectToServerSocket(file)
-          .then(() => resolve())
-          .catch(reject)
-      }
-
-      this.uppy.emit('upload-started', file)
+    this.uppy.emit('upload-started', file)
+    if (file.serverToken) {
+      return this.connectToServerSocket(file)
+    }
 
+    return new Promise((resolve, reject) => {
       const Client = file.remote.providerOptions.provider ? Provider : RequestClient
       const client = new Client(this.uppy, file.remote.providerOptions)
       client.post(
         file.remote.url,
-        Object.assign({}, file.remote.body, {
+        {
+          ...file.remote.body,
           protocol: 's3-multipart',
           size: file.data.size,
           metadata: file.meta
-        })
+        }
       ).then((res) => {
         this.uppy.setFileState(file.id, { serverToken: res.token })
         file = this.uppy.getFile(file.id)
@@ -288,46 +314,78 @@ module.exports = class AwsS3Multipart extends Plugin {
     return new Promise((resolve, reject) => {
       const token = file.serverToken
       const host = getSocketHost(file.remote.companionUrl)
-      const socket = new Socket({ target: `${host}/api/${token}` })
+      const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
       this.uploaderSockets[socket] = socket
-      this.uploaderEvents[file.id] = createEventTracker(this.uppy)
+      this.uploaderEvents[file.id] = new EventTracker(this.uppy)
 
       this.onFileRemove(file.id, (removed) => {
+        queuedRequest.abort()
+        socket.send('pause', {})
         this.resetUploaderReferences(file.id, { abort: true })
         resolve(`upload ${file.id} was removed`)
       })
 
       this.onFilePause(file.id, (isPaused) => {
-        socket.send(isPaused ? 'pause' : 'resume', {})
+        if (isPaused) {
+          // Remove this file from the queue so another file can start in its place.
+          queuedRequest.abort()
+          socket.send('pause', {})
+        } else {
+          // Resuming an upload should be queued, else you could pause and then resume a queued upload to make it skip the queue.
+          queuedRequest.abort()
+          queuedRequest = this.requests.run(() => {
+            socket.send('resume', {})
+            return () => {}
+          })
+        }
       })
 
-      this.onPauseAll(file.id, () => socket.send('pause', {}))
+      this.onPauseAll(file.id, () => {
+        queuedRequest.abort()
+        socket.send('pause', {})
+      })
+
+      this.onCancelAll(file.id, () => {
+        queuedRequest.abort()
+        socket.send('pause', {})
+        this.resetUploaderReferences(file.id)
+        resolve(`upload ${file.id} was canceled`)
+      })
 
       this.onResumeAll(file.id, () => {
+        queuedRequest.abort()
         if (file.error) {
           socket.send('pause', {})
         }
-        socket.send('resume', {})
+        queuedRequest = this.requests.run(() => {
+          socket.send('resume', {})
+        })
       })
 
       this.onRetry(file.id, () => {
-        socket.send('pause', {})
-        socket.send('resume', {})
+        // Only do the retry if the upload is actually in progress;
+        // else we could try to send these messages when the upload is still queued.
+        // We may need a better check for this since the socket may also be closed
+        // for other reasons, like network failures.
+        if (socket.isOpen) {
+          socket.send('pause', {})
+          socket.send('resume', {})
+        }
       })
 
       this.onRetryAll(file.id, () => {
-        socket.send('pause', {})
-        socket.send('resume', {})
+        if (socket.isOpen) {
+          socket.send('pause', {})
+          socket.send('resume', {})
+        }
       })
 
-      if (file.isPaused) {
-        socket.send('pause', {})
-      }
-
       socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
 
       socket.on('error', (errData) => {
         this.uppy.emit('upload-error', file, new Error(errData.error))
+        this.resetUploaderReferences(file.id)
+        queuedRequest.done()
         reject(new Error(errData.error))
       })
 
@@ -337,8 +395,19 @@ module.exports = class AwsS3Multipart extends Plugin {
         }
 
         this.uppy.emit('upload-success', file, uploadResp)
+        this.resetUploaderReferences(file.id)
+        queuedRequest.done()
         resolve()
       })
+
+      let queuedRequest = this.requests.run(() => {
+        socket.open()
+        if (file.isPaused) {
+          socket.send('pause', {})
+        }
+
+        return () => {}
+      })
     })
   }
 
@@ -393,6 +462,13 @@ module.exports = class AwsS3Multipart extends Plugin {
     })
   }
 
+  onCancelAll (fileID, cb) {
+    this.uploaderEvents[fileID].on('cancel-all', () => {
+      if (!this.uppy.getFile(fileID)) return
+      cb()
+    })
+  }
+
   onResumeAll (fileID, cb) {
     this.uploaderEvents[fileID].on('resume-all', () => {
       if (!this.uppy.getFile(fileID)) return
@@ -409,19 +485,15 @@ module.exports = class AwsS3Multipart extends Plugin {
       }
     })
     this.uppy.addUploader(this.upload)
-
-    this.uppy.on('cancel-all', () => {
-      this.uppy.getFiles().forEach((file) => {
-        this.resetUploaderReferences(file.id, { abort: true })
-      })
-    })
   }
 
   uninstall () {
+    const { capabilities } = this.uppy.getState()
     this.uppy.setState({
-      capabilities: Object.assign({}, this.uppy.getState().capabilities, {
+      capabilities: {
+        ...capabilities,
         resumableUploads: false
-      })
+      }
     })
     this.uppy.removeUploader(this.upload)
   }

+ 30 - 25
packages/@uppy/aws-s3/src/index.js

@@ -1,7 +1,7 @@
 const resolveUrl = require('resolve-url')
 const { Plugin } = require('@uppy/core')
 const Translator = require('@uppy/utils/lib/Translator')
-const limitPromises = require('@uppy/utils/lib/limitPromises')
+const RateLimitedQueue = require('@uppy/utils/lib/RateLimitedQueue')
 const { RequestClient } = require('@uppy/companion-client')
 const XHRUpload = require('@uppy/xhr-upload')
 
@@ -59,11 +59,7 @@ module.exports = class AwsS3 extends Plugin {
 
     this.prepareUpload = this.prepareUpload.bind(this)
 
-    if (typeof this.opts.limit === 'number' && this.opts.limit !== 0) {
-      this.limitRequests = limitPromises(this.opts.limit)
-    } else {
-      this.limitRequests = (fn) => fn
-    }
+    this.requests = new RateLimitedQueue(this.opts.limit)
   }
 
   getUploadParameters (file) {
@@ -102,25 +98,29 @@ module.exports = class AwsS3 extends Plugin {
       })
     })
 
-    const getUploadParameters = this.limitRequests(this.opts.getUploadParameters)
+    // Wrapping rate-limited opts.getUploadParameters in a Promise takes some boilerplate!
+    const getUploadParameters = this.requests.wrapPromiseFunction((file) => {
+      return this.opts.getUploadParameters(file)
+    })
 
     return Promise.all(
       fileIDs.map((id) => {
         const file = this.uppy.getFile(id)
-        const paramsPromise = Promise.resolve()
-          .then(() => getUploadParameters(file))
-        return paramsPromise.then((params) => {
-          return this.validateParameters(file, params)
-        }).then((params) => {
-          this.uppy.emit('preprocess-progress', file, {
-            mode: 'determinate',
-            message: this.i18n('preparingUpload'),
-            value: 1
+        return getUploadParameters(file)
+          .then((params) => {
+            return this.validateParameters(file, params)
+          })
+          .then((params) => {
+            this.uppy.emit('preprocess-progress', file, {
+              mode: 'determinate',
+              message: this.i18n('preparingUpload'),
+              value: 1
+            })
+            return params
+          })
+          .catch((error) => {
+            this.uppy.emit('upload-error', file, error)
           })
-          return params
-        }).catch((error) => {
-          this.uppy.emit('upload-error', file, error)
-        })
       })
     ).then((responses) => {
       const updatedFiles = {}
@@ -147,16 +147,21 @@ module.exports = class AwsS3 extends Plugin {
           xhrOpts.headers = headers
         }
 
-        const updatedFile = Object.assign({}, file, {
-          meta: Object.assign({}, file.meta, fields),
+        const updatedFile = {
+          ...file,
+          meta: { ...file.meta, ...fields },
           xhrUpload: xhrOpts
-        })
+        }
 
         updatedFiles[id] = updatedFile
       })
 
+      const { files } = this.uppy.getState()
       this.uppy.setState({
-        files: Object.assign({}, this.uppy.getState().files, updatedFiles)
+        files: {
+          ...files,
+          ...updatedFiles
+        }
       })
 
       fileIDs.forEach((id) => {
@@ -175,7 +180,7 @@ module.exports = class AwsS3 extends Plugin {
       fieldName: 'file',
       responseUrlFieldName: 'location',
       timeout: this.opts.timeout,
-      limit: this.opts.limit,
+      __queue: this.requests,
       responseType: 'text',
       // Get the response data from a successful XMLHttpRequest instance.
       // `content` is the S3 response as a string.

+ 25 - 15
packages/@uppy/companion-client/src/Socket.js

@@ -2,18 +2,34 @@ const ee = require('namespace-emitter')
 
 module.exports = class UppySocket {
   constructor (opts) {
-    this.queued = []
+    this.opts = opts
+    this._queued = []
     this.isOpen = false
-    this.socket = new WebSocket(opts.target)
     this.emitter = ee()
 
+    this._handleMessage = this._handleMessage.bind(this)
+
+    this.close = this.close.bind(this)
+    this.emit = this.emit.bind(this)
+    this.on = this.on.bind(this)
+    this.once = this.once.bind(this)
+    this.send = this.send.bind(this)
+
+    if (!opts || opts.autoOpen !== false) {
+      this.open()
+    }
+  }
+
+  open () {
+    this.socket = new WebSocket(this.opts.target)
+
     this.socket.onopen = (e) => {
       this.isOpen = true
 
-      while (this.queued.length > 0 && this.isOpen) {
-        const first = this.queued[0]
+      while (this._queued.length > 0 && this.isOpen) {
+        const first = this._queued[0]
         this.send(first.action, first.payload)
-        this.queued = this.queued.slice(1)
+        this._queued = this._queued.slice(1)
       }
     }
 
@@ -21,26 +37,20 @@ module.exports = class UppySocket {
       this.isOpen = false
     }
 
-    this._handleMessage = this._handleMessage.bind(this)
-
     this.socket.onmessage = this._handleMessage
-
-    this.close = this.close.bind(this)
-    this.emit = this.emit.bind(this)
-    this.on = this.on.bind(this)
-    this.once = this.once.bind(this)
-    this.send = this.send.bind(this)
   }
 
   close () {
-    return this.socket.close()
+    if (this.socket) {
+      this.socket.close()
+    }
   }
 
   send (action, payload) {
     // attach uuid
 
     if (!this.isOpen) {
-      this.queued.push({ action, payload })
+      this._queued.push({ action, payload })
       return
     }
 

+ 5 - 5
packages/@uppy/companion-client/src/Socket.test.js

@@ -66,7 +66,7 @@ describe('Socket', () => {
     const uppySocket = new UppySocket({ target: 'foo' })
 
     uppySocket.send('bar', 'boo')
-    expect(uppySocket.queued).toEqual([{ action: 'bar', payload: 'boo' }])
+    expect(uppySocket._queued).toEqual([{ action: 'bar', payload: 'boo' }])
     expect(webSocketSendSpy.mock.calls.length).toEqual(0)
   })
 
@@ -76,7 +76,7 @@ describe('Socket', () => {
 
     uppySocket.send('bar', 'boo')
     uppySocket.send('moo', 'baa')
-    expect(uppySocket.queued).toEqual([
+    expect(uppySocket._queued).toEqual([
       { action: 'bar', payload: 'boo' },
       { action: 'moo', payload: 'baa' }
     ])
@@ -84,7 +84,7 @@ describe('Socket', () => {
 
     webSocketInstance.triggerOpen()
 
-    expect(uppySocket.queued).toEqual([])
+    expect(uppySocket._queued).toEqual([])
     expect(webSocketSendSpy.mock.calls.length).toEqual(2)
     expect(webSocketSendSpy.mock.calls[0]).toEqual([
       JSON.stringify({ action: 'bar', payload: 'boo' })
@@ -99,11 +99,11 @@ describe('Socket', () => {
     const webSocketInstance = uppySocket.socket
     webSocketInstance.triggerOpen()
     uppySocket.send('bar', 'boo')
-    expect(uppySocket.queued).toEqual([])
+    expect(uppySocket._queued).toEqual([])
 
     webSocketInstance.triggerClose()
     uppySocket.send('bar', 'boo')
-    expect(uppySocket.queued).toEqual([{ action: 'bar', payload: 'boo' }])
+    expect(uppySocket._queued).toEqual([{ action: 'bar', payload: 'boo' }])
   })
 
   it('should close the websocket when it is force closed', () => {

+ 4 - 0
packages/@uppy/companion-client/types/index.d.ts

@@ -28,10 +28,14 @@ export class Provider extends RequestClient {
 
 export interface SocketOptions {
   target: string;
+  autoOpen?: boolean;
 }
 
 export class Socket {
+  isOpen: boolean;
+
   constructor (opts: SocketOptions);
+  open (): void;
   close (): void;
   send (action: string, payload: any): void;
   on (action: string, handler: (param: any) => void): void;

+ 4 - 0
packages/@uppy/core/types/index.d.ts

@@ -105,6 +105,10 @@ declare module Uppy {
     on(event: 'complete', callback: (result: UploadResult) => void): Uppy;
     on(event: string, callback: (...args: any[]) => void): Uppy;
     off(event: string, callback: any): Uppy;
+    /**
+     * For use by plugins only!
+     */
+    emit(event: string, ...args: any[]): void;
     updateAll(state: object): void;
     setState(patch: object): void;
     getState<TMeta extends IndexedObject<any> = {}>(): State<TMeta>;

+ 230 - 95
packages/@uppy/tus/src/index.js

@@ -4,11 +4,21 @@ const { Provider, RequestClient, Socket } = require('@uppy/companion-client')
 const emitSocketProgress = require('@uppy/utils/lib/emitSocketProgress')
 const getSocketHost = require('@uppy/utils/lib/getSocketHost')
 const settle = require('@uppy/utils/lib/settle')
-const limitPromises = require('@uppy/utils/lib/limitPromises')
+const EventTracker = require('@uppy/utils/lib/EventTracker')
+const RateLimitedQueue = require('@uppy/utils/lib/RateLimitedQueue')
 const getFingerprint = require('./getFingerprint')
 
-// Extracted from https://github.com/tus/tus-js-client/blob/master/lib/upload.js#L13
-// excepted we removed 'fingerprint' key to avoid adding more dependencies
+/** @typedef {import('..').TusOptions} TusOptions */
+/** @typedef {import('@uppy/core').Uppy} Uppy */
+/** @typedef {import('@uppy/core').UppyFile} UppyFile */
+/** @typedef {import('@uppy/core').FailedUppyFile<{}>} FailedUppyFile */
+
+/**
+ * Extracted from https://github.com/tus/tus-js-client/blob/master/lib/upload.js#L13
+ * excepted we removed 'fingerprint' key to avoid adding more dependencies
+ *
+ * @type {TusOptions}
+ */
 const tusDefaultOptions = {
   endpoint: '',
   resume: true,
@@ -25,32 +35,16 @@ const tusDefaultOptions = {
   retryDelays: null
 }
 
-/**
- * Create a wrapper around an event emitter with a `remove` method to remove
- * all events that were added using the wrapped emitter.
- */
-function createEventTracker (emitter) {
-  const events = []
-  return {
-    on (event, fn) {
-      events.push([event, fn])
-      return emitter.on(event, fn)
-    },
-    remove () {
-      events.forEach(([event, fn]) => {
-        emitter.off(event, fn)
-      })
-    }
-  }
-}
-
 /**
  * Tus resumable file uploader
- *
  */
 module.exports = class Tus extends Plugin {
   static VERSION = require('../package.json').version
 
+  /**
+   * @param {Uppy} uppy
+   * @param {TusOptions} opts
+   */
   constructor (uppy, opts) {
     super(uppy, opts)
     this.type = 'uploader'
@@ -67,14 +61,14 @@ module.exports = class Tus extends Plugin {
     }
 
     // merge default options with the ones set by user
+    /** @type {import("..").TusOptions} */
     this.opts = Object.assign({}, defaultOptions, opts)
 
-    // Simultaneous upload limiting is shared across all uploads with this plugin.
-    if (typeof this.opts.limit === 'number' && this.opts.limit !== 0) {
-      this.limitUploads = limitPromises(this.opts.limit)
-    } else {
-      this.limitUploads = (fn) => fn
-    }
+    /**
+     * Simultaneous upload limiting is shared across all uploads with this plugin.
+     * @type {RateLimitedQueue}
+     */
+    this.requests = new RateLimitedQueue(this.opts.limit)
 
     this.uploaders = Object.create(null)
     this.uploaderEvents = Object.create(null)
@@ -101,6 +95,8 @@ module.exports = class Tus extends Plugin {
   /**
    * Clean up all references for a file's upload: the tus.Upload instance,
    * any events related to the file, and the Companion WebSocket connection.
+   *
+   * @param {string} fileID
    */
   resetUploaderReferences (fileID) {
     if (this.uploaders[fileID]) {
@@ -118,18 +114,43 @@ module.exports = class Tus extends Plugin {
   }
 
   /**
-   * Create a new Tus upload
+   * Create a new Tus upload.
    *
-   * @param {object} file for use with upload
-   * @param {integer} current file in a queue
-   * @param {integer} total number of files in a queue
-   * @returns {Promise}
+   * A lot can happen during an upload, so this is quite hard to follow!
+   * - First, the upload is started. If the file was already paused by the time the upload starts, nothing should happen.
+   *   If the `limit` option is used, the upload must be queued onto the `this.requests` queue.
+   *   When an upload starts, we store the tus.Upload instance, and an EventTracker instance that manages the event listeners
+   *   for pausing, cancellation, removal, etc.
+   * - While the upload is in progress, it may be paused or cancelled.
+   *   Pausing aborts the underlying tus.Upload, and removes the upload from the `this.requests` queue. All other state is
+   *   maintained.
+   *   Cancelling removes the upload from the `this.requests` queue, and completely aborts the upload--the tus.Upload instance
+   *   is aborted and discarded, the EventTracker instance is destroyed (removing all listeners).
+   *   Resuming the upload uses the `this.requests` queue as well, to prevent selectively pausing and resuming uploads from
+   *   bypassing the limit.
+   * - After completing an upload, the tus.Upload and EventTracker instances are cleaned up, and the upload is marked as done
+   *   in the `this.requests` queue.
+   * - When an upload completed with an error, the same happens as on successful completion, but the `upload()` promise is rejected.
+   *
+   * When working on this function, keep in mind:
+   *  - When an upload is completed or cancelled for any reason, the tus.Upload and EventTracker instances need to be cleaned up using this.resetUploaderReferences().
+   *  - When an upload is cancelled or paused, for any reason, it needs to be removed from the `this.requests` queue using `queuedRequest.abort()`.
+   *  - When an upload is completed for any reason, including errors, it needs to be marked as such using `queuedRequest.done()`.
+   *  - When an upload is started or resumed, it needs to go through the `this.requests` queue. The `queuedRequest` variable must be updated so the other uses of it are valid.
+   *  - Before replacing the `queuedRequest` variable, the previous `queuedRequest` must be aborted, else it will keep taking up a spot in the queue.
+   *
+   * @param {UppyFile} file for use with upload
+   * @param {number} current file in a queue
+   * @param {number} total number of files in a queue
+   * @returns {Promise<void>}
    */
   upload (file, current, total) {
     this.resetUploaderReferences(file.id)
 
     // Create a new tus upload
     return new Promise((resolve, reject) => {
+      this.uppy.emit('upload-started', file)
+
       const optsTus = Object.assign(
         {},
         tusDefaultOptions,
@@ -150,6 +171,7 @@ module.exports = class Tus extends Plugin {
         err.message = `Failed because: ${err.message}`
 
         this.resetUploaderReferences(file.id)
+        queuedRequest.done()
         reject(err)
       }
 
@@ -174,6 +196,7 @@ module.exports = class Tus extends Plugin {
         }
 
         this.resetUploaderReferences(file.id)
+        queuedRequest.done()
         resolve(upload)
       }
 
@@ -203,78 +226,106 @@ module.exports = class Tus extends Plugin {
 
       const upload = new tus.Upload(file.data, optsTus)
       this.uploaders[file.id] = upload
-      this.uploaderEvents[file.id] = createEventTracker(this.uppy)
+      this.uploaderEvents[file.id] = new EventTracker(this.uppy)
+
+      let queuedRequest = this.requests.run(() => {
+        if (!file.isPaused) {
+          upload.start()
+        }
+        // Don't do anything here, the caller will take care of cancelling the upload itself
+        // using resetUploaderReferences(). This is because resetUploaderReferences() has to be
+        // called when this request is still in the queue, and has not been started yet, too. At
+        // that point this cancellation function is not going to be called.
+        // Also, we need to remove the request from the queue _without_ destroying everything
+        // related to this upload to handle pauses.
+        return () => {}
+      })
 
       this.onFileRemove(file.id, (targetFileID) => {
+        queuedRequest.abort()
         this.resetUploaderReferences(file.id)
         resolve(`upload ${targetFileID} was removed`)
       })
 
       this.onPause(file.id, (isPaused) => {
         if (isPaused) {
+          // Remove this file from the queue so another file can start in its place.
+          queuedRequest.abort()
           upload.abort()
         } else {
-          upload.start()
+          // Resuming an upload should be queued, else you could pause and then resume a queued upload to make it skip the queue.
+          queuedRequest.abort()
+          queuedRequest = this.requests.run(() => {
+            upload.start()
+            return () => {}
+          })
         }
       })
 
       this.onPauseAll(file.id, () => {
+        queuedRequest.abort()
         upload.abort()
       })
 
       this.onCancelAll(file.id, () => {
+        queuedRequest.abort()
         this.resetUploaderReferences(file.id)
         resolve(`upload ${file.id} was canceled`)
       })
 
       this.onResumeAll(file.id, () => {
+        queuedRequest.abort()
         if (file.error) {
           upload.abort()
         }
-        upload.start()
+        queuedRequest = this.requests.run(() => {
+          upload.start()
+          return () => {}
+        })
       })
-
-      if (!file.isPaused) {
-        upload.start()
-      }
+    }).catch((err) => {
+      this.uppy.emit('upload-error', file, err)
+      throw err
     })
   }
 
+  /**
+   * @param {UppyFile} file for use with upload
+   * @param {number} current file in a queue
+   * @param {number} total number of files in a queue
+   * @return {Promise<void>}
+   */
   uploadRemote (file, current, total) {
     this.resetUploaderReferences(file.id)
 
-    const opts = Object.assign(
-      {},
-      this.opts,
+    const opts = { ...this.opts }
+    if (file.tus) {
       // Install file-specific upload overrides.
-      file.tus || {}
-    )
+      Object.assign(opts, file.tus)
+    }
 
-    return new Promise((resolve, reject) => {
-      this.uppy.log(file.remote.url)
-      if (file.serverToken) {
-        return this.connectToServerSocket(file)
-          .then(() => resolve())
-          .catch(reject)
-      }
+    this.uppy.emit('upload-started', file)
+    this.uppy.log(file.remote.url)
 
-      this.uppy.emit('upload-started', file)
+    if (file.serverToken) {
+      return this.connectToServerSocket(file)
+    }
+
+    return new Promise((resolve, reject) => {
       const Client = file.remote.providerOptions.provider ? Provider : RequestClient
       const client = new Client(this.uppy, file.remote.providerOptions)
-      client.post(
-        file.remote.url,
-        Object.assign({}, file.remote.body, {
-          endpoint: opts.endpoint,
-          uploadUrl: opts.uploadUrl,
-          protocol: 'tus',
-          size: file.data.size,
-          metadata: file.meta
-        })
-      ).then((res) => {
+
+      // !! cancellation is NOT supported at this stage yet
+      client.post(file.remote.url, {
+        ...file.remote.body,
+        endpoint: opts.endpoint,
+        uploadUrl: opts.uploadUrl,
+        protocol: 'tus',
+        size: file.data.size,
+        metadata: file.meta
+      }).then((res) => {
         this.uppy.setFileState(file.id, { serverToken: res.token })
         file = this.uppy.getFile(file.id)
-        return file
-      }).then((file) => {
         return this.connectToServerSocket(file)
       }).then(() => {
         resolve()
@@ -284,48 +335,85 @@ module.exports = class Tus extends Plugin {
     })
   }
 
+  /**
+   * See the comment on the upload() method.
+   *
+   * Additionally, when an upload is removed, completed, or cancelled, we need to close the WebSocket connection. This is handled by the resetUploaderReferences() function, so the same guidelines apply as in upload().
+   *
+   * @param {UppyFile} file
+   */
   connectToServerSocket (file) {
     return new Promise((resolve, reject) => {
       const token = file.serverToken
       const host = getSocketHost(file.remote.companionUrl)
-      const socket = new Socket({ target: `${host}/api/${token}` })
+      const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
       this.uploaderSockets[file.id] = socket
-      this.uploaderEvents[file.id] = createEventTracker(this.uppy)
+      this.uploaderEvents[file.id] = new EventTracker(this.uppy)
 
       this.onFileRemove(file.id, () => {
+        queuedRequest.abort()
         socket.send('pause', {})
+        this.resetUploaderReferences(file.id)
         resolve(`upload ${file.id} was removed`)
       })
 
       this.onPause(file.id, (isPaused) => {
-        isPaused ? socket.send('pause', {}) : socket.send('resume', {})
+        if (isPaused) {
+          // Remove this file from the queue so another file can start in its place.
+          queuedRequest.abort()
+          socket.send('pause', {})
+        } else {
+          // Resuming an upload should be queued, else you could pause and then resume a queued upload to make it skip the queue.
+          queuedRequest.abort()
+          queuedRequest = this.requests.run(() => {
+            socket.send('resume', {})
+            return () => {}
+          })
+        }
       })
 
-      this.onPauseAll(file.id, () => socket.send('pause', {}))
+      this.onPauseAll(file.id, () => {
+        queuedRequest.abort()
+        socket.send('pause', {})
+      })
 
-      this.onCancelAll(file.id, () => socket.send('pause', {}))
+      this.onCancelAll(file.id, () => {
+        queuedRequest.abort()
+        socket.send('pause', {})
+        this.resetUploaderReferences(file.id)
+        resolve(`upload ${file.id} was canceled`)
+      })
 
       this.onResumeAll(file.id, () => {
+        queuedRequest.abort()
         if (file.error) {
           socket.send('pause', {})
         }
-        socket.send('resume', {})
+        queuedRequest = this.requests.run(() => {
+          socket.send('resume', {})
+          return () => {}
+        })
       })
 
       this.onRetry(file.id, () => {
-        socket.send('pause', {})
-        socket.send('resume', {})
+        // Only do the retry if the upload is actually in progress;
+        // else we could try to send these messages when the upload is still queued.
+        // We may need a better check for this since the socket may also be closed
+        // for other reasons, like network failures.
+        if (socket.isOpen) {
+          socket.send('pause', {})
+          socket.send('resume', {})
+        }
       })
 
       this.onRetryAll(file.id, () => {
-        socket.send('pause', {})
-        socket.send('resume', {})
+        // See the comment in the onRetry() call
+        if (socket.isOpen) {
+          socket.send('pause', {})
+          socket.send('resume', {})
+        }
       })
 
-      if (file.isPaused) {
-        socket.send('pause', {})
-      }
-
       socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
 
       socket.on('error', (errData) => {
@@ -340,9 +428,12 @@ module.exports = class Tus extends Plugin {
           this.uppy.setFileState(file.id, {
             serverToken: null
           })
+        } else {
+          socket.close()
         }
 
         this.uppy.emit('upload-error', file, error)
+        queuedRequest.done()
         reject(error)
       })
 
@@ -353,14 +444,33 @@ module.exports = class Tus extends Plugin {
 
         this.uppy.emit('upload-success', file, uploadResp)
         this.resetUploaderReferences(file.id)
+        queuedRequest.done()
         resolve()
       })
+
+      let queuedRequest = this.requests.run(() => {
+        socket.open()
+        if (file.isPaused) {
+          socket.send('pause', {})
+        }
+
+        // Don't do anything here, the caller will take care of cancelling the upload itself
+        // using resetUploaderReferences(). This is because resetUploaderReferences() has to be
+        // called when this request is still in the queue, and has not been started yet, too. At
+        // that point this cancellation function is not going to be called.
+        // Also, we need to remove the request from the queue _without_ destroying everything
+        // related to this upload to handle pauses.
+        return () => {}
+      })
     })
   }
 
   /**
    * Store the uploadUrl on the file options, so that when Golden Retriever
    * restores state, we will continue uploading to the correct URL.
+   *
+   * @param {UppyFile} file
+   * @param {string} uploadURL
    */
   onReceiveUploadUrl (file, uploadURL) {
     const currentFile = this.uppy.getFile(file.id)
@@ -376,12 +486,20 @@ module.exports = class Tus extends Plugin {
     }
   }
 
+  /**
+   * @param {string} fileID
+   * @param {function(string): void} cb
+   */
   onFileRemove (fileID, cb) {
     this.uploaderEvents[fileID].on('file-removed', (file) => {
       if (fileID === file.id) cb(file.id)
     })
   }
 
+  /**
+   * @param {string} fileID
+   * @param {function(boolean): void} cb
+   */
   onPause (fileID, cb) {
     this.uploaderEvents[fileID].on('upload-pause', (targetFileID, isPaused) => {
       if (fileID === targetFileID) {
@@ -391,6 +509,10 @@ module.exports = class Tus extends Plugin {
     })
   }
 
+  /**
+   * @param {string} fileID
+   * @param {function(): void} cb
+   */
   onRetry (fileID, cb) {
     this.uploaderEvents[fileID].on('upload-retry', (targetFileID) => {
       if (fileID === targetFileID) {
@@ -399,6 +521,10 @@ module.exports = class Tus extends Plugin {
     })
   }
 
+  /**
+   * @param {string} fileID
+   * @param {function(): void} cb
+   */
   onRetryAll (fileID, cb) {
     this.uploaderEvents[fileID].on('retry-all', (filesToRetry) => {
       if (!this.uppy.getFile(fileID)) return
@@ -406,6 +532,10 @@ module.exports = class Tus extends Plugin {
     })
   }
 
+  /**
+   * @param {string} fileID
+   * @param {function(): void} cb
+   */
   onPauseAll (fileID, cb) {
     this.uploaderEvents[fileID].on('pause-all', () => {
       if (!this.uppy.getFile(fileID)) return
@@ -413,6 +543,10 @@ module.exports = class Tus extends Plugin {
     })
   }
 
+  /**
+   * @param {string} fileID
+   * @param {function(): void} cb
+   */
   onCancelAll (fileID, cb) {
     this.uploaderEvents[fileID].on('cancel-all', () => {
       if (!this.uppy.getFile(fileID)) return
@@ -420,6 +554,10 @@ module.exports = class Tus extends Plugin {
     })
   }
 
+  /**
+   * @param {string} fileID
+   * @param {function(): void} cb
+   */
   onResumeAll (fileID, cb) {
     this.uploaderEvents[fileID].on('resume-all', () => {
       if (!this.uppy.getFile(fileID)) return
@@ -427,32 +565,29 @@ module.exports = class Tus extends Plugin {
     })
   }
 
+  /**
+   * @param {(UppyFile | FailedUppyFile)[]} files
+   */
   uploadFiles (files) {
-    const actions = files.map((file, i) => {
-      const current = parseInt(i, 10) + 1
+    const promises = files.map((file, i) => {
+      const current = i + 1
       const total = files.length
 
-      if (file.error) {
-        return () => Promise.reject(new Error(file.error))
+      if ('error' in file && file.error) {
+        return Promise.reject(new Error(file.error))
       } else if (file.isRemote) {
-        // We emit upload-started here, so that it's also emitted for files
-        // that have to wait due to the `limit` option.
-        this.uppy.emit('upload-started', file)
-        return this.uploadRemote.bind(this, file, current, total)
+        return this.uploadRemote(file, current, total)
       } else {
-        this.uppy.emit('upload-started', file)
-        return this.upload.bind(this, file, current, total)
+        return this.upload(file, current, total)
       }
     })
 
-    const promises = actions.map((action) => {
-      const limitedAction = this.limitUploads(action)
-      return limitedAction()
-    })
-
     return settle(promises)
   }
 
+  /**
+   * @param {string[]} fileIDs
+   */
   handleUpload (fileIDs) {
     if (fileIDs.length === 0) {
       this.uppy.log('[Tus] No files to upload')

+ 21 - 0
packages/@uppy/utils/src/EventTracker.js

@@ -0,0 +1,21 @@
+/**
+ * Create a wrapper around an event emitter with a `remove` method to remove
+ * all events that were added using the wrapped emitter.
+ */
+module.exports = class EventTracker {
+  constructor (emitter) {
+    this._events = []
+    this._emitter = emitter
+  }
+
+  on (event, fn) {
+    this._events.push([event, fn])
+    return this._emitter.on(event, fn)
+  }
+
+  remove () {
+    this._events.forEach(([event, fn]) => {
+      this._emitter.off(event, fn)
+    })
+  }
+}

+ 37 - 0
packages/@uppy/utils/src/ProgressTimeout.js

@@ -0,0 +1,37 @@
+/**
+ * Helper to abort upload requests if there has not been any progress for `timeout` ms.
+ * Create an instance using `timer = new ProgressTimeout(10000, onTimeout)`
+ * Call `timer.progress()` to signal that there has been progress of any kind.
+ * Call `timer.done()` when the upload has completed.
+ */
+class ProgressTimeout {
+  constructor (timeout, timeoutHandler) {
+    this._timeout = timeout
+    this._onTimedOut = timeoutHandler
+    this._isDone = false
+    this._aliveTimer = null
+    this._onTimedOut = this._onTimedOut.bind(this)
+  }
+
+  progress () {
+    // Some browsers fire another progress event when the upload is
+    // cancelled, so we have to ignore progress after the timer was
+    // told to stop.
+    if (this._isDone) return
+
+    if (this._timeout > 0) {
+      if (this._aliveTimer) clearTimeout(this._aliveTimer)
+      this._aliveTimer = setTimeout(this._onTimedOut, this._timeout)
+    }
+  }
+
+  done () {
+    if (this._aliveTimer) {
+      clearTimeout(this._aliveTimer)
+      this._aliveTimer = null
+    }
+    this._isDone = true
+  }
+}
+
+module.exports = ProgressTimeout

+ 115 - 0
packages/@uppy/utils/src/RateLimitedQueue.js

@@ -0,0 +1,115 @@
+module.exports = class RateLimitedQueue {
+  constructor (limit) {
+    if (typeof limit !== 'number' || limit === 0) {
+      this.limit = Infinity
+    } else {
+      this.limit = limit
+    }
+
+    this.activeRequests = 0
+    this.queuedHandlers = []
+  }
+
+  _call (fn) {
+    this.activeRequests += 1
+
+    let done = false
+
+    let cancelActive
+    try {
+      cancelActive = fn()
+    } catch (err) {
+      this.activeRequests -= 1
+      throw err
+    }
+
+    return {
+      abort: () => {
+        if (done) return
+        done = true
+        this.activeRequests -= 1
+        cancelActive()
+        this._next()
+      },
+
+      done: () => {
+        if (done) return
+        done = true
+        this.activeRequests -= 1
+        this._next()
+      }
+    }
+  }
+
+  _next () {
+    if (this.activeRequests >= this.limit) {
+      return
+    }
+    if (this.queuedHandlers.length === 0) {
+      return
+    }
+
+    // Dispatch the next request, and update the abort/done handlers
+    // so that cancelling it does the Right Thing (and doesn't just try
+    // to dequeue an already-running request).
+    const next = this.queuedHandlers.shift()
+    const handler = this._call(next.fn)
+    next.abort = handler.abort
+    next.done = handler.done
+  }
+
+  _queue (fn) {
+    const handler = {
+      fn,
+      abort: () => {
+        this._dequeue(handler)
+      },
+      done: () => {
+        throw new Error('Cannot mark a queued request as done: this indicates a bug')
+      }
+    }
+    this.queuedHandlers.push(handler)
+    return handler
+  }
+
+  _dequeue (handler) {
+    const index = this.queuedHandlers.indexOf(handler)
+    if (index !== -1) {
+      this.queuedHandlers.splice(index, 1)
+    }
+  }
+
+  run (fn) {
+    if (this.activeRequests < this.limit) {
+      return this._call(fn)
+    }
+    return this._queue(fn)
+  }
+
+  wrapPromiseFunction (fn) {
+    return (...args) => new Promise((resolve, reject) => {
+      const queuedRequest = this.run(() => {
+        let cancelError
+        fn(...args).then((result) => {
+          if (cancelError) {
+            reject(cancelError)
+          } else {
+            queuedRequest.done()
+            resolve(result)
+          }
+        }, (err) => {
+          if (cancelError) {
+            reject(cancelError)
+          } else {
+            queuedRequest.done()
+            reject(err)
+          }
+        })
+
+        return () => {
+          cancelError = new Error('Cancelled')
+        }
+      })
+    })
+  }
+}

+ 47 - 0
packages/@uppy/utils/src/RateLimitedQueue.test.js

@@ -0,0 +1,47 @@
+const RateLimitedQueue = require('./RateLimitedQueue')
+
+const delay = ms => new Promise(resolve => setTimeout(resolve, ms))
+
+describe('RateLimitedQueue', () => {
+  let pending = 0
+  function fn () {
+    pending++
+    return delay(15).then(() => pending--)
+  }
+
+  it('should run at most N promises at the same time', async () => {
+    const queue = new RateLimitedQueue(4)
+    const fn2 = queue.wrapPromiseFunction(fn)
+
+    const result = Promise.all([
+      fn2(), fn2(), fn2(), fn2(),
+      fn2(), fn2(), fn2(), fn2(),
+      fn2(), fn2()
+    ])
+
+    expect(pending).toBe(4)
+
+    await delay(10)
+    expect(pending).toBe(4)
+
+    await result
+    expect(pending).toBe(0)
+  })
+
+  it('should accept Infinity as limit', () => {
+    const queue = new RateLimitedQueue(Infinity)
+    const fn2 = queue.wrapPromiseFunction(fn)
+
+    const result = Promise.all([
+      fn2(), fn2(), fn2(), fn2(),
+      fn2(), fn2(), fn2(), fn2(),
+      fn2(), fn2()
+    ])
+
+    expect(pending).toBe(10)
+
+    return result.then(() => {
+      expect(pending).toBe(0)
+    })
+  })
+})

+ 0 - 36
packages/@uppy/utils/src/limitPromises.js

@@ -1,36 +0,0 @@
-/**
- * Limit the amount of simultaneously pending Promises.
- * Returns a function that, when passed a function `fn`,
- * will make sure that at most `limit` calls to `fn` are pending.
- *
- * @param {number} limit
- * @returns {function()}
- */
-module.exports = function limitPromises (limit) {
-  let pending = 0
-  const queue = []
-  return (fn) => {
-    return (...args) => {
-      const call = () => {
-        pending++
-        const promise = fn(...args)
-        promise.then(onfinish, onfinish)
-        return promise
-      }
-
-      if (pending >= limit) {
-        return new Promise((resolve, reject) => {
-          queue.push(() => {
-            call().then(resolve, reject)
-          })
-        })
-      }
-      return call()
-    }
-  }
-  function onfinish () {
-    pending--
-    const next = queue.shift()
-    if (next) next()
-  }
-}

+ 0 - 47
packages/@uppy/utils/src/limitPromises.test.js

@@ -1,47 +0,0 @@
-const limitPromises = require('./limitPromises')
-
-describe('limitPromises', () => {
-  let pending = 0
-  function fn () {
-    pending++
-    return new Promise((resolve) => setTimeout(resolve, 10))
-      .then(() => pending--)
-  }
-
-  it('should run at most N promises at the same time', () => {
-    const limit = limitPromises(4)
-    const fn2 = limit(fn)
-
-    const result = Promise.all([
-      fn2(), fn2(), fn2(), fn2(),
-      fn2(), fn2(), fn2(), fn2(),
-      fn2(), fn2()
-    ])
-
-    expect(pending).toBe(4)
-    setTimeout(() => {
-      expect(pending).toBe(4)
-    }, 10)
-
-    return result.then(() => {
-      expect(pending).toBe(0)
-    })
-  })
-
-  it('should accept Infinity as limit', () => {
-    const limit = limitPromises(Infinity)
-    const fn2 = limit(fn)
-
-    const result = Promise.all([
-      fn2(), fn2(), fn2(), fn2(),
-      fn2(), fn2(), fn2(), fn2(),
-      fn2(), fn2()
-    ])
-
-    expect(pending).toBe(10)
-
-    return result.then(() => {
-      expect(pending).toBe(0)
-    })
-  })
-})

+ 107 - 41
packages/@uppy/utils/types/index.d.ts

@@ -1,28 +1,81 @@
 declare module '@uppy/utils/lib/Translator' {
-  export interface TranslatorOptions {
-    locale: {
-      strings: {
-        [key: string]: string | { [plural: number]: string };
+  namespace Translator {
+    export interface TranslatorOptions {
+      locale: {
+        strings: {
+          [key: string]: string | { [plural: number]: string };
+        };
+        pluralize: (n: number) => number;
       };
-      pluralize: (n: number) => number;
+    }
+  }
+
+  class Translator {
+    constructor(opts: Translator.TranslatorOptions);
+  }
+
+  export = Translator
+}
+
+declare module '@uppy/utils/lib/EventTracker' {
+  namespace EventTracker {
+    export type EventHandler = (...args: any[]) => void;
+    export interface Emitter {
+      on: (event: string, handler: EventHandler) => void;
+      off: (event: string, handler: EventHandler) => void;
+    }
+  }
+
+  class EventTracker {
+    constructor(emitter: EventTracker.Emitter);
+    on(event: string, handler: EventTracker.EventHandler): void;
+    remove(): void;
+  }
+
+  export = EventTracker
+}
+
+declare module '@uppy/utils/lib/ProgressTimeout' {
+  class ProgressTimeout {
+    constructor(timeout: number, timeoutHandler: () => void);
+    progress(): void;
+    done(): void;
+  }
+  export = ProgressTimeout
+}
+
+declare module '@uppy/utils/lib/RateLimitedQueue' {
+  namespace RateLimitedQueue {
+    export type AbortFunction = () => void;
+    export type PromiseFunction = (...args: any[]) => Promise<any>;
+    export type QueueEntry = {
+      abort: () => void,
+      done: () => void,
     };
   }
 
-  export default class Translator {
-    constructor(opts: TranslatorOptions);
+  class RateLimitedQueue {
+    constructor(limit: number);
+    run(fn: () => RateLimitedQueue.AbortFunction): RateLimitedQueue.QueueEntry;
+    wrapPromiseFunction(fn: () => RateLimitedQueue.PromiseFunction): RateLimitedQueue.PromiseFunction;
   }
+
+  export = RateLimitedQueue
 }
 
 declare module '@uppy/utils/lib/canvasToBlob' {
-  export default function canvasToBlob(canvas: HTMLCanvasElement, type: string, quality?: number): Promise<Blob>;
+  function canvasToBlob(canvas: HTMLCanvasElement, type: string, quality?: number): Promise<Blob>;
+  export = canvasToBlob
 }
 
 declare module '@uppy/utils/lib/dataURItoBlob' {
-  export default function dataURItoBlob(dataURI: string, opts: { mimeType?: string, name?: string }): Blob;
+  function dataURItoBlob(dataURI: string, opts: { mimeType?: string, name?: string }): Blob;
+  export = dataURItoBlob
 }
 
 declare module '@uppy/utils/lib/dataURItoFile' {
-  export default function dataURItoFile(dataURI: string, opts: { mimeType?: string, name?: string }): File;
+  function dataURItoFile(dataURI: string, opts: { mimeType?: string, name?: string }): File;
+  export = dataURItoFile
 }
 
 declare module '@uppy/utils/lib/emitSocketProgress' {
@@ -34,104 +87,117 @@ declare module '@uppy/utils/lib/emitSocketProgress' {
     bytesTotal: number;
   }
 
-  export default function emitSocketProgress(uploader: object, progressData: ProgressData, file: UppyUtils.UppyFile): void;
+  function emitSocketProgress(uploader: object, progressData: ProgressData, file: UppyUtils.UppyFile): void;
+  export = emitSocketProgress
 }
 
 declare module '@uppy/utils/lib/findAllDOMElements' {
-  export default function findAllDOMElements(element: string | HTMLElement): HTMLElement[];
+  function findAllDOMElements(element: string | HTMLElement): HTMLElement[];
+  export = findAllDOMElements
 }
 
 declare module '@uppy/utils/lib/findDOMElement' {
-  export default function findDOMElement(element: string | HTMLElement): HTMLElement | null;
+  function findDOMElement(element: string | HTMLElement): HTMLElement | null;
+  export = findDOMElement
 }
 
 declare module '@uppy/utils/lib/generateFileID' {
   import UppyUtils = require('@uppy/utils');
 
-  export default function generateFileID(file: UppyUtils.UppyFile): string;
+  function generateFileID(file: UppyUtils.UppyFile): string;
+  export = generateFileID
 }
 
 declare module '@uppy/utils/lib/getBytesRemaining' {
-  export default function getBytesRemaining(progress: { bytesTotal: number, bytesUploaded: number }): number;
+  function getBytesRemaining(progress: { bytesTotal: number, bytesUploaded: number }): number;
+  export = getBytesRemaining
 }
 
 declare module '@uppy/utils/lib/getETA' {
-  export default function getETA(progress: object): number;
+  function getETA(progress: object): number;
+  export = getETA
 }
 
 declare module '@uppy/utils/lib/getFileNameAndExtension' {
-  export default function getFileNameAndExtension(filename: string): { name: string, extension: string };
+  function getFileNameAndExtension(filename: string): { name: string, extension: string };
+  export = getFileNameAndExtension
 }
 
 declare module '@uppy/utils/lib/getFileType' {
   import UppyUtils = require('@uppy/utils');
 
-  export default function getFileType(file: UppyUtils.UppyFile): string | null;
+  function getFileType(file: UppyUtils.UppyFile): string | null;
+  export = getFileType
 }
 
 declare module '@uppy/utils/lib/getFileTypeExtension' {
-  export default function getFileTypeExtension(mime: string): string;
+  function getFileTypeExtension(mime: string): string;
+  export = getFileTypeExtension
 }
 
 declare module '@uppy/utils/lib/getSocketHost' {
-  export default function getSocketHost(url: string): string;
+  function getSocketHost(url: string): string;
+  export = getSocketHost
 }
 
 declare module '@uppy/utils/lib/getSpeed' {
-  export default function getSpeed(progress: { bytesTotal: number, bytesUploaded: number }): number;
+  function getSpeed(progress: { bytesTotal: number, bytesUploaded: number }): number;
+  export = getSpeed
 }
 
 declare module '@uppy/utils/lib/getTimeStamp' {
-  export default function getTimeStamp(): string;
+  function getTimeStamp(): string;
+  export = getTimeStamp
 }
 
 declare module '@uppy/utils/lib/isDOMElement' {
-  export default function isDOMElement(element: any): boolean;
+  function isDOMElement(element: any): boolean;
+  export = isDOMElement
 }
 
 declare module '@uppy/utils/lib/isObjectURL' {
-  export default function isObjectURL(url: string): boolean;
+  function isObjectURL(url: string): boolean;
+  export = isObjectURL
 }
 
 declare module '@uppy/utils/lib/isDragDropSupported' {
-  export default function isDragDropSupported(): boolean;
+  function isDragDropSupported(): boolean;
+  export = isDragDropSupported
 }
 
 declare module '@uppy/utils/lib/isPreviewSupported' {
-  export default function isPreviewSupported(mime: string): boolean;
+  function isPreviewSupported(mime: string): boolean;
+  export = isPreviewSupported
 }
 
 declare module '@uppy/utils/lib/isTouchDevice' {
-  export default function isTouchDevice(): boolean;
-}
-
-declare module '@uppy/utils/lib/limitPromises' {
-  // TODO guess this could be generic but it's probably fine this way
-  // because it's mostly for internal use
-  type LimitedFunction<T> = (...args: any[]) => Promise<T>;
-  type LimitedFunctionFactory<T> = (fn: (...args: any[]) => Promise<T>) => LimitedFunction<T>;
-
-  export default function limitPromises<T>(limit: number): LimitedFunctionFactory<T>;
+  function isTouchDevice(): boolean;
+  export = isTouchDevice
 }
 
 declare module '@uppy/utils/lib/prettyETA' {
-  export default function prettyETA(seconds: number): string;
+  function prettyETA(seconds: number): string;
+  export = prettyETA
 }
 
 declare module '@uppy/utils/lib/secondsToTime' {
-  export default function secondsToTime(seconds: number): string;
+  function secondsToTime(seconds: number): string;
+  export = secondsToTime
 }
 
 declare module '@uppy/utils/lib/settle' {
-  export default function settle<T>(promises: Promise<T>[]): Promise<{ successful: T[], failed: any[] }>;
+  function settle<T>(promises: Promise<T>[]): Promise<{ successful: T[], failed: any[] }>;
+  export = settle
 }
 
 declare module '@uppy/utils/lib/toArray' {
-  export default function toArray(list: any): any[];
+  function toArray(list: any): any[];
+  export = toArray
 }
 
 declare module '@uppy/utils/lib/getDroppedFiles' {
-  export default function getDroppedFiles(dataTransfer: DataTransfer, options?: object): Promise<File[]>;
+  function getDroppedFiles(dataTransfer: DataTransfer, options?: object): Promise<File[]>;
+  export = getDroppedFiles
 }
 
 declare module '@uppy/utils' {

+ 112 - 71
packages/@uppy/xhr-upload/src/index.js

@@ -5,7 +5,9 @@ const { Provider, RequestClient, Socket } = require('@uppy/companion-client')
 const emitSocketProgress = require('@uppy/utils/lib/emitSocketProgress')
 const getSocketHost = require('@uppy/utils/lib/getSocketHost')
 const settle = require('@uppy/utils/lib/settle')
-const limitPromises = require('@uppy/utils/lib/limitPromises')
+const EventTracker = require('@uppy/utils/lib/EventTracker')
+const ProgressTimeout = require('@uppy/utils/lib/ProgressTimeout')
+const RateLimitedQueue = require('@uppy/utils/lib/RateLimitedQueue')
 
 function buildResponseError (xhr, error) {
   // No error message
@@ -111,15 +113,18 @@ module.exports = class XHRUpload extends Plugin {
     this.handleUpload = this.handleUpload.bind(this)
 
     // Simultaneous upload limiting is shared across all uploads with this plugin.
-    if (typeof this.opts.limit === 'number' && this.opts.limit !== 0) {
-      this.limitUploads = limitPromises(this.opts.limit)
+    // __queue is for internal Uppy use only!
+    if (this.opts.__queue instanceof RateLimitedQueue) {
+      this.requests = this.opts.__queue
     } else {
-      this.limitUploads = (fn) => fn
+      this.requests = new RateLimitedQueue(this.opts.limit)
     }
 
     if (this.opts.bundle && !this.opts.formData) {
       throw new Error('`opts.formData` must be true when `opts.bundle` is enabled.')
     }
+
+    this.uploaderEvents = Object.create(null)
   }
 
   getOptions (file) {
@@ -141,49 +146,6 @@ module.exports = class XHRUpload extends Plugin {
     return opts
   }
 
-  // Helper to abort upload requests if there has not been any progress for `timeout` ms.
-  // Create an instance using `timer = createProgressTimeout(10000, onTimeout)`
-  // Call `timer.progress()` to signal that there has been progress of any kind.
-  // Call `timer.done()` when the upload has completed.
-  createProgressTimeout (timeout, timeoutHandler) {
-    const uppy = this.uppy
-    const self = this
-    let isDone = false
-
-    function onTimedOut () {
-      uppy.log(`[XHRUpload] timed out`)
-      const error = new Error(self.i18n('timedOut', { seconds: Math.ceil(timeout / 1000) }))
-      timeoutHandler(error)
-    }
-
-    let aliveTimer = null
-    function progress () {
-      // Some browsers fire another progress event when the upload is
-      // cancelled, so we have to ignore progress after the timer was
-      // told to stop.
-      if (isDone) return
-
-      if (timeout > 0) {
-        if (aliveTimer) clearTimeout(aliveTimer)
-        aliveTimer = setTimeout(onTimedOut, timeout)
-      }
-    }
-
-    function done () {
-      uppy.log(`[XHRUpload] timer done`)
-      if (aliveTimer) {
-        clearTimeout(aliveTimer)
-        aliveTimer = null
-      }
-      isDone = true
-    }
-
-    return {
-      progress,
-      done
-    }
-  }
-
   addMetadata (formData, meta, opts) {
     const metaFields = Array.isArray(opts.metaFields)
       ? opts.metaFields
@@ -240,17 +202,21 @@ module.exports = class XHRUpload extends Plugin {
 
     this.uppy.log(`uploading ${current} of ${total}`)
     return new Promise((resolve, reject) => {
+      this.uppy.emit('upload-started', file)
+
       const data = opts.formData
         ? this.createFormDataUpload(file, opts)
         : this.createBareUpload(file, opts)
 
-      const timer = this.createProgressTimeout(opts.timeout, (error) => {
+      const timer = new ProgressTimeout(opts.timeout, () => {
         xhr.abort()
+        const error = new Error(this.i18n('timedOut', { seconds: Math.ceil(opts.timeout / 1000) }))
         this.uppy.emit('upload-error', file, error)
         reject(error)
       })
 
       const xhr = new XMLHttpRequest()
+      this.uploaderEvents[file.id] = new EventTracker(this.uppy)
 
       const id = cuid()
 
@@ -276,6 +242,11 @@ module.exports = class XHRUpload extends Plugin {
       xhr.addEventListener('load', (ev) => {
         this.uppy.log(`[XHRUpload] ${id} finished`)
         timer.done()
+        queuedRequest.done()
+        if (this.uploaderEvents[file.id]) {
+          this.uploaderEvents[file.id].remove()
+          this.uploaderEvents[file.id] = null
+        }
 
         if (opts.validateStatus(ev.target.status, xhr.responseText, xhr)) {
           const body = opts.getResponseData(xhr.responseText, xhr)
@@ -311,6 +282,11 @@ module.exports = class XHRUpload extends Plugin {
       xhr.addEventListener('error', (ev) => {
         this.uppy.log(`[XHRUpload] ${id} errored`)
         timer.done()
+        queuedRequest.done()
+        if (this.uploaderEvents[file.id]) {
+          this.uploaderEvents[file.id].remove()
+          this.uploaderEvents[file.id] = null
+        }
 
         const error = buildResponseError(xhr, opts.getResponseError(xhr.responseText, xhr))
         this.uppy.emit('upload-error', file, error)
@@ -329,19 +305,21 @@ module.exports = class XHRUpload extends Plugin {
         xhr.setRequestHeader(header, opts.headers[header])
       })
 
-      xhr.send(data)
-
-      this.uppy.on('file-removed', (removedFile) => {
-        if (removedFile.id === file.id) {
+      const queuedRequest = this.requests.run(() => {
+        xhr.send(data)
+        return () => {
           timer.done()
           xhr.abort()
-          reject(new Error('File removed'))
         }
       })
 
-      this.uppy.on('cancel-all', () => {
-        timer.done()
-        xhr.abort()
+      this.onFileRemove(file.id, () => {
+        queuedRequest.abort()
+        reject(new Error('File removed'))
+      })
+
+      this.onCancelAll(file.id, () => {
+        queuedRequest.abort()
         reject(new Error('Upload cancelled'))
       })
     })
@@ -350,6 +328,8 @@ module.exports = class XHRUpload extends Plugin {
   uploadRemote (file, current, total) {
     const opts = this.getOptions(file)
     return new Promise((resolve, reject) => {
+      this.uppy.emit('upload-started', file)
+
       const fields = {}
       const metaFields = Array.isArray(opts.metaFields)
         ? opts.metaFields
@@ -372,7 +352,30 @@ module.exports = class XHRUpload extends Plugin {
       }).then((res) => {
         const token = res.token
         const host = getSocketHost(file.remote.companionUrl)
-        const socket = new Socket({ target: `${host}/api/${token}` })
+        const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
+        this.uploaderEvents[file.id] = new EventTracker(this.uppy)
+
+        this.onFileRemove(file.id, () => {
+          socket.send('pause', {})
+          queuedRequest.abort()
+          resolve(`upload ${file.id} was removed`)
+        })
+
+        this.onCancelAll(file.id, () => {
+          socket.send('pause', {})
+          queuedRequest.abort()
+          resolve(`upload ${file.id} was canceled`)
+        })
+
+        this.onRetry(file.id, () => {
+          socket.send('pause', {})
+          socket.send('resume', {})
+        })
+
+        this.onRetryAll(file.id, () => {
+          socket.send('pause', {})
+          socket.send('resume', {})
+        })
 
         socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
 
@@ -387,7 +390,11 @@ module.exports = class XHRUpload extends Plugin {
           }
 
           this.uppy.emit('upload-success', file, uploadResp)
-          socket.close()
+          queuedRequest.done()
+          if (this.uploaderEvents[file.id]) {
+            this.uploaderEvents[file.id].remove()
+            this.uploaderEvents[file.id] = null
+          }
           return resolve()
         })
 
@@ -397,8 +404,22 @@ module.exports = class XHRUpload extends Plugin {
             ? opts.getResponseError(resp.responseText, resp)
             : Object.assign(new Error(errData.error.message), { cause: errData.error })
           this.uppy.emit('upload-error', file, error)
+          queuedRequest.done()
+          if (this.uploaderEvents[file.id]) {
+            this.uploaderEvents[file.id].remove()
+            this.uploaderEvents[file.id] = null
+          }
           reject(error)
         })
+
+        const queuedRequest = this.requests.run(() => {
+          socket.open()
+          if (file.isPaused) {
+            socket.send('pause', {})
+          }
+
+          return () => socket.close()
+        })
       })
     })
   }
@@ -416,8 +437,9 @@ module.exports = class XHRUpload extends Plugin {
 
       const xhr = new XMLHttpRequest()
 
-      const timer = this.createProgressTimeout(this.opts.timeout, (error) => {
+      const timer = new ProgressTimeout(this.opts.timeout, () => {
         xhr.abort()
+        const error = new Error(this.i18n('timedOut', { seconds: Math.ceil(this.opts.timeout / 1000) }))
         emitError(error)
         reject(error)
       })
@@ -502,29 +524,48 @@ module.exports = class XHRUpload extends Plugin {
   }
 
   uploadFiles (files) {
-    const actions = files.map((file, i) => {
+    const promises = files.map((file, i) => {
       const current = parseInt(i, 10) + 1
       const total = files.length
 
       if (file.error) {
-        return () => Promise.reject(new Error(file.error))
+        return Promise.reject(new Error(file.error))
       } else if (file.isRemote) {
-        // We emit upload-started here, so that it's also emitted for files
-        // that have to wait due to the `limit` option.
-        this.uppy.emit('upload-started', file)
-        return this.uploadRemote.bind(this, file, current, total)
+        return this.uploadRemote(file, current, total)
       } else {
-        this.uppy.emit('upload-started', file)
-        return this.upload.bind(this, file, current, total)
+        return this.upload(file, current, total)
       }
     })
 
-    const promises = actions.map((action) => {
-      const limitedAction = this.limitUploads(action)
-      return limitedAction()
+    return settle(promises)
+  }
+
+  onFileRemove (fileID, cb) {
+    this.uploaderEvents[fileID].on('file-removed', (file) => {
+      if (fileID === file.id) cb(file.id)
     })
+  }
 
-    return settle(promises)
+  onRetry (fileID, cb) {
+    this.uploaderEvents[fileID].on('upload-retry', (targetFileID) => {
+      if (fileID === targetFileID) {
+        cb()
+      }
+    })
+  }
+
+  onRetryAll (fileID, cb) {
+    this.uploaderEvents[fileID].on('retry-all', (filesToRetry) => {
+      if (!this.uppy.getFile(fileID)) return
+      cb()
+    })
+  }
+
+  onCancelAll (fileID, cb) {
+    this.uploaderEvents[fileID].on('cancel-all', () => {
+      if (!this.uppy.getFile(fileID)) return
+      cb()
+    })
   }
 
   handleUpload (fileIDs) {

+ 37 - 0
test/endtoend/chaos-monkey/index.html

@@ -0,0 +1,37 @@
+<!DOCTYPE html>
+<html>
+  <head>
+    <meta charset="utf-8">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+    <title>Uppy test page</title>
+  </head>
+  <body>
+    <style>
+      #main { display: flex; } /* display side by side */
+    </style>
+    <main>
+      <h2>Chaos monkey</h2>
+      <div id="main">
+        <div id="dash"></div>
+        <div>
+          <ol id="log"></ol>
+        </div>
+      </div>
+    </main>
+
+    <link href="uppy.min.css" rel="stylesheet">
+    <script>
+      function addLogMessage (message) {
+        var log = document.querySelector('#log')
+        while (log.childNodes.length >= 15) {
+          log.removeChild(log.firstChild)
+          log.start = (parseInt(log.start, 10) || 0) + 1
+        }
+        var li = document.createElement('li')
+        li.appendChild(document.createTextNode(message))
+        log.appendChild(li)
+      }
+    </script>
+    <script src="bundle.js"></script>
+  </body>
+</html>

+ 39 - 0
test/endtoend/chaos-monkey/main.js

@@ -0,0 +1,39 @@
+require('es6-promise/auto')
+require('whatwg-fetch')
+const Uppy = require('@uppy/core')
+const Dashboard = require('@uppy/dashboard')
+const Tus = require('@uppy/tus')
+
+const isOnTravis = !!(process.env.TRAVIS && process.env.CI)
+const endpoint = isOnTravis ? 'http://companion.test:1081' : 'http://localhost:1081'
+
+let id = 0
+window.setup = function (options) {
+  id += 1
+
+  // Initialise Uppy with Drag & Drop
+  const uppy = Uppy({ id: `uppy${id}`, debug: true })
+
+  uppy.use(Dashboard, { inline: true, target: '#dash' })
+  uppy.use(Tus, {
+    endpoint: `${endpoint}/files/`,
+    limit: options.limit
+  })
+  uppy.on('file-added', (file) => {
+    randomColorImage(function (blob) {
+      uppy.setFileState(file.id, { preview: URL.createObjectURL(blob) })
+    })
+  })
+
+  return uppy
+}
+
+function randomColorImage (callback) {
+  const canvas = document.createElement('canvas')
+  canvas.width = 140
+  canvas.height = 140
+  const context = canvas.getContext('2d')
+  context.fillStyle = '#xxxxxx'.replace(/x/g, () => '0123456789ABCDEF'[Math.floor(Math.random() * 16)])
+  context.fillRect(0, 0, 140, 140)
+  canvas.toBlob(callback)
+}

+ 103 - 0
test/endtoend/chaos-monkey/test.js

@@ -0,0 +1,103 @@
+/* global browser, expect  */
+const crypto = require('crypto')
+const lorem = require('@jamen/lorem')
+const { selectFakeFile } = require('../utils')
+
+const testURL = 'http://localhost:4567/chaos-monkey'
+
+describe('Chaos monkey', function () {
+  this.timeout(5 * 60 * 1000) // 5 minutes
+
+  beforeEach(async () => {
+    await browser.url(testURL)
+  })
+
+  it('Add and cancel a bunch', async () => {
+    await browser.execute(function () {
+      window.currentUppy = window.setup({ limit: 3 })
+      window.onerror = function (message) {
+        window.anyError = message
+      }
+    })
+
+    const types = ['application/octet-stream', 'text/plain']
+    const generate = {
+      'application/octet-stream' () {
+        const len = Math.round(Math.random() * 5000000)
+        return crypto.randomBytes(len)
+      },
+      'text/plain' () {
+        const len = Math.round(Math.random() * 5000000)
+        return Buffer.from(lorem(len))
+      }
+    }
+
+    async function addFile () {
+      await browser.execute(function () {
+        window.addLogMessage('Adding a file')
+      })
+      const type = types[Math.floor(Math.random() * types.length)]
+      const data = generate[type]().toString('base64')
+
+      const name = `${Math.random().toString(32).slice(2)}-file`
+      await browser.execute(selectFakeFile, 'currentUppy', name, type, data)
+    }
+
+    function cancelFile () {
+      return browser.execute(function () {
+        window.addLogMessage('Cancelling a file')
+        // prefer deleting a file that is uploading right now
+        var selector = Math.random() <= 0.7
+          ? '.is-inprogress .uppy-DashboardItem-action--remove'
+          : '.uppy-DashboardItem-action--remove'
+        var buttons = document.querySelectorAll(selector)
+        var del = buttons[Math.floor(Math.random() * buttons.length)]
+        if (del) del.click()
+      })
+    }
+
+    function startUploadIfAnyWaitingFiles () {
+      return browser.execute(function () {
+        window.addLogMessage('Starting upload')
+        var start = document.querySelector('.uppy-StatusBar-actionBtn--upload')
+        if (start) start.click()
+      })
+    }
+
+    function cancelAll () {
+      return browser.execute(function () {
+        window.addLogMessage('Cancelling everything')
+        var button = document.querySelector('.uppy-DashboardContent-back')
+        if (button) button.click()
+      })
+    }
+
+    await addFile()
+    await addFile()
+    await addFile()
+
+    for (let i = 0; i < 300; i++) {
+      await browser.pause(50 + Math.floor(Math.random() * 300))
+      const v = Math.floor(Math.random() * 100)
+      if (v < 45) {
+        await addFile()
+      } else if (v < 55) {
+        await cancelFile()
+      } else if (v === 55) {
+        await cancelAll()
+      } else if (v < 75) {
+        await startUploadIfAnyWaitingFiles()
+      } else {
+        // wait
+      }
+    }
+
+    await cancelAll()
+
+    const errorMessage = await browser.execute(function () {
+      return window.anyError
+    })
+    // yikes chai, why can this not be a function call
+    expect(errorMessage).to.not.exist // eslint-disable-line no-unused-expressions
+  })
+})

+ 24 - 0
test/endtoend/utils.js

@@ -140,6 +140,9 @@ const tus = require('tus-node-server')
 const os = require('os')
 const rimraf = promisify(require('rimraf'))
 const { randomBytes } = require('crypto')
+const http = require('http')
+const httpProxy = require('http-proxy')
+const brake = require('brake')
 class TusService {
   constructor ({ tusServerPort = 1080 }) {
     this.port = tusServerPort
@@ -153,16 +156,37 @@ class TusService {
       directory: this.path
     })
 
+    const proxy = httpProxy.createProxyServer()
+    this.slowServer = http.createServer((req, res) => {
+      proxy.web(req, res, {
+        target: `http://localhost:1080`,
+        // 200 kbps max upload, checking the rate limit every 20ms
+        buffer: req.pipe(brake({
+          period: 20,
+          rate: 200 * 1024 / 50
+        }))
+      }, (err) => { // eslint-disable-line handle-callback-err
+        // ignore, typically a cancelled request
+      })
+    })
+
     const listen = promisify(this.tusServer.listen.bind(this.tusServer))
     this.server = await listen({ host: '0.0.0.0', port: this.port })
+    const listen2 = promisify(this.slowServer.listen.bind(this.slowServer))
+    await listen2(this.port + 1)
   }
 
   async onComplete () {
+    if (this.slowServer) {
+      const close = promisify(this.slowServer.close.bind(this.slowServer))
+      await close()
+    }
     if (this.server) {
       const close = promisify(this.server.close.bind(this.server))
       await close()
     }
     await rimraf(this.path)
+    this.slowServer = null
     this.tusServer = null
   }
 }

+ 4 - 3
test/endtoend/wdio.base.conf.js

@@ -89,15 +89,16 @@ exports.config = {
     [CompanionService],
     [StaticServerService, {
       folders: [
+        { mount: '/chaos-monkey', path: './test/endtoend/chaos-monkey/dist' },
+        { mount: '/create-react-app', path: './test/endtoend/create-react-app/build' },
         { mount: '/i18n-drag-drop', path: './test/endtoend/i18n-drag-drop/dist' },
-        { mount: '/tus-drag-drop', path: './test/endtoend/tus-drag-drop/dist' },
-        { mount: '/xhr-limit', path: './test/endtoend/xhr-limit/dist' },
         { mount: '/providers', path: './test/endtoend/providers/dist' },
         { mount: '/thumbnails', path: './test/endtoend/thumbnails/dist' },
         { mount: '/transloadit', path: './test/endtoend/transloadit/dist' },
+        { mount: '/tus-drag-drop', path: './test/endtoend/tus-drag-drop/dist' },
         { mount: '/typescript', path: './test/endtoend/typescript/dist' },
         { mount: '/url-plugin', path: './test/endtoend/url-plugin/dist' },
-        { mount: '/create-react-app', path: './test/endtoend/create-react-app/build' }
+        { mount: '/xhr-limit', path: './test/endtoend/xhr-limit/dist' }
       ]
     }],
     [TusService]

+ 1 - 0
test/endtoend/wdio.remote.conf.js

@@ -31,6 +31,7 @@ exports.config = {
 
   // Patterns to exclude.
   exclude: [
+    'test/endtoend/chaos-monkey/*',
     'test/endtoend/url-plugin/*',
     'test/endtoend/transloadit/*'
   ],

+ 2 - 0
tsconfig.json

@@ -6,6 +6,8 @@
       "dom",
       "esnext"
     ],
+    "resolveJsonModule": true,
+    "allowJs": true,
     "noImplicitAny": true,
     "noImplicitThis": true,
     "strictNullChecks": true,