Uploader.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. const fs = require('fs')
  2. const path = require('path')
  3. const tus = require('tus-js-client')
  4. const uuid = require('uuid')
  5. const isObject = require('isobject')
  6. const validator = require('validator')
  7. const request = require('request')
  8. const serializeError = require('serialize-error')
  9. const emitter = require('./emitter')
  10. const { jsonStringify, hasMatch } = require('./helpers/utils')
  11. const logger = require('./logger')
  12. const headerSanitize = require('./header-blacklist')
  13. const redis = require('./redis')
  14. const DEFAULT_FIELD_NAME = 'files[]'
  15. const PROTOCOLS = Object.freeze({
  16. multipart: 'multipart',
  17. s3Multipart: 's3-multipart',
  18. tus: 'tus',
  19. })
  20. class Uploader {
  21. /**
  22. * Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart)
  23. * For tus uploads, the deferredLength option is enabled, because file size value can be unreliable
  24. * for some providers (Instagram particularly)
  25. *
  26. * @typedef {object} UploaderOptions
  27. * @property {string} endpoint
  28. * @property {string=} uploadUrl
  29. * @property {string} protocol
  30. * @property {number} size
  31. * @property {string=} fieldname
  32. * @property {string} pathPrefix
  33. * @property {any=} s3
  34. * @property {any} metadata
  35. * @property {any} companionOptions
  36. * @property {any=} storage
  37. * @property {any=} headers
  38. * @property {string=} httpMethod
  39. * @property {boolean=} useFormData
  40. *
  41. * @param {UploaderOptions} options
  42. */
  43. constructor (options) {
  44. if (!this.validateOptions(options)) {
  45. logger.debug(this._errRespMessage, 'uploader.validator.fail')
  46. return
  47. }
  48. this.options = options
  49. this.token = uuid.v4()
  50. this.path = `${this.options.pathPrefix}/${Uploader.FILE_NAME_PREFIX}-${this.token}`
  51. this.options.metadata = this.options.metadata || {}
  52. this.options.fieldname = this.options.fieldname || DEFAULT_FIELD_NAME
  53. this.uploadFileName = this.options.metadata.name || path.basename(this.path)
  54. this.streamsEnded = false
  55. this.uploadStopped = false
  56. this.writeStream = fs.createWriteStream(this.path, { mode: 0o666 }) // no executable files
  57. .on('error', (err) => logger.error(`${err}`, 'uploader.write.error', this.shortToken))
  58. /** @type {number} */
  59. this.emittedProgress = 0
  60. this.storage = options.storage
  61. this._paused = false
  62. if (this.options.protocol === PROTOCOLS.tus) {
  63. emitter().on(`pause:${this.token}`, () => {
  64. this._paused = true
  65. if (this.tus) {
  66. this.tus.abort()
  67. }
  68. })
  69. emitter().on(`resume:${this.token}`, () => {
  70. this._paused = false
  71. if (this.tus) {
  72. this.tus.start()
  73. }
  74. })
  75. emitter().on(`cancel:${this.token}`, () => {
  76. this._paused = true
  77. if (this.tus) {
  78. const shouldTerminate = !!this.tus.url
  79. this.tus.abort(shouldTerminate).catch(() => {})
  80. }
  81. this.cleanUp()
  82. })
  83. }
  84. }
  85. /**
  86. * returns a substring of the token. Used as traceId for logging
  87. * we avoid using the entire token because this is meant to be a short term
  88. * access token between uppy client and companion websocket
  89. *
  90. * @param {string} token the token to Shorten
  91. * @returns {string}
  92. */
  93. static shortenToken (token) {
  94. return token.substring(0, 8)
  95. }
  96. static reqToOptions (req, size) {
  97. const useFormDataIsSet = Object.prototype.hasOwnProperty.call(req.body, 'useFormData')
  98. const useFormData = useFormDataIsSet ? req.body.useFormData : true
  99. return {
  100. companionOptions: req.companion.options,
  101. endpoint: req.body.endpoint,
  102. uploadUrl: req.body.uploadUrl,
  103. protocol: req.body.protocol,
  104. metadata: req.body.metadata,
  105. httpMethod: req.body.httpMethod,
  106. useFormData,
  107. size,
  108. fieldname: req.body.fieldname,
  109. pathPrefix: `${req.companion.options.filePath}`,
  110. storage: redis.client(),
  111. s3: req.companion.s3Client ? {
  112. client: req.companion.s3Client,
  113. options: req.companion.options.providerOptions.s3,
  114. } : null,
  115. headers: req.body.headers,
  116. }
  117. }
  118. /**
  119. * the number of bytes written into the streams
  120. */
  121. get bytesWritten () {
  122. return this.writeStream.bytesWritten
  123. }
  124. /**
  125. * Validate the options passed down to the uplaoder
  126. *
  127. * @param {UploaderOptions} options
  128. * @returns {boolean}
  129. */
  130. validateOptions (options) {
  131. // validate HTTP Method
  132. if (options.httpMethod) {
  133. if (typeof options.httpMethod !== 'string') {
  134. this._errRespMessage = 'unsupported HTTP METHOD specified'
  135. return false
  136. }
  137. const method = options.httpMethod.toLowerCase()
  138. if (method !== 'put' && method !== 'post') {
  139. this._errRespMessage = 'unsupported HTTP METHOD specified'
  140. return false
  141. }
  142. }
  143. // validate fieldname
  144. if (options.fieldname && typeof options.fieldname !== 'string') {
  145. this._errRespMessage = 'fieldname must be a string'
  146. return false
  147. }
  148. // validate metadata
  149. if (options.metadata && !isObject(options.metadata)) {
  150. this._errRespMessage = 'metadata must be an object'
  151. return false
  152. }
  153. // validate headers
  154. if (options.headers && !isObject(options.headers)) {
  155. this._errRespMessage = 'headers must be an object'
  156. return false
  157. }
  158. // validate protocol
  159. // @todo this validation should not be conditional once the protocol field is mandatory
  160. if (options.protocol && !Object.keys(PROTOCOLS).some((key) => PROTOCOLS[key] === options.protocol)) {
  161. this._errRespMessage = 'unsupported protocol specified'
  162. return false
  163. }
  164. // s3 uploads don't require upload destination
  165. // validation, because the destination is determined
  166. // by the server's s3 config
  167. if (options.protocol === PROTOCOLS.s3Multipart) {
  168. return true
  169. }
  170. if (!options.endpoint && !options.uploadUrl) {
  171. this._errRespMessage = 'no destination specified'
  172. return false
  173. }
  174. const validatorOpts = { require_protocol: true, require_tld: !options.companionOptions.debug }
  175. return [options.endpoint, options.uploadUrl].every((url) => {
  176. if (url && !validator.isURL(url, validatorOpts)) {
  177. this._errRespMessage = 'invalid destination url'
  178. return false
  179. }
  180. const allowedUrls = options.companionOptions.uploadUrls
  181. if (allowedUrls && url && !hasMatch(url, allowedUrls)) {
  182. this._errRespMessage = 'upload destination does not match any allowed destinations'
  183. return false
  184. }
  185. return true
  186. })
  187. }
  188. hasError () {
  189. return this._errRespMessage != null
  190. }
  191. /**
  192. * returns a substring of the token. Used as traceId for logging
  193. * we avoid using the entire token because this is meant to be a short term
  194. * access token between uppy client and companion websocket
  195. */
  196. get shortToken () {
  197. return Uploader.shortenToken(this.token)
  198. }
  199. /**
  200. *
  201. * @param {Function} callback
  202. */
  203. onSocketReady (callback) {
  204. emitter().once(`connection:${this.token}`, () => callback())
  205. logger.debug('waiting for connection', 'uploader.socket.wait', this.shortToken)
  206. }
  207. cleanUp () {
  208. fs.unlink(this.path, (err) => {
  209. if (err) {
  210. logger.error(`cleanup failed for: ${this.path} err: ${err}`, 'uploader.cleanup.error')
  211. }
  212. })
  213. emitter().removeAllListeners(`pause:${this.token}`)
  214. emitter().removeAllListeners(`resume:${this.token}`)
  215. emitter().removeAllListeners(`cancel:${this.token}`)
  216. this.uploadStopped = true
  217. }
  218. /**
  219. *
  220. * @param {Error} err
  221. * @param {string | Buffer | Buffer[]} chunk
  222. */
  223. handleChunk (err, chunk) {
  224. if (this.uploadStopped) {
  225. return
  226. }
  227. if (err) {
  228. logger.error(err, 'uploader.download.error', this.shortToken)
  229. this.emitError(err)
  230. this.cleanUp()
  231. return
  232. }
  233. // @todo a default protocol should not be set. We should ensure that the user specifies their protocol.
  234. const protocol = this.options.protocol || PROTOCOLS.multipart
  235. // The download has completed; close the file and start an upload if necessary.
  236. if (chunk === null) {
  237. this.writeStream.on('finish', () => {
  238. this.streamsEnded = true
  239. switch (protocol) {
  240. case PROTOCOLS.multipart:
  241. if (this.options.endpoint) {
  242. this.uploadMultipart()
  243. }
  244. break
  245. case PROTOCOLS.s3Multipart:
  246. if (!this.s3Upload) {
  247. this.uploadS3Multipart()
  248. } else {
  249. logger.warn('handleChunk() called multiple times', 'uploader.s3.duplicate', this.shortToken)
  250. }
  251. break
  252. case PROTOCOLS.tus:
  253. if (!this.tus) {
  254. this.uploadTus()
  255. } else {
  256. logger.warn('handleChunk() called multiple times', 'uploader.tus.duplicate', this.shortToken)
  257. }
  258. break
  259. }
  260. })
  261. return this.endStreams()
  262. }
  263. this.writeStream.write(chunk, () => {
  264. logger.debug(`${this.bytesWritten} bytes`, 'uploader.download.progress', this.shortToken)
  265. return this.emitIllusiveProgress()
  266. })
  267. }
  268. endStreams () {
  269. this.writeStream.end()
  270. }
  271. getResponse () {
  272. if (this._errRespMessage) {
  273. return { body: { message: this._errRespMessage }, status: 400 }
  274. }
  275. return { body: { token: this.token }, status: 200 }
  276. }
  277. /**
  278. * @typedef {{action: string, payload: object}} State
  279. * @param {State} state
  280. */
  281. saveState (state) {
  282. if (!this.storage) return
  283. this.storage.set(`${Uploader.STORAGE_PREFIX}:${this.token}`, jsonStringify(state))
  284. }
  285. /**
  286. * This method emits upload progress but also creates an "upload progress" illusion
  287. * for the waiting period while only download is happening. Hence, it combines both
  288. * download and upload into an upload progress.
  289. *
  290. * @see emitProgress
  291. * @param {number=} bytesUploaded the bytes actually Uploaded so far
  292. */
  293. emitIllusiveProgress (bytesUploaded = 0) {
  294. if (this._paused) {
  295. return
  296. }
  297. let bytesTotal = this.streamsEnded ? this.bytesWritten : this.options.size
  298. if (!this.streamsEnded) {
  299. bytesTotal = Math.max(bytesTotal, this.bytesWritten)
  300. }
  301. // for a 10MB file, 10MB of download will account for 5MB upload progress
  302. // and 10MB of actual upload will account for the other 5MB upload progress.
  303. const illusiveBytesUploaded = (this.bytesWritten / 2) + (bytesUploaded / 2)
  304. logger.debug(
  305. `${bytesUploaded} ${illusiveBytesUploaded} ${bytesTotal}`,
  306. 'uploader.illusive.progress',
  307. this.shortToken
  308. )
  309. this.emitProgress(illusiveBytesUploaded, bytesTotal)
  310. }
  311. /**
  312. *
  313. * @param {number} bytesUploaded
  314. * @param {number | null} bytesTotal
  315. */
  316. emitProgress (bytesUploaded, bytesTotal) {
  317. bytesTotal = bytesTotal || this.options.size
  318. if (this.tus && this.tus.options.uploadLengthDeferred && this.streamsEnded) {
  319. bytesTotal = this.bytesWritten
  320. }
  321. const percentage = (bytesUploaded / bytesTotal * 100)
  322. const formatPercentage = percentage.toFixed(2)
  323. logger.debug(
  324. `${bytesUploaded} ${bytesTotal} ${formatPercentage}%`,
  325. 'uploader.upload.progress',
  326. this.shortToken
  327. )
  328. const dataToEmit = {
  329. action: 'progress',
  330. payload: { progress: formatPercentage, bytesUploaded, bytesTotal },
  331. }
  332. this.saveState(dataToEmit)
  333. // avoid flooding the client with progress events.
  334. const roundedPercentage = Math.floor(percentage)
  335. if (this.emittedProgress !== roundedPercentage) {
  336. this.emittedProgress = roundedPercentage
  337. emitter().emit(this.token, dataToEmit)
  338. }
  339. }
  340. /**
  341. *
  342. * @param {string} url
  343. * @param {object} extraData
  344. */
  345. emitSuccess (url, extraData = {}) {
  346. const emitData = {
  347. action: 'success',
  348. payload: Object.assign(extraData, { complete: true, url }),
  349. }
  350. this.saveState(emitData)
  351. emitter().emit(this.token, emitData)
  352. }
  353. /**
  354. *
  355. * @param {Error} err
  356. * @param {object=} extraData
  357. */
  358. emitError (err, extraData = {}) {
  359. const serializedErr = serializeError(err)
  360. // delete stack to avoid sending server info to client
  361. delete serializedErr.stack
  362. const dataToEmit = {
  363. action: 'error',
  364. payload: Object.assign(extraData, { error: serializedErr }),
  365. }
  366. this.saveState(dataToEmit)
  367. emitter().emit(this.token, dataToEmit)
  368. }
  369. /**
  370. * start the tus upload
  371. */
  372. uploadTus () {
  373. const file = fs.createReadStream(this.path)
  374. const uploader = this
  375. this.tus = new tus.Upload(file, {
  376. endpoint: this.options.endpoint,
  377. uploadUrl: this.options.uploadUrl,
  378. uploadLengthDeferred: false,
  379. retryDelays: [0, 1000, 3000, 5000],
  380. uploadSize: this.bytesWritten,
  381. headers: headerSanitize(this.options.headers),
  382. addRequestId: true,
  383. metadata: {
  384. // file name and type as required by the tusd tus server
  385. // https://github.com/tus/tusd/blob/5b376141903c1fd64480c06dde3dfe61d191e53d/unrouted_handler.go#L614-L646
  386. filename: this.uploadFileName,
  387. filetype: this.options.metadata.type,
  388. ...this.options.metadata,
  389. },
  390. /**
  391. *
  392. * @param {Error} error
  393. */
  394. onError (error) {
  395. logger.error(error, 'uploader.tus.error')
  396. // deleting tus originalRequest field because it uses the same http-agent
  397. // as companion, and this agent may contain sensitive request details (e.g headers)
  398. // previously made to providers. Deleting the field would prevent it from getting leaked
  399. // to the frontend etc.
  400. // @ts-ignore
  401. delete error.originalRequest
  402. // @ts-ignore
  403. delete error.originalResponse
  404. uploader.emitError(error)
  405. },
  406. /**
  407. *
  408. * @param {number} bytesUploaded
  409. * @param {number} bytesTotal
  410. */
  411. onProgress (bytesUploaded, bytesTotal) {
  412. uploader.emitIllusiveProgress(bytesUploaded)
  413. },
  414. onSuccess () {
  415. uploader.emitSuccess(uploader.tus.url)
  416. uploader.cleanUp()
  417. },
  418. })
  419. if (!this._paused) {
  420. this.tus.start()
  421. }
  422. }
  423. uploadMultipart () {
  424. const file = fs.createReadStream(this.path)
  425. // upload progress
  426. let bytesUploaded = 0
  427. file.on('data', (data) => {
  428. bytesUploaded += data.length
  429. this.emitIllusiveProgress(bytesUploaded)
  430. })
  431. const httpMethod = (this.options.httpMethod || '').toLowerCase() === 'put' ? 'put' : 'post'
  432. const headers = headerSanitize(this.options.headers)
  433. const reqOptions = { url: this.options.endpoint, headers, encoding: null }
  434. const httpRequest = request[httpMethod]
  435. if (this.options.useFormData) {
  436. reqOptions.formData = {
  437. ...this.options.metadata,
  438. [this.options.fieldname]: {
  439. value: file,
  440. options: {
  441. filename: this.uploadFileName,
  442. contentType: this.options.metadata.type,
  443. },
  444. },
  445. }
  446. httpRequest(reqOptions, (error, response, body) => {
  447. this._onMultipartComplete(error, response, body, bytesUploaded)
  448. })
  449. } else {
  450. reqOptions.headers['content-length'] = this.bytesWritten
  451. reqOptions.body = file
  452. httpRequest(reqOptions, (error, response, body) => {
  453. this._onMultipartComplete(error, response, body, bytesUploaded)
  454. })
  455. }
  456. }
  457. _onMultipartComplete (error, response, body, bytesUploaded) {
  458. if (error) {
  459. logger.error(error, 'upload.multipart.error')
  460. this.emitError(error)
  461. return
  462. }
  463. const { headers } = response
  464. // remove browser forbidden headers
  465. delete headers['set-cookie']
  466. delete headers['set-cookie2']
  467. const respObj = {
  468. responseText: body.toString(),
  469. status: response.statusCode,
  470. statusText: response.statusMessage,
  471. headers,
  472. }
  473. if (response.statusCode >= 400) {
  474. logger.error(`upload failed with status: ${response.statusCode}`, 'upload.multipart.error')
  475. this.emitError(new Error(response.statusMessage), respObj)
  476. } else if (bytesUploaded !== this.bytesWritten && bytesUploaded !== this.options.size) {
  477. const errMsg = `uploaded only ${bytesUploaded} of ${this.bytesWritten} with status: ${response.statusCode}`
  478. logger.error(errMsg, 'upload.multipart.mismatch.error')
  479. this.emitError(new Error(errMsg))
  480. } else {
  481. this.emitSuccess(null, { response: respObj })
  482. }
  483. this.cleanUp()
  484. }
  485. /**
  486. * Upload the file to S3 using a Multipart upload.
  487. */
  488. uploadS3Multipart () {
  489. const file = fs.createReadStream(this.path)
  490. return this._uploadS3MultipartStream(file)
  491. }
  492. /**
  493. * Upload a stream to S3.
  494. */
  495. _uploadS3MultipartStream (stream) {
  496. if (!this.options.s3) {
  497. this.emitError(new Error('The S3 client is not configured on this companion instance.'))
  498. return
  499. }
  500. const filename = this.options.metadata.name || path.basename(this.path)
  501. const { client, options } = this.options.s3
  502. const upload = client.upload({
  503. Bucket: options.bucket,
  504. Key: options.getKey(null, filename, this.options.metadata),
  505. ACL: options.acl,
  506. ContentType: this.options.metadata.type,
  507. Metadata: this.options.metadata,
  508. Body: stream,
  509. })
  510. this.s3Upload = upload
  511. upload.on('httpUploadProgress', ({ loaded, total }) => {
  512. this.emitProgress(loaded, total)
  513. })
  514. upload.send((error, data) => {
  515. this.s3Upload = null
  516. if (error) {
  517. this.emitError(error)
  518. } else {
  519. const url = data && data.Location ? data.Location : null
  520. this.emitSuccess(url, {
  521. response: {
  522. responseText: JSON.stringify(data),
  523. headers: {
  524. 'content-type': 'application/json',
  525. },
  526. },
  527. })
  528. }
  529. this.cleanUp()
  530. })
  531. }
  532. }
  533. Uploader.FILE_NAME_PREFIX = 'uppy-file'
  534. Uploader.STORAGE_PREFIX = 'companion'
  535. module.exports = Uploader