index.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. const { Plugin } = require('@uppy/core')
  2. const { Socket, Provider, RequestClient } = require('@uppy/companion-client')
  3. const EventTracker = require('@uppy/utils/lib/EventTracker')
  4. const emitSocketProgress = require('@uppy/utils/lib/emitSocketProgress')
  5. const getSocketHost = require('@uppy/utils/lib/getSocketHost')
  6. const RateLimitedQueue = require('@uppy/utils/lib/RateLimitedQueue')
  7. const Uploader = require('./MultipartUploader')
  8. function assertServerError (res) {
  9. if (res && res.error) {
  10. const error = new Error(res.message)
  11. Object.assign(error, res.error)
  12. throw error
  13. }
  14. return res
  15. }
  16. module.exports = class AwsS3Multipart extends Plugin {
  17. static VERSION = require('../package.json').version
  18. constructor (uppy, opts) {
  19. super(uppy, opts)
  20. this.type = 'uploader'
  21. this.id = this.opts.id || 'AwsS3Multipart'
  22. this.title = 'AWS S3 Multipart'
  23. this.client = new RequestClient(uppy, opts)
  24. const defaultOptions = {
  25. timeout: 30 * 1000,
  26. limit: 0,
  27. retryDelays: [0, 1000, 3000, 5000],
  28. createMultipartUpload: this.createMultipartUpload.bind(this),
  29. listParts: this.listParts.bind(this),
  30. prepareUploadPart: this.prepareUploadPart.bind(this),
  31. abortMultipartUpload: this.abortMultipartUpload.bind(this),
  32. completeMultipartUpload: this.completeMultipartUpload.bind(this),
  33. }
  34. this.opts = { ...defaultOptions, ...opts }
  35. this.upload = this.upload.bind(this)
  36. this.requests = new RateLimitedQueue(this.opts.limit)
  37. this.uploaders = Object.create(null)
  38. this.uploaderEvents = Object.create(null)
  39. this.uploaderSockets = Object.create(null)
  40. }
  41. /**
  42. * Clean up all references for a file's upload: the MultipartUploader instance,
  43. * any events related to the file, and the Companion WebSocket connection.
  44. *
  45. * Set `opts.abort` to tell S3 that the multipart upload is cancelled and must be removed.
  46. * This should be done when the user cancels the upload, not when the upload is completed or errored.
  47. */
  48. resetUploaderReferences (fileID, opts = {}) {
  49. if (this.uploaders[fileID]) {
  50. this.uploaders[fileID].abort({ really: opts.abort || false })
  51. this.uploaders[fileID] = null
  52. }
  53. if (this.uploaderEvents[fileID]) {
  54. this.uploaderEvents[fileID].remove()
  55. this.uploaderEvents[fileID] = null
  56. }
  57. if (this.uploaderSockets[fileID]) {
  58. this.uploaderSockets[fileID].close()
  59. this.uploaderSockets[fileID] = null
  60. }
  61. }
  62. assertHost (method) {
  63. if (!this.opts.companionUrl) {
  64. throw new Error(`Expected a \`companionUrl\` option containing a Companion address, or if you are not using Companion, a custom \`${method}\` implementation.`)
  65. }
  66. }
  67. createMultipartUpload (file) {
  68. this.assertHost('createMultipartUpload')
  69. const metadata = {}
  70. Object.keys(file.meta).map(key => {
  71. if (file.meta[key] != null) {
  72. metadata[key] = file.meta[key].toString()
  73. }
  74. })
  75. return this.client.post('s3/multipart', {
  76. filename: file.name,
  77. type: file.type,
  78. metadata,
  79. }).then(assertServerError)
  80. }
  81. listParts (file, { key, uploadId }) {
  82. this.assertHost('listParts')
  83. const filename = encodeURIComponent(key)
  84. return this.client.get(`s3/multipart/${uploadId}?key=${filename}`)
  85. .then(assertServerError)
  86. }
  87. prepareUploadPart (file, { key, uploadId, number }) {
  88. this.assertHost('prepareUploadPart')
  89. const filename = encodeURIComponent(key)
  90. return this.client.get(`s3/multipart/${uploadId}/${number}?key=${filename}`)
  91. .then(assertServerError)
  92. }
  93. completeMultipartUpload (file, { key, uploadId, parts }) {
  94. this.assertHost('completeMultipartUpload')
  95. const filename = encodeURIComponent(key)
  96. const uploadIdEnc = encodeURIComponent(uploadId)
  97. return this.client.post(`s3/multipart/${uploadIdEnc}/complete?key=${filename}`, { parts })
  98. .then(assertServerError)
  99. }
  100. abortMultipartUpload (file, { key, uploadId }) {
  101. this.assertHost('abortMultipartUpload')
  102. const filename = encodeURIComponent(key)
  103. const uploadIdEnc = encodeURIComponent(uploadId)
  104. return this.client.delete(`s3/multipart/${uploadIdEnc}?key=${filename}`)
  105. .then(assertServerError)
  106. }
  107. uploadFile (file) {
  108. return new Promise((resolve, reject) => {
  109. const onStart = (data) => {
  110. const cFile = this.uppy.getFile(file.id)
  111. this.uppy.setFileState(file.id, {
  112. s3Multipart: {
  113. ...cFile.s3Multipart,
  114. key: data.key,
  115. uploadId: data.uploadId,
  116. },
  117. })
  118. }
  119. const onProgress = (bytesUploaded, bytesTotal) => {
  120. this.uppy.emit('upload-progress', file, {
  121. uploader: this,
  122. bytesUploaded,
  123. bytesTotal,
  124. })
  125. }
  126. const onError = (err) => {
  127. this.uppy.log(err)
  128. this.uppy.emit('upload-error', file, err)
  129. queuedRequest.done()
  130. this.resetUploaderReferences(file.id)
  131. reject(err)
  132. }
  133. const onSuccess = (result) => {
  134. const uploadResp = {
  135. body: {
  136. ...result,
  137. },
  138. uploadURL: result.location,
  139. }
  140. queuedRequest.done()
  141. this.resetUploaderReferences(file.id)
  142. const cFile = this.uppy.getFile(file.id)
  143. this.uppy.emit('upload-success', cFile || file, uploadResp)
  144. if (result.location) {
  145. this.uppy.log(`Download ${upload.file.name} from ${result.location}`)
  146. }
  147. resolve(upload)
  148. }
  149. const onPartComplete = (part) => {
  150. const cFile = this.uppy.getFile(file.id)
  151. if (!cFile) {
  152. return
  153. }
  154. this.uppy.emit('s3-multipart:part-uploaded', cFile, part)
  155. }
  156. const upload = new Uploader(file.data, {
  157. // .bind to pass the file object to each handler.
  158. createMultipartUpload: this.opts.createMultipartUpload.bind(this, file),
  159. listParts: this.opts.listParts.bind(this, file),
  160. prepareUploadPart: this.opts.prepareUploadPart.bind(this, file),
  161. completeMultipartUpload: this.opts.completeMultipartUpload.bind(this, file),
  162. abortMultipartUpload: this.opts.abortMultipartUpload.bind(this, file),
  163. getChunkSize: this.opts.getChunkSize ? this.opts.getChunkSize.bind(this) : null,
  164. onStart,
  165. onProgress,
  166. onError,
  167. onSuccess,
  168. onPartComplete,
  169. limit: this.opts.limit || 5,
  170. retryDelays: this.opts.retryDelays || [],
  171. ...file.s3Multipart,
  172. })
  173. this.uploaders[file.id] = upload
  174. this.uploaderEvents[file.id] = new EventTracker(this.uppy)
  175. let queuedRequest = this.requests.run(() => {
  176. if (!file.isPaused) {
  177. upload.start()
  178. }
  179. // Don't do anything here, the caller will take care of cancelling the upload itself
  180. // using resetUploaderReferences(). This is because resetUploaderReferences() has to be
  181. // called when this request is still in the queue, and has not been started yet, too. At
  182. // that point this cancellation function is not going to be called.
  183. return () => {}
  184. })
  185. this.onFileRemove(file.id, (removed) => {
  186. queuedRequest.abort()
  187. this.resetUploaderReferences(file.id, { abort: true })
  188. resolve(`upload ${removed.id} was removed`)
  189. })
  190. this.onCancelAll(file.id, () => {
  191. queuedRequest.abort()
  192. this.resetUploaderReferences(file.id, { abort: true })
  193. resolve(`upload ${file.id} was canceled`)
  194. })
  195. this.onFilePause(file.id, (isPaused) => {
  196. if (isPaused) {
  197. // Remove this file from the queue so another file can start in its place.
  198. queuedRequest.abort()
  199. upload.pause()
  200. } else {
  201. // Resuming an upload should be queued, else you could pause and then resume a queued upload to make it skip the queue.
  202. queuedRequest.abort()
  203. queuedRequest = this.requests.run(() => {
  204. upload.start()
  205. return () => {}
  206. })
  207. }
  208. })
  209. this.onPauseAll(file.id, () => {
  210. queuedRequest.abort()
  211. upload.pause()
  212. })
  213. this.onResumeAll(file.id, () => {
  214. queuedRequest.abort()
  215. if (file.error) {
  216. upload.abort()
  217. }
  218. queuedRequest = this.requests.run(() => {
  219. upload.start()
  220. return () => {}
  221. })
  222. })
  223. // Don't double-emit upload-started for Golden Retriever-restored files that were already started
  224. if (!file.progress.uploadStarted || !file.isRestored) {
  225. this.uppy.emit('upload-started', file)
  226. }
  227. })
  228. }
  229. uploadRemote (file) {
  230. this.resetUploaderReferences(file.id)
  231. // Don't double-emit upload-started for Golden Retriever-restored files that were already started
  232. if (!file.progress.uploadStarted || !file.isRestored) {
  233. this.uppy.emit('upload-started', file)
  234. }
  235. if (file.serverToken) {
  236. return this.connectToServerSocket(file)
  237. }
  238. return new Promise((resolve, reject) => {
  239. const Client = file.remote.providerOptions.provider ? Provider : RequestClient
  240. const client = new Client(this.uppy, file.remote.providerOptions)
  241. client.post(
  242. file.remote.url,
  243. {
  244. ...file.remote.body,
  245. protocol: 's3-multipart',
  246. size: file.data.size,
  247. metadata: file.meta,
  248. }
  249. ).then((res) => {
  250. this.uppy.setFileState(file.id, { serverToken: res.token })
  251. file = this.uppy.getFile(file.id)
  252. return file
  253. }).then((file) => {
  254. return this.connectToServerSocket(file)
  255. }).then(() => {
  256. resolve()
  257. }).catch((err) => {
  258. this.uppy.emit('upload-error', file, err)
  259. reject(err)
  260. })
  261. })
  262. }
  263. connectToServerSocket (file) {
  264. return new Promise((resolve, reject) => {
  265. const token = file.serverToken
  266. const host = getSocketHost(file.remote.companionUrl)
  267. const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
  268. this.uploaderSockets[file.id] = socket
  269. this.uploaderEvents[file.id] = new EventTracker(this.uppy)
  270. this.onFileRemove(file.id, (removed) => {
  271. queuedRequest.abort()
  272. socket.send('pause', {})
  273. this.resetUploaderReferences(file.id, { abort: true })
  274. resolve(`upload ${file.id} was removed`)
  275. })
  276. this.onFilePause(file.id, (isPaused) => {
  277. if (isPaused) {
  278. // Remove this file from the queue so another file can start in its place.
  279. queuedRequest.abort()
  280. socket.send('pause', {})
  281. } else {
  282. // Resuming an upload should be queued, else you could pause and then resume a queued upload to make it skip the queue.
  283. queuedRequest.abort()
  284. queuedRequest = this.requests.run(() => {
  285. socket.send('resume', {})
  286. return () => {}
  287. })
  288. }
  289. })
  290. this.onPauseAll(file.id, () => {
  291. queuedRequest.abort()
  292. socket.send('pause', {})
  293. })
  294. this.onCancelAll(file.id, () => {
  295. queuedRequest.abort()
  296. socket.send('pause', {})
  297. this.resetUploaderReferences(file.id)
  298. resolve(`upload ${file.id} was canceled`)
  299. })
  300. this.onResumeAll(file.id, () => {
  301. queuedRequest.abort()
  302. if (file.error) {
  303. socket.send('pause', {})
  304. }
  305. queuedRequest = this.requests.run(() => {
  306. socket.send('resume', {})
  307. })
  308. })
  309. this.onRetry(file.id, () => {
  310. // Only do the retry if the upload is actually in progress;
  311. // else we could try to send these messages when the upload is still queued.
  312. // We may need a better check for this since the socket may also be closed
  313. // for other reasons, like network failures.
  314. if (socket.isOpen) {
  315. socket.send('pause', {})
  316. socket.send('resume', {})
  317. }
  318. })
  319. this.onRetryAll(file.id, () => {
  320. if (socket.isOpen) {
  321. socket.send('pause', {})
  322. socket.send('resume', {})
  323. }
  324. })
  325. socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
  326. socket.on('error', (errData) => {
  327. this.uppy.emit('upload-error', file, new Error(errData.error))
  328. this.resetUploaderReferences(file.id)
  329. queuedRequest.done()
  330. reject(new Error(errData.error))
  331. })
  332. socket.on('success', (data) => {
  333. const uploadResp = {
  334. uploadURL: data.url,
  335. }
  336. this.uppy.emit('upload-success', file, uploadResp)
  337. this.resetUploaderReferences(file.id)
  338. queuedRequest.done()
  339. resolve()
  340. })
  341. let queuedRequest = this.requests.run(() => {
  342. socket.open()
  343. if (file.isPaused) {
  344. socket.send('pause', {})
  345. }
  346. return () => {}
  347. })
  348. })
  349. }
  350. upload (fileIDs) {
  351. if (fileIDs.length === 0) return Promise.resolve()
  352. const promises = fileIDs.map((id) => {
  353. const file = this.uppy.getFile(id)
  354. if (file.isRemote) {
  355. return this.uploadRemote(file)
  356. }
  357. return this.uploadFile(file)
  358. })
  359. return Promise.all(promises)
  360. }
  361. onFileRemove (fileID, cb) {
  362. this.uploaderEvents[fileID].on('file-removed', (file) => {
  363. if (fileID === file.id) cb(file.id)
  364. })
  365. }
  366. onFilePause (fileID, cb) {
  367. this.uploaderEvents[fileID].on('upload-pause', (targetFileID, isPaused) => {
  368. if (fileID === targetFileID) {
  369. // const isPaused = this.uppy.pauseResume(fileID)
  370. cb(isPaused)
  371. }
  372. })
  373. }
  374. onRetry (fileID, cb) {
  375. this.uploaderEvents[fileID].on('upload-retry', (targetFileID) => {
  376. if (fileID === targetFileID) {
  377. cb()
  378. }
  379. })
  380. }
  381. onRetryAll (fileID, cb) {
  382. this.uploaderEvents[fileID].on('retry-all', (filesToRetry) => {
  383. if (!this.uppy.getFile(fileID)) return
  384. cb()
  385. })
  386. }
  387. onPauseAll (fileID, cb) {
  388. this.uploaderEvents[fileID].on('pause-all', () => {
  389. if (!this.uppy.getFile(fileID)) return
  390. cb()
  391. })
  392. }
  393. onCancelAll (fileID, cb) {
  394. this.uploaderEvents[fileID].on('cancel-all', () => {
  395. if (!this.uppy.getFile(fileID)) return
  396. cb()
  397. })
  398. }
  399. onResumeAll (fileID, cb) {
  400. this.uploaderEvents[fileID].on('resume-all', () => {
  401. if (!this.uppy.getFile(fileID)) return
  402. cb()
  403. })
  404. }
  405. install () {
  406. const { capabilities } = this.uppy.getState()
  407. this.uppy.setState({
  408. capabilities: {
  409. ...capabilities,
  410. resumableUploads: true,
  411. },
  412. })
  413. this.uppy.addUploader(this.upload)
  414. }
  415. uninstall () {
  416. const { capabilities } = this.uppy.getState()
  417. this.uppy.setState({
  418. capabilities: {
  419. ...capabilities,
  420. resumableUploads: false,
  421. },
  422. })
  423. this.uppy.removeUploader(this.upload)
  424. }
  425. }