Prechádzať zdrojové kódy

disable parallel (down/up)loads temporarily

Ifedapo Olarewaju 6 rokov pred
rodič
commit
814bf5c11f
1 zmenil súbory, kde vykonal 41 pridanie a 28 odobranie
  1. 41 28
      packages/@uppy/companion/src/server/Uploader.js

+ 41 - 28
packages/@uppy/companion/src/server/Uploader.js

@@ -1,5 +1,4 @@
 const fs = require('fs')
-const stream = require('stream')
 const path = require('path')
 const tus = require('tus-js-client')
 const uuid = require('uuid')
@@ -51,15 +50,33 @@ class Uploader {
     this.options.path = `${this.options.pathPrefix}/${Uploader.FILE_NAME_PREFIX}-${this.token}`
     this.streamsEnded = false
     this.duplexStream = null
-    if (this.options.protocol === PROTOCOLS.tus) {
-      this.duplexStream = new stream.PassThrough()
-        .on('error', (err) => logger.error(`${this.shortToken} ${err}`, 'uploader.duplex.error'))
-    }
+    // @TODO disabling parallel uploads and downloads for now
+    // if (this.options.protocol === PROTOCOLS.tus) {
+    //   this.duplexStream = new stream.PassThrough()
+    //     .on('error', (err) => logger.error(`${this.shortToken} ${err}`, 'uploader.duplex.error'))
+    // }
     this.writeStream = fs.createWriteStream(this.options.path, { mode: 0o666 }) // no executable files
       .on('error', (err) => logger.error(`${this.shortToken} ${err}`, 'uploader.write.error'))
     /** @type {number} */
     this.emittedProgress = 0
     this.storage = options.storage
+    this._paused = false
+
+    if (this.options.protocol === PROTOCOLS.tus) {
+      emitter().on(`pause:${this.token}`, () => {
+        this._paused = true
+        if (this.tus) {
+          this.tus.abort()
+        }
+      })
+
+      emitter().on(`resume:${this.token}`, () => {
+        this._paused = false
+        if (this.tus) {
+          this.tus.start()
+        }
+      })
+    }
   }
 
   /**
@@ -150,25 +167,30 @@ class Uploader {
         if (this.options.endpoint && protocol === PROTOCOLS.multipart) {
           this.uploadMultipart()
         }
+
+        if (protocol === PROTOCOLS.tus && !this.tus) {
+          return this.uploadTus()
+        }
       })
 
       return this.endStreams()
     }
 
-    this.writeToStreams(chunk, () => {
+    this.writeStream.write(chunk, () => {
       logger.debug(`${this.shortToken} ${this.bytesWritten} bytes`, 'uploader.download.progress')
-      if (protocol === PROTOCOLS.multipart) {
+      if (protocol === PROTOCOLS.multipart || protocol === PROTOCOLS.tus) {
         return this.emitIllusiveProgress()
       }
 
       if (protocol === PROTOCOLS.s3Multipart && !this.s3Upload) {
         return this.uploadS3()
       }
-      if (!this.options.endpoint) return
+      // @TODO disabling parallel uploads and downloads for now
+      // if (!this.options.endpoint) return
 
-      if (protocol === PROTOCOLS.tus && !this.tus) {
-        return this.uploadTus()
-      }
+      // if (protocol === PROTOCOLS.tus && !this.tus) {
+      //   return this.uploadTus()
+      // }
     })
   }
 
@@ -223,6 +245,10 @@ class Uploader {
    * @param {number=} bytesUploaded the bytes actually Uploaded so far
    */
   emitIllusiveProgress (bytesUploaded) {
+    if (this._paused) {
+      return
+    }
+
     const bytesTotal = this.streamsEnded ? this.bytesWritten : this.options.size
     bytesUploaded = bytesUploaded || 0
     // for a 10MB file, 10MB of download will account for 5MB upload progress
@@ -303,23 +329,18 @@ class Uploader {
     const fname = path.basename(this.options.path)
     const ftype = this.options.metadata.type
     const metadata = Object.assign({ filename: fname, filetype: ftype }, this.options.metadata || {})
-    const file = this.duplexStream
+    const file = fs.createReadStream(this.options.path)
     const uploader = this
-    const oneGB = 1024 * 1024 * 1024  // 1 GB
-    // chunk size can't be infinity with deferred length.
-    // cap value to 1GB to avoid buffer allocation error (RangeError)
-    const chunkSize = Math.min(this.options.size || oneGB, oneGB)
 
     // @ts-ignore
     this.tus = new tus.Upload(file, {
       endpoint: this.options.endpoint,
       uploadUrl: this.options.uploadUrl,
       // @ts-ignore
-      uploadLengthDeferred: true,
+      uploadLengthDeferred: false,
       resume: true,
-      uploadSize: null,
+      uploadSize: this.bytesWritten,
       metadata,
-      chunkSize,
       /**
        *
        * @param {Error} error
@@ -334,7 +355,7 @@ class Uploader {
        * @param {number} bytesTotal
        */
       onProgress (bytesUploaded, bytesTotal) {
-        uploader.emitProgress(bytesUploaded, bytesTotal)
+        uploader.emitIllusiveProgress(bytesUploaded)
       },
       onSuccess () {
         uploader.emitSuccess(uploader.tus.url)
@@ -343,14 +364,6 @@ class Uploader {
     })
 
     this.tus.start()
-
-    emitter().on(`pause:${this.token}`, () => {
-      this.tus.abort()
-    })
-
-    emitter().on(`resume:${this.token}`, () => {
-      this.tus.start()
-    })
   }
 
   uploadMultipart () {