Multipart.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. const Plugin = require('../../core/Plugin')
  2. const RequestClient = require('../../server/RequestClient')
  3. const UppySocket = require('../../core/UppySocket')
  4. const emitSocketProgress = require('../../utils/emitSocketProgress')
  5. const getSocketHost = require('../../utils/getSocketHost')
  6. const limitPromises = require('../../utils/limitPromises')
  7. const Uploader = require('./MultipartUploader')
  8. /**
  9. * Create a wrapper around an event emitter with a `remove` method to remove
  10. * all events that were added using the wrapped emitter.
  11. */
  12. function createEventTracker (emitter) {
  13. const events = []
  14. return {
  15. on (event, fn) {
  16. events.push([ event, fn ])
  17. return emitter.on(event, fn)
  18. },
  19. remove () {
  20. events.forEach(([ event, fn ]) => {
  21. emitter.off(event, fn)
  22. })
  23. }
  24. }
  25. }
  26. function assertServerError (res) {
  27. if (res && res.error) {
  28. const error = new Error(res.message)
  29. Object.assign(error, res.error)
  30. throw error
  31. }
  32. return res
  33. }
  34. module.exports = class AwsS3Multipart extends Plugin {
  35. constructor (uppy, opts) {
  36. super(uppy, opts)
  37. this.type = 'uploader'
  38. this.id = 'AwsS3Multipart'
  39. this.title = 'AWS S3 Multipart'
  40. this.client = new RequestClient(uppy, opts)
  41. const defaultOptions = {
  42. timeout: 30 * 1000,
  43. limit: 0,
  44. createMultipartUpload: this.createMultipartUpload.bind(this),
  45. listParts: this.listParts.bind(this),
  46. prepareUploadPart: this.prepareUploadPart.bind(this),
  47. abortMultipartUpload: this.abortMultipartUpload.bind(this),
  48. completeMultipartUpload: this.completeMultipartUpload.bind(this)
  49. }
  50. this.opts = Object.assign({}, defaultOptions, opts)
  51. this.upload = this.upload.bind(this)
  52. if (typeof this.opts.limit === 'number' && this.opts.limit !== 0) {
  53. this.limitRequests = limitPromises(this.opts.limit)
  54. } else {
  55. this.limitRequests = (fn) => fn
  56. }
  57. this.uploaders = Object.create(null)
  58. this.uploaderEvents = Object.create(null)
  59. this.uploaderSockets = Object.create(null)
  60. }
  61. /**
  62. * Clean up all references for a file's upload: the MultipartUploader instance,
  63. * any events related to the file, and the uppy-server WebSocket connection.
  64. */
  65. resetUploaderReferences (fileID) {
  66. if (this.uploaders[fileID]) {
  67. this.uploaders[fileID].abort()
  68. this.uploaders[fileID] = null
  69. }
  70. if (this.uploaderEvents[fileID]) {
  71. this.uploaderEvents[fileID].remove()
  72. this.uploaderEvents[fileID] = null
  73. }
  74. if (this.uploaderSockets[fileID]) {
  75. this.uploaderSockets[fileID].close()
  76. this.uploaderSockets[fileID] = null
  77. }
  78. }
  79. assertHost () {
  80. if (!this.opts.host) {
  81. throw new Error('Expected a `host` option containing an uppy-server address.')
  82. }
  83. }
  84. createMultipartUpload (file) {
  85. this.assertHost()
  86. return this.client.post('s3/multipart', {
  87. filename: file.name,
  88. type: file.type
  89. }).then(assertServerError)
  90. }
  91. listParts (file, { key, uploadId }) {
  92. this.assertHost()
  93. const filename = encodeURIComponent(key)
  94. return this.client.get(`s3/multipart/${uploadId}?key=${filename}`)
  95. .then(assertServerError)
  96. }
  97. prepareUploadPart (file, { key, uploadId, number }) {
  98. this.assertHost()
  99. const filename = encodeURIComponent(key)
  100. return this.client.get(`s3/multipart/${uploadId}/${number}?key=${filename}`)
  101. .then(assertServerError)
  102. }
  103. completeMultipartUpload (file, { key, uploadId, parts }) {
  104. this.assertHost()
  105. const filename = encodeURIComponent(key)
  106. const uploadIdEnc = encodeURIComponent(uploadId)
  107. return this.client.post(`s3/multipart/${uploadIdEnc}/complete?key=${filename}`, { parts })
  108. .then(assertServerError)
  109. }
  110. abortMultipartUpload (file, { key, uploadId }) {
  111. this.assertHost()
  112. const filename = encodeURIComponent(key)
  113. const uploadIdEnc = encodeURIComponent(uploadId)
  114. return this.client.delete(`s3/multipart/${uploadIdEnc}?key=${filename}`)
  115. .then(assertServerError)
  116. }
  117. uploadFile (file) {
  118. return new Promise((resolve, reject) => {
  119. const upload = new Uploader(file.data, Object.assign({
  120. // .bind to pass the file object to each handler.
  121. createMultipartUpload: this.limitRequests(this.opts.createMultipartUpload.bind(this, file)),
  122. listParts: this.limitRequests(this.opts.listParts.bind(this, file)),
  123. prepareUploadPart: this.opts.prepareUploadPart.bind(this, file),
  124. completeMultipartUpload: this.limitRequests(this.opts.completeMultipartUpload.bind(this, file)),
  125. abortMultipartUpload: this.limitRequests(this.opts.abortMultipartUpload.bind(this, file)),
  126. limit: this.opts.limit || 5,
  127. onStart: (data) => {
  128. const cFile = this.uppy.getFile(file.id)
  129. this.uppy.setFileState(file.id, {
  130. s3Multipart: Object.assign({}, cFile.s3Multipart, {
  131. key: data.key,
  132. uploadId: data.uploadId,
  133. parts: []
  134. })
  135. })
  136. },
  137. onProgress: (bytesUploaded, bytesTotal) => {
  138. this.uppy.emit('upload-progress', file, {
  139. uploader: this,
  140. bytesUploaded: bytesUploaded,
  141. bytesTotal: bytesTotal
  142. })
  143. },
  144. onError: (err) => {
  145. this.uppy.log(err)
  146. this.uppy.emit('upload-error', file, err)
  147. err.message = `Failed because: ${err.message}`
  148. this.resetUploaderReferences(file.id)
  149. reject(err)
  150. },
  151. onSuccess: (result) => {
  152. this.uppy.emit('upload-success', file, upload, result.location)
  153. if (result.location) {
  154. this.uppy.log('Download ' + upload.file.name + ' from ' + result.location)
  155. }
  156. this.resetUploaderReferences(file.id)
  157. resolve(upload)
  158. },
  159. onPartComplete: (part) => {
  160. // Store completed parts in state.
  161. const cFile = this.uppy.getFile(file.id)
  162. this.uppy.setFileState(file.id, {
  163. s3Multipart: Object.assign({}, cFile.s3Multipart, {
  164. parts: [
  165. ...cFile.s3Multipart.parts,
  166. part
  167. ]
  168. })
  169. })
  170. this.uppy.emit('s3-multipart:part-uploaded', cFile, part)
  171. }
  172. }, file.s3Multipart))
  173. this.uploaders[file.id] = upload
  174. this.uploaderEvents[file.id] = createEventTracker(this.uppy)
  175. this.onFileRemove(file.id, (removed) => {
  176. this.resetUploaderReferences(file.id)
  177. resolve(`upload ${removed.id} was removed`)
  178. })
  179. this.onFilePause(file.id, (isPaused) => {
  180. if (isPaused) {
  181. upload.pause()
  182. } else {
  183. upload.start()
  184. }
  185. })
  186. this.onPauseAll(file.id, () => {
  187. upload.pause()
  188. })
  189. this.onCancelAll(file.id, () => {
  190. upload.abort({ really: true })
  191. })
  192. this.onResumeAll(file.id, () => {
  193. upload.start()
  194. })
  195. if (!file.isPaused) {
  196. upload.start()
  197. }
  198. if (!file.isRestored) {
  199. this.uppy.emit('upload-started', file, upload)
  200. }
  201. })
  202. }
  203. uploadRemote (file) {
  204. this.resetUploaderReferences(file.id)
  205. return new Promise((resolve, reject) => {
  206. if (file.serverToken) {
  207. return this.connectToServerSocket(file)
  208. .then(() => resolve())
  209. .catch(reject)
  210. }
  211. this.uppy.emit('upload-started', file)
  212. fetch(file.remote.url, {
  213. method: 'post',
  214. credentials: 'include',
  215. headers: {
  216. 'Accept': 'application/json',
  217. 'Content-Type': 'application/json'
  218. },
  219. body: JSON.stringify(Object.assign({}, file.remote.body, {
  220. protocol: 's3-multipart',
  221. size: file.data.size,
  222. metadata: file.meta
  223. }))
  224. })
  225. .then((res) => {
  226. if (res.status < 200 || res.status > 300) {
  227. return reject(res.statusText)
  228. }
  229. return res.json().then((data) => {
  230. this.uppy.setFileState(file.id, { serverToken: data.token })
  231. return this.uppy.getFile(file.id)
  232. })
  233. })
  234. .then((file) => {
  235. return this.connectToServerSocket(file)
  236. })
  237. .then(() => {
  238. resolve()
  239. })
  240. .catch((err) => {
  241. reject(new Error(err))
  242. })
  243. })
  244. }
  245. connectToServerSocket (file) {
  246. return new Promise((resolve, reject) => {
  247. const token = file.serverToken
  248. const host = getSocketHost(file.remote.host)
  249. const socket = new UppySocket({ target: `${host}/api/${token}` })
  250. this.uploaderSockets[socket] = socket
  251. this.uploaderEvents[file.id] = createEventTracker(this.uppy)
  252. this.onFileRemove(file.id, (removed) => {
  253. socket.send('pause', {})
  254. resolve(`upload ${file.id} was removed`)
  255. })
  256. this.onFilePause(file.id, (isPaused) => {
  257. socket.send(isPaused ? 'pause' : 'resume', {})
  258. })
  259. this.onPauseAll(file.id, () => socket.send('pause', {}))
  260. this.onCancelAll(file.id, () => socket.send('pause', {}))
  261. this.onResumeAll(file.id, () => {
  262. if (file.error) {
  263. socket.send('pause', {})
  264. }
  265. socket.send('resume', {})
  266. })
  267. this.onRetry(file.id, () => {
  268. socket.send('pause', {})
  269. socket.send('resume', {})
  270. })
  271. this.onRetryAll(file.id, () => {
  272. socket.send('pause', {})
  273. socket.send('resume', {})
  274. })
  275. if (file.isPaused) {
  276. socket.send('pause', {})
  277. }
  278. socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
  279. socket.on('error', (errData) => {
  280. this.uppy.emit('upload-error', file, new Error(errData.error))
  281. reject(new Error(errData.error))
  282. })
  283. socket.on('success', (data) => {
  284. this.uppy.emit('upload-success', file, data, data.url)
  285. resolve()
  286. })
  287. })
  288. }
  289. upload (fileIDs) {
  290. if (fileIDs.length === 0) return Promise.resolve()
  291. const promises = fileIDs.map((id) => {
  292. const file = this.uppy.getFile(id)
  293. if (file.isRemote) {
  294. return this.uploadRemote(file)
  295. }
  296. return this.uploadFile(file)
  297. })
  298. return Promise.all(promises)
  299. }
  300. onFileRemove (fileID, cb) {
  301. this.uploaderEvents[fileID].on('file-removed', (file) => {
  302. if (fileID === file.id) cb(file.id)
  303. })
  304. }
  305. onFilePause (fileID, cb) {
  306. this.uploaderEvents[fileID].on('upload-pause', (targetFileID, isPaused) => {
  307. if (fileID === targetFileID) {
  308. // const isPaused = this.uppy.pauseResume(fileID)
  309. cb(isPaused)
  310. }
  311. })
  312. }
  313. onRetry (fileID, cb) {
  314. this.uploaderEvents[fileID].on('upload-retry', (targetFileID) => {
  315. if (fileID === targetFileID) {
  316. cb()
  317. }
  318. })
  319. }
  320. onRetryAll (fileID, cb) {
  321. this.uploaderEvents[fileID].on('retry-all', (filesToRetry) => {
  322. if (!this.uppy.getFile(fileID)) return
  323. cb()
  324. })
  325. }
  326. onPauseAll (fileID, cb) {
  327. this.uploaderEvents[fileID].on('pause-all', () => {
  328. if (!this.uppy.getFile(fileID)) return
  329. cb()
  330. })
  331. }
  332. onCancelAll (fileID, cb) {
  333. this.uploaderEvents[fileID].on('cancel-all', () => {
  334. if (!this.uppy.getFile(fileID)) return
  335. cb()
  336. })
  337. }
  338. onResumeAll (fileID, cb) {
  339. this.uploaderEvents[fileID].on('resume-all', () => {
  340. if (!this.uppy.getFile(fileID)) return
  341. cb()
  342. })
  343. }
  344. install () {
  345. this.uppy.setState({
  346. capabilities: Object.assign({}, this.uppy.getState().capabilities, {
  347. resumableUploads: true
  348. })
  349. })
  350. this.uppy.addUploader(this.upload)
  351. }
  352. uninstall () {
  353. this.uppy.setState({
  354. capabilities: Object.assign({}, this.uppy.getState().capabilities, {
  355. resumableUploads: false
  356. })
  357. })
  358. this.uppy.removeUploader(this.upload)
  359. }
  360. }