Uploader.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. const fs = require('fs')
  2. const path = require('path')
  3. const tus = require('tus-js-client')
  4. const uuid = require('uuid')
  5. const isObject = require('isobject')
  6. const validator = require('validator')
  7. const request = require('request')
  8. const emitter = require('./emitter')
  9. const serializeError = require('serialize-error')
  10. const { jsonStringify, hasMatch } = require('./helpers/utils')
  11. const logger = require('./logger')
  12. const headerSanitize = require('./header-blacklist')
  13. const redis = require('./redis')
  14. const DEFAULT_FIELD_NAME = 'files[]'
  15. const PROTOCOLS = Object.freeze({
  16. multipart: 'multipart',
  17. s3Multipart: 's3-multipart',
  18. tus: 'tus'
  19. })
  20. class Uploader {
  21. /**
  22. * Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart)
  23. * For tus uploads, the deferredLength option is enabled, because file size value can be unreliable
  24. * for some providers (Instagram particularly)
  25. *
  26. * @typedef {object} UploaderOptions
  27. * @property {string} endpoint
  28. * @property {string=} uploadUrl
  29. * @property {string} protocol
  30. * @property {number} size
  31. * @property {string=} fieldname
  32. * @property {string} pathPrefix
  33. * @property {any=} s3
  34. * @property {any} metadata
  35. * @property {any} companionOptions
  36. * @property {any=} storage
  37. * @property {any=} headers
  38. * @property {string=} httpMethod
  39. *
  40. * @param {UploaderOptions} options
  41. */
  42. constructor (options) {
  43. if (!this.validateOptions(options)) {
  44. logger.debug(this._errRespMessage, 'uploader.validator.fail')
  45. return
  46. }
  47. this.options = options
  48. this.token = uuid.v4()
  49. this.path = `${this.options.pathPrefix}/${Uploader.FILE_NAME_PREFIX}-${this.token}`
  50. this.options.metadata = this.options.metadata || {}
  51. this.options.fieldname = this.options.fieldname || DEFAULT_FIELD_NAME
  52. this.uploadFileName = this.options.metadata.name || path.basename(this.path)
  53. this.streamsEnded = false
  54. this.uploadStopped = false
  55. this.duplexStream = null
  56. // @TODO disabling parallel uploads and downloads for now
  57. // if (this.options.protocol === PROTOCOLS.tus) {
  58. // this.duplexStream = new stream.PassThrough()
  59. // .on('error', (err) => logger.error(`${this.shortToken} ${err}`, 'uploader.duplex.error'))
  60. // }
  61. this.writeStream = fs.createWriteStream(this.path, { mode: 0o666 }) // no executable files
  62. .on('error', (err) => logger.error(`${err}`, 'uploader.write.error', this.shortToken))
  63. /** @type {number} */
  64. this.emittedProgress = 0
  65. this.storage = options.storage
  66. this._paused = false
  67. if (this.options.protocol === PROTOCOLS.tus) {
  68. emitter().on(`pause:${this.token}`, () => {
  69. this._paused = true
  70. if (this.tus) {
  71. this.tus.abort()
  72. }
  73. })
  74. emitter().on(`resume:${this.token}`, () => {
  75. this._paused = false
  76. if (this.tus) {
  77. this.tus.start()
  78. }
  79. })
  80. emitter().on(`cancel:${this.token}`, () => {
  81. this._paused = true
  82. if (this.tus) {
  83. const shouldTerminate = !!this.tus.url
  84. this.tus.abort(shouldTerminate)
  85. }
  86. this.cleanUp()
  87. })
  88. }
  89. }
  90. /**
  91. * returns a substring of the token. Used as traceId for logging
  92. * we avoid using the entire token because this is meant to be a short term
  93. * access token between uppy client and companion websocket
  94. * @param {string} token the token to Shorten
  95. * @returns {string}
  96. */
  97. static shortenToken (token) {
  98. return token.substring(0, 8)
  99. }
  100. static reqToOptions (req, size) {
  101. return {
  102. companionOptions: req.companion.options,
  103. endpoint: req.body.endpoint,
  104. uploadUrl: req.body.uploadUrl,
  105. protocol: req.body.protocol,
  106. metadata: req.body.metadata,
  107. httpMethod: req.body.httpMethod,
  108. size: size,
  109. fieldname: req.body.fieldname,
  110. pathPrefix: `${req.companion.options.filePath}`,
  111. storage: redis.client(),
  112. s3: req.companion.s3Client ? {
  113. client: req.companion.s3Client,
  114. options: req.companion.options.providerOptions.s3
  115. } : null,
  116. headers: req.body.headers
  117. }
  118. }
  119. /**
  120. * the number of bytes written into the streams
  121. */
  122. get bytesWritten () {
  123. return this.writeStream.bytesWritten
  124. }
  125. /**
  126. * Validate the options passed down to the uplaoder
  127. *
  128. * @param {UploaderOptions} options
  129. * @returns {boolean}
  130. */
  131. validateOptions (options) {
  132. // validate HTTP Method
  133. if (options.httpMethod) {
  134. if (typeof options.httpMethod !== 'string') {
  135. this._errRespMessage = 'unsupported HTTP METHOD specified'
  136. return false
  137. }
  138. const method = options.httpMethod.toLowerCase()
  139. if (method !== 'put' && method !== 'post') {
  140. this._errRespMessage = 'unsupported HTTP METHOD specified'
  141. return false
  142. }
  143. }
  144. // validate fieldname
  145. if (options.fieldname && typeof options.fieldname !== 'string') {
  146. this._errRespMessage = 'fieldname must be a string'
  147. return false
  148. }
  149. // validate metadata
  150. if (options.metadata && !isObject(options.metadata)) {
  151. this._errRespMessage = 'metadata must be an object'
  152. return false
  153. }
  154. // validate headers
  155. if (options.headers && !isObject(options.headers)) {
  156. this._errRespMessage = 'headers must be an object'
  157. return false
  158. }
  159. // validate protocol
  160. // @todo this validation should not be conditional once the protocol field is mandatory
  161. if (options.protocol && !Object.prototype.hasOwnProperty.call(PROTOCOLS, options.protocol)) {
  162. this._errRespMessage = 'unsupported protocol specified'
  163. return false
  164. }
  165. // s3 uploads don't require upload destination
  166. // validation, because the destination is determined
  167. // by the server's s3 config
  168. if (options.protocol === PROTOCOLS.s3Multipart) {
  169. return true
  170. }
  171. if (!options.endpoint && !options.uploadUrl) {
  172. this._errRespMessage = 'no destination specified'
  173. return false
  174. }
  175. const validatorOpts = { require_protocol: true, require_tld: !options.companionOptions.debug }
  176. return [options.endpoint, options.uploadUrl].every((url) => {
  177. if (url && !validator.isURL(url, validatorOpts)) {
  178. this._errRespMessage = 'invalid destination url'
  179. return false
  180. }
  181. const allowedUrls = options.companionOptions.uploadUrls
  182. if (allowedUrls && url && !hasMatch(url, allowedUrls)) {
  183. this._errRespMessage = 'upload destination does not match any allowed destinations'
  184. return false
  185. }
  186. return true
  187. })
  188. }
  189. hasError () {
  190. return this._errRespMessage != null
  191. }
  192. /**
  193. * returns a substring of the token. Used as traceId for logging
  194. * we avoid using the entire token because this is meant to be a short term
  195. * access token between uppy client and companion websocket
  196. */
  197. get shortToken () {
  198. return Uploader.shortenToken(this.token)
  199. }
  200. /**
  201. *
  202. * @param {function} callback
  203. */
  204. onSocketReady (callback) {
  205. emitter().once(`connection:${this.token}`, () => callback())
  206. logger.debug('waiting for connection', 'uploader.socket.wait', this.shortToken)
  207. }
  208. cleanUp () {
  209. fs.unlink(this.path, (err) => {
  210. if (err) {
  211. logger.error(`cleanup failed for: ${this.path} err: ${err}`, 'uploader.cleanup.error')
  212. }
  213. })
  214. emitter().removeAllListeners(`pause:${this.token}`)
  215. emitter().removeAllListeners(`resume:${this.token}`)
  216. emitter().removeAllListeners(`cancel:${this.token}`)
  217. this.uploadStopped = true
  218. }
  219. /**
  220. *
  221. * @param {Error} err
  222. * @param {string | Buffer | Buffer[]} chunk
  223. */
  224. handleChunk (err, chunk) {
  225. if (this.uploadStopped) {
  226. return
  227. }
  228. if (err) {
  229. logger.error(err, 'uploader.download.error', this.shortToken)
  230. this.emitError(err)
  231. this.cleanUp()
  232. return
  233. }
  234. // @todo a default protocol should not be set. We should ensure that the user specifies their protocol.
  235. const protocol = this.options.protocol || PROTOCOLS.multipart
  236. // The download has completed; close the file and start an upload if necessary.
  237. if (chunk === null) {
  238. this.writeStream.on('finish', () => {
  239. this.streamsEnded = true
  240. switch (protocol) {
  241. case PROTOCOLS.multipart:
  242. if (this.options.endpoint) {
  243. this.uploadMultipart()
  244. }
  245. break
  246. case PROTOCOLS.s3Multipart:
  247. if (!this.s3Upload) {
  248. this.uploadS3Multipart()
  249. } else {
  250. logger.warn('handleChunk() called multiple times', 'uploader.s3.duplicate', this.shortToken)
  251. }
  252. break
  253. case PROTOCOLS.tus:
  254. if (!this.tus) {
  255. this.uploadTus()
  256. } else {
  257. logger.warn('handleChunk() called multiple times', 'uploader.tus.duplicate', this.shortToken)
  258. }
  259. break
  260. }
  261. })
  262. return this.endStreams()
  263. }
  264. this.writeStream.write(chunk, () => {
  265. logger.debug(`${this.bytesWritten} bytes`, 'uploader.download.progress', this.shortToken)
  266. return this.emitIllusiveProgress()
  267. })
  268. }
  269. /**
  270. * @param {Buffer | Buffer[]} chunk
  271. * @param {function} cb
  272. */
  273. writeToStreams (chunk, cb) {
  274. const done = []
  275. const doneLength = this.duplexStream ? 2 : 1
  276. const onDone = () => {
  277. done.push(true)
  278. if (done.length >= doneLength) {
  279. cb()
  280. }
  281. }
  282. this.writeStream.write(chunk, onDone)
  283. if (this.duplexStream) {
  284. this.duplexStream.write(chunk, onDone)
  285. }
  286. }
  287. endStreams () {
  288. this.writeStream.end()
  289. if (this.duplexStream) {
  290. this.duplexStream.end()
  291. }
  292. }
  293. getResponse () {
  294. if (this._errRespMessage) {
  295. return { body: { error: this._errRespMessage }, status: 400 }
  296. }
  297. return { body: { token: this.token }, status: 200 }
  298. }
  299. /**
  300. * @typedef {{action: string, payload: object}} State
  301. * @param {State} state
  302. */
  303. saveState (state) {
  304. if (!this.storage) return
  305. this.storage.set(`${Uploader.STORAGE_PREFIX}:${this.token}`, jsonStringify(state))
  306. }
  307. /**
  308. * This method emits upload progress but also creates an "upload progress" illusion
  309. * for the waiting period while only download is happening. Hence, it combines both
  310. * download and upload into an upload progress.
  311. * @see emitProgress
  312. * @param {number=} bytesUploaded the bytes actually Uploaded so far
  313. */
  314. emitIllusiveProgress (bytesUploaded = 0) {
  315. if (this._paused) {
  316. return
  317. }
  318. let bytesTotal = this.streamsEnded ? this.bytesWritten : this.options.size
  319. if (!this.streamsEnded) {
  320. bytesTotal = Math.max(bytesTotal, this.bytesWritten)
  321. }
  322. // for a 10MB file, 10MB of download will account for 5MB upload progress
  323. // and 10MB of actual upload will account for the other 5MB upload progress.
  324. const illusiveBytesUploaded = (this.bytesWritten / 2) + (bytesUploaded / 2)
  325. logger.debug(
  326. `${bytesUploaded} ${illusiveBytesUploaded} ${bytesTotal}`,
  327. 'uploader.illusive.progress',
  328. this.shortToken
  329. )
  330. this.emitProgress(illusiveBytesUploaded, bytesTotal)
  331. }
  332. /**
  333. *
  334. * @param {number} bytesUploaded
  335. * @param {number | null} bytesTotal
  336. */
  337. emitProgress (bytesUploaded, bytesTotal) {
  338. bytesTotal = bytesTotal || this.options.size
  339. if (this.tus && this.tus.options.uploadLengthDeferred && this.streamsEnded) {
  340. bytesTotal = this.bytesWritten
  341. }
  342. const percentage = (bytesUploaded / bytesTotal * 100)
  343. const formatPercentage = percentage.toFixed(2)
  344. logger.debug(
  345. `${bytesUploaded} ${bytesTotal} ${formatPercentage}%`,
  346. 'uploader.upload.progress',
  347. this.shortToken
  348. )
  349. const dataToEmit = {
  350. action: 'progress',
  351. payload: { progress: formatPercentage, bytesUploaded, bytesTotal }
  352. }
  353. this.saveState(dataToEmit)
  354. // avoid flooding the client with progress events.
  355. const roundedPercentage = Math.floor(percentage)
  356. if (this.emittedProgress !== roundedPercentage) {
  357. this.emittedProgress = roundedPercentage
  358. emitter().emit(this.token, dataToEmit)
  359. }
  360. }
  361. /**
  362. *
  363. * @param {string} url
  364. * @param {object} extraData
  365. */
  366. emitSuccess (url, extraData = {}) {
  367. const emitData = {
  368. action: 'success',
  369. payload: Object.assign(extraData, { complete: true, url })
  370. }
  371. this.saveState(emitData)
  372. emitter().emit(this.token, emitData)
  373. }
  374. /**
  375. *
  376. * @param {Error} err
  377. * @param {object=} extraData
  378. */
  379. emitError (err, extraData = {}) {
  380. const serializedErr = serializeError(err)
  381. // delete stack to avoid sending server info to client
  382. delete serializedErr.stack
  383. const dataToEmit = {
  384. action: 'error',
  385. payload: Object.assign(extraData, { error: serializedErr })
  386. }
  387. this.saveState(dataToEmit)
  388. emitter().emit(this.token, dataToEmit)
  389. }
  390. /**
  391. * start the tus upload
  392. */
  393. uploadTus () {
  394. const file = fs.createReadStream(this.path)
  395. const uploader = this
  396. // @ts-ignore
  397. this.tus = new tus.Upload(file, {
  398. endpoint: this.options.endpoint,
  399. uploadUrl: this.options.uploadUrl,
  400. // @ts-ignore
  401. uploadLengthDeferred: false,
  402. resume: true,
  403. retryDelays: [0, 1000, 3000, 5000],
  404. uploadSize: this.bytesWritten,
  405. metadata: Object.assign(
  406. {
  407. // file name and type as required by the tusd tus server
  408. // https://github.com/tus/tusd/blob/5b376141903c1fd64480c06dde3dfe61d191e53d/unrouted_handler.go#L614-L646
  409. filename: this.uploadFileName,
  410. filetype: this.options.metadata.type
  411. }, this.options.metadata
  412. ),
  413. /**
  414. *
  415. * @param {Error} error
  416. */
  417. onError (error) {
  418. logger.error(error, 'uploader.tus.error')
  419. uploader.emitError(error)
  420. },
  421. /**
  422. *
  423. * @param {number} bytesUploaded
  424. * @param {number} bytesTotal
  425. */
  426. onProgress (bytesUploaded, bytesTotal) {
  427. uploader.emitIllusiveProgress(bytesUploaded)
  428. },
  429. onSuccess () {
  430. uploader.emitSuccess(uploader.tus.url)
  431. uploader.cleanUp()
  432. }
  433. })
  434. if (!this._paused) {
  435. this.tus.start()
  436. }
  437. }
  438. uploadMultipart () {
  439. const file = fs.createReadStream(this.path)
  440. // upload progress
  441. let bytesUploaded = 0
  442. file.on('data', (data) => {
  443. bytesUploaded += data.length
  444. this.emitIllusiveProgress(bytesUploaded)
  445. })
  446. const formData = Object.assign(
  447. {},
  448. this.options.metadata,
  449. {
  450. [this.options.fieldname]: {
  451. value: file,
  452. options: {
  453. filename: this.uploadFileName,
  454. contentType: this.options.metadata.type
  455. }
  456. }
  457. }
  458. )
  459. const httpMethod = (this.options.httpMethod || '').toLowerCase() === 'put' ? 'put' : 'post'
  460. const headers = headerSanitize(this.options.headers)
  461. request[httpMethod]({ url: this.options.endpoint, headers, formData, encoding: null }, (error, response, body) => {
  462. if (error) {
  463. logger.error(error, 'upload.multipart.error')
  464. this.emitError(error)
  465. return
  466. }
  467. const headers = response.headers
  468. // remove browser forbidden headers
  469. delete headers['set-cookie']
  470. delete headers['set-cookie2']
  471. const respObj = {
  472. responseText: body.toString(),
  473. status: response.statusCode,
  474. statusText: response.statusMessage,
  475. headers
  476. }
  477. if (response.statusCode >= 400) {
  478. logger.error(`upload failed with status: ${response.statusCode}`, 'upload.multipart.error')
  479. this.emitError(new Error(response.statusMessage), respObj)
  480. } else if (bytesUploaded !== this.bytesWritten && bytesUploaded !== this.options.size) {
  481. const errMsg = `uploaded only ${bytesUploaded} of ${this.bytesWritten} with status: ${response.statusCode}`
  482. logger.error(errMsg, 'upload.multipart.mismatch.error')
  483. this.emitError(new Error(errMsg))
  484. } else {
  485. this.emitSuccess(null, { response: respObj })
  486. }
  487. this.cleanUp()
  488. })
  489. }
  490. /**
  491. * Upload the file to S3 using a Multipart upload.
  492. */
  493. uploadS3Multipart () {
  494. const file = fs.createReadStream(this.path)
  495. return this._uploadS3MultipartStream(file)
  496. }
  497. /**
  498. * Upload a stream to S3.
  499. */
  500. _uploadS3MultipartStream (stream) {
  501. if (!this.options.s3) {
  502. this.emitError(new Error('The S3 client is not configured on this companion instance.'))
  503. return
  504. }
  505. const filename = this.options.metadata.name || path.basename(this.path)
  506. const { client, options } = this.options.s3
  507. const upload = client.upload({
  508. Bucket: options.bucket,
  509. Key: options.getKey(null, filename, this.options.metadata),
  510. ACL: options.acl,
  511. ContentType: this.options.metadata.type,
  512. Body: stream
  513. })
  514. this.s3Upload = upload
  515. upload.on('httpUploadProgress', ({ loaded, total }) => {
  516. this.emitProgress(loaded, total)
  517. })
  518. upload.send((error, data) => {
  519. this.s3Upload = null
  520. if (error) {
  521. this.emitError(error)
  522. } else {
  523. const url = data && data.Location ? data.Location : null
  524. this.emitSuccess(url, {
  525. response: {
  526. responseText: JSON.stringify(data),
  527. headers: {
  528. 'content-type': 'application/json'
  529. }
  530. }
  531. })
  532. }
  533. this.cleanUp()
  534. })
  535. }
  536. }
  537. Uploader.FILE_NAME_PREFIX = 'uppy-file'
  538. Uploader.STORAGE_PREFIX = 'companion'
  539. module.exports = Uploader