Quellcode durchsuchen

Companion server upload events (#3544)

* modernise code

* pull out config related functions and middleware

* make test more readable

* Expose companion emitter

to allow the consumer to subscribe to upload success/failure events

fixes #3435

* disable client socket timeout for tests

or jest will wait for them to time out
also fix broken test 'uploader respects maxFileSize with unknown size'

* document the event emitter usage
Mikael Finstad vor 3 Jahren
Ursprung
Commit
0033b9fd5e

+ 5 - 2
packages/@uppy/companion/src/companion.js

@@ -11,7 +11,7 @@ const providerManager = require('./server/provider')
 const controllers = require('./server/controllers')
 const s3 = require('./server/controllers/s3')
 const url = require('./server/controllers/url')
-const emitter = require('./server/emitter')
+const createEmitter = require('./server/emitter')
 const redis = require('./server/redis')
 const { getURLBuilder } = require('./server/helpers/utils')
 const jobs = require('./server/jobs')
@@ -79,7 +79,7 @@ module.exports.app = (optionsArg = {}) => {
   if (options.redisUrl) {
     redis.client(merge({ url: options.redisUrl }, options.redisOptions || {}))
   }
-  emitter(options.redisUrl, options.redisPubSubScope)
+  const emitter = createEmitter(options.redisUrl, options.redisPubSubScope)
 
   const app = express()
 
@@ -152,5 +152,8 @@ module.exports.app = (optionsArg = {}) => {
     processId,
   })
 
+  // todo split emitter from app in next major
+  // @ts-ignore
+  app.companionEmitter = emitter
   return app
 }

+ 1 - 0
packages/@uppy/companion/src/config/companion.js

