Uploader.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. const fs = require('fs')
  2. const path = require('path')
  3. const tus = require('tus-js-client')
  4. const uuid = require('uuid')
  5. const createTailReadStream = require('@uppy/fs-tail-stream')
  6. const emitter = require('./emitter')
  7. const request = require('request')
  8. const serializeError = require('serialize-error')
  9. const { jsonStringify, hasMatch } = require('./helpers/utils')
  10. const logger = require('./logger')
  11. const validator = require('validator')
  12. const headerSanitize = require('./header-blacklist')
  13. const redis = require('./redis')
  14. const PROTOCOLS = Object.freeze({
  15. multipart: 'multipart',
  16. s3Multipart: 's3-multipart',
  17. tus: 'tus'
  18. })
  19. class Uploader {
  20. /**
  21. * Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart)
  22. * For tus uploads, the deferredLength option is enabled, because file size value can be unreliable
  23. * for some providers (Instagram particularly)
  24. *
  25. * @typedef {object} UploaderOptions
  26. * @property {string} endpoint
  27. * @property {string=} uploadUrl
  28. * @property {string} protocol
  29. * @property {number} size
  30. * @property {string=} fieldname
  31. * @property {string} pathPrefix
  32. * @property {any=} s3
  33. * @property {any} metadata
  34. * @property {any} companionOptions
  35. * @property {any=} storage
  36. * @property {any=} headers
  37. *
  38. * @param {UploaderOptions} options
  39. */
  40. constructor (options) {
  41. if (!this.validateOptions(options)) {
  42. logger.debug(this._errRespMessage, 'uploader.validator.fail')
  43. return
  44. }
  45. this.options = options
  46. this.token = uuid.v4()
  47. this.path = `${this.options.pathPrefix}/${Uploader.FILE_NAME_PREFIX}-${this.token}`
  48. this.options.metadata = this.options.metadata || {}
  49. this.uploadFileName = this.options.metadata.name || path.basename(this.path)
  50. this.streamsEnded = false
  51. this.uploadStopped = false
  52. this.duplexStream = null
  53. // @TODO disabling parallel uploads and downloads for now
  54. // if (this.options.protocol === PROTOCOLS.tus) {
  55. // this.duplexStream = new stream.PassThrough()
  56. // .on('error', (err) => logger.error(`${this.shortToken} ${err}`, 'uploader.duplex.error'))
  57. // }
  58. this.writeStream = fs.createWriteStream(this.path, { mode: 0o666 }) // no executable files
  59. .on('error', (err) => logger.error(`${err}`, 'uploader.write.error', this.shortToken))
  60. /** @type {number} */
  61. this.emittedProgress = 0
  62. this.storage = options.storage
  63. this._paused = false
  64. if (this.options.protocol === PROTOCOLS.tus) {
  65. emitter().on(`pause:${this.token}`, () => {
  66. this._paused = true
  67. if (this.tus) {
  68. this.tus.abort()
  69. }
  70. })
  71. emitter().on(`resume:${this.token}`, () => {
  72. this._paused = false
  73. if (this.tus) {
  74. this.tus.start()
  75. }
  76. })
  77. emitter().on(`cancel:${this.token}`, () => {
  78. this._paused = true
  79. if (this.tus) {
  80. const shouldTerminate = !!this.tus.url
  81. // @todo remove the ts-ignore when the tus-js-client type definitions
  82. // have been updated with this change https://github.com/DefinitelyTyped/DefinitelyTyped/pull/40629
  83. // @ts-ignore
  84. this.tus.abort(shouldTerminate)
  85. }
  86. this.cleanUp()
  87. })
  88. }
  89. }
  90. /**
  91. * returns a substring of the token. Used as traceId for logging
  92. * we avoid using the entire token because this is meant to be a short term
  93. * access token between uppy client and companion websocket
  94. * @param {string} token the token to Shorten
  95. * @returns {string}
  96. */
  97. static shortenToken (token) {
  98. return token.substring(0, 8)
  99. }
  100. static reqToOptions (req, size) {
  101. return {
  102. companionOptions: req.companion.options,
  103. endpoint: req.body.endpoint,
  104. uploadUrl: req.body.uploadUrl,
  105. protocol: req.body.protocol,
  106. metadata: req.body.metadata,
  107. size: size,
  108. fieldname: req.body.fieldname,
  109. pathPrefix: `${req.companion.options.filePath}`,
  110. storage: redis.client(),
  111. s3: req.companion.s3Client ? {
  112. client: req.companion.s3Client,
  113. options: req.companion.options.providerOptions.s3
  114. } : null,
  115. headers: req.body.headers
  116. }
  117. }
  118. /**
  119. * the number of bytes written into the streams
  120. */
  121. get bytesWritten () {
  122. return this.writeStream.bytesWritten
  123. }
  124. /**
  125. * Validate the options passed down to the uplaoder
  126. *
  127. * @param {UploaderOptions} options
  128. * @returns {boolean}
  129. */
  130. validateOptions (options) {
  131. // s3 uploads don't require upload destination
  132. // validation, because the destination is determined
  133. // by the server's s3 config
  134. if (options.protocol === PROTOCOLS.s3Multipart) {
  135. return true
  136. }
  137. if (!options.endpoint && !options.uploadUrl) {
  138. this._errRespMessage = 'No destination specified'
  139. return false
  140. }
  141. const validatorOpts = { require_protocol: true, require_tld: !options.companionOptions.debug }
  142. return [options.endpoint, options.uploadUrl].every((url) => {
  143. if (url && !validator.isURL(url, validatorOpts)) {
  144. this._errRespMessage = 'Invalid destination url'
  145. return false
  146. }
  147. const allowedUrls = options.companionOptions.uploadUrls
  148. if (allowedUrls && url && !hasMatch(url, allowedUrls)) {
  149. this._errRespMessage = 'upload destination does not match any allowed destinations'
  150. return false
  151. }
  152. return true
  153. })
  154. }
  155. hasError () {
  156. return this._errRespMessage != null
  157. }
  158. /**
  159. * returns a substring of the token. Used as traceId for logging
  160. * we avoid using the entire token because this is meant to be a short term
  161. * access token between uppy client and companion websocket
  162. */
  163. get shortToken () {
  164. return Uploader.shortenToken(this.token)
  165. }
  166. /**
  167. *
  168. * @param {function} callback
  169. */
  170. onSocketReady (callback) {
  171. emitter().once(`connection:${this.token}`, () => callback())
  172. logger.debug('waiting for connection', 'uploader.socket.wait', this.shortToken)
  173. }
  174. cleanUp () {
  175. fs.unlink(this.path, (err) => {
  176. if (err) {
  177. logger.error(`cleanup failed for: ${this.path} err: ${err}`, 'uploader.cleanup.error')
  178. }
  179. })
  180. emitter().removeAllListeners(`pause:${this.token}`)
  181. emitter().removeAllListeners(`resume:${this.token}`)
  182. emitter().removeAllListeners(`cancel:${this.token}`)
  183. this.uploadStopped = true
  184. }
  185. /**
  186. *
  187. * @param {Buffer | Buffer[]} chunk
  188. */
  189. handleChunk (chunk) {
  190. if (this.uploadStopped) {
  191. return
  192. }
  193. // @todo a default protocol should not be set. We should ensure that the user specifies her protocol.
  194. const protocol = this.options.protocol || PROTOCOLS.multipart
  195. // The download has completed; close the file and start an upload if necessary.
  196. if (chunk === null) {
  197. this.writeStream.on('finish', () => {
  198. this.streamsEnded = true
  199. if (this.options.endpoint && protocol === PROTOCOLS.multipart) {
  200. this.uploadMultipart()
  201. }
  202. if (protocol === PROTOCOLS.tus && !this.tus) {
  203. return this.uploadTus()
  204. }
  205. })
  206. return this.endStreams()
  207. }
  208. this.writeStream.write(chunk, () => {
  209. logger.debug(`${this.bytesWritten} bytes`, 'uploader.download.progress', this.shortToken)
  210. if (protocol === PROTOCOLS.multipart || protocol === PROTOCOLS.tus) {
  211. return this.emitIllusiveProgress()
  212. }
  213. if (protocol === PROTOCOLS.s3Multipart && !this.s3Upload) {
  214. return this.uploadS3()
  215. }
  216. // @TODO disabling parallel uploads and downloads for now
  217. // if (!this.options.endpoint) return
  218. // if (protocol === PROTOCOLS.tus && !this.tus) {
  219. // return this.uploadTus()
  220. // }
  221. })
  222. }
  223. /**
  224. * @param {Buffer | Buffer[]} chunk
  225. * @param {function} cb
  226. */
  227. writeToStreams (chunk, cb) {
  228. const done = []
  229. const doneLength = this.duplexStream ? 2 : 1
  230. const onDone = () => {
  231. done.push(true)
  232. if (done.length >= doneLength) {
  233. cb()
  234. }
  235. }
  236. this.writeStream.write(chunk, onDone)
  237. if (this.duplexStream) {
  238. this.duplexStream.write(chunk, onDone)
  239. }
  240. }
  241. endStreams () {
  242. this.writeStream.end()
  243. if (this.duplexStream) {
  244. this.duplexStream.end()
  245. }
  246. }
  247. getResponse () {
  248. if (this._errRespMessage) {
  249. return { body: this._errRespMessage, status: 400 }
  250. }
  251. return { body: { token: this.token }, status: 200 }
  252. }
  253. /**
  254. * @typedef {{action: string, payload: object}} State
  255. * @param {State} state
  256. */
  257. saveState (state) {
  258. if (!this.storage) return
  259. this.storage.set(`${Uploader.STORAGE_PREFIX}:${this.token}`, jsonStringify(state))
  260. }
  261. /**
  262. * This method emits upload progress but also creates an "upload progress" illusion
  263. * for the waiting period while only download is happening. Hence, it combines both
  264. * download and upload into an upload progress.
  265. * @see emitProgress
  266. * @param {number=} bytesUploaded the bytes actually Uploaded so far
  267. */
  268. emitIllusiveProgress (bytesUploaded = 0) {
  269. if (this._paused) {
  270. return
  271. }
  272. let bytesTotal = this.streamsEnded ? this.bytesWritten : this.options.size
  273. if (!this.streamsEnded) {
  274. bytesTotal = Math.max(bytesTotal, this.bytesWritten)
  275. }
  276. // for a 10MB file, 10MB of download will account for 5MB upload progress
  277. // and 10MB of actual upload will account for the other 5MB upload progress.
  278. const illusiveBytesUploaded = (this.bytesWritten / 2) + (bytesUploaded / 2)
  279. logger.debug(
  280. `${bytesUploaded} ${illusiveBytesUploaded} ${bytesTotal}`,
  281. 'uploader.illusive.progress',
  282. this.shortToken
  283. )
  284. this.emitProgress(illusiveBytesUploaded, bytesTotal)
  285. }
  286. /**
  287. *
  288. * @param {number} bytesUploaded
  289. * @param {number | null} bytesTotal
  290. */
  291. emitProgress (bytesUploaded, bytesTotal) {
  292. bytesTotal = bytesTotal || this.options.size
  293. if (this.tus && this.tus.options.uploadLengthDeferred && this.streamsEnded) {
  294. bytesTotal = this.bytesWritten
  295. }
  296. const percentage = (bytesUploaded / bytesTotal * 100)
  297. const formatPercentage = percentage.toFixed(2)
  298. logger.debug(
  299. `${bytesUploaded} ${bytesTotal} ${formatPercentage}%`,
  300. 'uploader.upload.progress',
  301. this.shortToken
  302. )
  303. const dataToEmit = {
  304. action: 'progress',
  305. payload: { progress: formatPercentage, bytesUploaded, bytesTotal }
  306. }
  307. this.saveState(dataToEmit)
  308. // avoid flooding the client with progress events.
  309. const roundedPercentage = Math.floor(percentage)
  310. if (this.emittedProgress !== roundedPercentage) {
  311. this.emittedProgress = roundedPercentage
  312. emitter().emit(this.token, dataToEmit)
  313. }
  314. }
  315. /**
  316. *
  317. * @param {string} url
  318. * @param {object} extraData
  319. */
  320. emitSuccess (url, extraData = {}) {
  321. const emitData = {
  322. action: 'success',
  323. payload: Object.assign(extraData, { complete: true, url })
  324. }
  325. this.saveState(emitData)
  326. emitter().emit(this.token, emitData)
  327. }
  328. /**
  329. *
  330. * @param {Error} err
  331. * @param {object=} extraData
  332. */
  333. emitError (err, extraData = {}) {
  334. const serializedErr = serializeError(err)
  335. // delete stack to avoid sending server info to client
  336. delete serializedErr.stack
  337. const dataToEmit = {
  338. action: 'error',
  339. payload: Object.assign(extraData, { error: serializedErr })
  340. }
  341. this.saveState(dataToEmit)
  342. emitter().emit(this.token, dataToEmit)
  343. }
  344. /**
  345. * start the tus upload
  346. */
  347. uploadTus () {
  348. const file = fs.createReadStream(this.path)
  349. const uploader = this
  350. // @ts-ignore
  351. this.tus = new tus.Upload(file, {
  352. endpoint: this.options.endpoint,
  353. uploadUrl: this.options.uploadUrl,
  354. // @ts-ignore
  355. uploadLengthDeferred: false,
  356. resume: true,
  357. retryDelays: [0, 1000, 3000, 5000],
  358. uploadSize: this.bytesWritten,
  359. metadata: Object.assign(
  360. {
  361. // file name and type as required by the tusd tus server
  362. // https://github.com/tus/tusd/blob/5b376141903c1fd64480c06dde3dfe61d191e53d/unrouted_handler.go#L614-L646
  363. filename: this.uploadFileName,
  364. filetype: this.options.metadata.type
  365. }, this.options.metadata
  366. ),
  367. /**
  368. *
  369. * @param {Error} error
  370. */
  371. onError (error) {
  372. logger.error(error, 'uploader.tus.error')
  373. uploader.emitError(error)
  374. },
  375. /**
  376. *
  377. * @param {number} bytesUploaded
  378. * @param {number} bytesTotal
  379. */
  380. onProgress (bytesUploaded, bytesTotal) {
  381. uploader.emitIllusiveProgress(bytesUploaded)
  382. },
  383. onSuccess () {
  384. uploader.emitSuccess(uploader.tus.url)
  385. uploader.cleanUp()
  386. }
  387. })
  388. if (!this._paused) {
  389. this.tus.start()
  390. }
  391. }
  392. uploadMultipart () {
  393. const file = fs.createReadStream(this.path)
  394. // upload progress
  395. let bytesUploaded = 0
  396. file.on('data', (data) => {
  397. bytesUploaded += data.length
  398. this.emitIllusiveProgress(bytesUploaded)
  399. })
  400. const formData = Object.assign(
  401. {},
  402. this.options.metadata,
  403. {
  404. [this.options.fieldname]: {
  405. value: file,
  406. options: {
  407. filename: this.uploadFileName,
  408. contentType: this.options.metadata.type
  409. }
  410. }
  411. }
  412. )
  413. const headers = headerSanitize(this.options.headers)
  414. request.post({ url: this.options.endpoint, headers, formData, encoding: null }, (error, response, body) => {
  415. if (error) {
  416. logger.error(error, 'upload.multipart.error')
  417. this.emitError(error)
  418. return
  419. }
  420. const headers = response.headers
  421. // remove browser forbidden headers
  422. delete headers['set-cookie']
  423. delete headers['set-cookie2']
  424. const respObj = {
  425. responseText: body.toString(),
  426. status: response.statusCode,
  427. statusText: response.statusMessage,
  428. headers
  429. }
  430. if (response.statusCode >= 400) {
  431. logger.error(`upload failed with status: ${response.statusCode}`, 'upload.multipart.error')
  432. this.emitError(new Error(response.statusMessage), respObj)
  433. } else if (bytesUploaded !== this.bytesWritten && bytesUploaded !== this.options.size) {
  434. const errMsg = `uploaded only ${bytesUploaded} of ${this.bytesWritten} with status: ${response.statusCode}`
  435. logger.error(errMsg, 'upload.multipart.mismatch.error')
  436. this.emitError(new Error(errMsg))
  437. } else {
  438. this.emitSuccess(null, { response: respObj })
  439. }
  440. this.cleanUp()
  441. })
  442. }
  443. /**
  444. * Upload the file to S3 while it is still being downloaded.
  445. */
  446. uploadS3 () {
  447. const file = createTailReadStream(this.path, {
  448. tail: true
  449. })
  450. this.writeStream.on('finish', () => {
  451. file.close()
  452. })
  453. return this._uploadS3(file)
  454. }
  455. /**
  456. * Upload a stream to S3.
  457. */
  458. _uploadS3 (stream) {
  459. if (!this.options.s3) {
  460. this.emitError(new Error('The S3 client is not configured on this companion instance.'))
  461. return
  462. }
  463. const filename = this.options.metadata.name || path.basename(this.path)
  464. const { client, options } = this.options.s3
  465. const upload = client.upload({
  466. Bucket: options.bucket,
  467. Key: options.getKey(null, filename),
  468. ACL: options.acl,
  469. ContentType: this.options.metadata.type,
  470. Body: stream
  471. })
  472. this.s3Upload = upload
  473. upload.on('httpUploadProgress', ({ loaded, total }) => {
  474. this.emitProgress(loaded, total)
  475. })
  476. upload.send((error, data) => {
  477. this.s3Upload = null
  478. if (error) {
  479. this.emitError(error)
  480. } else {
  481. const url = data && data.Location ? data.Location : null
  482. this.emitSuccess(url, {
  483. response: {
  484. responseText: JSON.stringify(data),
  485. headers: {
  486. 'content-type': 'application/json'
  487. }
  488. }
  489. })
  490. }
  491. this.cleanUp()
  492. })
  493. }
  494. }
  495. Uploader.FILE_NAME_PREFIX = 'uppy-file'
  496. Uploader.STORAGE_PREFIX = 'companion'
  497. module.exports = Uploader