Jelajahi Sumber

Move remote file upload logic into companion-client (#4573)

Merlijn Vos 1 tahun lalu
induk
melakukan
ef613e6a9f

+ 28 - 200
packages/@uppy/aws-s3-multipart/src/index.js

@@ -1,8 +1,6 @@
-import UploaderPlugin from '@uppy/core/lib/UploaderPlugin.js'
-import { Socket, Provider, RequestClient } from '@uppy/companion-client'
+import BasePlugin from '@uppy/core/lib/BasePlugin.js'
+import { Provider, RequestClient } from '@uppy/companion-client'
 import EventManager from '@uppy/utils/lib/EventManager'
-import emitSocketProgress from '@uppy/utils/lib/emitSocketProgress'
-import getSocketHost from '@uppy/utils/lib/getSocketHost'
 import { RateLimitedQueue } from '@uppy/utils/lib/RateLimitedQueue'
 import { filterNonFailedFiles, filterFilesToEmitUploadStarted } from '@uppy/utils/lib/fileFilters'
 import { createAbortError } from '@uppy/utils/lib/AbortController'
@@ -374,7 +372,7 @@ class HTTPCommunicationQueue {
   }
 }
 
-export default class AwsS3Multipart extends UploaderPlugin {
+export default class AwsS3Multipart extends BasePlugin {
   static VERSION = packageJson.version
 
   #companionCommunicationQueue
@@ -435,8 +433,6 @@ export default class AwsS3Multipart extends UploaderPlugin {
     this.uploaders = Object.create(null)
     this.uploaderEvents = Object.create(null)
     this.uploaderSockets = Object.create(null)
-
-    this.setQueueRequestSocketToken(this.requests.wrapPromiseFunction(this.#requestSocketToken, { priority: -1 }))
   }
 
   [Symbol.for('uppy test: getClient')] () { return this.#client }
@@ -717,7 +713,7 @@ export default class AwsS3Multipart extends UploaderPlugin {
     return this.uppy.getFile(file.id) || file
   }
 
-  #uploadFile (file) {
+  #uploadLocalFile (file) {
     return new Promise((resolve, reject) => {
       const onProgress = (bytesUploaded, bytesTotal) => {
         this.uppy.emit('upload-progress', file, {
@@ -777,15 +773,16 @@ export default class AwsS3Multipart extends UploaderPlugin {
       })
 
       this.uploaders[file.id] = upload
-      this.uploaderEvents[file.id] = new EventManager(this.uppy)
+      const eventManager = new EventManager(this.uppy)
+      this.uploaderEvents[file.id] = eventManager
 
-      this.onFileRemove(file.id, (removed) => {
+      eventManager.onFileRemove(file.id, (removed) => {
         upload.abort()
         this.resetUploaderReferences(file.id, { abort: true })
         resolve(`upload ${removed.id} was removed`)
       })
 
-      this.onCancelAll(file.id, ({ reason } = {}) => {
+      eventManager.onCancelAll(file.id, ({ reason } = {}) => {
         if (reason === 'user') {
           upload.abort()
           this.resetUploaderReferences(file.id, { abort: true })
@@ -793,7 +790,7 @@ export default class AwsS3Multipart extends UploaderPlugin {
         resolve(`upload ${file.id} was canceled`)
       })
 
-      this.onFilePause(file.id, (isPaused) => {
+      eventManager.onFilePause(file.id, (isPaused) => {
         if (isPaused) {
           upload.pause()
         } else {
@@ -801,11 +798,11 @@ export default class AwsS3Multipart extends UploaderPlugin {
         }
       })
 
-      this.onPauseAll(file.id, () => {
+      eventManager.onPauseAll(file.id, () => {
         upload.pause()
       })
 
-      this.onResumeAll(file.id, () => {
+      eventManager.onResumeAll(file.id, () => {
         upload.start()
       })
 
@@ -813,155 +810,32 @@ export default class AwsS3Multipart extends UploaderPlugin {
     })
   }
 
-  #requestSocketToken = async (file, options) => {
-    const Client = file.remote.providerOptions.provider ? Provider : RequestClient
-    const client = new Client(this.uppy, file.remote.providerOptions)
-    const opts = { ...this.opts }
-
-    if (file.tus) {
-      // Install file-specific upload overrides.
-      Object.assign(opts, file.tus)
-    }
-
-    if (file.remote.url == null) {
-      throw new Error('Cannot connect to an undefined URL')
-    }
-
-    const res = await client.post(file.remote.url, {
+  // eslint-disable-next-line class-methods-use-this
+  #getCompanionClientArgs (file) {
+    return {
       ...file.remote.body,
       protocol: 's3-multipart',
       size: file.data.size,
       metadata: file.meta,
-    }, options)
-    return res.token
-  }
-
-  async connectToServerSocket (file) {
-    return new Promise((resolve, reject) => {
-      let queuedRequest
-
-      const token = file.serverToken
-      const host = getSocketHost(file.remote.companionUrl)
-      const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
-      this.uploaderSockets[file.id] = socket
-      this.uploaderEvents[file.id] = new EventManager(this.uppy)
-
-      this.onFileRemove(file.id, () => {
-        socket.send('cancel', {})
-        queuedRequest.abort()
-        this.resetUploaderReferences(file.id, { abort: true })
-        resolve(`upload ${file.id} was removed`)
-      })
-
-      this.onFilePause(file.id, (isPaused) => {
-        if (isPaused) {
-          // Remove this file from the queue so another file can start in its place.
-          socket.send('pause', {})
-          queuedRequest.abort()
-        } else {
-          // Resuming an upload should be queued, else you could pause and then
-          // resume a queued upload to make it skip the queue.
-          queuedRequest.abort()
-          queuedRequest = this.requests.run(() => {
-            socket.open()
-            socket.send('resume', {})
-            return () => {}
-          })
-        }
-      })
-
-      this.onPauseAll(file.id, () => {
-        // First send the message, then call .abort,
-        // just to make sure socket is not closed, which .abort used to do
-        socket.send('pause', {})
-        queuedRequest.abort()
-      })
-
-      this.onCancelAll(file.id, ({ reason } = {}) => {
-        if (reason === 'user') {
-          socket.send('cancel', {})
-          queuedRequest.abort()
-          this.resetUploaderReferences(file.id)
-        }
-        resolve(`upload ${file.id} was canceled`)
-      })
-
-      this.onResumeAll(file.id, () => {
-        queuedRequest.abort()
-        if (file.error) {
-          socket.send('pause', {})
-        }
-        queuedRequest = this.requests.run(() => {
-          socket.open()
-          socket.send('resume', {})
-
-          return () => {}
-        })
-      })
-
-      this.onRetry(file.id, () => {
-        // Only do the retry if the upload is actually in progress;
-        // else we could try to send these messages when the upload is still queued.
-        // We may need a better check for this since the socket may also be closed
-        // for other reasons, like network failures.
-        if (socket.isOpen) {
-          socket.send('pause', {})
-          socket.send('resume', {})
-        }
-      })
-
-      this.onRetryAll(file.id, () => {
-        if (socket.isOpen) {
-          socket.send('pause', {})
-          socket.send('resume', {})
-        }
-      })
-
-      socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
-
-      socket.on('error', (errData) => {
-        this.uppy.emit('upload-error', file, new Error(errData.error))
-        this.resetUploaderReferences(file.id)
-        socket.close()
-        queuedRequest.done()
-        reject(new Error(errData.error))
-      })
-
-      socket.on('success', (data) => {
-        const uploadResp = {
-          uploadURL: data.url,
-        }
-
-        this.uppy.emit('upload-success', file, uploadResp)
-        this.resetUploaderReferences(file.id)
-        socket.close()
-        queuedRequest.done()
-        resolve()
-      })
-
-      queuedRequest = this.requests.run(() => {
-        if (file.isPaused) {
-          socket.send('pause', {})
-        } else {
-          socket.open()
-        }
-
-        return () => {}
-      })
-    })
+    }
   }
 
   #upload = async (fileIDs) => {
     if (fileIDs.length === 0) return undefined
 
     const files = this.uppy.getFilesByIds(fileIDs)
-
     const filesFiltered = filterNonFailedFiles(files)
     const filesToEmit = filterFilesToEmitUploadStarted(filesFiltered)
+
     this.uppy.emit('upload-start', filesToEmit)
 
     const promises = filesFiltered.map((file) => {
       if (file.isRemote) {
+        // INFO: the url plugin needs to use RequestClient,
+        // while others use Provider
+        const Client = file.remote.providerOptions.provider ? Provider : RequestClient
+        const getQueue = () => this.requests
+        const client = new Client(this.uppy, file.remote.providerOptions, getQueue)
         this.#setResumableUploadsCapability(false)
         const controller = new AbortController()
 
@@ -970,8 +844,11 @@ export default class AwsS3Multipart extends UploaderPlugin {
         }
         this.uppy.on('file-removed', removedHandler)
 
-        this.resetUploaderReferences(file.id)
-        const uploadPromise = this.uploadRemoteFile(file, { signal: controller.signal })
+        const uploadPromise = client.uploadRemoteFile(
+          file,
+          this.#getCompanionClientArgs(file),
+          { signal: controller.signal },
+        )
 
         this.requests.wrapSyncFunction(() => {
           this.uppy.off('file-removed', removedHandler)
@@ -979,7 +856,8 @@ export default class AwsS3Multipart extends UploaderPlugin {
 
         return uploadPromise
       }
-      return this.#uploadFile(file)
+
+      return this.#uploadLocalFile(file)
     })
 
     const upload = await Promise.all(promises)
@@ -993,56 +871,6 @@ export default class AwsS3Multipart extends UploaderPlugin {
     this.#client.setCompanionHeaders(this.opts.companionHeaders)
   }
 
-  onFileRemove (fileID, cb) {
-    this.uploaderEvents[fileID].on('file-removed', (file) => {
-      if (fileID === file.id) cb(file.id)
-    })
-  }
-
-  onFilePause (fileID, cb) {
-    this.uploaderEvents[fileID].on('upload-pause', (targetFileID, isPaused) => {
-      if (fileID === targetFileID) {
-        cb(isPaused)
-      }
-    })
-  }
-
-  onRetry (fileID, cb) {
-    this.uploaderEvents[fileID].on('upload-retry', (targetFileID) => {
-      if (fileID === targetFileID) {
-        cb()
-      }
-    })
-  }
-
-  onRetryAll (fileID, cb) {
-    this.uploaderEvents[fileID].on('retry-all', () => {
-      if (!this.uppy.getFile(fileID)) return
-      cb()
-    })
-  }
-
-  onPauseAll (fileID, cb) {
-    this.uploaderEvents[fileID].on('pause-all', () => {
-      if (!this.uppy.getFile(fileID)) return
-      cb()
-    })
-  }
-
-  onCancelAll (fileID, eventHandler) {
-    this.uploaderEvents[fileID].on('cancel-all', (...args) => {
-      if (!this.uppy.getFile(fileID)) return
-      eventHandler(...args)
-    })
-  }
-
-  onResumeAll (fileID, cb) {
-    this.uploaderEvents[fileID].on('resume-all', () => {
-      if (!this.uppy.getFile(fileID)) return
-      cb()
-    })
-  }
-
   #setResumableUploadsCapability = (boolean) => {
     const { capabilities } = this.uppy.getState()
     this.uppy.setState({

+ 0 - 105
packages/@uppy/aws-s3/src/MiniXHRUpload.js

@@ -1,7 +1,4 @@
 import { nanoid } from 'nanoid/non-secure'
-import { Socket } from '@uppy/companion-client'
-import emitSocketProgress from '@uppy/utils/lib/emitSocketProgress'
-import getSocketHost from '@uppy/utils/lib/getSocketHost'
 import EventManager from '@uppy/utils/lib/EventManager'
 import ProgressTimeout from '@uppy/utils/lib/ProgressTimeout'
 import ErrorWithCause from '@uppy/utils/lib/ErrorWithCause'
@@ -235,106 +232,4 @@ export default class MiniXHRUpload {
       })
     })
   }
-
-  async connectToServerSocket (file) {
-    return new Promise((resolve, reject) => {
-      const opts = this.getOptions(file)
-      const token = file.serverToken
-      const host = getSocketHost(file.remote.companionUrl)
-      let socket
-
-      const createSocket = () => {
-        if (socket != null) return
-
-        socket = new Socket({ target: `${host}/api/${token}` })
-
-        socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
-
-        socket.on('success', (data) => {
-          const body = opts.getResponseData(data.response.responseText, data.response)
-          const uploadURL = body[opts.responseUrlFieldName]
-
-          const uploadResp = {
-            status: data.response.status,
-            body,
-            uploadURL,
-            bytesUploaded: data.bytesUploaded,
-          }
-
-          this.uppy.emit('upload-success', file, uploadResp)
-          queuedRequest.done() // eslint-disable-line no-use-before-define
-          socket.close()
-          if (this.uploaderEvents[file.id]) {
-            this.uploaderEvents[file.id].remove()
-            this.uploaderEvents[file.id] = null
-          }
-          return resolve()
-        })
-
-        socket.on('error', (errData) => {
-          const resp = errData.response
-          const error = resp
-            ? opts.getResponseError(resp.responseText, resp)
-            : new ErrorWithCause(errData.error.message, { cause: errData.error })
-          this.uppy.emit('upload-error', file, error)
-          queuedRequest.done() // eslint-disable-line no-use-before-define
-          if (this.uploaderEvents[file.id]) {
-            this.uploaderEvents[file.id].remove()
-            this.uploaderEvents[file.id] = null
-          }
-          reject(error)
-        })
-      }
-      this.uploaderEvents[file.id] = new EventManager(this.uppy)
-
-      let queuedRequest = this.requests.run(() => {
-        if (file.isPaused) {
-          socket?.send('pause', {})
-        } else {
-          createSocket()
-        }
-
-        return () => socket.close()
-      })
-
-      this.#addEventHandlerForFile('file-removed', file.id, () => {
-        socket?.send('cancel', {})
-        queuedRequest.abort()
-        resolve(`upload ${file.id} was removed`)
-      })
-
-      this.#addEventHandlerIfFileStillExists('cancel-all', file.id, ({ reason } = {}) => {
-        if (reason === 'user') {
-          socket?.send('cancel', {})
-          queuedRequest.abort()
-        }
-        resolve(`upload ${file.id} was canceled`)
-      })
-
-      const onRetryRequest = () => {
-        if (socket == null) {
-          queuedRequest.abort()
-        } else {
-          socket.send('pause', {})
-          queuedRequest.done()
-        }
-        queuedRequest = this.requests.run(() => {
-          if (!file.isPaused) {
-            if (socket == null) {
-              createSocket()
-            } else {
-              socket.send('resume', {})
-            }
-          }
-
-          return () => socket.close()
-        })
-      }
-      this.#addEventHandlerForFile('upload-retry', file.id, onRetryRequest)
-      this.#addEventHandlerIfFileStillExists('retry-all', file.id, onRetryRequest)
-    }).catch((err) => {
-      this.uppy.emit('upload-error', file, err)
-      return Promise.reject(err)
-    })
-  }
 }

+ 15 - 21
packages/@uppy/aws-s3/src/index.js

@@ -25,7 +25,7 @@
  * the XHRUpload code, but at least it's not horrifically broken :)
  */
 
-import UploaderPlugin from '@uppy/core/lib/UploaderPlugin.js'
+import BasePlugin from '@uppy/core/lib/BasePlugin.js'
 import AwsS3Multipart from '@uppy/aws-s3-multipart'
 import { RateLimitedQueue, internalRateLimitedQueue } from '@uppy/utils/lib/RateLimitedQueue'
 import { RequestClient, Provider } from '@uppy/companion-client'
@@ -103,7 +103,7 @@ function defaultGetResponseError (content, xhr) {
 let warnedSuccessActionStatus = false
 
 // TODO deprecate this, will use s3-multipart instead
-export default class AwsS3 extends UploaderPlugin {
+export default class AwsS3 extends BasePlugin {
   static VERSION = packageJson.version
 
   #client
@@ -144,8 +144,6 @@ export default class AwsS3 extends UploaderPlugin {
 
     this.#client = new RequestClient(uppy, opts)
     this.#requests = new RateLimitedQueue(this.opts.limit)
-
-    this.setQueueRequestSocketToken(this.#requests.wrapPromiseFunction(this.#requestSocketToken, { priority: -1 }))
   }
 
   [Symbol.for('uppy test: getClient')] () { return this.#client }
@@ -249,25 +247,13 @@ export default class AwsS3 extends UploaderPlugin {
     return Promise.resolve()
   }
 
-  connectToServerSocket (file) {
-    return this.#uploader.connectToServerSocket(file)
-  }
-
-  #requestSocketToken = async (file) => {
+  #getCompanionClientArgs = (file) => {
     const opts = this.#uploader.getOptions(file)
-    const Client = file.remote.providerOptions.provider ? Provider : RequestClient
-    const client = new Client(this.uppy, file.remote.providerOptions)
     const allowedMetaFields = Array.isArray(opts.allowedMetaFields)
       ? opts.allowedMetaFields
       // Send along all fields by default.
       : Object.keys(file.meta)
-
-    if (file.tus) {
-      // Install file-specific upload overrides.
-      Object.assign(opts, file.tus)
-    }
-
-    const res = await client.post(file.remote.url, {
+    return {
       ...file.remote.body,
       protocol: 'multipart',
       endpoint: opts.endpoint,
@@ -277,8 +263,7 @@ export default class AwsS3 extends UploaderPlugin {
       httpMethod: opts.method,
       useFormData: opts.formData,
       headers: typeof opts.headers === 'function' ? opts.headers(file) : opts.headers,
-    })
-    return res.token
+    }
   }
 
   uploadFile (id, current, total) {
@@ -288,6 +273,11 @@ export default class AwsS3 extends UploaderPlugin {
     if (file.error) throw new Error(file.error)
 
     if (file.isRemote) {
+      // INFO: the url plugin needs to use RequestClient,
+      // while others use Provider
+      const Client = file.remote.providerOptions.provider ? Provider : RequestClient
+      const getQueue = () => this.#requests
+      const client = new Client(this.uppy, file.remote.providerOptions, getQueue)
       const controller = new AbortController()
 
       const removedHandler = (removedFile) => {
@@ -295,7 +285,11 @@ export default class AwsS3 extends UploaderPlugin {
       }
       this.uppy.on('file-removed', removedHandler)
 
-      const uploadPromise = this.uploadRemoteFile(file, { signal: controller.signal })
+      const uploadPromise = client.uploadRemoteFile(
+        file,
+        this.#getCompanionClientArgs(file),
+        { signal: controller.signal },
+      )
 
       this.#requests.wrapSyncFunction(() => {
         this.uppy.off('file-removed', removedHandler)

+ 2 - 2
packages/@uppy/companion-client/src/Provider.js

@@ -30,8 +30,8 @@ function isOriginAllowed (origin, allowedOrigin) {
 export default class Provider extends RequestClient {
   #refreshingTokenPromise
 
-  constructor (uppy, opts) {
-    super(uppy, opts)
+  constructor (uppy, opts, getQueue) {
+    super(uppy, opts, getQueue)
     this.provider = opts.provider
     this.id = this.provider
     this.name = this.opts.name || getName(this.id)

+ 225 - 17
packages/@uppy/companion-client/src/RequestClient.js

@@ -2,7 +2,12 @@
 
 import fetchWithNetworkError from '@uppy/utils/lib/fetchWithNetworkError'
 import ErrorWithCause from '@uppy/utils/lib/ErrorWithCause'
+import emitSocketProgress from '@uppy/utils/lib/emitSocketProgress'
+import getSocketHost from '@uppy/utils/lib/getSocketHost'
+import EventManager from '@uppy/utils/lib/EventManager'
+
 import AuthError from './AuthError.js'
+import Socket from './Socket.js'
 
 import packageJson from '../package.json'
 
@@ -25,8 +30,12 @@ async function handleJSONResponse (res) {
   try {
     const errData = await jsonPromise
     errMsg = errData.message ? `${errMsg} message: ${errData.message}` : errMsg
-    errMsg = errData.requestId ? `${errMsg} request-Id: ${errData.requestId}` : errMsg
-  } catch { /* if the response contains invalid JSON, let's ignore the error */ }
+    errMsg = errData.requestId
+      ? `${errMsg} request-Id: ${errData.requestId}`
+      : errMsg
+  } catch {
+    /* if the response contains invalid JSON, let's ignore the error */
+  }
   throw new Error(errMsg)
 }
 
@@ -38,9 +47,10 @@ export default class RequestClient {
 
   #companionHeaders
 
-  constructor (uppy, opts) {
+  constructor (uppy, opts, getQueue) {
     this.uppy = uppy
     this.opts = opts
+    this.getQueue = getQueue
     this.onReceiveResponse = this.onReceiveResponse.bind(this)
     this.#companionHeaders = opts?.companionHeaders
   }
@@ -49,7 +59,9 @@ export default class RequestClient {
     this.#companionHeaders = headers
   }
 
-  [Symbol.for('uppy test: getCompanionHeaders')] () { return this.#companionHeaders }
+  [Symbol.for('uppy test: getCompanionHeaders')] () {
+    return this.#companionHeaders
+  }
 
   get hostname () {
     const { companion } = this.uppy.getState()
@@ -108,7 +120,11 @@ export default class RequestClient {
     const allowedHeadersCached = allowedHeadersCache.get(this.hostname)
     if (allowedHeadersCached != null) return allowedHeadersCached
 
-    const fallbackAllowedHeaders = ['accept', 'content-type', 'uppy-auth-token']
+    const fallbackAllowedHeaders = [
+      'accept',
+      'content-type',
+      'uppy-auth-token',
+    ]
 
     const promise = (async () => {
       try {
@@ -120,13 +136,20 @@ export default class RequestClient {
           return fallbackAllowedHeaders
         }
 
-        this.uppy.log(`[CompanionClient] adding allowed preflight headers to companion cache: ${this.hostname} ${header}`)
+        this.uppy.log(
+          `[CompanionClient] adding allowed preflight headers to companion cache: ${this.hostname} ${header}`,
+        )
 
-        const allowedHeaders = header.split(',').map((headerName) => headerName.trim().toLowerCase())
+        const allowedHeaders = header
+          .split(',')
+          .map((headerName) => headerName.trim().toLowerCase())
         allowedHeadersCache.set(this.hostname, allowedHeaders)
         return allowedHeaders
       } catch (err) {
-        this.uppy.log(`[CompanionClient] unable to make preflight request ${err}`, 'warning')
+        this.uppy.log(
+          `[CompanionClient] unable to make preflight request ${err}`,
+          'warning',
+        )
         // If the user gets a network error or similar, we should try preflight
         // again next time, or else we might get incorrect behaviour.
         allowedHeadersCache.delete(this.hostname) // re-fetch next time
@@ -139,15 +162,22 @@ export default class RequestClient {
   }
 
   async preflightAndHeaders (path) {
-    const [allowedHeaders, headers] = await Promise.all([this.preflight(path), this.headers()])
+    const [allowedHeaders, headers] = await Promise.all([
+      this.preflight(path),
+      this.headers(),
+    ])
     // filter to keep only allowed Headers
-    return Object.fromEntries(Object.entries(headers).filter(([header]) => {
-      if (!allowedHeaders.includes(header.toLowerCase())) {
-        this.uppy.log(`[CompanionClient] excluding disallowed header ${header}`)
-        return false
-      }
-      return true
-    }))
+    return Object.fromEntries(
+      Object.entries(headers).filter(([header]) => {
+        if (!allowedHeaders.includes(header.toLowerCase())) {
+          this.uppy.log(
+            `[CompanionClient] excluding disallowed header ${header}`,
+          )
+          return false
+        }
+        return true
+      }),
+    )
   }
 
   /** @protected */
@@ -165,7 +195,9 @@ export default class RequestClient {
       return handleJSONResponse(response)
     } catch (err) {
       if (err?.isAuthError) throw err
-      throw new ErrorWithCause(`Could not ${method} ${this.#getUrl(path)}`, { cause: err })
+      throw new ErrorWithCause(`Could not ${method} ${this.#getUrl(path)}`, {
+        cause: err,
+      })
     }
   }
 
@@ -189,4 +221,180 @@ export default class RequestClient {
     if (typeof options === 'boolean') options = { skipPostResponse: options }
     return this.request({ ...options, path, method: 'DELETE', data })
   }
+
+  async uploadRemoteFile (file, reqBody, options = {}) {
+    try {
+      if (file.serverToken) {
+        return await this.connectToServerSocket(file, this.getQueue())
+      }
+      const queueRequestSocketToken = this.getQueue().wrapPromiseFunction(
+        this.#requestSocketToken,
+        { priority: -1 },
+      )
+      const serverToken = await queueRequestSocketToken(file, reqBody).abortOn(
+        options.signal,
+      )
+
+      if (!this.uppy.getState().files[file.id]) return undefined
+
+      this.uppy.setFileState(file.id, { serverToken })
+      return await this.connectToServerSocket(
+        this.uppy.getFile(file.id),
+        this.getQueue(),
+      )
+    } catch (err) {
+      if (err?.cause?.name === 'AbortError') {
+        // The file upload was aborted, it’s not an error
+        return undefined
+      }
+
+      this.uppy.setFileState(file.id, { serverToken: undefined })
+      this.uppy.emit('upload-error', file, err)
+      throw err
+    }
+  }
+
+  #requestSocketToken = async (file, postBody) => {
+    if (file.remote.url == null) {
+      throw new Error('Cannot connect to an undefined URL')
+    }
+
+    const res = await this.post(file.remote.url, {
+      ...file.remote.body,
+      ...postBody,
+    })
+
+    return res.token
+  }
+
+  /**
+   * @param {UppyFile} file
+   */
+  async connectToServerSocket (file, queue) {
+    return new Promise((resolve, reject) => {
+      const token = file.serverToken
+      const host = getSocketHost(file.remote.companionUrl)
+      const socket = new Socket({
+        target: `${host}/api/${token}`,
+        autoOpen: false,
+      })
+      const eventManager = new EventManager(this.uppy)
+
+      let queuedRequest
+
+      eventManager.onFileRemove(file.id, () => {
+        socket.send('cancel', {})
+        queuedRequest.abort()
+        resolve(`upload ${file.id} was removed`)
+      })
+
+      eventManager.onPause(file.id, (isPaused) => {
+        if (isPaused) {
+          // Remove this file from the queue so another file can start in its place.
+          socket.send('pause', {})
+          queuedRequest.abort()
+        } else {
+          // Resuming an upload should be queued, else you could pause and then
+          // resume a queued upload to make it skip the queue.
+          queuedRequest.abort()
+          queuedRequest = queue.run(() => {
+            socket.open()
+            socket.send('resume', {})
+
+            return () => {}
+          })
+        }
+      })
+
+      eventManager.onPauseAll(file.id, () => {
+        socket.send('pause', {})
+        queuedRequest.abort()
+      })
+
+      eventManager.onCancelAll(file.id, ({ reason } = {}) => {
+        if (reason === 'user') {
+          socket.send('cancel', {})
+          queuedRequest.abort()
+        }
+        resolve(`upload ${file.id} was canceled`)
+      })
+
+      eventManager.onResumeAll(file.id, () => {
+        queuedRequest.abort()
+        if (file.error) {
+          socket.send('pause', {})
+        }
+        queuedRequest = queue.run(() => {
+          socket.open()
+          socket.send('resume', {})
+
+          return () => {}
+        })
+      })
+
+      eventManager.onRetry(file.id, () => {
+        // Only do the retry if the upload is actually in progress;
+        // else we could try to send these messages when the upload is still queued.
+        // We may need a better check for this since the socket may also be closed
+        // for other reasons, like network failures.
+        if (socket.isOpen) {
+          socket.send('pause', {})
+          socket.send('resume', {})
+        }
+      })
+
+      eventManager.onRetryAll(file.id, () => {
+        // See the comment in the onRetry() call
+        if (socket.isOpen) {
+          socket.send('pause', {})
+          socket.send('resume', {})
+        }
+      })
+
+      socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
+
+      socket.on('error', (errData) => {
+        const { message } = errData.error
+        const error = Object.assign(new Error(message), {
+          cause: errData.error,
+        })
+
+        // If the remote retry optimisation should not be used,
+        // close the socket—this will tell companion to clear state and delete the file.
+        if (!this.opts.useFastRemoteRetry) {
+          // Remove the serverToken so that a new one will be created for the retry.
+          this.uppy.setFileState(file.id, {
+            serverToken: null,
+          })
+        } else {
+          socket.close()
+        }
+
+        this.uppy.emit('upload-error', file, error)
+        queuedRequest.done()
+        reject(error)
+      })
+
+      socket.on('success', (data) => {
+        const uploadResp = {
+          uploadURL: data.url,
+        }
+
+        this.uppy.emit('upload-success', file, uploadResp)
+        queuedRequest.done()
+        socket.close()
+        resolve()
+      })
+
+      queuedRequest = queue.run(() => {
+        if (file.isPaused) {
+          socket.send('pause', {})
+        } else {
+          socket.open()
+        }
+
+        return () => {}
+      })
+    })
+  }
 }

+ 0 - 34
packages/@uppy/core/src/UploaderPlugin.js

@@ -1,34 +0,0 @@
-import BasePlugin from './BasePlugin.js'
-
-export default class UploaderPlugin extends BasePlugin {
-  #queueRequestSocketToken
-
-  /** @protected */
-  setQueueRequestSocketToken (fn) {
-    this.#queueRequestSocketToken = fn
-  }
-
-  async uploadRemoteFile (file, options = {}) {
-    // TODO: we could rewrite this to use server-sent events instead of creating WebSockets.
-    try {
-      if (file.serverToken) {
-        return await this.connectToServerSocket(file)
-      }
-      const serverToken = await this.#queueRequestSocketToken(file).abortOn(options.signal)
-
-      if (!this.uppy.getState().files[file.id]) return undefined
-
-      this.uppy.setFileState(file.id, { serverToken })
-      return await this.connectToServerSocket(this.uppy.getFile(file.id))
-    } catch (err) {
-      if (err?.cause?.name === 'AbortError') {
-        // The file upload was aborted, it’s not an error
-        return undefined
-      }
-
-      this.uppy.setFileState(file.id, { serverToken: undefined })
-      this.uppy.emit('upload-error', file, err)
-      throw err
-    }
-  }
-}

+ 38 - 260
packages/@uppy/tus/src/index.js

@@ -1,8 +1,6 @@
-import UploaderPlugin from '@uppy/core/lib/UploaderPlugin.js'
+import BasePlugin from '@uppy/core/lib/BasePlugin.js'
 import * as tus from 'tus-js-client'
-import { Provider, RequestClient, Socket } from '@uppy/companion-client'
-import emitSocketProgress from '@uppy/utils/lib/emitSocketProgress'
-import getSocketHost from '@uppy/utils/lib/getSocketHost'
+import { Provider, RequestClient } from '@uppy/companion-client'
 import EventManager from '@uppy/utils/lib/EventManager'
 import NetworkError from '@uppy/utils/lib/NetworkError'
 import isNetworkError from '@uppy/utils/lib/isNetworkError'
@@ -52,7 +50,7 @@ const tusDefaultOptions = {
 /**
  * Tus resumable file uploader
  */
-export default class Tus extends UploaderPlugin {
+export default class Tus extends BasePlugin {
   static VERSION = packageJson.version
 
   #retryDelayIterator
@@ -97,10 +95,8 @@ export default class Tus extends UploaderPlugin {
 
     this.uploaders = Object.create(null)
     this.uploaderEvents = Object.create(null)
-    this.uploaderSockets = Object.create(null)
 
     this.handleResetProgress = this.handleResetProgress.bind(this)
-    this.setQueueRequestSocketToken(this.requests.wrapPromiseFunction(this.#requestSocketToken, { priority: -1 }))
   }
 
   handleResetProgress () {
@@ -139,10 +135,6 @@ export default class Tus extends UploaderPlugin {
       this.uploaderEvents[fileID].remove()
       this.uploaderEvents[fileID] = null
     }
-    if (this.uploaderSockets[fileID]) {
-      this.uploaderSockets[fileID].close()
-      this.uploaderSockets[fileID] = null
-    }
   }
 
   /**
@@ -180,7 +172,7 @@ export default class Tus extends UploaderPlugin {
    * @param {UppyFile} file for use with upload
    * @returns {Promise<void>}
    */
-  #upload (file) {
+  #uploadLocalFile (file) {
     this.resetUploaderReferences(file.id)
 
     // Create a new tus upload
@@ -361,7 +353,8 @@ export default class Tus extends UploaderPlugin {
 
       upload = new tus.Upload(file.data, uploadOptions)
       this.uploaders[file.id] = upload
-      this.uploaderEvents[file.id] = new EventManager(this.uppy)
+      const eventManager = new EventManager(this.uppy)
+      this.uploaderEvents[file.id] = eventManager
 
       // eslint-disable-next-line prefer-const
       qRequest = () => {
@@ -387,13 +380,13 @@ export default class Tus extends UploaderPlugin {
 
       queuedRequest = this.requests.run(qRequest)
 
-      this.onFileRemove(file.id, (targetFileID) => {
+      eventManager.onFileRemove(file.id, (targetFileID) => {
         queuedRequest.abort()
         this.resetUploaderReferences(file.id, { abort: !!upload.url })
         resolve(`upload ${targetFileID} was removed`)
       })
 
-      this.onPause(file.id, (isPaused) => {
+      eventManager.onPause(file.id, (isPaused) => {
         queuedRequest.abort()
         if (isPaused) {
           // Remove this file from the queue so another file can start in its place.
@@ -405,12 +398,12 @@ export default class Tus extends UploaderPlugin {
         }
       })
 
-      this.onPauseAll(file.id, () => {
+      eventManager.onPauseAll(file.id, () => {
         queuedRequest.abort()
         upload.abort()
       })
 
-      this.onCancelAll(file.id, ({ reason } = {}) => {
+      eventManager.onCancelAll(file.id, ({ reason } = {}) => {
         if (reason === 'user') {
           queuedRequest.abort()
           this.resetUploaderReferences(file.id, { abort: !!upload.url })
@@ -418,7 +411,7 @@ export default class Tus extends UploaderPlugin {
         resolve(`upload ${file.id} was canceled`)
       })
 
-      this.onResumeAll(file.id, () => {
+      eventManager.onResumeAll(file.id, () => {
         queuedRequest.abort()
         if (file.error) {
           upload.abort()
@@ -431,170 +424,6 @@ export default class Tus extends UploaderPlugin {
     })
   }
 
-  #requestSocketToken = async (file, options) => {
-    const Client = file.remote.providerOptions.provider ? Provider : RequestClient
-    const client = new Client(this.uppy, file.remote.providerOptions)
-    const opts = { ...this.opts }
-
-    if (file.tus) {
-      // Install file-specific upload overrides.
-      Object.assign(opts, file.tus)
-    }
-
-    const res = await client.post(file.remote.url, {
-      ...file.remote.body,
-      endpoint: opts.endpoint,
-      uploadUrl: opts.uploadUrl,
-      protocol: 'tus',
-      size: file.data.size,
-      headers: (typeof opts.headers === 'function') ? opts.headers(file) : opts.headers,
-      metadata: file.meta,
-    }, options)
-    return res.token
-  }
-
-  /**
-   * See the comment on the upload() method.
-   *
-   * Additionally, when an upload is removed, completed, or cancelled, we need to close the WebSocket connection. This is
-   * handled by the resetUploaderReferences() function, so the same guidelines apply as in upload().
-   *
-   * @param {UppyFile} file
-   */
-  async connectToServerSocket (file) {
-    return new Promise((resolve, reject) => {
-      const token = file.serverToken
-      const host = getSocketHost(file.remote.companionUrl)
-      const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
-      this.uploaderSockets[file.id] = socket
-      this.uploaderEvents[file.id] = new EventManager(this.uppy)
-
-      let queuedRequest
-
-      this.onFileRemove(file.id, () => {
-        socket.send('cancel', {})
-        queuedRequest.abort()
-        this.resetUploaderReferences(file.id)
-        resolve(`upload ${file.id} was removed`)
-      })
-
-      this.onPause(file.id, (isPaused) => {
-        if (isPaused) {
-          // Remove this file from the queue so another file can start in its place.
-          socket.send('pause', {})
-          queuedRequest.abort()
-        } else {
-          // Resuming an upload should be queued, else you could pause and then
-          // resume a queued upload to make it skip the queue.
-          queuedRequest.abort()
-          queuedRequest = this.requests.run(() => {
-            socket.open()
-            socket.send('resume', {})
-
-            return () => {}
-          })
-        }
-      })
-
-      this.onPauseAll(file.id, () => {
-        socket.send('pause', {})
-        queuedRequest.abort()
-      })
-
-      this.onCancelAll(file.id, ({ reason } = {}) => {
-        if (reason === 'user') {
-          socket.send('cancel', {})
-          queuedRequest.abort()
-          this.resetUploaderReferences(file.id)
-        }
-        resolve(`upload ${file.id} was canceled`)
-      })
-
-      this.onResumeAll(file.id, () => {
-        queuedRequest.abort()
-        if (file.error) {
-          socket.send('pause', {})
-        }
-        queuedRequest = this.requests.run(() => {
-          socket.open()
-          socket.send('resume', {})
-
-          return () => {}
-        })
-      })
-
-      this.onRetry(file.id, () => {
-        // Only do the retry if the upload is actually in progress;
-        // else we could try to send these messages when the upload is still queued.
-        // We may need a better check for this since the socket may also be closed
-        // for other reasons, like network failures.
-        if (socket.isOpen) {
-          socket.send('pause', {})
-          socket.send('resume', {})
-        }
-      })
-
-      this.onRetryAll(file.id, () => {
-        // See the comment in the onRetry() call
-        if (socket.isOpen) {
-          socket.send('pause', {})
-          socket.send('resume', {})
-        }
-      })
-
-      socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
-
-      socket.on('error', (errData) => {
-        const { message } = errData.error
-        const error = Object.assign(new Error(message), { cause: errData.error })
-
-        // If the remote retry optimisation should not be used,
-        // close the socket—this will tell companion to clear state and delete the file.
-        if (!this.opts.useFastRemoteRetry) {
-          this.resetUploaderReferences(file.id)
-          // Remove the serverToken so that a new one will be created for the retry.
-          this.uppy.setFileState(file.id, {
-            serverToken: null,
-          })
-        } else {
-          socket.close()
-        }
-
-        this.uppy.emit('upload-error', file, error)
-        queuedRequest.done()
-        reject(error)
-      })
-
-      socket.on('success', (data) => {
-        const uploadResp = {
-          uploadURL: data.url,
-        }
-
-        this.uppy.emit('upload-success', file, uploadResp)
-        this.resetUploaderReferences(file.id)
-        queuedRequest.done()
-        socket.close()
-        resolve()
-      })
-
-      queuedRequest = this.requests.run(() => {
-        if (file.isPaused) {
-          socket.send('pause', {})
-        } else {
-          socket.open()
-        }
-
-        // Just close the socket here, the caller will take care of cancelling the upload itself
-        // using resetUploaderReferences(). This is because resetUploaderReferences() has to be
-        // called when this request is still in the queue, and has not been started yet, too. At
-        // that point this cancellation function is not going to be called.
-        // Also, we need to remove the request from the queue _without_ destroying everything
-        // related to this upload to handle pauses.
-        return () => {}
-      })
-    })
-  }
-
   /**
    * Store the uploadUrl on the file options, so that when Golden Retriever
    * restores state, we will continue uploading to the correct URL.
@@ -614,83 +443,23 @@ export default class Tus extends UploaderPlugin {
     }
   }
 
-  /**
-   * @param {string} fileID
-   * @param {function(string): void} cb
-   */
-  onFileRemove (fileID, cb) {
-    this.uploaderEvents[fileID].on('file-removed', (file) => {
-      if (fileID === file.id) cb(file.id)
-    })
-  }
-
-  /**
-   * @param {string} fileID
-   * @param {function(boolean): void} cb
-   */
-  onPause (fileID, cb) {
-    this.uploaderEvents[fileID].on('upload-pause', (targetFileID, isPaused) => {
-      if (fileID === targetFileID) {
-        // const isPaused = this.uppy.pauseResume(fileID)
-        cb(isPaused)
-      }
-    })
-  }
-
-  /**
-   * @param {string} fileID
-   * @param {function(): void} cb
-   */
-  onRetry (fileID, cb) {
-    this.uploaderEvents[fileID].on('upload-retry', (targetFileID) => {
-      if (fileID === targetFileID) {
-        cb()
-      }
-    })
-  }
-
-  /**
-   * @param {string} fileID
-   * @param {function(): void} cb
-   */
-  onRetryAll (fileID, cb) {
-    this.uploaderEvents[fileID].on('retry-all', () => {
-      if (!this.uppy.getFile(fileID)) return
-      cb()
-    })
-  }
-
-  /**
-   * @param {string} fileID
-   * @param {function(): void} cb
-   */
-  onPauseAll (fileID, cb) {
-    this.uploaderEvents[fileID].on('pause-all', () => {
-      if (!this.uppy.getFile(fileID)) return
-      cb()
-    })
-  }
+  #getCompanionClientArgs (file) {
+    const opts = { ...this.opts }
 
-  /**
-   * @param {string} fileID
-   * @param {function(): void} eventHandler
-   */
-  onCancelAll (fileID, eventHandler) {
-    this.uploaderEvents[fileID].on('cancel-all', (...args) => {
-      if (!this.uppy.getFile(fileID)) return
-      eventHandler(...args)
-    })
-  }
+    if (file.tus) {
+      // Install file-specific upload overrides.
+      Object.assign(opts, file.tus)
+    }
 
-  /**
-   * @param {string} fileID
-   * @param {function(): void} cb
-   */
-  onResumeAll (fileID, cb) {
-    this.uploaderEvents[fileID].on('resume-all', () => {
-      if (!this.uppy.getFile(fileID)) return
-      cb()
-    })
+    return {
+      ...file.remote.body,
+      endpoint: opts.endpoint,
+      uploadUrl: opts.uploadUrl,
+      protocol: 'tus',
+      size: file.data.size,
+      headers: opts.headers,
+      metadata: file.meta,
+    }
   }
 
   /**
@@ -706,6 +475,11 @@ export default class Tus extends UploaderPlugin {
       const total = files.length
 
       if (file.isRemote) {
+        // INFO: the url plugin needs to use RequestClient,
+        // while others use Provider
+        const Client = file.remote.providerOptions.provider ? Provider : RequestClient
+        const getQueue = () => this.requests
+        const client = new Client(this.uppy, file.remote.providerOptions, getQueue)
         const controller = new AbortController()
 
         const removedHandler = (removedFile) => {
@@ -713,8 +487,11 @@ export default class Tus extends UploaderPlugin {
         }
         this.uppy.on('file-removed', removedHandler)
 
-        this.resetUploaderReferences(file.id)
-        const uploadPromise = this.uploadRemoteFile(file, { signal: controller.signal })
+        const uploadPromise = client.uploadRemoteFile(
+          file,
+          this.#getCompanionClientArgs(file),
+          { signal: controller.signal },
+        )
 
         this.requests.wrapSyncFunction(() => {
           this.uppy.off('file-removed', removedHandler)
@@ -722,7 +499,8 @@ export default class Tus extends UploaderPlugin {
 
         return uploadPromise
       }
-      return this.#upload(file, current, total)
+
+      return this.#uploadLocalFile(file, current, total)
     }))
   }
 

+ 64 - 5
packages/@uppy/utils/src/EventManager.js

@@ -3,22 +3,81 @@
  * all events that were added using the wrapped emitter.
  */
 export default class EventManager {
-  #emitter
+  #uppy
 
   #events = []
 
-  constructor (emitter) {
-    this.#emitter = emitter
+  constructor (uppy) {
+    this.#uppy = uppy
   }
 
   on (event, fn) {
     this.#events.push([event, fn])
-    return this.#emitter.on(event, fn)
+    return this.#uppy.on(event, fn)
   }
 
   remove () {
     for (const [event, fn] of this.#events.splice(0)) {
-      this.#emitter.off(event, fn)
+      this.#uppy.off(event, fn)
     }
   }
+
+  onFilePause (fileID, cb) {
+    this.on('upload-pause', (targetFileID, isPaused) => {
+      if (fileID === targetFileID) {
+        cb(isPaused)
+      }
+    })
+  }
+
+  onFileRemove (fileID, cb) {
+    this.on('file-removed', (file) => {
+      if (fileID === file.id) cb(file.id)
+    })
+  }
+
+  onPause (fileID, cb) {
+    this.on('upload-pause', (targetFileID, isPaused) => {
+      if (fileID === targetFileID) {
+        // const isPaused = this.#uppy.pauseResume(fileID)
+        cb(isPaused)
+      }
+    })
+  }
+
+  onRetry (fileID, cb) {
+    this.on('upload-retry', (targetFileID) => {
+      if (fileID === targetFileID) {
+        cb()
+      }
+    })
+  }
+
+  onRetryAll (fileID, cb) {
+    this.on('retry-all', () => {
+      if (!this.#uppy.getFile(fileID)) return
+      cb()
+    })
+  }
+
+  onPauseAll (fileID, cb) {
+    this.on('pause-all', () => {
+      if (!this.#uppy.getFile(fileID)) return
+      cb()
+    })
+  }
+
+  onCancelAll (fileID, eventHandler) {
+    this.on('cancel-all', (...args) => {
+      if (!this.#uppy.getFile(fileID)) return
+      eventHandler(...args)
+    })
+  }
+
+  onResumeAll (fileID, cb) {
+    this.on('resume-all', () => {
+      if (!this.#uppy.getFile(fileID)) return
+      cb()
+    })
+  }
 }

+ 40 - 161
packages/@uppy/xhr-upload/src/index.js

@@ -1,8 +1,6 @@
-import UploaderPlugin from '@uppy/core/lib/UploaderPlugin.js'
+import BasePlugin from '@uppy/core/lib/BasePlugin.js'
 import { nanoid } from 'nanoid/non-secure'
-import { Provider, RequestClient, Socket } from '@uppy/companion-client'
-import emitSocketProgress from '@uppy/utils/lib/emitSocketProgress'
-import getSocketHost from '@uppy/utils/lib/getSocketHost'
+import { Provider, RequestClient } from '@uppy/companion-client'
 import EventManager from '@uppy/utils/lib/EventManager'
 import ProgressTimeout from '@uppy/utils/lib/ProgressTimeout'
 import { RateLimitedQueue, internalRateLimitedQueue } from '@uppy/utils/lib/RateLimitedQueue'
@@ -46,7 +44,7 @@ function setTypeInBlob (file) {
   return dataWithUpdatedType
 }
 
-export default class XHRUpload extends UploaderPlugin {
+export default class XHRUpload extends BasePlugin {
   // eslint-disable-next-line global-require
   static VERSION = packageJson.version
 
@@ -62,7 +60,7 @@ export default class XHRUpload extends UploaderPlugin {
     const defaultOptions = {
       formData: true,
       fieldName: opts.bundle ? 'files[]' : 'file',
-      method: 'POST',
+      method: 'post',
       allowedMetaFields: null,
       responseUrlFieldName: 'url',
       bundle: false,
@@ -127,7 +125,6 @@ export default class XHRUpload extends UploaderPlugin {
     }
 
     this.uploaderEvents = Object.create(null)
-    this.setQueueRequestSocketToken(this.requests.wrapPromiseFunction(this.#requestSocketToken, { priority: -1 }))
   }
 
   getOptions (file) {
@@ -216,7 +213,7 @@ export default class XHRUpload extends UploaderPlugin {
     return formPost
   }
 
-  async #upload (file, current, total) {
+  async #uploadLocalFile (file, current, total) {
     const opts = this.getOptions(file)
 
     this.uppy.log(`uploading ${current} of ${total}`)
@@ -226,7 +223,8 @@ export default class XHRUpload extends UploaderPlugin {
         : file.data
 
       const xhr = new XMLHttpRequest()
-      this.uploaderEvents[file.id] = new EventManager(this.uppy)
+      const eventManager = new EventManager(this.uppy)
+      this.uploaderEvents[file.id] = eventManager
       let queuedRequest
 
       const timer = new ProgressTimeout(opts.timeout, () => {
@@ -335,12 +333,12 @@ export default class XHRUpload extends UploaderPlugin {
         }
       })
 
-      this.onFileRemove(file.id, () => {
+      eventManager.onFileRemove(file.id, () => {
         queuedRequest.abort()
         reject(new Error('File removed'))
       })
 
-      this.onCancelAll(file.id, ({ reason }) => {
+      eventManager.onCancelAll(file.id, ({ reason }) => {
         if (reason === 'user') {
           queuedRequest.abort()
         }
@@ -349,126 +347,6 @@ export default class XHRUpload extends UploaderPlugin {
     })
   }
 
-  #requestSocketToken = async (file, options) => {
-    const opts = this.getOptions(file)
-    const Client = file.remote.providerOptions.provider ? Provider : RequestClient
-    const client = new Client(this.uppy, file.remote.providerOptions)
-    const allowedMetaFields = Array.isArray(opts.allowedMetaFields)
-      ? opts.allowedMetaFields
-      // Send along all fields by default.
-      : Object.keys(file.meta)
-    const res = await client.post(file.remote.url, {
-      ...file.remote.body,
-      protocol: 'multipart',
-      endpoint: opts.endpoint,
-      size: file.data.size,
-      fieldname: opts.fieldName,
-      metadata: Object.fromEntries(allowedMetaFields.map(name => [name, file.meta[name]])),
-      httpMethod: opts.method,
-      useFormData: opts.formData,
-      headers: typeof opts.headers === 'function' ? opts.headers(file) : opts.headers,
-    }, options)
-    return res.token
-  }
-
-  async connectToServerSocket (file) {
-    return new Promise((resolve, reject) => {
-      const opts = this.getOptions(file)
-      const token = file.serverToken
-      const host = getSocketHost(file.remote.companionUrl)
-      let socket
-
-      const createSocket = () => {
-        if (socket != null) return
-
-        socket = new Socket({ target: `${host}/api/${token}` })
-
-        socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
-
-        socket.on('success', (data) => {
-          const body = opts.getResponseData(data.response.responseText, data.response)
-          const uploadURL = body[opts.responseUrlFieldName]
-
-          const uploadResp = {
-            status: data.response.status,
-            body,
-            uploadURL,
-          }
-
-          this.uppy.emit('upload-success', file, uploadResp)
-          queuedRequest.done() // eslint-disable-line no-use-before-define
-          socket.close()
-          if (this.uploaderEvents[file.id]) {
-            this.uploaderEvents[file.id].remove()
-            this.uploaderEvents[file.id] = null
-          }
-          return resolve()
-        })
-
-        socket.on('error', (errData) => {
-          const resp = errData.response
-          const error = resp
-            ? opts.getResponseError(resp.responseText, resp)
-            : Object.assign(new Error(errData.error.message), { cause: errData.error })
-          this.uppy.emit('upload-error', file, error)
-          queuedRequest.done() // eslint-disable-line no-use-before-define
-          socket.close()
-          if (this.uploaderEvents[file.id]) {
-            this.uploaderEvents[file.id].remove()
-            this.uploaderEvents[file.id] = null
-          }
-          reject(error)
-        })
-      }
-      this.uploaderEvents[file.id] = new EventManager(this.uppy)
-
-      let queuedRequest = this.requests.run(() => {
-        if (file.isPaused) {
-          socket?.send('pause', {})
-        } else {
-          createSocket()
-        }
-
-        return () => {}
-      })
-
-      this.onFileRemove(file.id, () => {
-        socket?.send('cancel', {})
-        socket.close()
-        queuedRequest.abort()
-        resolve(`upload ${file.id} was removed`)
-      })
-
-      this.onCancelAll(file.id, ({ reason } = {}) => {
-        if (reason === 'user') {
-          socket?.send('cancel', {})
-          queuedRequest.abort()
-          // socket.close()
-        }
-        resolve(`upload ${file.id} was canceled`)
-      })
-
-      const onRetryRequest = () => {
-        if (socket == null) {
-          queuedRequest.abort()
-        } else {
-          queuedRequest.done()
-        }
-        queuedRequest = this.requests.run(() => {
-          if (socket == null) {
-            createSocket()
-          }
-          return () => {}
-        })
-      }
-      this.onRetry(file.id, onRetryRequest)
-      this.onRetryAll(file.id, onRetryRequest)
-    }).catch((err) => {
-      this.uppy.emit('upload-error', file, err)
-      return Promise.reject(err)
-    })
-  }
-
   #uploadBundle (files) {
     return new Promise((resolve, reject) => {
       const { endpoint } = this.opts
@@ -563,12 +441,36 @@ export default class XHRUpload extends UploaderPlugin {
     })
   }
 
+  #getCompanionClientArgs (file) {
+    const opts = this.getOptions(file)
+    const allowedMetaFields = Array.isArray(opts.allowedMetaFields)
+      ? opts.allowedMetaFields
+      // Send along all fields by default.
+      : Object.keys(file.meta)
+    return {
+      ...file.remote.body,
+      protocol: 'multipart',
+      endpoint: opts.endpoint,
+      size: file.data.size,
+      fieldname: opts.fieldName,
+      metadata: Object.fromEntries(allowedMetaFields.map(name => [name, file.meta[name]])),
+      httpMethod: opts.method,
+      useFormData: opts.formData,
+      headers: opts.headers,
+    }
+  }
+
   async #uploadFiles (files) {
     await Promise.allSettled(files.map((file, i) => {
       const current = parseInt(i, 10) + 1
       const total = files.length
 
       if (file.isRemote) {
+        // INFO: the url plugin needs to use RequestClient,
+        // while others use Provider
+        const Client = file.remote.providerOptions.provider ? Provider : RequestClient
+        const getQueue = () => this.requests
+        const client = new Client(this.uppy, file.remote.providerOptions, getQueue)
         const controller = new AbortController()
 
         const removedHandler = (removedFile) => {
@@ -576,7 +478,11 @@ export default class XHRUpload extends UploaderPlugin {
         }
         this.uppy.on('file-removed', removedHandler)
 
-        const uploadPromise = this.uploadRemoteFile(file, { signal: controller.signal })
+        const uploadPromise = client.uploadRemoteFile(
+          file,
+          this.#getCompanionClientArgs(file),
+          { signal: controller.signal },
+        )
 
         this.requests.wrapSyncFunction(() => {
           this.uppy.off('file-removed', removedHandler)
@@ -584,36 +490,9 @@ export default class XHRUpload extends UploaderPlugin {
 
         return uploadPromise
       }
-      return this.#upload(file, current, total)
-    }))
-  }
 
-  onFileRemove (fileID, cb) {
-    this.uploaderEvents[fileID].on('file-removed', (file) => {
-      if (fileID === file.id) cb(file.id)
-    })
-  }
-
-  onRetry (fileID, cb) {
-    this.uploaderEvents[fileID].on('upload-retry', (targetFileID) => {
-      if (fileID === targetFileID) {
-        cb()
-      }
-    })
-  }
-
-  onRetryAll (fileID, cb) {
-    this.uploaderEvents[fileID].on('retry-all', () => {
-      if (!this.uppy.getFile(fileID)) return
-      cb()
-    })
-  }
-
-  onCancelAll (fileID, eventHandler) {
-    this.uploaderEvents[fileID].on('cancel-all', (...args) => {
-      if (!this.uppy.getFile(fileID)) return
-      eventHandler(...args)
-    })
+      return this.#uploadLocalFile(file, current, total)
+    }))
   }
 
   #handleUpload = async (fileIDs) => {

+ 1 - 1
private/dev/Dashboard.js

@@ -125,7 +125,7 @@ export default () => {
       uppyDashboard.use(AwsS3Multipart, { companionUrl: COMPANION_URL })
       break
     case 'xhr':
-      uppyDashboard.use(XHRUpload, { endpoint: XHR_ENDPOINT, limit: 6, bundle: true })
+      uppyDashboard.use(XHRUpload, { endpoint: XHR_ENDPOINT, limit: 6, bundle: false })
       break
     case 'transloadit':
       uppyDashboard.use(Transloadit, {