Uploader.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. const fs = require('fs')
  2. const stream = require('stream')
  3. const path = require('path')
  4. const tus = require('tus-js-client')
  5. const uuid = require('uuid')
  6. const createTailReadStream = require('@uppy/fs-tail-stream')
  7. const emitter = require('./emitter')
  8. const request = require('request')
  9. const serializeError = require('serialize-error')
  10. const { jsonStringify, hasMatch } = require('./helpers/utils')
  11. const logger = require('./logger')
  12. const validator = require('validator')
  13. const headerSanitize = require('./header-blacklist')
  14. const PROTOCOLS = Object.freeze({
  15. multipart: 'multipart',
  16. s3Multipart: 's3-multipart',
  17. tus: 'tus'
  18. })
  19. class Uploader {
  20. /**
  21. * Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart)
  22. * For tus uploads, the deferredLength option is enabled, because file size value can be unreliable
  23. * for some providers (Instagram particularly)
  24. *
  25. * @typedef {object} UploaderOptions
  26. * @property {string} endpoint
  27. * @property {string=} uploadUrl
  28. * @property {string} protocol
  29. * @property {number} size
  30. * @property {string=} fieldname
  31. * @property {string} pathPrefix
  32. * @property {string=} path
  33. * @property {any=} s3
  34. * @property {any} metadata
  35. * @property {any} uppyOptions
  36. * @property {any=} storage
  37. * @property {any=} headers
  38. *
  39. * @param {UploaderOptions} options
  40. */
  41. constructor (options) {
  42. if (!this.validateOptions(options)) {
  43. logger.debug(this._errRespMessage, 'uploader.validator.fail')
  44. return
  45. }
  46. this.options = options
  47. this.token = uuid.v4()
  48. this.options.path = `${this.options.pathPrefix}/${Uploader.FILE_NAME_PREFIX}-${this.token}`
  49. this.streamsEnded = false
  50. this.duplexStream = null
  51. if (this.options.protocol === PROTOCOLS.tus) {
  52. this.duplexStream = new stream.PassThrough()
  53. .on('error', (err) => logger.error(`${this.shortToken} ${err}`, 'uploader.duplex.error'))
  54. }
  55. this.writeStream = fs.createWriteStream(this.options.path, { mode: 0o666 }) // no executable files
  56. .on('error', (err) => logger.error(`${this.shortToken} ${err}`, 'uploader.write.error'))
  57. /** @type {number} */
  58. this.emittedProgress = 0
  59. this.storage = options.storage
  60. }
  61. /**
  62. * the number of bytes written into the streams
  63. */
  64. get bytesWritten () {
  65. return this.writeStream.bytesWritten
  66. }
  67. /**
  68. * Validate the options passed down to the uplaoder
  69. *
  70. * @param {UploaderOptions} options
  71. * @returns {boolean}
  72. */
  73. validateOptions (options) {
  74. // s3 uploads don't require upload destination
  75. // validation, because the destination is determined
  76. // by the server's s3 config
  77. if (options.protocol === PROTOCOLS.s3Multipart) {
  78. return true
  79. }
  80. if (!options.endpoint && !options.uploadUrl) {
  81. this._errRespMessage = 'No destination specified'
  82. return false
  83. }
  84. const validatorOpts = { require_protocol: true, require_tld: !options.uppyOptions.debug }
  85. return [options.endpoint, options.uploadUrl].every((url) => {
  86. if (url && !validator.isURL(url, validatorOpts)) {
  87. this._errRespMessage = 'Invalid destination url'
  88. return false
  89. }
  90. const allowedUrls = options.uppyOptions.uploadUrls
  91. if (allowedUrls && url && !hasMatch(url, allowedUrls)) {
  92. this._errRespMessage = 'upload destination does not match any allowed destinations'
  93. return false
  94. }
  95. return true
  96. })
  97. }
  98. hasError () {
  99. return this._errRespMessage != null
  100. }
  101. /**
  102. * returns a substring of the token
  103. */
  104. get shortToken () {
  105. return this.token.substring(0, 8)
  106. }
  107. /**
  108. *
  109. * @param {function} callback
  110. */
  111. onSocketReady (callback) {
  112. emitter().once(`connection:${this.token}`, () => callback())
  113. logger.debug(`${this.shortToken} waiting for connection`, 'uploader.socket.wait')
  114. }
  115. cleanUp () {
  116. fs.unlink(this.options.path, (err) => {
  117. if (err) {
  118. logger.error(`cleanup failed for: ${this.options.path} err: ${err}`, 'uploader.cleanup.error')
  119. }
  120. })
  121. emitter().removeAllListeners(`pause:${this.token}`)
  122. emitter().removeAllListeners(`resume:${this.token}`)
  123. }
  124. /**
  125. *
  126. * @param {Buffer | Buffer[]} chunk
  127. */
  128. handleChunk (chunk) {
  129. // @todo a default protocol should not be set. We should ensure that the user specifies her protocol.
  130. const protocol = this.options.protocol || PROTOCOLS.multipart
  131. // The download has completed; close the file and start an upload if necessary.
  132. if (chunk === null) {
  133. this.writeStream.on('finish', () => {
  134. this.streamsEnded = true
  135. if (this.options.endpoint && protocol === PROTOCOLS.multipart) {
  136. this.uploadMultipart()
  137. }
  138. })
  139. return this.endStreams()
  140. }
  141. this.writeToStreams(chunk, () => {
  142. logger.debug(`${this.shortToken} ${this.bytesWritten} bytes`, 'uploader.download.progress')
  143. if (protocol === PROTOCOLS.multipart) {
  144. return this.emitIllusiveProgress()
  145. }
  146. if (protocol === PROTOCOLS.s3Multipart && !this.s3Upload) {
  147. return this.uploadS3()
  148. }
  149. if (!this.options.endpoint) return
  150. if (protocol === PROTOCOLS.tus && !this.tus) {
  151. return this.uploadTus()
  152. }
  153. })
  154. }
  155. /**
  156. * @param {Buffer | Buffer[]} chunk
  157. * @param {function} cb
  158. */
  159. writeToStreams (chunk, cb) {
  160. const done = []
  161. const doneLength = this.duplexStream ? 2 : 1
  162. const onDone = () => {
  163. done.push(true)
  164. if (done.length >= doneLength) {
  165. cb()
  166. }
  167. }
  168. this.writeStream.write(chunk, onDone)
  169. if (this.duplexStream) {
  170. this.duplexStream.write(chunk, onDone)
  171. }
  172. }
  173. endStreams () {
  174. this.writeStream.end()
  175. if (this.duplexStream) {
  176. this.duplexStream.end()
  177. }
  178. }
  179. getResponse () {
  180. if (this._errRespMessage) {
  181. return { body: this._errRespMessage, status: 400 }
  182. }
  183. return { body: { token: this.token }, status: 200 }
  184. }
  185. /**
  186. * @typedef {{action: string, payload: object}} State
  187. * @param {State} state
  188. */
  189. saveState (state) {
  190. if (!this.storage) return
  191. this.storage.set(`${Uploader.STORAGE_PREFIX}:${this.token}`, jsonStringify(state))
  192. }
  193. /**
  194. * This method emits upload progress but also creates an "upload progress" illusion
  195. * for the waiting period while only download is happening. Hence, it combines both
  196. * download and upload into an upload progress.
  197. * @see emitProgress
  198. * @param {number=} bytesUploaded the bytes actually Uploaded so far
  199. */
  200. emitIllusiveProgress (bytesUploaded) {
  201. const bytesTotal = this.streamsEnded ? this.bytesWritten : this.options.size
  202. bytesUploaded = bytesUploaded || 0
  203. // for a 10MB file, 10MB of download will account for 5MB upload progress
  204. // and 10MB of actual upload will account for the other 5MB upload progress.
  205. const illusiveBytesUploaded = (this.bytesWritten / 2) + (bytesUploaded / 2)
  206. logger.debug(
  207. `${this.shortToken} ${bytesUploaded} ${illusiveBytesUploaded} ${bytesTotal}`,
  208. 'uploader.illusive.progress'
  209. )
  210. this.emitProgress(illusiveBytesUploaded, bytesTotal)
  211. }
  212. /**
  213. *
  214. * @param {number} bytesUploaded
  215. * @param {number | null} bytesTotal
  216. */
  217. emitProgress (bytesUploaded, bytesTotal) {
  218. bytesTotal = bytesTotal || this.options.size
  219. if (this.tus && this.tus.options.uploadLengthDeferred && this.streamsEnded) {
  220. bytesTotal = this.bytesWritten
  221. }
  222. const percentage = (bytesUploaded / bytesTotal * 100)
  223. const formatPercentage = percentage.toFixed(2)
  224. logger.debug(
  225. `${this.shortToken} ${bytesUploaded} ${bytesTotal} ${formatPercentage}%`,
  226. 'uploader.upload.progress'
  227. )
  228. const dataToEmit = {
  229. action: 'progress',
  230. payload: { progress: formatPercentage, bytesUploaded, bytesTotal }
  231. }
  232. this.saveState(dataToEmit)
  233. // avoid flooding the client with progress events.
  234. const roundedPercentage = Math.floor(percentage)
  235. if (this.emittedProgress !== roundedPercentage) {
  236. this.emittedProgress = roundedPercentage
  237. emitter().emit(this.token, dataToEmit)
  238. }
  239. }
  240. /**
  241. *
  242. * @param {string} url
  243. * @param {object} extraData
  244. */
  245. emitSuccess (url, extraData = {}) {
  246. const emitData = {
  247. action: 'success',
  248. payload: Object.assign(extraData, { complete: true, url })
  249. }
  250. this.saveState(emitData)
  251. emitter().emit(this.token, emitData)
  252. }
  253. /**
  254. *
  255. * @param {Error} err
  256. * @param {object=} extraData
  257. */
  258. emitError (err, extraData = {}) {
  259. const dataToEmit = {
  260. action: 'error',
  261. // TODO: consider removing the stack property
  262. payload: Object.assign(extraData, { error: serializeError(err) })
  263. }
  264. this.saveState(dataToEmit)
  265. emitter().emit(this.token, dataToEmit)
  266. }
  267. /**
  268. * start the tus upload
  269. */
  270. uploadTus () {
  271. const fname = path.basename(this.options.path)
  272. const ftype = this.options.metadata.type
  273. const metadata = Object.assign({ filename: fname, filetype: ftype }, this.options.metadata || {})
  274. const file = this.duplexStream
  275. const uploader = this
  276. const oneGB = 1024 * 1024 * 1024 // 1 GB
  277. // chunk size can't be infinity with deferred length.
  278. // cap value to 1GB to avoid buffer allocation error (RangeError)
  279. const chunkSize = Math.min(this.options.size || oneGB, oneGB)
  280. // @ts-ignore
  281. this.tus = new tus.Upload(file, {
  282. endpoint: this.options.endpoint,
  283. uploadUrl: this.options.uploadUrl,
  284. // @ts-ignore
  285. uploadLengthDeferred: true,
  286. resume: true,
  287. uploadSize: null,
  288. metadata,
  289. chunkSize,
  290. /**
  291. *
  292. * @param {Error} error
  293. */
  294. onError (error) {
  295. logger.error(error, 'uploader.tus.error')
  296. uploader.emitError(error)
  297. },
  298. /**
  299. *
  300. * @param {number} bytesUploaded
  301. * @param {number} bytesTotal
  302. */
  303. onProgress (bytesUploaded, bytesTotal) {
  304. uploader.emitProgress(bytesUploaded, bytesTotal)
  305. },
  306. onSuccess () {
  307. uploader.emitSuccess(uploader.tus.url)
  308. uploader.cleanUp()
  309. }
  310. })
  311. this.tus.start()
  312. emitter().on(`pause:${this.token}`, () => {
  313. this.tus.abort()
  314. })
  315. emitter().on(`resume:${this.token}`, () => {
  316. this.tus.start()
  317. })
  318. }
  319. uploadMultipart () {
  320. const file = fs.createReadStream(this.options.path)
  321. // upload progress
  322. let bytesUploaded = 0
  323. file.on('data', (data) => {
  324. bytesUploaded += data.length
  325. this.emitIllusiveProgress(bytesUploaded)
  326. })
  327. const formData = Object.assign(
  328. {},
  329. this.options.metadata,
  330. { [this.options.fieldname]: file }
  331. )
  332. const headers = headerSanitize(this.options.headers)
  333. request.post({ url: this.options.endpoint, headers, formData, encoding: null }, (error, response, body) => {
  334. if (error) {
  335. logger.error(error, 'upload.multipart.error')
  336. this.emitError(error)
  337. return
  338. }
  339. const headers = response.headers
  340. // remove browser forbidden headers
  341. delete headers['set-cookie']
  342. delete headers['set-cookie2']
  343. const respObj = {
  344. responseText: body.toString(),
  345. status: response.statusCode,
  346. statusText: response.statusMessage,
  347. headers
  348. }
  349. if (response.statusCode >= 400) {
  350. logger.error(`upload failed with status: ${response.statusCode}`, 'upload.multipar.error')
  351. this.emitError(new Error(response.statusMessage), respObj)
  352. } else {
  353. this.emitSuccess(null, { response: respObj })
  354. }
  355. this.cleanUp()
  356. })
  357. }
  358. /**
  359. * Upload the file to S3 while it is still being downloaded.
  360. */
  361. uploadS3 () {
  362. const file = createTailReadStream(this.options.path, {
  363. tail: true
  364. })
  365. this.writeStream.on('finish', () => {
  366. file.close()
  367. })
  368. return this._uploadS3(file)
  369. }
  370. /**
  371. * Upload a stream to S3.
  372. */
  373. _uploadS3 (stream) {
  374. if (!this.options.s3) {
  375. this.emitError(new Error('The S3 client is not configured on this companion instance.'))
  376. return
  377. }
  378. const filename = this.options.metadata.filename || path.basename(this.options.path)
  379. const { client, options } = this.options.s3
  380. const upload = client.upload({
  381. Bucket: options.bucket,
  382. Key: options.getKey(null, filename),
  383. ACL: options.acl,
  384. ContentType: this.options.metadata.type,
  385. Body: stream
  386. })
  387. this.s3Upload = upload
  388. upload.on('httpUploadProgress', ({ loaded, total }) => {
  389. this.emitProgress(loaded, total)
  390. })
  391. upload.send((error, data) => {
  392. this.s3Upload = null
  393. if (error) {
  394. this.emitError(error)
  395. } else {
  396. this.emitSuccess(null, {
  397. response: {
  398. responseText: JSON.stringify(data),
  399. headers: {
  400. 'content-type': 'application/json'
  401. }
  402. }
  403. })
  404. }
  405. this.cleanUp()
  406. })
  407. }
  408. }
  409. Uploader.FILE_NAME_PREFIX = 'uppy-file'
  410. Uploader.STORAGE_PREFIX = 'companion'
  411. module.exports = Uploader