Parcourir la source

Companion throttle progress by time (#4101)

* throttle progress events by time

instead of value
when uploading large files on fast connections, sometimes
a LOT of events will be emitted. (with distinct byte values)
this causes logs to flood and a lot of events to be sent to the client
slowing down everything

* fix tests

final progress is no longer guaranteed to be emitted
Mikael Finstad il y a 2 ans
Parent
commit
1b5c0d7d4b

+ 1 - 0
packages/@uppy/companion/package.json

@@ -50,6 +50,7 @@
     "ipaddr.js": "^2.0.1",
     "jsonwebtoken": "8.5.1",
     "lodash.merge": "^4.6.2",
+    "lodash.throttle": "^4.1.1",
     "mime-types": "2.1.35",
     "moment": "^2.29.2",
     "moment-timezone": "^0.5.31",

+ 20 - 17
packages/@uppy/companion/src/server/Uploader.js

@@ -8,6 +8,7 @@ const { join } = require('node:path')
 const fs = require('node:fs')
 const { promisify } = require('node:util')
 const FormData = require('form-data')
+const throttle = require('lodash.throttle')
 
 // TODO move to `require('streams/promises').pipeline` when dropping support for Node.js 14.x.
 const pipeline = promisify(pipelineCb)
@@ -165,7 +166,6 @@ class Uploader {
 
     this.uploadStopped = false
 
-    this.emittedProgress = {}
     this.storage = options.storage
     this._paused = false
 
@@ -281,10 +281,14 @@ class Uploader {
     } finally {
       logger.debug('cleanup', this.shortToken)
       if (this.readStream && !this.readStream.destroyed) this.readStream.destroy()
-      if (this.tmpPath) unlink(this.tmpPath).catch(() => {})
+      await this.tryDeleteTmpPath()
     }
   }
 
+  tryDeleteTmpPath () {
+    if (this.tmpPath) unlink(this.tmpPath).catch(() => {})
+  }
+
   /**
    *
    * @param {import('stream').Readable} stream
@@ -408,6 +412,17 @@ class Uploader {
     this.storage.set(redisKey, jsonStringify(state), 'EX', keyExpirySec)
   }
 
+  throttledEmitProgress = throttle((dataToEmit) => {
+    const { bytesUploaded, bytesTotal, progress } = dataToEmit.payload
+    logger.debug(
+      `${bytesUploaded} ${bytesTotal} ${progress}%`,
+      'uploader.total.progress',
+      this.shortToken,
+    )
+    this.saveState(dataToEmit)
+    emitter().emit(this.token, dataToEmit)
+  }, 1000, { trailing: false })
+
   /**
    *
    * @param {number} [bytesUploaded]
@@ -428,11 +443,6 @@ class Uploader {
     if (bytesTotal > 0) percentage = Math.min(Math.max(0, ((combinedBytes / bytesTotal) * 100)), 100)
 
     const formattedPercentage = percentage.toFixed(2)
-    logger.debug(
-      `${combinedBytes} ${bytesTotal} ${formattedPercentage}%`,
-      'uploader.total.progress',
-      this.shortToken,
-    )
 
     if (this._paused || this.uploadStopped) {
       return
@@ -443,17 +453,10 @@ class Uploader {
       action: 'progress',
       payload,
     }
-    this.saveState(dataToEmit)
 
-    const isEqual = (p1, p2) => (p1.progress === p2.progress
-      && p1.bytesUploaded === p2.bytesUploaded
-      && p1.bytesTotal === p2.bytesTotal)
-
-    // avoid flooding the client with progress events.
-    if (!isEqual(this.emittedProgress, payload)) {
-      this.emittedProgress = payload
-      emitter().emit(this.token, dataToEmit)
-    }
+    // avoid flooding the client (and log) with progress events.
+    // flooding will cause reduced performance and possibly network issues
+    this.throttledEmitProgress(dataToEmit)
   }
 
   /**

+ 18 - 18
packages/@uppy/companion/test/__tests__/uploader.js

@@ -65,7 +65,7 @@ describe('uploader with tus protocol', () => {
     const uploadToken = uploader.token
     expect(uploadToken).toBeTruthy()
 
-    let progressReceived = 0
+    let firstReceivedProgress
 
     const onProgress = jest.fn()
     const onUploadSuccess = jest.fn()
@@ -80,14 +80,14 @@ describe('uploader with tus protocol', () => {
     // emulate socket connection
     socketClient.connect(uploadToken)
     socketClient.onProgress(uploadToken, (message) => {
-      progressReceived = message.payload.bytesUploaded
+      if (firstReceivedProgress == null) firstReceivedProgress = message.payload.bytesUploaded
       onProgress(message)
     })
     socketClient.onUploadSuccess(uploadToken, onUploadSuccess)
     await promise
     await uploader.tryUploadStream(stream)
 
-    expect(progressReceived).toBe(fileContent.length)
+    expect(firstReceivedProgress).toBe(8)
 
     expect(onProgress).toHaveBeenLastCalledWith(expect.objectContaining({
       payload: expect.objectContaining({
@@ -118,6 +118,15 @@ describe('uploader with tus protocol', () => {
     }
 
     const uploader = new Uploader(opts)
+    uploader.tryDeleteTmpPath = async () => {
+      // validate that the tmp file has been downloaded and saved into the file path
+      // must do it before it gets deleted
+      const fileInfo = fs.statSync(uploader.tmpPath)
+      expect(fileInfo.isFile()).toBe(true)
+      expect(fileInfo.size).toBe(fileContent.length)
+
+      return uploader.tryDeleteTmpPath()
+    }
     const uploadToken = uploader.token
     expect(uploadToken).toBeTruthy()
 
@@ -134,27 +143,18 @@ describe('uploader with tus protocol', () => {
         })
       })
 
-      let progressReceived = 0
+      let firstReceivedProgress
+
       // emulate socket connection
       socketClient.connect(uploadToken)
       socketClient.onProgress(uploadToken, (message) => {
-        // validate that the file has been downloaded and saved into the file path
-        try {
-          progressReceived = message.payload.bytesUploaded
-
-          if (progressReceived === fileContent.length) {
-            const fileInfo = fs.statSync(uploader.tmpPath)
-            expect(fileInfo.isFile()).toBe(true)
-            expect(fileInfo.size).toBe(fileContent.length)
-            expect(message.payload.bytesTotal).toBe(fileContent.length)
-          }
-        } catch (err) {
-          reject(err)
-        }
+        if (firstReceivedProgress == null) firstReceivedProgress = message.payload.bytesUploaded
       })
       socketClient.onUploadSuccess(uploadToken, (message) => {
         try {
-          expect(progressReceived).toBe(fileContent.length)
+          expect(message.payload.bytesTotal).toBe(fileContent.length)
+
+          expect(firstReceivedProgress).toBe(8192)
           // see __mocks__/tus-js-client.js
           expect(message.payload.url).toBe('https://tus.endpoint/files/foo-bar')
         } catch (err) {

+ 1 - 0
yarn.lock

@@ -8387,6 +8387,7 @@ __metadata:
     jest: ^29.0.0
     jsonwebtoken: 8.5.1
     lodash.merge: ^4.6.2
+    lodash.throttle: ^4.1.1
     mime-types: 2.1.35
     moment: ^2.29.2
     moment-timezone: ^0.5.31