Uploader.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. const tus = require('tus-js-client')
  2. const uuid = require('uuid')
  3. const isObject = require('isobject')
  4. const validator = require('validator')
  5. const request = require('request')
  6. // eslint-disable-next-line no-unused-vars
  7. const { Readable, pipeline: pipelineCb } = require('stream')
  8. const { join } = require('path')
  9. const fs = require('fs')
  10. const { promisify } = require('util')
  11. // TODO move to `require('streams/promises').pipeline` when dropping support for Node.js 14.x.
  12. const pipeline = promisify(pipelineCb)
  13. const { createReadStream, createWriteStream, ReadStream } = fs
  14. const { stat, unlink } = fs.promises
  15. /** @type {any} */
  16. // @ts-ignore - typescript resolves this this to a hoisted version of
  17. // serialize-error that ships with a declaration file, we are using a version
  18. // here that does not have a declaration file
  19. const serializeError = require('serialize-error')
  20. const emitter = require('./emitter')
  21. const { jsonStringify, hasMatch } = require('./helpers/utils')
  22. const logger = require('./logger')
  23. const headerSanitize = require('./header-blacklist')
  24. const redis = require('./redis')
  25. // Need to limit length or we can get
  26. // "MetadataTooLarge: Your metadata headers exceed the maximum allowed metadata size" in tus / S3
  27. const MAX_FILENAME_LENGTH = 500
  28. const DEFAULT_FIELD_NAME = 'files[]'
  29. const PROTOCOLS = Object.freeze({
  30. multipart: 'multipart',
  31. s3Multipart: 's3-multipart',
  32. tus: 'tus',
  33. })
  34. function exceedsMaxFileSize (maxFileSize, size) {
  35. return maxFileSize && size && size > maxFileSize
  36. }
  37. class AbortError extends Error {}
  38. class Uploader {
  39. /**
  40. * Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart)
  41. * For tus uploads, the deferredLength option is enabled, because file size value can be unreliable
  42. * for some providers (Instagram particularly)
  43. *
  44. * @typedef {object} UploaderOptions
  45. * @property {string} endpoint
  46. * @property {string} [uploadUrl]
  47. * @property {string} protocol
  48. * @property {number} [size]
  49. * @property {string} [fieldname]
  50. * @property {string} pathPrefix
  51. * @property {any} [s3]
  52. * @property {any} metadata
  53. * @property {any} companionOptions
  54. * @property {any} [storage]
  55. * @property {any} [headers]
  56. * @property {string} [httpMethod]
  57. * @property {boolean} [useFormData]
  58. * @property {number} [chunkSize]
  59. *
  60. * @param {UploaderOptions} options
  61. */
  62. constructor (options) {
  63. if (!this.validateOptions(options)) {
  64. logger.debug(this._errRespMessage, 'uploader.validator.fail')
  65. return
  66. }
  67. this.options = options
  68. this.token = uuid.v4()
  69. this.fileName = `${Uploader.FILE_NAME_PREFIX}-${this.token}`
  70. this.options.metadata = this.options.metadata || {}
  71. this.options.fieldname = this.options.fieldname || DEFAULT_FIELD_NAME
  72. this.size = options.size
  73. this.uploadFileName = this.options.metadata.name
  74. ? this.options.metadata.name.substring(0, MAX_FILENAME_LENGTH)
  75. : this.fileName
  76. this.uploadStopped = false
  77. this.emittedProgress = {}
  78. this.storage = options.storage
  79. this._paused = false
  80. this.downloadedBytes = 0
  81. this.readStream = null
  82. if (this.options.protocol === PROTOCOLS.tus) {
  83. emitter().on(`pause:${this.token}`, () => {
  84. logger.debug('Received from client: pause', 'uploader', this.shortToken)
  85. this._paused = true
  86. if (this.tus) {
  87. this.tus.abort()
  88. }
  89. })
  90. emitter().on(`resume:${this.token}`, () => {
  91. logger.debug('Received from client: resume', 'uploader', this.shortToken)
  92. this._paused = false
  93. if (this.tus) {
  94. this.tus.start()
  95. }
  96. })
  97. }
  98. emitter().on(`cancel:${this.token}`, () => {
  99. logger.debug('Received from client: cancel', 'uploader', this.shortToken)
  100. this._paused = true
  101. if (this.tus) {
  102. const shouldTerminate = !!this.tus.url
  103. this.tus.abort(shouldTerminate).catch(() => {})
  104. }
  105. this.abortReadStream(new AbortError())
  106. })
  107. }
  108. abortReadStream (err) {
  109. this.uploadStopped = true
  110. if (this.readStream) this.readStream.destroy(err)
  111. }
  112. async _uploadByProtocol () {
  113. // @todo a default protocol should not be set. We should ensure that the user specifies their protocol.
  114. const protocol = this.options.protocol || PROTOCOLS.multipart
  115. switch (protocol) {
  116. case PROTOCOLS.multipart:
  117. return this._uploadMultipart(this.readStream)
  118. case PROTOCOLS.s3Multipart:
  119. return this._uploadS3Multipart(this.readStream)
  120. case PROTOCOLS.tus:
  121. return this._uploadTus(this.readStream)
  122. default:
  123. throw new Error('Invalid protocol')
  124. }
  125. }
  126. async _downloadStreamAsFile (stream) {
  127. this.tmpPath = join(this.options.pathPrefix, this.fileName)
  128. logger.debug('fully downloading file', 'uploader.download', this.shortToken)
  129. const writeStream = createWriteStream(this.tmpPath)
  130. const onData = (chunk) => {
  131. this.downloadedBytes += chunk.length
  132. if (exceedsMaxFileSize(this.options.companionOptions.maxFileSize, this.downloadedBytes)) this.abortReadStream(new Error('maxFileSize exceeded'))
  133. this.onProgress(0, undefined)
  134. }
  135. stream.on('data', onData)
  136. await pipeline(stream, writeStream)
  137. logger.debug('finished fully downloading file', 'uploader.download', this.shortToken)
  138. const { size } = await stat(this.tmpPath)
  139. this.size = size
  140. const fileStream = createReadStream(this.tmpPath)
  141. this.readStream = fileStream
  142. }
  143. _needDownloadFirst () {
  144. return !this.options.size || !this.options.companionOptions.streamingUpload
  145. }
  146. /**
  147. *
  148. * @param {Readable} stream
  149. */
  150. async uploadStream (stream) {
  151. try {
  152. if (this.uploadStopped) throw new Error('Cannot upload stream after upload stopped')
  153. if (this.readStream) throw new Error('Already uploading')
  154. this.readStream = stream
  155. if (this._needDownloadFirst()) {
  156. logger.debug('need to download the whole file first', 'controller.get.provider.size', this.shortToken)
  157. // Some streams need to be downloaded entirely first, because we don't know their size from the provider
  158. // This is true for zoom and drive (exported files) or some URL downloads.
  159. // The stream will then typically come from a "Transfer-Encoding: chunked" response
  160. await this._downloadStreamAsFile(this.readStream)
  161. }
  162. if (this.uploadStopped) return
  163. const { url, extraData } = await Promise.race([
  164. this._uploadByProtocol(),
  165. // If we don't handle stream errors, we get unhandled error in node.
  166. new Promise((resolve, reject) => this.readStream.on('error', reject)),
  167. ])
  168. this.emitSuccess(url, extraData)
  169. } catch (err) {
  170. if (err instanceof AbortError) {
  171. logger.error('Aborted upload', 'uploader.aborted', this.shortToken)
  172. return
  173. }
  174. // console.log(err)
  175. logger.error(err, 'uploader.error', this.shortToken)
  176. this.emitError(err)
  177. } finally {
  178. this.cleanUp()
  179. }
  180. }
  181. /**
  182. * returns a substring of the token. Used as traceId for logging
  183. * we avoid using the entire token because this is meant to be a short term
  184. * access token between uppy client and companion websocket
  185. *
  186. * @param {string} token the token to Shorten
  187. * @returns {string}
  188. */
  189. static shortenToken (token) {
  190. return token.substring(0, 8)
  191. }
  192. static reqToOptions (req, size) {
  193. const useFormDataIsSet = Object.prototype.hasOwnProperty.call(req.body, 'useFormData')
  194. const useFormData = useFormDataIsSet ? req.body.useFormData : true
  195. return {
  196. companionOptions: req.companion.options,
  197. endpoint: req.body.endpoint,
  198. uploadUrl: req.body.uploadUrl,
  199. protocol: req.body.protocol,
  200. metadata: req.body.metadata,
  201. httpMethod: req.body.httpMethod,
  202. useFormData,
  203. size,
  204. fieldname: req.body.fieldname,
  205. pathPrefix: `${req.companion.options.filePath}`,
  206. storage: redis.client(),
  207. s3: req.companion.s3Client ? {
  208. client: req.companion.s3Client,
  209. options: req.companion.options.providerOptions.s3,
  210. } : null,
  211. headers: req.body.headers,
  212. chunkSize: req.companion.options.chunkSize,
  213. }
  214. }
  215. /**
  216. * Validate the options passed down to the uplaoder
  217. *
  218. * @param {UploaderOptions} options
  219. * @returns {boolean}
  220. */
  221. validateOptions (options) {
  222. // validate HTTP Method
  223. if (options.httpMethod) {
  224. if (typeof options.httpMethod !== 'string') {
  225. this._errRespMessage = 'unsupported HTTP METHOD specified'
  226. return false
  227. }
  228. const method = options.httpMethod.toLowerCase()
  229. if (method !== 'put' && method !== 'post') {
  230. this._errRespMessage = 'unsupported HTTP METHOD specified'
  231. return false
  232. }
  233. }
  234. if (exceedsMaxFileSize(options.companionOptions.maxFileSize, options.size)) {
  235. this._errRespMessage = 'maxFileSize exceeded'
  236. return false
  237. }
  238. // validate fieldname
  239. if (options.fieldname && typeof options.fieldname !== 'string') {
  240. this._errRespMessage = 'fieldname must be a string'
  241. return false
  242. }
  243. // validate metadata
  244. if (options.metadata && !isObject(options.metadata)) {
  245. this._errRespMessage = 'metadata must be an object'
  246. return false
  247. }
  248. // validate headers
  249. if (options.headers && !isObject(options.headers)) {
  250. this._errRespMessage = 'headers must be an object'
  251. return false
  252. }
  253. // validate protocol
  254. // @todo this validation should not be conditional once the protocol field is mandatory
  255. if (options.protocol && !Object.keys(PROTOCOLS).some((key) => PROTOCOLS[key] === options.protocol)) {
  256. this._errRespMessage = 'unsupported protocol specified'
  257. return false
  258. }
  259. // s3 uploads don't require upload destination
  260. // validation, because the destination is determined
  261. // by the server's s3 config
  262. if (options.protocol !== PROTOCOLS.s3Multipart) {
  263. if (!options.endpoint && !options.uploadUrl) {
  264. this._errRespMessage = 'no destination specified'
  265. return false
  266. }
  267. const validateUrl = (url) => {
  268. const validatorOpts = { require_protocol: true, require_tld: false }
  269. if (url && !validator.isURL(url, validatorOpts)) {
  270. this._errRespMessage = 'invalid destination url'
  271. return false
  272. }
  273. const allowedUrls = options.companionOptions.uploadUrls
  274. if (allowedUrls && url && !hasMatch(url, allowedUrls)) {
  275. this._errRespMessage = 'upload destination does not match any allowed destinations'
  276. return false
  277. }
  278. return true
  279. }
  280. if (![options.endpoint, options.uploadUrl].every(validateUrl)) return false
  281. }
  282. if (options.chunkSize != null && typeof options.chunkSize !== 'number') {
  283. this._errRespMessage = 'incorrect chunkSize'
  284. return false
  285. }
  286. return true
  287. }
  288. hasError () {
  289. return this._errRespMessage != null
  290. }
  291. /**
  292. * returns a substring of the token. Used as traceId for logging
  293. * we avoid using the entire token because this is meant to be a short term
  294. * access token between uppy client and companion websocket
  295. */
  296. get shortToken () {
  297. return Uploader.shortenToken(this.token)
  298. }
  299. async awaitReady () {
  300. // TODO timeout after a while? Else we could leak emitters
  301. logger.debug('waiting for socket connection', 'uploader.socket.wait', this.shortToken)
  302. await new Promise((resolve) => emitter().once(`connection:${this.token}`, resolve))
  303. logger.debug('socket connection received', 'uploader.socket.wait', this.shortToken)
  304. }
  305. cleanUp () {
  306. logger.debug('cleanup', this.shortToken)
  307. if (this.readStream && !this.readStream.destroyed) this.readStream.destroy()
  308. if (this.tmpPath) unlink(this.tmpPath).catch(() => {})
  309. emitter().removeAllListeners(`pause:${this.token}`)
  310. emitter().removeAllListeners(`resume:${this.token}`)
  311. emitter().removeAllListeners(`cancel:${this.token}`)
  312. }
  313. getResponse () {
  314. if (this._errRespMessage) {
  315. return { body: { message: this._errRespMessage }, status: 400 }
  316. }
  317. return { body: { token: this.token }, status: 200 }
  318. }
  319. /**
  320. * @typedef {{action: string, payload: object}} State
  321. * @param {State} state
  322. */
  323. saveState (state) {
  324. if (!this.storage) return
  325. this.storage.set(`${Uploader.STORAGE_PREFIX}:${this.token}`, jsonStringify(state))
  326. }
  327. /**
  328. *
  329. * @param {number} [bytesUploaded]
  330. * @param {number | null} [bytesTotalIn]
  331. */
  332. onProgress (bytesUploaded = 0, bytesTotalIn = 0) {
  333. const bytesTotal = bytesTotalIn || this.size || 0
  334. // If fully downloading before uploading, combine downloaded and uploaded bytes
  335. // This will make sure that the user sees half of the progress before upload starts (while downloading)
  336. let combinedBytes = bytesUploaded
  337. if (this._needDownloadFirst()) {
  338. combinedBytes = Math.floor((combinedBytes + (this.downloadedBytes || 0)) / 2)
  339. }
  340. // Prevent divide by zero
  341. let percentage = 0
  342. if (bytesTotal > 0) percentage = Math.min(Math.max(0, ((combinedBytes / bytesTotal) * 100)), 100)
  343. const formattedPercentage = percentage.toFixed(2)
  344. logger.debug(
  345. `${combinedBytes} ${bytesTotal} ${formattedPercentage}%`,
  346. 'uploader.total.progress',
  347. this.shortToken,
  348. )
  349. if (this._paused || this.uploadStopped) {
  350. return
  351. }
  352. const payload = { progress: formattedPercentage, bytesUploaded: combinedBytes, bytesTotal }
  353. const dataToEmit = {
  354. action: 'progress',
  355. payload,
  356. }
  357. this.saveState(dataToEmit)
  358. const isEqual = (p1, p2) => (p1.progress === p2.progress
  359. && p1.bytesUploaded === p2.bytesUploaded
  360. && p1.bytesTotal === p2.bytesTotal)
  361. // avoid flooding the client with progress events.
  362. if (!isEqual(this.emittedProgress, payload)) {
  363. this.emittedProgress = payload
  364. emitter().emit(this.token, dataToEmit)
  365. }
  366. }
  367. /**
  368. *
  369. * @param {string} url
  370. * @param {object} extraData
  371. */
  372. emitSuccess (url, extraData = {}) {
  373. const emitData = {
  374. action: 'success',
  375. payload: Object.assign(extraData, { complete: true, url }),
  376. }
  377. this.saveState(emitData)
  378. emitter().emit(this.token, emitData)
  379. }
  380. /**
  381. *
  382. * @param {Error} err
  383. */
  384. emitError (err) {
  385. const serializedErr = serializeError(err)
  386. // delete stack to avoid sending server info to client
  387. delete serializedErr.stack
  388. const dataToEmit = {
  389. action: 'error',
  390. // @ts-ignore
  391. payload: Object.assign(err.extraData || {}, { error: serializedErr }),
  392. }
  393. this.saveState(dataToEmit)
  394. emitter().emit(this.token, dataToEmit)
  395. }
  396. /**
  397. * start the tus upload
  398. *
  399. * @param {any} stream
  400. */
  401. async _uploadTus (stream) {
  402. const uploader = this
  403. const isFileStream = stream instanceof ReadStream
  404. // chunkSize needs to be a finite value if the stream is not a file stream (fs.createReadStream)
  405. // https://github.com/tus/tus-js-client/blob/4479b78032937ac14da9b0542e489ac6fe7e0bc7/lib/node/fileReader.js#L50
  406. const chunkSize = this.options.chunkSize || (isFileStream ? Infinity : 50e6)
  407. return new Promise((resolve, reject) => {
  408. this.tus = new tus.Upload(stream, {
  409. endpoint: this.options.endpoint,
  410. uploadUrl: this.options.uploadUrl,
  411. uploadLengthDeferred: false,
  412. retryDelays: [0, 1000, 3000, 5000],
  413. uploadSize: this.size,
  414. chunkSize,
  415. headers: headerSanitize(this.options.headers),
  416. addRequestId: true,
  417. metadata: {
  418. // file name and type as required by the tusd tus server
  419. // https://github.com/tus/tusd/blob/5b376141903c1fd64480c06dde3dfe61d191e53d/unrouted_handler.go#L614-L646
  420. filename: this.uploadFileName,
  421. filetype: this.options.metadata.type,
  422. ...this.options.metadata,
  423. },
  424. /**
  425. *
  426. * @param {Error} error
  427. */
  428. onError (error) {
  429. logger.error(error, 'uploader.tus.error')
  430. // deleting tus originalRequest field because it uses the same http-agent
  431. // as companion, and this agent may contain sensitive request details (e.g headers)
  432. // previously made to providers. Deleting the field would prevent it from getting leaked
  433. // to the frontend etc.
  434. // @ts-ignore
  435. delete error.originalRequest
  436. // @ts-ignore
  437. delete error.originalResponse
  438. reject(error)
  439. },
  440. /**
  441. *
  442. * @param {number} [bytesUploaded]
  443. * @param {number} [bytesTotal]
  444. */
  445. onProgress (bytesUploaded, bytesTotal) {
  446. uploader.onProgress(bytesUploaded, bytesTotal)
  447. },
  448. onSuccess () {
  449. resolve({ url: uploader.tus.url })
  450. },
  451. })
  452. if (!this._paused) {
  453. this.tus.start()
  454. }
  455. })
  456. }
  457. async _uploadMultipart (stream) {
  458. if (!this.options.endpoint) {
  459. throw new Error('No multipart endpoint set')
  460. }
  461. // upload progress
  462. let bytesUploaded = 0
  463. stream.on('data', (data) => {
  464. bytesUploaded += data.length
  465. this.onProgress(bytesUploaded, undefined)
  466. })
  467. const httpMethod = (this.options.httpMethod || '').toLowerCase() === 'put' ? 'put' : 'post'
  468. const headers = headerSanitize(this.options.headers)
  469. const reqOptions = { url: this.options.endpoint, headers, encoding: null }
  470. const runRequest = request[httpMethod]
  471. if (this.options.useFormData) {
  472. reqOptions.formData = {
  473. ...this.options.metadata,
  474. [this.options.fieldname]: {
  475. value: stream,
  476. options: {
  477. filename: this.uploadFileName,
  478. contentType: this.options.metadata.type,
  479. knownLength: this.size,
  480. },
  481. },
  482. }
  483. } else {
  484. reqOptions.headers['content-length'] = this.size
  485. reqOptions.body = stream
  486. }
  487. const { response, body } = await new Promise((resolve, reject) => {
  488. runRequest(reqOptions, (error, response2, body2) => {
  489. if (error) {
  490. logger.error(error, 'upload.multipart.error')
  491. reject(error)
  492. return
  493. }
  494. resolve({ response: response2, body: body2 })
  495. })
  496. })
  497. // remove browser forbidden headers
  498. delete response.headers['set-cookie']
  499. delete response.headers['set-cookie2']
  500. const respObj = {
  501. responseText: body.toString(),
  502. status: response.statusCode,
  503. statusText: response.statusMessage,
  504. headers: response.headers,
  505. }
  506. if (response.statusCode >= 400) {
  507. logger.error(`upload failed with status: ${response.statusCode}`, 'upload.multipart.error')
  508. const err = new Error(response.statusMessage)
  509. // @ts-ignore
  510. err.extraData = respObj
  511. throw err
  512. }
  513. if (bytesUploaded !== this.size) {
  514. const errMsg = `uploaded only ${bytesUploaded} of ${this.size} with status: ${response.statusCode}`
  515. logger.error(errMsg, 'upload.multipart.mismatch.error')
  516. throw new Error(errMsg)
  517. }
  518. return { url: null, extraData: { response: respObj, bytesUploaded } }
  519. }
  520. /**
  521. * Upload the file to S3 using a Multipart upload.
  522. */
  523. async _uploadS3Multipart (stream) {
  524. if (!this.options.s3) {
  525. throw new Error('The S3 client is not configured on this companion instance.')
  526. }
  527. const filename = this.uploadFileName
  528. const { client, options } = this.options.s3
  529. const upload = client.upload({
  530. Bucket: options.bucket,
  531. Key: options.getKey(null, filename, this.options.metadata),
  532. ACL: options.acl,
  533. ContentType: this.options.metadata.type,
  534. Metadata: this.options.metadata,
  535. Body: stream,
  536. })
  537. upload.on('httpUploadProgress', ({ loaded, total }) => {
  538. this.onProgress(loaded, total)
  539. })
  540. return new Promise((resolve, reject) => {
  541. upload.send((error, data) => {
  542. if (error) {
  543. reject(error)
  544. return
  545. }
  546. resolve({
  547. url: data && data.Location ? data.Location : null,
  548. extraData: {
  549. response: {
  550. responseText: JSON.stringify(data),
  551. headers: {
  552. 'content-type': 'application/json',
  553. },
  554. },
  555. },
  556. })
  557. })
  558. })
  559. }
  560. }
  561. Uploader.FILE_NAME_PREFIX = 'uppy-file'
  562. Uploader.STORAGE_PREFIX = 'companion'
  563. module.exports = Uploader