Browse Source

@uppy/transloadit: add rate limiting for assembly creation and status polling (#3718)

Share one instance of `RateLimitedQueue` between Tus plugin and
Transloadit plugin, ensure all requests to transloadit.com go through
the `RateLimitedQueue` to handle rate limiting response globally.
Antoine du Hamel 2 years ago
parent
commit
1413672f7c

+ 18 - 5
packages/@uppy/transloadit/src/Assembly.js

@@ -41,7 +41,13 @@ function isStatus (status, test) {
 }
 
 class TransloaditAssembly extends Emitter {
-  constructor (assembly) {
+  #rateLimitedQueue
+
+  #fetchWithNetworkError
+
+  #previousFetchStatusStillPending = false
+
+  constructor (assembly, rateLimitedQueue) {
     super()
 
     // The current assembly status.
@@ -52,6 +58,9 @@ class TransloaditAssembly extends Emitter {
     this.pollInterval = null
     // Whether this assembly has been closed (finished or errored)
     this.closed = false
+
+    this.#rateLimitedQueue = rateLimitedQueue
+    this.#fetchWithNetworkError = rateLimitedQueue.wrapPromiseFunction(fetchWithNetworkError)
   }
 
   connect () {
@@ -145,15 +154,19 @@ class TransloaditAssembly extends Emitter {
    * 'status'.
    */
   async #fetchStatus ({ diff = true } = {}) {
-    if (this.closed) return
+    if (this.closed || this.#rateLimitedQueue.isPaused || this.#previousFetchStatusStillPending) return
 
     try {
-      const response = await fetchWithNetworkError(this.status.assembly_ssl_url)
+      this.#previousFetchStatusStillPending = true
+      const response = await this.#fetchWithNetworkError(this.status.assembly_ssl_url)
+      this.#previousFetchStatusStillPending = false
 
       if (this.closed) return
 
-      // In case of rate-limiting, ignore the error.
-      if (response.status === 429) return
+      if (response.status === 429) {
+        this.#rateLimitedQueue.rateLimit(2_000)
+        return
+      }
 
       if (!response.ok) {
         this.#onError(new NetworkError(response.statusText))

+ 2 - 1
packages/@uppy/transloadit/src/Assembly.test.js

@@ -1,9 +1,10 @@
+const { RateLimitedQueue } = require('@uppy/utils/lib/RateLimitedQueue')
 const Assembly = require('./Assembly')
 
 describe('Transloadit/Assembly', () => {
   describe('status diffing', () => {
     function attemptDiff (prev, next) {
-      const assembly = new Assembly(prev)
+      const assembly = new Assembly(prev, new RateLimitedQueue())
       const events = []
       assembly.emit = jest.fn((name, ...args) => {
         events.push([name, ...args])

+ 45 - 44
packages/@uppy/transloadit/src/Client.js

@@ -2,56 +2,57 @@ const fetchWithNetworkError = require('@uppy/utils/lib/fetchWithNetworkError')
 
 const ASSEMBLIES_ENDPOINT = '/assemblies'
 
-function fetchJSON (...args) {
-  return fetchWithNetworkError(...args).then(response => {
-    if (response.status === 429) {
-      // If the server asks the client to rate limit, reschedule the request 2s later.
-      // TODO: there are several instances of rate limiting accross the code base, having one global one could be useful.
-      return new Promise((resolve, reject) => {
-        setTimeout(() => fetchJSON(...args).then(resolve, reject), 2_000)
-      })
-    }
-
-    if (!response.ok) {
-      const serverError = new Error(response.statusText)
-      serverError.statusCode = response.status
-
-      if (!`${args[0]}`.endsWith(ASSEMBLIES_ENDPOINT)) return Promise.reject(serverError)
-
-      // Failed assembly requests should return a more detailed error in JSON.
-      return response.json().then(assembly => {
-        if (!assembly.error) throw serverError
-
-        const error = new Error(assembly.error)
-        error.details = assembly.message
-        error.assembly = assembly
-        if (assembly.assembly_id) {
-          error.details += ` Assembly ID: ${assembly.assembly_id}`
-        }
-        throw error
-      }, err => {
-        // eslint-disable-next-line no-param-reassign
-        err.cause = serverError
-        throw err
-      })
-    }
-
-    return response.json()
-  })
-}
-
 /**
  * A Barebones HTTP API client for Transloadit.
  */
 module.exports = class Client {
   #headers = {}
 
+  #fetchWithNetworkError
+
   constructor (opts = {}) {
     this.opts = opts
 
     if (this.opts.client != null) {
       this.#headers['Transloadit-Client'] = this.opts.client
     }
+
+    this.#fetchWithNetworkError = this.opts.rateLimitedQueue.wrapPromiseFunction(fetchWithNetworkError)
+  }
+
+  #fetchJSON (...args) {
+    return this.#fetchWithNetworkError(...args).then(response => {
+      if (response.status === 429) {
+        this.opts.rateLimitedQueue.rateLimit(2_000)
+        return this.#fetchJSON(...args)
+      }
+
+      if (!response.ok) {
+        const serverError = new Error(response.statusText)
+        serverError.statusCode = response.status
+
+        if (!`${args[0]}`.endsWith(ASSEMBLIES_ENDPOINT)) return Promise.reject(serverError)
+
+        // Failed assembly requests should return a more detailed error in JSON.
+        return response.json().then(assembly => {
+          if (!assembly.error) throw serverError
+
+          const error = new Error(assembly.error)
+          error.details = assembly.message
+          error.assembly = assembly
+          if (assembly.assembly_id) {
+            error.details += ` Assembly ID: ${assembly.assembly_id}`
+          }
+          throw error
+        }, err => {
+          // eslint-disable-next-line no-param-reassign
+          err.cause = serverError
+          throw err
+        })
+      }
+
+      return response.json()
+    })
   }
 
   /**
@@ -83,7 +84,7 @@ module.exports = class Client {
     data.append('num_expected_upload_files', expectedFiles)
 
     const url = new URL(ASSEMBLIES_ENDPOINT, `${this.opts.service}`).href
-    return fetchJSON(url, {
+    return this.#fetchJSON(url, {
       method: 'post',
       headers: this.#headers,
       body: data,
@@ -100,7 +101,7 @@ module.exports = class Client {
   reserveFile (assembly, file) {
     const size = encodeURIComponent(file.size)
     const url = `${assembly.assembly_ssl_url}/reserve_file?size=${size}`
-    return fetchJSON(url, { method: 'post', headers: this.#headers })
+    return this.#fetchJSON(url, { method: 'post', headers: this.#headers })
       .catch((err) => this.#reportError(err, { assembly, file, url, type: 'API_ERROR' }))
   }
 
@@ -121,7 +122,7 @@ module.exports = class Client {
 
     const qs = `size=${size}&filename=${filename}&fieldname=${fieldname}&s3Url=${uploadUrl}`
     const url = `${assembly.assembly_ssl_url}/add_file?${qs}`
-    return fetchJSON(url, { method: 'post', headers: this.#headers })
+    return this.#fetchJSON(url, { method: 'post', headers: this.#headers })
       .catch((err) => this.#reportError(err, { assembly, file, url, type: 'API_ERROR' }))
   }
 
@@ -132,7 +133,7 @@ module.exports = class Client {
    */
   cancelAssembly (assembly) {
     const url = assembly.assembly_ssl_url
-    return fetchJSON(url, { method: 'delete', headers: this.#headers })
+    return this.#fetchJSON(url, { method: 'delete', headers: this.#headers })
       .catch((err) => this.#reportError(err, { url, type: 'API_ERROR' }))
   }
 
@@ -142,7 +143,7 @@ module.exports = class Client {
    * @param {string} url The status endpoint of the assembly.
    */
   getAssemblyStatus (url) {
-    return fetchJSON(url, { headers: this.#headers })
+    return this.#fetchJSON(url, { headers: this.#headers })
       .catch((err) => this.#reportError(err, { url, type: 'STATUS_ERROR' }))
   }
 
@@ -151,7 +152,7 @@ module.exports = class Client {
       ? `${err.message} (${err.details})`
       : err.message
 
-    return fetchJSON('https://transloaditstatus.com/client_error', {
+    return this.#fetchJSON('https://transloaditstatus.com/client_error', {
       method: 'post',
       body: JSON.stringify({
         endpoint,

+ 8 - 2
packages/@uppy/transloadit/src/index.js

@@ -1,5 +1,6 @@
 const hasProperty = require('@uppy/utils/lib/hasProperty')
 const ErrorWithCause = require('@uppy/utils/lib/ErrorWithCause')
+const { RateLimitedQueue } = require('@uppy/utils/lib/RateLimitedQueue')
 const BasePlugin = require('@uppy/core/lib/BasePlugin')
 const Tus = require('@uppy/tus')
 const Assembly = require('./Assembly')
@@ -35,6 +36,8 @@ const TL_COMPANION = /https?:\/\/api2(?:-\w+)?\.transloadit\.com\/companion/
 module.exports = class Transloadit extends BasePlugin {
   static VERSION = require('../package.json').version // eslint-disable-line global-require
 
+  #rateLimitedQueue
+
   constructor (uppy, opts) {
     super(uppy, opts)
     this.type = 'uploader'
@@ -59,6 +62,7 @@ module.exports = class Transloadit extends BasePlugin {
     }
 
     this.opts = { ...defaultOptions, ...opts }
+    this.#rateLimitedQueue = new RateLimitedQueue(this.opts.limit)
 
     this.i18nInit()
 
@@ -75,6 +79,7 @@ module.exports = class Transloadit extends BasePlugin {
       service: this.opts.service,
       client: this.#getClientVersion(),
       errorReporting: this.opts.errorReporting,
+      rateLimitedQueue: this.#rateLimitedQueue,
     })
     // Contains Assembly instances for in-progress Assemblies.
     this.activeAssemblies = {}
@@ -185,7 +190,7 @@ module.exports = class Transloadit extends BasePlugin {
       expectedFiles: fileIDs.length,
       signature: options.signature,
     }).then((newAssembly) => {
-      const assembly = new Assembly(newAssembly)
+      const assembly = new Assembly(newAssembly, this.#rateLimitedQueue)
       const { status } = assembly
       const assemblyID = status.assembly_id
 
@@ -492,7 +497,7 @@ module.exports = class Transloadit extends BasePlugin {
 
       const allAssemblyIDs = Object.keys(assemblies)
       allAssemblyIDs.forEach((id) => {
-        const assembly = new Assembly(assemblies[id])
+        const assembly = new Assembly(assemblies[id], this.#rateLimitedQueue)
         this.#connectAssembly(assembly)
       })
     }
@@ -762,6 +767,7 @@ module.exports = class Transloadit extends BasePlugin {
         metaFields: ['assembly_url', 'filename', 'fieldname'],
         // Pass the limit option to @uppy/tus
         limit: this.opts.limit,
+        rateLimitedQueue: this.#rateLimitedQueue,
         retryDelays: this.opts.retryDelays,
       })
     }

+ 1 - 1
packages/@uppy/tus/src/index.js

@@ -87,7 +87,7 @@ module.exports = class Tus extends BasePlugin {
      *
      * @type {RateLimitedQueue}
      */
-    this.requests = new RateLimitedQueue(this.opts.limit)
+    this.requests = this.opts.rateLimitedQueue ?? new RateLimitedQueue(this.opts.limit)
     this.#retryDelayIterator = this.opts.retryDelays?.values()
 
     this.uploaders = Object.create(null)