Uploader.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. const fs = require('fs')
  2. const path = require('path')
  3. const tus = require('tus-js-client')
  4. const uuid = require('uuid')
  5. const createTailReadStream = require('@uppy/fs-tail-stream')
  6. const emitter = require('./emitter')
  7. const request = require('request')
  8. const serializeError = require('serialize-error')
  9. const { jsonStringify, hasMatch } = require('./helpers/utils')
  10. const logger = require('./logger')
  11. const validator = require('validator')
  12. const headerSanitize = require('./header-blacklist')
  13. const PROTOCOLS = Object.freeze({
  14. multipart: 'multipart',
  15. s3Multipart: 's3-multipart',
  16. tus: 'tus'
  17. })
  18. class Uploader {
  19. /**
  20. * Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart)
  21. * For tus uploads, the deferredLength option is enabled, because file size value can be unreliable
  22. * for some providers (Instagram particularly)
  23. *
  24. * @typedef {object} UploaderOptions
  25. * @property {string} endpoint
  26. * @property {string=} uploadUrl
  27. * @property {string} protocol
  28. * @property {number} size
  29. * @property {string=} fieldname
  30. * @property {string} pathPrefix
  31. * @property {string=} path
  32. * @property {any=} s3
  33. * @property {any} metadata
  34. * @property {any} uppyOptions
  35. * @property {any=} storage
  36. * @property {any=} headers
  37. *
  38. * @param {UploaderOptions} options
  39. */
  40. constructor (options) {
  41. if (!this.validateOptions(options)) {
  42. logger.debug(this._errRespMessage, 'uploader.validator.fail')
  43. return
  44. }
  45. this.options = options
  46. this.token = uuid.v4()
  47. this.options.path = `${this.options.pathPrefix}/${Uploader.FILE_NAME_PREFIX}-${this.token}`
  48. this.streamsEnded = false
  49. this.duplexStream = null
  50. // @TODO disabling parallel uploads and downloads for now
  51. // if (this.options.protocol === PROTOCOLS.tus) {
  52. // this.duplexStream = new stream.PassThrough()
  53. // .on('error', (err) => logger.error(`${this.shortToken} ${err}`, 'uploader.duplex.error'))
  54. // }
  55. this.writeStream = fs.createWriteStream(this.options.path, { mode: 0o666 }) // no executable files
  56. .on('error', (err) => logger.error(`${this.shortToken} ${err}`, 'uploader.write.error'))
  57. /** @type {number} */
  58. this.emittedProgress = 0
  59. this.storage = options.storage
  60. this._paused = false
  61. if (this.options.protocol === PROTOCOLS.tus) {
  62. emitter().on(`pause:${this.token}`, () => {
  63. this._paused = true
  64. if (this.tus) {
  65. this.tus.abort()
  66. }
  67. })
  68. emitter().on(`resume:${this.token}`, () => {
  69. this._paused = false
  70. if (this.tus) {
  71. this.tus.start()
  72. }
  73. })
  74. }
  75. }
  76. /**
  77. * the number of bytes written into the streams
  78. */
  79. get bytesWritten () {
  80. return this.writeStream.bytesWritten
  81. }
  82. /**
  83. * Validate the options passed down to the uplaoder
  84. *
  85. * @param {UploaderOptions} options
  86. * @returns {boolean}
  87. */
  88. validateOptions (options) {
  89. if (!Object.keys(PROTOCOLS).some((key) => PROTOCOLS[key] === options.protocol)) {
  90. this._errRespMessage = 'Invalid upload protocol'
  91. return false
  92. }
  93. // s3 uploads don't require upload destination
  94. // validation, because the destination is determined
  95. // by the server's s3 config
  96. if (options.protocol === PROTOCOLS.s3Multipart) {
  97. return true
  98. }
  99. if (!options.endpoint && !options.uploadUrl) {
  100. this._errRespMessage = 'No destination specified'
  101. return false
  102. }
  103. const validatorOpts = { require_protocol: true, require_tld: !options.uppyOptions.debug }
  104. return [options.endpoint, options.uploadUrl].every((url) => {
  105. if (url && !validator.isURL(url, validatorOpts)) {
  106. this._errRespMessage = 'Invalid destination url'
  107. return false
  108. }
  109. const allowedUrls = options.uppyOptions.uploadUrls
  110. if (allowedUrls && url && !hasMatch(url, allowedUrls)) {
  111. this._errRespMessage = 'upload destination does not match any allowed destinations'
  112. return false
  113. }
  114. return true
  115. })
  116. }
  117. hasError () {
  118. return this._errRespMessage != null
  119. }
  120. /**
  121. * returns a substring of the token
  122. */
  123. get shortToken () {
  124. return this.token.substring(0, 8)
  125. }
  126. /**
  127. *
  128. * @param {function} callback
  129. */
  130. onSocketReady (callback) {
  131. emitter().once(`connection:${this.token}`, () => callback())
  132. logger.debug(`${this.shortToken} waiting for connection`, 'uploader.socket.wait')
  133. }
  134. cleanUp () {
  135. fs.unlink(this.options.path, (err) => {
  136. if (err) {
  137. logger.error(`cleanup failed for: ${this.options.path} err: ${err}`, 'uploader.cleanup.error')
  138. }
  139. })
  140. emitter().removeAllListeners(`pause:${this.token}`)
  141. emitter().removeAllListeners(`resume:${this.token}`)
  142. }
  143. /**
  144. *
  145. * @param {Buffer | Buffer[]} chunk
  146. */
  147. handleChunk (chunk) {
  148. const protocol = this.options.protocol
  149. // The download has completed; close the file and start an upload if necessary.
  150. if (chunk === null) {
  151. this.writeStream.on('finish', () => {
  152. this.streamsEnded = true
  153. if (this.options.endpoint && protocol === PROTOCOLS.multipart) {
  154. this.uploadMultipart()
  155. }
  156. if (protocol === PROTOCOLS.tus && !this.tus) {
  157. return this.uploadTus()
  158. }
  159. })
  160. return this.endStreams()
  161. }
  162. this.writeStream.write(chunk, () => {
  163. logger.debug(`${this.shortToken} ${this.bytesWritten} bytes`, 'uploader.download.progress')
  164. if (protocol === PROTOCOLS.multipart || protocol === PROTOCOLS.tus) {
  165. return this.emitIllusiveProgress()
  166. }
  167. if (protocol === PROTOCOLS.s3Multipart && !this.s3Upload) {
  168. return this.uploadS3()
  169. }
  170. // @TODO disabling parallel uploads and downloads for now
  171. // if (!this.options.endpoint) return
  172. // if (protocol === PROTOCOLS.tus && !this.tus) {
  173. // return this.uploadTus()
  174. // }
  175. })
  176. }
  177. /**
  178. * @param {Buffer | Buffer[]} chunk
  179. * @param {function} cb
  180. */
  181. writeToStreams (chunk, cb) {
  182. const done = []
  183. const doneLength = this.duplexStream ? 2 : 1
  184. const onDone = () => {
  185. done.push(true)
  186. if (done.length >= doneLength) {
  187. cb()
  188. }
  189. }
  190. this.writeStream.write(chunk, onDone)
  191. if (this.duplexStream) {
  192. this.duplexStream.write(chunk, onDone)
  193. }
  194. }
  195. endStreams () {
  196. this.writeStream.end()
  197. if (this.duplexStream) {
  198. this.duplexStream.end()
  199. }
  200. }
  201. getResponse () {
  202. if (this._errRespMessage) {
  203. return { body: this._errRespMessage, status: 400 }
  204. }
  205. return { body: { token: this.token }, status: 200 }
  206. }
  207. /**
  208. * @typedef {{action: string, payload: object}} State
  209. * @param {State} state
  210. */
  211. saveState (state) {
  212. if (!this.storage) return
  213. this.storage.set(`${Uploader.STORAGE_PREFIX}:${this.token}`, jsonStringify(state))
  214. }
  215. /**
  216. * This method emits upload progress but also creates an "upload progress" illusion
  217. * for the waiting period while only download is happening. Hence, it combines both
  218. * download and upload into an upload progress.
  219. * @see emitProgress
  220. * @param {number=} bytesUploaded the bytes actually Uploaded so far
  221. */
  222. emitIllusiveProgress (bytesUploaded) {
  223. if (this._paused) {
  224. return
  225. }
  226. let bytesTotal = this.streamsEnded ? this.bytesWritten : this.options.size
  227. if (!this.streamsEnded) {
  228. bytesTotal = Math.max(bytesTotal, this.bytesWritten)
  229. }
  230. bytesUploaded = bytesUploaded || 0
  231. // for a 10MB file, 10MB of download will account for 5MB upload progress
  232. // and 10MB of actual upload will account for the other 5MB upload progress.
  233. const illusiveBytesUploaded = (this.bytesWritten / 2) + (bytesUploaded / 2)
  234. logger.debug(
  235. `${this.shortToken} ${bytesUploaded} ${illusiveBytesUploaded} ${bytesTotal}`,
  236. 'uploader.illusive.progress'
  237. )
  238. this.emitProgress(illusiveBytesUploaded, bytesTotal)
  239. }
  240. /**
  241. *
  242. * @param {number} bytesUploaded
  243. * @param {number | null} bytesTotal
  244. */
  245. emitProgress (bytesUploaded, bytesTotal) {
  246. bytesTotal = bytesTotal || this.options.size
  247. if (this.tus && this.tus.options.uploadLengthDeferred && this.streamsEnded) {
  248. bytesTotal = this.bytesWritten
  249. }
  250. const percentage = (bytesUploaded / bytesTotal * 100)
  251. const formatPercentage = percentage.toFixed(2)
  252. logger.debug(
  253. `${this.shortToken} ${bytesUploaded} ${bytesTotal} ${formatPercentage}%`,
  254. 'uploader.upload.progress'
  255. )
  256. const dataToEmit = {
  257. action: 'progress',
  258. payload: { progress: formatPercentage, bytesUploaded, bytesTotal }
  259. }
  260. this.saveState(dataToEmit)
  261. // avoid flooding the client with progress events.
  262. const roundedPercentage = Math.floor(percentage)
  263. if (this.emittedProgress !== roundedPercentage) {
  264. this.emittedProgress = roundedPercentage
  265. emitter().emit(this.token, dataToEmit)
  266. }
  267. }
  268. /**
  269. *
  270. * @param {string} url
  271. * @param {object} extraData
  272. */
  273. emitSuccess (url, extraData = {}) {
  274. const emitData = {
  275. action: 'success',
  276. payload: Object.assign(extraData, { complete: true, url })
  277. }
  278. this.saveState(emitData)
  279. emitter().emit(this.token, emitData)
  280. }
  281. /**
  282. *
  283. * @param {Error} err
  284. * @param {object=} extraData
  285. */
  286. emitError (err, extraData = {}) {
  287. const dataToEmit = {
  288. action: 'error',
  289. // TODO: consider removing the stack property
  290. payload: Object.assign(extraData, { error: serializeError(err) })
  291. }
  292. this.saveState(dataToEmit)
  293. emitter().emit(this.token, dataToEmit)
  294. }
  295. /**
  296. * start the tus upload
  297. */
  298. uploadTus () {
  299. const fname = path.basename(this.options.path)
  300. const ftype = this.options.metadata.type
  301. const metadata = Object.assign({ filename: fname, filetype: ftype }, this.options.metadata || {})
  302. const file = fs.createReadStream(this.options.path)
  303. const uploader = this
  304. // @ts-ignore
  305. this.tus = new tus.Upload(file, {
  306. endpoint: this.options.endpoint,
  307. uploadUrl: this.options.uploadUrl,
  308. // @ts-ignore
  309. uploadLengthDeferred: false,
  310. resume: true,
  311. retryDelays: [0, 1000, 3000, 5000],
  312. uploadSize: this.bytesWritten,
  313. metadata,
  314. /**
  315. *
  316. * @param {Error} error
  317. */
  318. onError (error) {
  319. logger.error(error, 'uploader.tus.error')
  320. uploader.emitError(error)
  321. },
  322. /**
  323. *
  324. * @param {number} bytesUploaded
  325. * @param {number} bytesTotal
  326. */
  327. onProgress (bytesUploaded, bytesTotal) {
  328. uploader.emitIllusiveProgress(bytesUploaded)
  329. },
  330. onSuccess () {
  331. uploader.emitSuccess(uploader.tus.url)
  332. uploader.cleanUp()
  333. }
  334. })
  335. if (!this._paused) {
  336. this.tus.start()
  337. }
  338. }
  339. uploadMultipart () {
  340. const file = fs.createReadStream(this.options.path)
  341. // upload progress
  342. let bytesUploaded = 0
  343. file.on('data', (data) => {
  344. bytesUploaded += data.length
  345. this.emitIllusiveProgress(bytesUploaded)
  346. })
  347. const formData = Object.assign(
  348. {},
  349. this.options.metadata,
  350. { [this.options.fieldname]: file }
  351. )
  352. const headers = headerSanitize(this.options.headers)
  353. request.post({ url: this.options.endpoint, headers, formData, encoding: null }, (error, response, body) => {
  354. if (error) {
  355. logger.error(error, 'upload.multipart.error')
  356. this.emitError(error)
  357. return
  358. }
  359. const headers = response.headers
  360. // remove browser forbidden headers
  361. delete headers['set-cookie']
  362. delete headers['set-cookie2']
  363. const respObj = {
  364. responseText: body.toString(),
  365. status: response.statusCode,
  366. statusText: response.statusMessage,
  367. headers
  368. }
  369. if (response.statusCode >= 400) {
  370. logger.error(`upload failed with status: ${response.statusCode}`, 'upload.multipart.error')
  371. this.emitError(new Error(response.statusMessage), respObj)
  372. } else if (bytesUploaded !== this.bytesWritten && bytesUploaded !== this.options.size) {
  373. const errMsg = `uploaded only ${bytesUploaded} of ${this.bytesWritten} with status: ${response.statusCode}`
  374. logger.error(errMsg, 'upload.multipart.mismatch.error')
  375. this.emitError(new Error(errMsg))
  376. } else {
  377. this.emitSuccess(null, { response: respObj })
  378. }
  379. this.cleanUp()
  380. })
  381. }
  382. /**
  383. * Upload the file to S3 while it is still being downloaded.
  384. */
  385. uploadS3 () {
  386. const file = createTailReadStream(this.options.path, {
  387. tail: true
  388. })
  389. this.writeStream.on('finish', () => {
  390. file.close()
  391. })
  392. return this._uploadS3(file)
  393. }
  394. /**
  395. * Upload a stream to S3.
  396. */
  397. _uploadS3 (stream) {
  398. if (!this.options.s3) {
  399. this.emitError(new Error('The S3 client is not configured on this companion instance.'))
  400. return
  401. }
  402. const filename = this.options.metadata.filename || path.basename(this.options.path)
  403. const { client, options } = this.options.s3
  404. const upload = client.upload({
  405. Bucket: options.bucket,
  406. Key: options.getKey(null, filename),
  407. ACL: options.acl,
  408. ContentType: this.options.metadata.type,
  409. Body: stream
  410. })
  411. this.s3Upload = upload
  412. upload.on('httpUploadProgress', ({ loaded, total }) => {
  413. this.emitProgress(loaded, total)
  414. })
  415. upload.send((error, data) => {
  416. this.s3Upload = null
  417. if (error) {
  418. this.emitError(error)
  419. } else {
  420. this.emitSuccess(null, {
  421. response: {
  422. responseText: JSON.stringify(data),
  423. headers: {
  424. 'content-type': 'application/json'
  425. }
  426. }
  427. })
  428. }
  429. this.cleanUp()
  430. })
  431. }
  432. }
  433. Uploader.FILE_NAME_PREFIX = 'uppy-file'
  434. Uploader.STORAGE_PREFIX = 'companion'
  435. module.exports = Uploader