Ver código fonte

@uppy/tus: pause all requests in response to server rate limiting (#3394)

* @uppy/tus: pause all requests in response to server rate limiting

When the remote server responds with HTTP 429, all requests are
paused for a while in the hope that it can resolve the rate limiting.
Failed requests are also now queued up after the retry delay. Before
that, they were simply scheduled which would sometimes end up
overflowing the `limit` option.

* Address review comments

* fix requests bypassing queue pause state

* Auto rate limiting

* fix `RateLimitedQueue`
Antoine du Hamel 3 anos atrás
pai
commit
a08ec4e0f7

+ 67 - 22
packages/@uppy/tus/src/index.js

@@ -40,7 +40,7 @@ const tusDefaultOptions = {
   addRequestId: false,
   addRequestId: false,
 
 
   chunkSize: Infinity,
   chunkSize: Infinity,
-  retryDelays: [0, 1000, 3000, 5000],
+  retryDelays: [100, 1000, 3000, 5000],
   parallelUploads: 1,
   parallelUploads: 1,
   removeFingerprintOnSuccess: false,
   removeFingerprintOnSuccess: false,
   uploadLengthDeferred: false,
   uploadLengthDeferred: false,
@@ -51,8 +51,11 @@ const tusDefaultOptions = {
  * Tus resumable file uploader
  * Tus resumable file uploader
  */
  */
 module.exports = class Tus extends BasePlugin {
 module.exports = class Tus extends BasePlugin {
+  // eslint-disable-next-line global-require
   static VERSION = require('../package.json').version
   static VERSION = require('../package.json').version
 
 
+  #retryDelayIterator
+
   /**
   /**
    * @param {Uppy} uppy
    * @param {Uppy} uppy
    * @param {TusOptions} opts
    * @param {TusOptions} opts
@@ -66,8 +69,8 @@ module.exports = class Tus extends BasePlugin {
     // set default options
     // set default options
     const defaultOptions = {
     const defaultOptions = {
       useFastRemoteRetry: true,
       useFastRemoteRetry: true,
-      limit: 5,
-      retryDelays: [0, 1000, 3000, 5000],
+      limit: 20,
+      retryDelays: tusDefaultOptions.retryDelays,
       withCredentials: false,
       withCredentials: false,
     }
     }
 
 
@@ -85,6 +88,7 @@ module.exports = class Tus extends BasePlugin {
      * @type {RateLimitedQueue}
      * @type {RateLimitedQueue}
      */
      */
     this.requests = new RateLimitedQueue(this.opts.limit)
     this.requests = new RateLimitedQueue(this.opts.limit)
+    this.#retryDelayIterator = this.opts.retryDelays?.values()
 
 
     this.uploaders = Object.create(null)
     this.uploaders = Object.create(null)
     this.uploaderEvents = Object.create(null)
     this.uploaderEvents = Object.create(null)
@@ -178,6 +182,9 @@ module.exports = class Tus extends BasePlugin {
 
 
     // Create a new tus upload
     // Create a new tus upload
     return new Promise((resolve, reject) => {
     return new Promise((resolve, reject) => {
+      let queuedRequest
+      let qRequest
+
       this.uppy.emit('upload-started', file)
       this.uppy.emit('upload-started', file)
 
 
       const opts = {
       const opts = {
@@ -219,7 +226,7 @@ module.exports = class Tus extends BasePlugin {
         }
         }
 
 
         this.resetUploaderReferences(file.id)
         this.resetUploaderReferences(file.id)
-        queuedRequest.done()
+        queuedRequest.abort()
 
 
         this.uppy.emit('upload-error', file, err)
         this.uppy.emit('upload-error', file, err)
 
 
@@ -252,6 +259,46 @@ module.exports = class Tus extends BasePlugin {
         resolve(upload)
         resolve(upload)
       }
       }
 
 
+      uploadOptions.onShouldRetry = (err, retryAttempt, options) => {
+        const status = err?.originalResponse?.getStatus()
+        if (status === 429) {
+          // HTTP 429 Too Many Requests => to avoid the whole download to fail, pause all requests.
+          if (!this.requests.isPaused) {
+            const next = this.#retryDelayIterator?.next()
+            if (next == null || next.done) {
+              return false
+            }
+            this.requests.rateLimit(next.value)
+          }
+          queuedRequest.abort()
+          queuedRequest = this.requests.run(qRequest)
+        } else if (status > 400 && status < 500 && status !== 409) {
+          // HTTP 4xx, the server won't send anything, it's doesn't make sense to retry
+          return false
+        } else if (typeof navigator !== 'undefined' && navigator.onLine === false) {
+          // The navigator is offline, let's wait for it to come back online.
+          if (!this.requests.isPaused) {
+            this.requests.pause()
+            window.addEventListener('online', () => {
+              this.requests.resume()
+            }, { once: true })
+          }
+          queuedRequest.abort()
+          queuedRequest = this.requests.run(qRequest)
+        } else {
+          // For a non-4xx error, we can re-queue the request.
+          setTimeout(() => {
+            queuedRequest.abort()
+            queuedRequest = this.requests.run(qRequest)
+          }, options.retryDelays[retryAttempt])
+        }
+        // Aborting the timeout set by tus-js-client to not short-circuit the rate limiting.
+        // eslint-disable-next-line no-underscore-dangle
+        queueMicrotask(() => clearTimeout(queuedRequest._retryTimeout))
+        // We need to return true here so tus-js-client increments the retryAttempt and do not emit an error event.
+        return true
+      }
+
       const copyProp = (obj, srcProp, destProp) => {
       const copyProp = (obj, srcProp, destProp) => {
         if (hasProperty(obj, srcProp) && !hasProperty(obj, destProp)) {
         if (hasProperty(obj, srcProp) && !hasProperty(obj, destProp)) {
           obj[destProp] = obj[srcProp]
           obj[destProp] = obj[srcProp]
@@ -278,15 +325,7 @@ module.exports = class Tus extends BasePlugin {
       this.uploaders[file.id] = upload
       this.uploaders[file.id] = upload
       this.uploaderEvents[file.id] = new EventTracker(this.uppy)
       this.uploaderEvents[file.id] = new EventTracker(this.uppy)
 
 
-      upload.findPreviousUploads().then((previousUploads) => {
-        const previousUpload = previousUploads[0]
-        if (previousUpload) {
-          this.uppy.log(`[Tus] Resuming upload of ${file.id} started at ${previousUpload.creationTime}`)
-          upload.resumeFromPreviousUpload(previousUpload)
-        }
-      })
-
-      let queuedRequest = this.requests.run(() => {
+      qRequest = () => {
         if (!file.isPaused) {
         if (!file.isPaused) {
           upload.start()
           upload.start()
         }
         }
@@ -297,8 +336,18 @@ module.exports = class Tus extends BasePlugin {
         // Also, we need to remove the request from the queue _without_ destroying everything
         // Also, we need to remove the request from the queue _without_ destroying everything
         // related to this upload to handle pauses.
         // related to this upload to handle pauses.
         return () => {}
         return () => {}
+      }
+
+      upload.findPreviousUploads().then((previousUploads) => {
+        const previousUpload = previousUploads[0]
+        if (previousUpload) {
+          this.uppy.log(`[Tus] Resuming upload of ${file.id} started at ${previousUpload.creationTime}`)
+          upload.resumeFromPreviousUpload(previousUpload)
+        }
       })
       })
 
 
+      queuedRequest = this.requests.run(qRequest)
+
       this.onFileRemove(file.id, (targetFileID) => {
       this.onFileRemove(file.id, (targetFileID) => {
         queuedRequest.abort()
         queuedRequest.abort()
         this.resetUploaderReferences(file.id, { abort: !!upload.url })
         this.resetUploaderReferences(file.id, { abort: !!upload.url })
@@ -314,10 +363,7 @@ module.exports = class Tus extends BasePlugin {
           // Resuming an upload should be queued, else you could pause and then
           // Resuming an upload should be queued, else you could pause and then
           // resume a queued upload to make it skip the queue.
           // resume a queued upload to make it skip the queue.
           queuedRequest.abort()
           queuedRequest.abort()
-          queuedRequest = this.requests.run(() => {
-            upload.start()
-            return () => {}
-          })
+          queuedRequest = this.requests.run(qRequest)
         }
         }
       })
       })
 
 
@@ -337,10 +383,7 @@ module.exports = class Tus extends BasePlugin {
         if (file.error) {
         if (file.error) {
           upload.abort()
           upload.abort()
         }
         }
-        queuedRequest = this.requests.run(() => {
-          upload.start()
-          return () => {}
-        })
+        queuedRequest = this.requests.run(qRequest)
       })
       })
     }).catch((err) => {
     }).catch((err) => {
       this.uppy.emit('upload-error', file, err)
       this.uppy.emit('upload-error', file, err)
@@ -412,6 +455,8 @@ module.exports = class Tus extends BasePlugin {
       this.uploaderSockets[file.id] = socket
       this.uploaderSockets[file.id] = socket
       this.uploaderEvents[file.id] = new EventTracker(this.uppy)
       this.uploaderEvents[file.id] = new EventTracker(this.uppy)
 
 
+      let queuedRequest
+
       this.onFileRemove(file.id, () => {
       this.onFileRemove(file.id, () => {
         queuedRequest.abort()
         queuedRequest.abort()
         socket.send('cancel', {})
         socket.send('cancel', {})
@@ -512,7 +557,7 @@ module.exports = class Tus extends BasePlugin {
         resolve()
         resolve()
       })
       })
 
 
-      let queuedRequest = this.requests.run(() => {
+      queuedRequest = this.requests.run(() => {
         socket.open()
         socket.open()
         if (file.isPaused) {
         if (file.isPaused) {
           socket.send('pause', {})
           socket.send('pause', {})

+ 75 - 2
packages/@uppy/utils/src/RateLimitedQueue.js

@@ -7,6 +7,16 @@ class RateLimitedQueue {
 
 
   #queuedHandlers = []
   #queuedHandlers = []
 
 
+  #paused = false
+
+  #pauseTimer
+
+  #downLimit = 1
+
+  #upperLimit
+
+  #rateLimitingTimer
+
   constructor (limit) {
   constructor (limit) {
     if (typeof limit !== 'number' || limit === 0) {
     if (typeof limit !== 'number' || limit === 0) {
       this.limit = Infinity
       this.limit = Infinity
@@ -54,7 +64,7 @@ class RateLimitedQueue {
   }
   }
 
 
   #next () {
   #next () {
-    if (this.#activeRequests >= this.limit) {
+    if (this.#paused || this.#activeRequests >= this.limit) {
       return
       return
     }
     }
     if (this.#queuedHandlers.length === 0) {
     if (this.#queuedHandlers.length === 0) {
@@ -101,7 +111,7 @@ class RateLimitedQueue {
   }
   }
 
 
   run (fn, queueOptions) {
   run (fn, queueOptions) {
-    if (this.#activeRequests < this.limit) {
+    if (!this.#paused && this.#activeRequests < this.limit) {
       return this.#call(fn)
       return this.#call(fn)
     }
     }
     return this.#queue(fn, queueOptions)
     return this.#queue(fn, queueOptions)
@@ -149,6 +159,69 @@ class RateLimitedQueue {
       return outerPromise
       return outerPromise
     }
     }
   }
   }
+
+  resume () {
+    this.#paused = false
+    clearTimeout(this.#pauseTimer)
+    for (let i = 0; i < this.limit; i++) {
+      this.#queueNext()
+    }
+  }
+
+  #resume = () => this.resume()
+
+  /**
+   * Freezes the queue for a while or indefinitely.
+   *
+   * @param {number | null } [duration] Duration for the pause to happen, in milliseconds.
+   *                                    If omitted, the queue won't resume automatically.
+   */
+  pause (duration = null) {
+    this.#paused = true
+    clearTimeout(this.#pauseTimer)
+    if (duration != null) {
+      this.#pauseTimer = setTimeout(this.#resume, duration)
+    }
+  }
+
+  /**
+   * Pauses the queue for a duration, and lower the limit of concurrent requests
+   * when the queue resumes. When the queue resumes, it tries to progressively
+   * increase the limit in `this.#increaseLimit` until another call is made to
+   * `this.rateLimit`.
+   * Call this function when using the RateLimitedQueue for network requests and
+   * the remote server responds with 429 HTTP code.
+   *
+   * @param {number} duration in milliseconds.
+   */
+  rateLimit (duration) {
+    clearTimeout(this.#rateLimitingTimer)
+    this.pause(duration)
+    if (this.limit > 1 && Number.isFinite(this.limit)) {
+      this.#upperLimit = this.limit - 1
+      this.limit = this.#downLimit
+      this.#rateLimitingTimer = setTimeout(this.#increaseLimit, duration)
+    }
+  }
+
+  #increaseLimit = () => {
+    if (this.#paused) {
+      this.#rateLimitingTimer = setTimeout(this.#increaseLimit, 0)
+      return
+    }
+    this.#downLimit = this.limit
+    this.limit = Math.ceil((this.#upperLimit + this.#downLimit) / 2)
+    for (let i = this.#downLimit; i <= this.limit; i++) {
+      this.#queueNext()
+    }
+    if (this.#upperLimit - this.#downLimit > 3) {
+      this.#rateLimitingTimer = setTimeout(this.#increaseLimit, 2000)
+    } else {
+      this.#downLimit = Math.floor(this.#downLimit / 2)
+    }
+  }
+
+  get isPaused () { return this.#paused }
 }
 }
 
 
 module.exports = {
 module.exports = {