@@ -22,6 +22,7 @@ const defaultOptions = {
   logClientVersion: true,
   periodicPingUrls: [],
   streamingUpload: false,
+  clientSocketConnectTimeout: 60000,
 }
 
 /**

+ 32 - 3
packages/@uppy/companion/src/server/Uploader.js

@@ -290,6 +290,8 @@ class Uploader {
    */
   async tryUploadStream (stream) {
     try {
+      emitter().emit('upload-start', { token: this.token })
+
       const ret = await this.uploadStream(stream)
       if (!ret) return
       const { url, extraData } = ret
@@ -358,10 +360,37 @@ class Uploader {
     return Uploader.shortenToken(this.token)
   }
 
-  async awaitReady () {
-    // TODO timeout after a while? Else we could leak emitters
+  async awaitReady (timeout) {
     logger.debug('waiting for socket connection', 'uploader.socket.wait', this.shortToken)
-    await new Promise((resolve) => emitter().once(`connection:${this.token}`, resolve))
+
+    // TODO: replace the Promise constructor call when dropping support for Node.js <16 with
+    // await once(emitter, eventName, timeout && { signal: AbortSignal.timeout(timeout) })
+    await new Promise((resolve, reject) => {
+      const eventName = `connection:${this.token}`
+      let timer
+      let onEvent
+
+      function cleanup () {
+        emitter().removeListener(eventName, onEvent)
+        clearTimeout(timer)
+      }
+
+      if (timeout) {
+        // Need to timeout after a while, or we could leak emitters
+        timer = setTimeout(() => {
+          cleanup()
+          reject(new Error('Timed out waiting for socket connection'))
+        }, timeout)
+      }
+
+      onEvent = () => {
+        cleanup()
+        resolve()
+      }
+
+      emitter().once(eventName, onEvent)
+    })
+
     logger.debug('socket connection received', 'uploader.socket.wait', this.shortToken)
   }
 

+ 3 - 1
packages/@uppy/companion/src/server/helpers/upload.js

@@ -7,10 +7,12 @@ const { ValidationError } = Uploader
 async function startDownUpload ({ req, res, getSize, download, onUnhandledError }) {
   try {
     const size = await getSize()
+    const { clientSocketConnectTimeout } = req.companion.options
 
     logger.debug('Instantiating uploader.', null, req.id)
     const uploader = new Uploader(Uploader.reqToOptions(req, size))
 
+    logger.debug('Starting download stream.', null, req.id)
     const stream = await download()
 
     // "Forking" off the upload operation to background, so we can return the http request:
@@ -18,7 +20,7 @@ async function startDownUpload ({ req, res, getSize, download, onUnhandledError
       // wait till the client has connected to the socket, before starting
       // the download, so that the client can receive all download/upload progress.
       logger.debug('Waiting for socket connection before beginning remote download/upload.', null, req.id)
-      await uploader.awaitReady()
+      await uploader.awaitReady(clientSocketConnectTimeout)
       logger.debug('Socket connection received. Starting remote download/upload.', null, req.id)
 
       await uploader.tryUploadStream(stream)

+ 3 - 2
packages/@uppy/companion/src/server/s3-client.js

@@ -1,3 +1,6 @@
+const S3 = require('aws-sdk/clients/s3')
+const AWS = require('aws-sdk')
+
 /**
  * instantiates the aws-sdk s3 client that will be used for s3 uploads.
  *
@@ -6,8 +9,6 @@
 module.exports = (companionOptions) => {
   let s3Client = null
   if (companionOptions.providerOptions.s3) {
-    const S3 = require('aws-sdk/clients/s3')
-    const AWS = require('aws-sdk')
     const s3ProviderOptions = companionOptions.providerOptions.s3
 
     if (s3ProviderOptions.accessKeyId || s3ProviderOptions.secretAccessKey) {

+ 2 - 0
packages/@uppy/companion/src/standalone/helper.js

@@ -112,6 +112,8 @@ const getConfigFromEnv = () => {
     streamingUpload: process.env.COMPANION_STREAMING_UPLOAD === 'true',
     maxFileSize: process.env.COMPANION_MAX_FILE_SIZE ? parseInt(process.env.COMPANION_MAX_FILE_SIZE, 10) : undefined,
     chunkSize: process.env.COMPANION_CHUNK_SIZE ? parseInt(process.env.COMPANION_CHUNK_SIZE, 10) : undefined,
+    clientSocketConnectTimeout: process.env.COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT
+      ? parseInt(process.env.COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT, 10) : undefined,
   }
 }
 

+ 2 - 1
packages/@uppy/companion/src/standalone/index.js

@@ -13,6 +13,7 @@ const companion = require('../companion')
 const helper = require('./helper')
 const middlewares = require('../server/middlewares')
 const { getURLBuilder } = require('../server/helpers/utils')
+const connectRedis = require('connect-redis')
 
 /**
  * Configures an Express app for running Companion standalone
@@ -139,7 +140,7 @@ module.exports = function server (inputCompanionOptions = {}) {
   }
 
   if (companionOptions.redisUrl) {
-    const RedisStore = require('connect-redis')(session)
+    const RedisStore = connectRedis(session)
     const redisClient = redis.client(
       merge({ url: companionOptions.redisUrl }, companionOptions.redisOptions),
     )

+ 2 - 1
packages/@uppy/companion/test/__tests__/companion.js

@@ -15,7 +15,8 @@ const request = require('supertest')
 const tokenService = require('../../src/server/helpers/jwt')
 const { getServer } = require('../mockserver')
 
-const authServer = getServer()
+// todo don't share server between tests. rewrite to not use env variables
+const authServer = getServer({ COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT: '0' })
 const authData = {
   dropbox: 'token value',
   box: 'token value',

+ 2 - 1
packages/@uppy/companion/test/__tests__/providers.js

@@ -17,7 +17,8 @@ const tokenService = require('../../src/server/helpers/jwt')
 const { getServer } = require('../mockserver')
 const defaults = require('../fixtures/constants')
 
-const authServer = getServer()
+// todo don't share server between tests. rewrite to not use env variables
+const authServer = getServer({ COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT: '0' })
 const OAUTH_STATE = 'some-cool-nice-encrytpion'
 const providers = require('../../src/server/provider').getDefaultProviders()
 

+ 40 - 27
packages/@uppy/companion/test/__tests__/uploader.js

@@ -9,6 +9,7 @@ const nock = require('nock')
 const Uploader = require('../../src/server/Uploader')
 const socketClient = require('../mocksocket')
 const standalone = require('../../src/standalone')
+const Emitter = require('../../src/server/emitter')
 
 afterAll(() => {
   nock.cleanAll()
@@ -64,33 +65,45 @@ describe('uploader with tus protocol', () => {
     const uploadToken = uploader.token
     expect(uploadToken).toBeTruthy()
 
-    return new Promise((resolve, reject) => {
-      // validate that the test is resolved on socket connection
-      uploader.awaitReady().then(() => {
-        uploader.tryUploadStream(stream).then(() => resolve())
-      })
+    let progressReceived = 0
 
-      let progressReceived = 0
-      // emulate socket connection
-      socketClient.connect(uploadToken)
-      socketClient.onProgress(uploadToken, (message) => {
-        progressReceived = message.payload.bytesUploaded
-        try {
-          expect(message.payload.bytesTotal).toBe(fileContent.length)
-        } catch (err) {
-          reject(err)
-        }
-      })
-      socketClient.onUploadSuccess(uploadToken, (message) => {
-        try {
-          expect(progressReceived).toBe(fileContent.length)
-          // see __mocks__/tus-js-client.js
-          expect(message.payload.url).toBe('https://tus.endpoint/files/foo-bar')
-        } catch (err) {
-          reject(err)
-        }
-      })
+    const onProgress = jest.fn()
+    const onUploadSuccess = jest.fn()
+    const onBeginUploadEvent = jest.fn()
+    const onUploadEvent = jest.fn()
+
+    const emitter = Emitter()
+    emitter.on('upload-start', onBeginUploadEvent)
+    emitter.on(uploadToken, onUploadEvent)
+
+    const promise = uploader.awaitReady(60000)
+    // emulate socket connection
+    socketClient.connect(uploadToken)
+    socketClient.onProgress(uploadToken, (message) => {
+      progressReceived = message.payload.bytesUploaded
+      onProgress(message)
+    })
+    socketClient.onUploadSuccess(uploadToken, onUploadSuccess)
+    await promise
+    await uploader.tryUploadStream(stream)
+
+    expect(progressReceived).toBe(fileContent.length)
+
+    expect(onProgress).toHaveBeenLastCalledWith(expect.objectContaining({
+      payload: expect.objectContaining({
+        bytesTotal: fileContent.length,
+      }),
+    }))
+    const expectedPayload = expect.objectContaining({
+      // see __mocks__/tus-js-client.js
+      url: 'https://tus.endpoint/files/foo-bar',
     })
+    expect(onUploadSuccess).toHaveBeenCalledWith(expect.objectContaining({
+      payload: expectedPayload,
+    }))
+
+    expect(onBeginUploadEvent).toHaveBeenCalledWith({ token: uploadToken })
+    expect(onUploadEvent).toHaveBeenLastCalledWith({ action: 'success', payload: expectedPayload })
   })
 
   test('upload functions with tus protocol without size', async () => {
@@ -110,7 +123,7 @@ describe('uploader with tus protocol', () => {
 
     return new Promise((resolve, reject) => {
       // validate that the test is resolved on socket connection
-      uploader.awaitReady().then(() => {
+      uploader.awaitReady(60000).then(() => {
         uploader.tryUploadStream(stream).then(() => {
           try {
             expect(fs.existsSync(uploader.path)).toBe(false)
@@ -257,7 +270,7 @@ describe('uploader with tus protocol', () => {
     const uploadToken = uploader.token
 
     // validate that the test is resolved on socket connection
-    uploader.awaitReady().then(uploader.tryUploadStream(stream))
+    uploader.awaitReady(60000).then(() => uploader.tryUploadStream(stream))
     socketClient.connect(uploadToken)
 
     return new Promise((resolve, reject) => {

+ 1 - 1
packages/@uppy/companion/test/__tests__/url.js

@@ -14,7 +14,7 @@ jest.mock('../../src/server/helpers/request', () => {
 })
 const { getServer } = require('../mockserver')
 
-const mockServer = getServer()
+const mockServer = getServer({ COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT: '0' })
 
 beforeAll(() => {
   nock('http://url.myendpoint.com').get('/files').reply(200, () => '')

+ 5 - 2
packages/@uppy/companion/test/mockserver.js

@@ -1,4 +1,3 @@
-/* global jest:false */
 const express = require('express')
 const session = require('express-session')
 
@@ -36,6 +35,8 @@ const defaultEnv = {
   COMPANION_PATH: '',
 
   COMPANION_PERIODIC_PING_URLS: '',
+
+  COMPANION_CLIENT_SOCKET_CONNECT_TIMEOUT: '',
 }
 
 function updateEnv (env) {
@@ -54,7 +55,9 @@ module.exports.getServer = (extraEnv) => {
 
   updateEnv(env)
 
-  // delete from cache to force the server to reload companionOptions from the new env vars
+  // companion stores certain global state like emitter, metrics, logger (frozen object), so we need to reset modules
+  // todo rewrite companion to not use global state
+  // https://github.com/transloadit/uppy/issues/3284
   jest.resetModules()
   const standalone = require('../src/standalone')
   const authServer = express()

+ 33 - 0
website/src/docs/companion.md

@@ -103,6 +103,39 @@ companion.socket(server)
 
 This takes your `server` instance as an argument.
 
+#### Events
+
+The object returned by `companion.app()` also has a property `companionEmitter` which is an `EventEmitter` that emits the following events:
+
+* `upload-start` - When an upload starts, this event is emitted with an object containing the property `token`, which is a unique ID for the upload.
+* **token** - The event name is the token from `upload-start`. The event has an object with the following properties:
+  * `action` - One of the following strings:
+    * `success` - When the upload succeeds.
+    * `error` - When the upload fails with an error.
+  * `payload` - the error or success payload.
+
+Example code for using the `EventEmitter` to handle a finished file upload:
+
+```js
+const companionApp = companion.app(options)
+const { companionEmitter: emitter } = companionApp
+
+emitter.on('upload-start', ({ token }) => {
+  console.log('Upload started', token)
+
+  function onUploadEvent ({ action, payload }) {
+    if (action === 'success') {
+      emitter.off(token, onUploadEvent) // avoid listener leak
+      console.log('Upload finished', token, payload.url)
+    } else if (action === 'error') {
+      emitter.off(token, onUploadEvent) // avoid listener leak
+      console.error('Upload failed', payload)
+    }
+  }
+  emitter.on(token, onUploadEvent)
+})
+```
+
 ### Running as a standalone server
 
 > Please make sure that the required environment variables are set before running/using Companion as a standalone server. See [Configure Standalone](#Configuring-a-standalone-server) for the variables required.