|
@@ -12,6 +12,12 @@ const logger = require('./logger')
|
|
|
const validator = require('validator')
|
|
|
const headerSanitize = require('./header-blacklist')
|
|
|
|
|
|
+const PROTOCOLS = Object.freeze({
|
|
|
+ multipart: 'multipart',
|
|
|
+ s3Multipart: 's3-multipart',
|
|
|
+ tus: 'tus'
|
|
|
+})
|
|
|
+
|
|
|
class Uploader {
|
|
|
/**
|
|
|
* Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart)
|
|
@@ -44,8 +50,11 @@ class Uploader {
|
|
|
this.token = uuid.v4()
|
|
|
this.options.path = `${this.options.pathPrefix}/${Uploader.FILE_NAME_PREFIX}-${this.token}`
|
|
|
this.streamsEnded = false
|
|
|
- this.duplexStream = new stream.PassThrough()
|
|
|
- .on('error', (err) => logger.error(`${this.shortToken} ${err}`, 'uploader.duplex.error'))
|
|
|
+ this.duplexStream = null
|
|
|
+ if (this.options.protocol === PROTOCOLS.tus) {
|
|
|
+ this.duplexStream = new stream.PassThrough()
|
|
|
+ .on('error', (err) => logger.error(`${this.shortToken} ${err}`, 'uploader.duplex.error'))
|
|
|
+ }
|
|
|
this.writeStream = fs.createWriteStream(this.options.path, { mode: 0o666 }) // no executable files
|
|
|
.on('error', (err) => logger.error(`${this.shortToken} ${err}`, 'uploader.write.error'))
|
|
|
/** @type {number} */
|
|
@@ -70,7 +79,7 @@ class Uploader {
|
|
|
// s3 uploads don't require upload destination
|
|
|
// validation, because the destination is determined
|
|
|
// by the server's s3 config
|
|
|
- if (options.protocol === 's3-multipart') {
|
|
|
+ if (options.protocol === PROTOCOLS.s3Multipart) {
|
|
|
return true
|
|
|
}
|
|
|
|
|
@@ -131,56 +140,34 @@ class Uploader {
|
|
|
* @param {Buffer | Buffer[]} chunk
|
|
|
*/
|
|
|
handleChunk (chunk) {
|
|
|
- logger.debug(`${this.shortToken} ${this.bytesWritten} bytes`, 'uploader.download.progress')
|
|
|
-
|
|
|
- const protocol = this.options.protocol || 'multipart'
|
|
|
+ // @todo a default protocol should not be set. We should ensure that the user specifies her protocol.
|
|
|
+ const protocol = this.options.protocol || PROTOCOLS.multipart
|
|
|
|
|
|
// The download has completed; close the file and start an upload if necessary.
|
|
|
if (chunk === null) {
|
|
|
this.writeStream.on('finish', () => {
|
|
|
this.streamsEnded = true
|
|
|
- if (this.options.endpoint && protocol === 'multipart') {
|
|
|
+ if (this.options.endpoint && protocol === PROTOCOLS.multipart) {
|
|
|
this.uploadMultipart()
|
|
|
}
|
|
|
})
|
|
|
|
|
|
- this.duplexStream.end()
|
|
|
- return this.writeStream.end()
|
|
|
+ return this.endStreams()
|
|
|
}
|
|
|
|
|
|
this.writeToStreams(chunk, () => {
|
|
|
- if (protocol === 's3-multipart' && !this.s3Upload) {
|
|
|
- return this.uploadS3Streaming()
|
|
|
+ logger.debug(`${this.shortToken} ${this.bytesWritten} bytes`, 'uploader.download.progress')
|
|
|
+ if (protocol === PROTOCOLS.multipart) {
|
|
|
+ return this.emitIllusiveProgress()
|
|
|
}
|
|
|
- if (!this.options.endpoint) return
|
|
|
-
|
|
|
- if (protocol === 'tus' && !this.tus) {
|
|
|
- return this.uploadTus(true)
|
|
|
- }
|
|
|
- })
|
|
|
- }
|
|
|
|
|
|
- /**
|
|
|
- *
|
|
|
- * @param {object} resp
|
|
|
- */
|
|
|
- handleResponse (resp) {
|
|
|
- resp.pipe(this.writeStream)
|
|
|
-
|
|
|
- const protocol = this.options.protocol || 'multipart'
|
|
|
-
|
|
|
- this.writeStream.on('finish', () => {
|
|
|
- if (protocol === 's3-multipart') {
|
|
|
- this.uploadS3Full()
|
|
|
+ if (protocol === PROTOCOLS.s3Multipart && !this.s3Upload) {
|
|
|
+ return this.uploadS3()
|
|
|
}
|
|
|
-
|
|
|
if (!this.options.endpoint) return
|
|
|
|
|
|
- if (protocol === 'tus') {
|
|
|
- this.uploadTus(false)
|
|
|
- }
|
|
|
- if (protocol === 'multipart') {
|
|
|
- this.uploadMultipart()
|
|
|
+ if (protocol === PROTOCOLS.tus && !this.tus) {
|
|
|
+ return this.uploadTus()
|
|
|
}
|
|
|
})
|
|
|
}
|
|
@@ -191,15 +178,25 @@ class Uploader {
|
|
|
*/
|
|
|
writeToStreams (chunk, cb) {
|
|
|
const done = []
|
|
|
+ const doneLength = this.duplexStream ? 2 : 1
|
|
|
const onDone = () => {
|
|
|
done.push(true)
|
|
|
- if (done.length >= 2) {
|
|
|
+ if (done.length >= doneLength) {
|
|
|
cb()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- this.duplexStream.write(chunk, onDone)
|
|
|
this.writeStream.write(chunk, onDone)
|
|
|
+ if (this.duplexStream) {
|
|
|
+ this.duplexStream.write(chunk, onDone)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ endStreams () {
|
|
|
+ this.writeStream.end()
|
|
|
+ if (this.duplexStream) {
|
|
|
+ this.duplexStream.end()
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
getResponse () {
|
|
@@ -218,6 +215,27 @@ class Uploader {
|
|
|
this.storage.set(`${Uploader.STORAGE_PREFIX}:${this.token}`, jsonStringify(state))
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This method emits upload progress but also creates an "upload progress" illusion
|
|
|
+ * for the waiting period while only download is happening. Hence, it combines both
|
|
|
+ * download and upload into an upload progress.
|
|
|
+ * @see emitProgress
|
|
|
+ * @param {number=} bytesUploaded the bytes actually Uploaded so far
|
|
|
+ */
|
|
|
+ emitIllusiveProgress (bytesUploaded) {
|
|
|
+ const bytesTotal = this.streamsEnded ? this.bytesWritten : this.options.size
|
|
|
+ bytesUploaded = bytesUploaded || 0
|
|
|
+ // for a 10MB file, 10MB of download will account for 5MB upload progress
|
|
|
+ // and 10MB of actual upload will account for the other 5MB upload progress.
|
|
|
+ const illusiveBytesUploaded = (this.bytesWritten / 2) + (bytesUploaded / 2)
|
|
|
+
|
|
|
+ logger.debug(
|
|
|
+ `${this.shortToken} ${bytesUploaded} ${illusiveBytesUploaded} ${bytesTotal}`,
|
|
|
+ 'uploader.illusive.progress'
|
|
|
+ )
|
|
|
+ this.emitProgress(illusiveBytesUploaded, bytesTotal)
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
*
|
|
|
* @param {number} bytesUploaded
|
|
@@ -279,14 +297,13 @@ class Uploader {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- *
|
|
|
- * @param {boolean} deferLength
|
|
|
+ * start the tus upload
|
|
|
*/
|
|
|
- uploadTus (deferLength) {
|
|
|
+ uploadTus () {
|
|
|
const fname = path.basename(this.options.path)
|
|
|
const ftype = this.options.metadata.type
|
|
|
const metadata = Object.assign({ filename: fname, filetype: ftype }, this.options.metadata || {})
|
|
|
- const file = deferLength ? this.duplexStream : fs.createReadStream(this.options.path)
|
|
|
+ const file = this.duplexStream
|
|
|
const uploader = this
|
|
|
const oneGB = 1024 * 1024 * 1024 // 1 GB
|
|
|
// chunk size can't be infinity with deferred length.
|
|
@@ -298,9 +315,9 @@ class Uploader {
|
|
|
endpoint: this.options.endpoint,
|
|
|
uploadUrl: this.options.uploadUrl,
|
|
|
// @ts-ignore
|
|
|
- uploadLengthDeferred: deferLength,
|
|
|
+ uploadLengthDeferred: true,
|
|
|
resume: true,
|
|
|
- uploadSize: deferLength ? null : (this.options.size || fs.statSync(this.options.path).size),
|
|
|
+ uploadSize: null,
|
|
|
metadata,
|
|
|
chunkSize,
|
|
|
/**
|
|
@@ -343,7 +360,7 @@ class Uploader {
|
|
|
let bytesUploaded = 0
|
|
|
file.on('data', (data) => {
|
|
|
bytesUploaded += data.length
|
|
|
- this.emitProgress(bytesUploaded, null)
|
|
|
+ this.emitIllusiveProgress(bytesUploaded)
|
|
|
})
|
|
|
|
|
|
const formData = Object.assign(
|
|
@@ -384,7 +401,7 @@ class Uploader {
|
|
|
/**
|
|
|
* Upload the file to S3 while it is still being downloaded.
|
|
|
*/
|
|
|
- uploadS3Streaming () {
|
|
|
+ uploadS3 () {
|
|
|
const file = createTailReadStream(this.options.path, {
|
|
|
tail: true
|
|
|
})
|
|
@@ -396,14 +413,6 @@ class Uploader {
|
|
|
return this._uploadS3(file)
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Upload the file to S3 after it has been fully downloaded.
|
|
|
- */
|
|
|
- uploadS3Full () {
|
|
|
- const file = fs.createReadStream(this.options.path)
|
|
|
- return this._uploadS3(file)
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Upload a stream to S3.
|
|
|
*/
|