Преглед изворни кода

queue socket token requests for remote files (#3797)

Queue socket token request and immediately open socket connection. This commit helps reducing the probability of the remote socket timing out, and ensures the number of requests sent to Companion at once doesn't exceed the `limit` option.
Merlijn Vos пре 2 година
родитељ
комит
df15292949

+ 36 - 29
packages/@uppy/aws-s3-multipart/src/index.js

@@ -20,6 +20,8 @@ function assertServerError (res) {
 export default class AwsS3Multipart extends BasePlugin {
   static VERSION = packageJson.version
 
+  #queueRequestSocketToken
+
   #client
 
   constructor (uppy, opts) {
@@ -50,6 +52,8 @@ export default class AwsS3Multipart extends BasePlugin {
     this.uploaders = Object.create(null)
     this.uploaderEvents = Object.create(null)
     this.uploaderSockets = Object.create(null)
+
+    this.#queueRequestSocketToken = this.requests.wrapPromiseFunction(this.#requestSocketToken)
   }
 
   [Symbol.for('uppy test: getClient')] () { return this.#client }
@@ -289,7 +293,26 @@ export default class AwsS3Multipart extends BasePlugin {
     })
   }
 
-  uploadRemote (file) {
+  #requestSocketToken = async (file) => {
+    const Client = file.remote.providerOptions.provider ? Provider : RequestClient
+    const client = new Client(this.uppy, file.remote.providerOptions)
+    const opts = { ...this.opts }
+
+    if (file.tus) {
+      // Install file-specific upload overrides.
+      Object.assign(opts, file.tus)
+    }
+
+    const res = await client.post(file.remote.url, {
+      ...file.remote.body,
+      protocol: 's3-multipart',
+      size: file.data.size,
+      metadata: file.meta,
+    })
+    return res.token
+  }
+
+  async uploadRemote (file) {
     this.resetUploaderReferences(file.id)
 
     // Don't double-emit upload-started for Golden Retriever-restored files that were already started
@@ -297,33 +320,18 @@ export default class AwsS3Multipart extends BasePlugin {
       this.uppy.emit('upload-started', file)
     }
 
-    if (file.serverToken) {
-      return this.connectToServerSocket(file)
-    }
-
-    return new Promise((resolve, reject) => {
-      const Client = file.remote.providerOptions.provider ? Provider : RequestClient
-      const client = new Client(this.uppy, file.remote.providerOptions)
-      client.post(
-        file.remote.url,
-        {
-          ...file.remote.body,
-          protocol: 's3-multipart',
-          size: file.data.size,
-          metadata: file.meta,
-        },
-      ).then((res) => {
-        this.uppy.setFileState(file.id, { serverToken: res.token })
-        // eslint-disable-next-line no-param-reassign
-        file = this.uppy.getFile(file.id)
+    try {
+      if (file.serverToken) {
         return this.connectToServerSocket(file)
-      }).then(() => {
-        resolve()
-      }).catch((err) => {
-        this.uppy.emit('upload-error', file, err)
-        reject(err)
-      })
-    })
+      }
+      const serverToken = await this.#queueRequestSocketToken(file)
+
+      this.uppy.setFileState(file.id, { serverToken })
+      return this.connectToServerSocket(this.uppy.getFile(file.id))
+    } catch (err) {
+      this.uppy.emit('upload-error', file, err)
+      throw err
+    }
   }
 
   connectToServerSocket (file) {
@@ -332,7 +340,7 @@ export default class AwsS3Multipart extends BasePlugin {
 
       const token = file.serverToken
       const host = getSocketHost(file.remote.companionUrl)
-      const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
+      const socket = new Socket({ target: `${host}/api/${token}` })
       this.uploaderSockets[file.id] = socket
       this.uploaderEvents[file.id] = new EventTracker(this.uppy)
 
@@ -422,7 +430,6 @@ export default class AwsS3Multipart extends BasePlugin {
       })
 
       queuedRequest = this.requests.run(() => {
-        socket.open()
         if (file.isPaused) {
           socket.send('pause', {})
         }

+ 39 - 13
packages/@uppy/aws-s3/src/MiniXHRUpload.js

@@ -53,6 +53,8 @@ function createFormDataUpload (file, opts) {
 const createBareUpload = file => file.data
 
 export default class MiniXHRUpload {
+  #queueRequestSocketToken
+
   constructor (uppy, opts) {
     this.uppy = uppy
     this.opts = {
@@ -65,6 +67,8 @@ export default class MiniXHRUpload {
     this.requests = opts[internalRateLimitedQueue]
     this.uploaderEvents = Object.create(null)
     this.i18n = opts.i18n
+
+    this.#queueRequestSocketToken = this.requests.wrapPromiseFunction(this.#requestSocketToken)
   }
 
   #getOptions (file) {
@@ -243,19 +247,21 @@ export default class MiniXHRUpload {
     })
   }
 
-  #uploadRemoteFile (file) {
+  #requestSocketToken = async (file) => {
     const opts = this.#getOptions(file)
-    // This is done in index.js in the S3 plugin.
-    // this.uppy.emit('upload-started', file)
-
+    const Client = file.remote.providerOptions.provider ? Provider : RequestClient
+    const client = new Client(this.uppy, file.remote.providerOptions)
     const metaFields = Array.isArray(opts.metaFields)
       ? opts.metaFields
-    // Send along all fields by default.
+      // Send along all fields by default.
       : Object.keys(file.meta)
 
-    const Client = file.remote.providerOptions.provider ? Provider : RequestClient
-    const client = new Client(this.uppy, file.remote.providerOptions)
-    return client.post(file.remote.url, {
+    if (file.tus) {
+      // Install file-specific upload overrides.
+      Object.assign(opts, file.tus)
+    }
+
+    const res = await client.post(file.remote.url, {
       ...file.remote.body,
       endpoint: opts.endpoint,
       size: file.data.size,
@@ -264,14 +270,34 @@ export default class MiniXHRUpload {
       httpMethod: opts.method,
       useFormData: opts.formData,
       headers: opts.headers,
-    }).then(res => new Promise((resolve, reject) => {
-      const { token } = res
+    })
+    return res.token
+  }
+
+  async #uploadRemoteFile (file) {
+    try {
+      if (file.serverToken) {
+        return this.connectToServerSocket(file)
+      }
+      const serverToken = await this.#queueRequestSocketToken(file)
+
+      this.uppy.setFileState(file.id, { serverToken })
+      return this.connectToServerSocket(this.uppy.getFile(file.id))
+    } catch (err) {
+      this.uppy.emit('upload-error', file, err)
+      throw err
+    }
+  }
+
+  connectToServerSocket (file) {
+    return new Promise((resolve, reject) => {
+      const opts = this.#getOptions(file)
+      const token = file.serverToken
       const host = getSocketHost(file.remote.companionUrl)
-      const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
+      const socket = new Socket({ target: `${host}/api/${token}` })
       this.uploaderEvents[file.id] = new EventTracker(this.uppy)
 
       const queuedRequest = this.requests.run(() => {
-        socket.open()
         if (file.isPaused) {
           socket.send('pause', {})
         }
@@ -341,6 +367,6 @@ export default class MiniXHRUpload {
     }).catch((err) => {
       this.uppy.emit('upload-error', file, err)
       return Promise.reject(err)
-    }))
+    })
   }
 }

+ 36 - 29
packages/@uppy/tus/src/index.js

@@ -57,6 +57,8 @@ export default class Tus extends BasePlugin {
 
   #retryDelayIterator
 
+  #queueRequestSocketToken
+
   /**
    * @param {Uppy} uppy
    * @param {TusOptions} opts
@@ -97,6 +99,7 @@ export default class Tus extends BasePlugin {
 
     this.handleResetProgress = this.handleResetProgress.bind(this)
     this.handleUpload = this.handleUpload.bind(this)
+    this.#queueRequestSocketToken = this.requests.wrapPromiseFunction(this.#requestSocketToken)
   }
 
   handleResetProgress () {
@@ -427,43 +430,48 @@ export default class Tus extends BasePlugin {
     })
   }
 
-  /**
-   * @param {UppyFile} file for use with upload
-   * @returns {Promise<void>}
-   */
-  async uploadRemote (file) {
-    this.resetUploaderReferences(file.id)
-
+  #requestSocketToken = async (file) => {
+    const Client = file.remote.providerOptions.provider ? Provider : RequestClient
+    const client = new Client(this.uppy, file.remote.providerOptions)
     const opts = { ...this.opts }
+
     if (file.tus) {
       // Install file-specific upload overrides.
       Object.assign(opts, file.tus)
     }
 
-    this.uppy.emit('upload-started', file)
-    this.uppy.log(file.remote.url)
+    const res = await client.post(file.remote.url, {
+      ...file.remote.body,
+      endpoint: opts.endpoint,
+      uploadUrl: opts.uploadUrl,
+      protocol: 'tus',
+      size: file.data.size,
+      headers: opts.headers,
+      metadata: file.meta,
+    })
+    return res.token
+  }
 
-    if (file.serverToken) {
-      await this.connectToServerSocket(file)
-      return
-    }
+  /**
+   * @param {UppyFile} file for use with upload
+   * @returns {Promise<void>}
+   */
+  async uploadRemote (file) {
+    this.resetUploaderReferences(file.id)
 
-    const Client = file.remote.providerOptions.provider ? Provider : RequestClient
-    const client = new Client(this.uppy, file.remote.providerOptions)
+    // Don't double-emit upload-started for Golden Retriever-restored files that were already started
+    if (!file.progress.uploadStarted || !file.isRestored) {
+      this.uppy.emit('upload-started', file)
+    }
 
     try {
-      // !! cancellation is NOT supported at this stage yet
-      const res = await client.post(file.remote.url, {
-        ...file.remote.body,
-        endpoint: opts.endpoint,
-        uploadUrl: opts.uploadUrl,
-        protocol: 'tus',
-        size: file.data.size,
-        headers: opts.headers,
-        metadata: file.meta,
-      })
-      this.uppy.setFileState(file.id, { serverToken: res.token })
-      await this.connectToServerSocket(this.uppy.getFile(file.id))
+      if (file.serverToken) {
+        return this.connectToServerSocket(file)
+      }
+      const serverToken = await this.#queueRequestSocketToken(file)
+
+      this.uppy.setFileState(file.id, { serverToken })
+      return this.connectToServerSocket(this.uppy.getFile(file.id))
     } catch (err) {
       this.uppy.emit('upload-error', file, err)
       throw err
@@ -482,7 +490,7 @@ export default class Tus extends BasePlugin {
     return new Promise((resolve, reject) => {
       const token = file.serverToken
       const host = getSocketHost(file.remote.companionUrl)
-      const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
+      const socket = new Socket({ target: `${host}/api/${token}` })
       this.uploaderSockets[file.id] = socket
       this.uploaderEvents[file.id] = new EventTracker(this.uppy)
 
@@ -591,7 +599,6 @@ export default class Tus extends BasePlugin {
       })
 
       queuedRequest = this.requests.run(() => {
-        socket.open()
         if (file.isPaused) {
           socket.send('pause', {})
         }