Uploader.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. const fs = require('fs')
  2. const path = require('path')
  3. const tus = require('tus-js-client')
  4. const uuid = require('uuid')
  5. const createTailReadStream = require('@uppy/fs-tail-stream')
  6. const emitter = require('./emitter')
  7. const request = require('request')
  8. const serializeError = require('serialize-error')
  9. const { jsonStringify, hasMatch } = require('./helpers/utils')
  10. const logger = require('./logger')
  11. const validator = require('validator')
  12. const headerSanitize = require('./header-blacklist')
  13. const redis = require('./redis')
  14. const PROTOCOLS = Object.freeze({
  15. multipart: 'multipart',
  16. s3Multipart: 's3-multipart',
  17. tus: 'tus'
  18. })
  19. class Uploader {
  20. /**
  21. * Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart)
  22. * For tus uploads, the deferredLength option is enabled, because file size value can be unreliable
  23. * for some providers (Instagram particularly)
  24. *
  25. * @typedef {object} UploaderOptions
  26. * @property {string} endpoint
  27. * @property {string=} uploadUrl
  28. * @property {string} protocol
  29. * @property {number} size
  30. * @property {string=} fieldname
  31. * @property {string} pathPrefix
  32. * @property {any=} s3
  33. * @property {any} metadata
  34. * @property {any} companionOptions
  35. * @property {any=} storage
  36. * @property {any=} headers
  37. *
  38. * @param {UploaderOptions} options
  39. */
  40. constructor (options) {
  41. if (!this.validateOptions(options)) {
  42. logger.debug(this._errRespMessage, 'uploader.validator.fail')
  43. return
  44. }
  45. this.options = options
  46. this.token = uuid.v4()
  47. this.path = `${this.options.pathPrefix}/${Uploader.FILE_NAME_PREFIX}-${this.token}`
  48. this.options.metadata = this.options.metadata || {}
  49. this.uploadFileName = this.options.metadata.name || path.basename(this.path)
  50. this.streamsEnded = false
  51. this.uploadStopped = false
  52. this.duplexStream = null
  53. // @TODO disabling parallel uploads and downloads for now
  54. // if (this.options.protocol === PROTOCOLS.tus) {
  55. // this.duplexStream = new stream.PassThrough()
  56. // .on('error', (err) => logger.error(`${this.shortToken} ${err}`, 'uploader.duplex.error'))
  57. // }
  58. this.writeStream = fs.createWriteStream(this.path, { mode: 0o666 }) // no executable files
  59. .on('error', (err) => logger.error(`${err}`, 'uploader.write.error', this.shortToken))
  60. /** @type {number} */
  61. this.emittedProgress = 0
  62. this.storage = options.storage
  63. this._paused = false
  64. if (this.options.protocol === PROTOCOLS.tus) {
  65. emitter().on(`pause:${this.token}`, () => {
  66. this._paused = true
  67. if (this.tus) {
  68. this.tus.abort()
  69. }
  70. })
  71. emitter().on(`resume:${this.token}`, () => {
  72. this._paused = false
  73. if (this.tus) {
  74. this.tus.start()
  75. }
  76. })
  77. emitter().on(`cancel:${this.token}`, () => {
  78. this._paused = true
  79. if (this.tus) {
  80. const shouldTerminate = !!this.tus.url
  81. this.tus.abort(shouldTerminate)
  82. }
  83. this.cleanUp()
  84. })
  85. }
  86. }
  87. /**
  88. * returns a substring of the token. Used as traceId for logging
  89. * we avoid using the entire token because this is meant to be a short term
  90. * access token between uppy client and companion websocket
  91. * @param {string} token the token to Shorten
  92. * @returns {string}
  93. */
  94. static shortenToken (token) {
  95. return token.substring(0, 8)
  96. }
  97. static reqToOptions (req, size) {
  98. return {
  99. companionOptions: req.companion.options,
  100. endpoint: req.body.endpoint,
  101. uploadUrl: req.body.uploadUrl,
  102. protocol: req.body.protocol,
  103. metadata: req.body.metadata,
  104. size: size,
  105. fieldname: req.body.fieldname,
  106. pathPrefix: `${req.companion.options.filePath}`,
  107. storage: redis.client(),
  108. s3: req.companion.s3Client ? {
  109. client: req.companion.s3Client,
  110. options: req.companion.options.providerOptions.s3
  111. } : null,
  112. headers: req.body.headers
  113. }
  114. }
  115. /**
  116. * the number of bytes written into the streams
  117. */
  118. get bytesWritten () {
  119. return this.writeStream.bytesWritten
  120. }
  121. /**
  122. * Validate the options passed down to the uplaoder
  123. *
  124. * @param {UploaderOptions} options
  125. * @returns {boolean}
  126. */
  127. validateOptions (options) {
  128. // s3 uploads don't require upload destination
  129. // validation, because the destination is determined
  130. // by the server's s3 config
  131. if (options.protocol === PROTOCOLS.s3Multipart) {
  132. return true
  133. }
  134. if (!options.endpoint && !options.uploadUrl) {
  135. this._errRespMessage = 'No destination specified'
  136. return false
  137. }
  138. const validatorOpts = { require_protocol: true, require_tld: !options.companionOptions.debug }
  139. return [options.endpoint, options.uploadUrl].every((url) => {
  140. if (url && !validator.isURL(url, validatorOpts)) {
  141. this._errRespMessage = 'Invalid destination url'
  142. return false
  143. }
  144. const allowedUrls = options.companionOptions.uploadUrls
  145. if (allowedUrls && url && !hasMatch(url, allowedUrls)) {
  146. this._errRespMessage = 'upload destination does not match any allowed destinations'
  147. return false
  148. }
  149. return true
  150. })
  151. }
  152. hasError () {
  153. return this._errRespMessage != null
  154. }
  155. /**
  156. * returns a substring of the token. Used as traceId for logging
  157. * we avoid using the entire token because this is meant to be a short term
  158. * access token between uppy client and companion websocket
  159. */
  160. get shortToken () {
  161. return Uploader.shortenToken(this.token)
  162. }
  163. /**
  164. *
  165. * @param {function} callback
  166. */
  167. onSocketReady (callback) {
  168. emitter().once(`connection:${this.token}`, () => callback())
  169. logger.debug('waiting for connection', 'uploader.socket.wait', this.shortToken)
  170. }
  171. cleanUp () {
  172. fs.unlink(this.path, (err) => {
  173. if (err) {
  174. logger.error(`cleanup failed for: ${this.path} err: ${err}`, 'uploader.cleanup.error')
  175. }
  176. })
  177. emitter().removeAllListeners(`pause:${this.token}`)
  178. emitter().removeAllListeners(`resume:${this.token}`)
  179. emitter().removeAllListeners(`cancel:${this.token}`)
  180. this.uploadStopped = true
  181. }
  182. /**
  183. *
  184. * @param {Buffer | Buffer[]} chunk
  185. */
  186. handleChunk (chunk) {
  187. if (this.uploadStopped) {
  188. return
  189. }
  190. // @todo a default protocol should not be set. We should ensure that the user specifies her protocol.
  191. const protocol = this.options.protocol || PROTOCOLS.multipart
  192. // The download has completed; close the file and start an upload if necessary.
  193. if (chunk === null) {
  194. this.writeStream.on('finish', () => {
  195. this.streamsEnded = true
  196. if (this.options.endpoint && protocol === PROTOCOLS.multipart) {
  197. this.uploadMultipart()
  198. }
  199. if (protocol === PROTOCOLS.tus && !this.tus) {
  200. return this.uploadTus()
  201. }
  202. })
  203. return this.endStreams()
  204. }
  205. this.writeStream.write(chunk, () => {
  206. logger.debug(`${this.bytesWritten} bytes`, 'uploader.download.progress', this.shortToken)
  207. if (protocol === PROTOCOLS.multipart || protocol === PROTOCOLS.tus) {
  208. return this.emitIllusiveProgress()
  209. }
  210. if (protocol === PROTOCOLS.s3Multipart && !this.s3Upload) {
  211. return this.uploadS3()
  212. }
  213. // @TODO disabling parallel uploads and downloads for now
  214. // if (!this.options.endpoint) return
  215. // if (protocol === PROTOCOLS.tus && !this.tus) {
  216. // return this.uploadTus()
  217. // }
  218. })
  219. }
  220. /**
  221. * @param {Buffer | Buffer[]} chunk
  222. * @param {function} cb
  223. */
  224. writeToStreams (chunk, cb) {
  225. const done = []
  226. const doneLength = this.duplexStream ? 2 : 1
  227. const onDone = () => {
  228. done.push(true)
  229. if (done.length >= doneLength) {
  230. cb()
  231. }
  232. }
  233. this.writeStream.write(chunk, onDone)
  234. if (this.duplexStream) {
  235. this.duplexStream.write(chunk, onDone)
  236. }
  237. }
  238. endStreams () {
  239. this.writeStream.end()
  240. if (this.duplexStream) {
  241. this.duplexStream.end()
  242. }
  243. }
  244. getResponse () {
  245. if (this._errRespMessage) {
  246. return { body: this._errRespMessage, status: 400 }
  247. }
  248. return { body: { token: this.token }, status: 200 }
  249. }
  250. /**
  251. * @typedef {{action: string, payload: object}} State
  252. * @param {State} state
  253. */
  254. saveState (state) {
  255. if (!this.storage) return
  256. this.storage.set(`${Uploader.STORAGE_PREFIX}:${this.token}`, jsonStringify(state))
  257. }
  258. /**
  259. * This method emits upload progress but also creates an "upload progress" illusion
  260. * for the waiting period while only download is happening. Hence, it combines both
  261. * download and upload into an upload progress.
  262. * @see emitProgress
  263. * @param {number=} bytesUploaded the bytes actually Uploaded so far
  264. */
  265. emitIllusiveProgress (bytesUploaded = 0) {
  266. if (this._paused) {
  267. return
  268. }
  269. let bytesTotal = this.streamsEnded ? this.bytesWritten : this.options.size
  270. if (!this.streamsEnded) {
  271. bytesTotal = Math.max(bytesTotal, this.bytesWritten)
  272. }
  273. // for a 10MB file, 10MB of download will account for 5MB upload progress
  274. // and 10MB of actual upload will account for the other 5MB upload progress.
  275. const illusiveBytesUploaded = (this.bytesWritten / 2) + (bytesUploaded / 2)
  276. logger.debug(
  277. `${bytesUploaded} ${illusiveBytesUploaded} ${bytesTotal}`,
  278. 'uploader.illusive.progress',
  279. this.shortToken
  280. )
  281. this.emitProgress(illusiveBytesUploaded, bytesTotal)
  282. }
  283. /**
  284. *
  285. * @param {number} bytesUploaded
  286. * @param {number | null} bytesTotal
  287. */
  288. emitProgress (bytesUploaded, bytesTotal) {
  289. bytesTotal = bytesTotal || this.options.size
  290. if (this.tus && this.tus.options.uploadLengthDeferred && this.streamsEnded) {
  291. bytesTotal = this.bytesWritten
  292. }
  293. const percentage = (bytesUploaded / bytesTotal * 100)
  294. const formatPercentage = percentage.toFixed(2)
  295. logger.debug(
  296. `${bytesUploaded} ${bytesTotal} ${formatPercentage}%`,
  297. 'uploader.upload.progress',
  298. this.shortToken
  299. )
  300. const dataToEmit = {
  301. action: 'progress',
  302. payload: { progress: formatPercentage, bytesUploaded, bytesTotal }
  303. }
  304. this.saveState(dataToEmit)
  305. // avoid flooding the client with progress events.
  306. const roundedPercentage = Math.floor(percentage)
  307. if (this.emittedProgress !== roundedPercentage) {
  308. this.emittedProgress = roundedPercentage
  309. emitter().emit(this.token, dataToEmit)
  310. }
  311. }
  312. /**
  313. *
  314. * @param {string} url
  315. * @param {object} extraData
  316. */
  317. emitSuccess (url, extraData = {}) {
  318. const emitData = {
  319. action: 'success',
  320. payload: Object.assign(extraData, { complete: true, url })
  321. }
  322. this.saveState(emitData)
  323. emitter().emit(this.token, emitData)
  324. }
  325. /**
  326. *
  327. * @param {Error} err
  328. * @param {object=} extraData
  329. */
  330. emitError (err, extraData = {}) {
  331. const serializedErr = serializeError(err)
  332. // delete stack to avoid sending server info to client
  333. delete serializedErr.stack
  334. const dataToEmit = {
  335. action: 'error',
  336. payload: Object.assign(extraData, { error: serializedErr })
  337. }
  338. this.saveState(dataToEmit)
  339. emitter().emit(this.token, dataToEmit)
  340. }
  341. /**
  342. * start the tus upload
  343. */
  344. uploadTus () {
  345. const file = fs.createReadStream(this.path)
  346. const uploader = this
  347. // @ts-ignore
  348. this.tus = new tus.Upload(file, {
  349. endpoint: this.options.endpoint,
  350. uploadUrl: this.options.uploadUrl,
  351. // @ts-ignore
  352. uploadLengthDeferred: false,
  353. resume: true,
  354. retryDelays: [0, 1000, 3000, 5000],
  355. uploadSize: this.bytesWritten,
  356. metadata: Object.assign(
  357. {
  358. // file name and type as required by the tusd tus server
  359. // https://github.com/tus/tusd/blob/5b376141903c1fd64480c06dde3dfe61d191e53d/unrouted_handler.go#L614-L646
  360. filename: this.uploadFileName,
  361. filetype: this.options.metadata.type
  362. }, this.options.metadata
  363. ),
  364. /**
  365. *
  366. * @param {Error} error
  367. */
  368. onError (error) {
  369. logger.error(error, 'uploader.tus.error')
  370. uploader.emitError(error)
  371. },
  372. /**
  373. *
  374. * @param {number} bytesUploaded
  375. * @param {number} bytesTotal
  376. */
  377. onProgress (bytesUploaded, bytesTotal) {
  378. uploader.emitIllusiveProgress(bytesUploaded)
  379. },
  380. onSuccess () {
  381. uploader.emitSuccess(uploader.tus.url)
  382. uploader.cleanUp()
  383. }
  384. })
  385. if (!this._paused) {
  386. this.tus.start()
  387. }
  388. }
  389. uploadMultipart () {
  390. const file = fs.createReadStream(this.path)
  391. // upload progress
  392. let bytesUploaded = 0
  393. file.on('data', (data) => {
  394. bytesUploaded += data.length
  395. this.emitIllusiveProgress(bytesUploaded)
  396. })
  397. const formData = Object.assign(
  398. {},
  399. this.options.metadata,
  400. {
  401. [this.options.fieldname]: {
  402. value: file,
  403. options: {
  404. filename: this.uploadFileName,
  405. contentType: this.options.metadata.type
  406. }
  407. }
  408. }
  409. )
  410. const headers = headerSanitize(this.options.headers)
  411. request.post({ url: this.options.endpoint, headers, formData, encoding: null }, (error, response, body) => {
  412. if (error) {
  413. logger.error(error, 'upload.multipart.error')
  414. this.emitError(error)
  415. return
  416. }
  417. const headers = response.headers
  418. // remove browser forbidden headers
  419. delete headers['set-cookie']
  420. delete headers['set-cookie2']
  421. const respObj = {
  422. responseText: body.toString(),
  423. status: response.statusCode,
  424. statusText: response.statusMessage,
  425. headers
  426. }
  427. if (response.statusCode >= 400) {
  428. logger.error(`upload failed with status: ${response.statusCode}`, 'upload.multipart.error')
  429. this.emitError(new Error(response.statusMessage), respObj)
  430. } else if (bytesUploaded !== this.bytesWritten && bytesUploaded !== this.options.size) {
  431. const errMsg = `uploaded only ${bytesUploaded} of ${this.bytesWritten} with status: ${response.statusCode}`
  432. logger.error(errMsg, 'upload.multipart.mismatch.error')
  433. this.emitError(new Error(errMsg))
  434. } else {
  435. this.emitSuccess(null, { response: respObj })
  436. }
  437. this.cleanUp()
  438. })
  439. }
  440. /**
  441. * Upload the file to S3 while it is still being downloaded.
  442. */
  443. uploadS3 () {
  444. const file = createTailReadStream(this.path, {
  445. tail: true
  446. })
  447. this.writeStream.on('finish', () => {
  448. file.close()
  449. })
  450. return this._uploadS3(file)
  451. }
  452. /**
  453. * Upload a stream to S3.
  454. */
  455. _uploadS3 (stream) {
  456. if (!this.options.s3) {
  457. this.emitError(new Error('The S3 client is not configured on this companion instance.'))
  458. return
  459. }
  460. const filename = this.options.metadata.name || path.basename(this.path)
  461. const { client, options } = this.options.s3
  462. const upload = client.upload({
  463. Bucket: options.bucket,
  464. Key: options.getKey(null, filename),
  465. ACL: options.acl,
  466. ContentType: this.options.metadata.type,
  467. Body: stream
  468. })
  469. this.s3Upload = upload
  470. upload.on('httpUploadProgress', ({ loaded, total }) => {
  471. this.emitProgress(loaded, total)
  472. })
  473. upload.send((error, data) => {
  474. this.s3Upload = null
  475. if (error) {
  476. this.emitError(error)
  477. } else {
  478. const url = data && data.Location ? data.Location : null
  479. this.emitSuccess(url, {
  480. response: {
  481. responseText: JSON.stringify(data),
  482. headers: {
  483. 'content-type': 'application/json'
  484. }
  485. }
  486. })
  487. }
  488. this.cleanUp()
  489. })
  490. }
  491. }
  492. Uploader.FILE_NAME_PREFIX = 'uppy-file'
  493. Uploader.STORAGE_PREFIX = 'companion'
  494. module.exports = Uploader