index.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. const { Plugin } = require('@uppy/core')
  2. const { Socket, RequestClient } = require('@uppy/server-utils')
  3. const emitSocketProgress = require('@uppy/utils/lib/emitSocketProgress')
  4. const getSocketHost = require('@uppy/utils/lib/getSocketHost')
  5. const limitPromises = require('@uppy/utils/lib/limitPromises')
  6. const Uploader = require('./MultipartUploader')
  7. /**
  8. * Create a wrapper around an event emitter with a `remove` method to remove
  9. * all events that were added using the wrapped emitter.
  10. */
  11. function createEventTracker (emitter) {
  12. const events = []
  13. return {
  14. on (event, fn) {
  15. events.push([ event, fn ])
  16. return emitter.on(event, fn)
  17. },
  18. remove () {
  19. events.forEach(([ event, fn ]) => {
  20. emitter.off(event, fn)
  21. })
  22. }
  23. }
  24. }
  25. function assertServerError (res) {
  26. if (res && res.error) {
  27. const error = new Error(res.message)
  28. Object.assign(error, res.error)
  29. throw error
  30. }
  31. return res
  32. }
  33. module.exports = class AwsS3Multipart extends Plugin {
  34. constructor (uppy, opts) {
  35. super(uppy, opts)
  36. this.type = 'uploader'
  37. this.id = 'AwsS3Multipart'
  38. this.title = 'AWS S3 Multipart'
  39. this.client = new RequestClient(uppy, opts)
  40. const defaultOptions = {
  41. timeout: 30 * 1000,
  42. limit: 0,
  43. createMultipartUpload: this.createMultipartUpload.bind(this),
  44. listParts: this.listParts.bind(this),
  45. prepareUploadPart: this.prepareUploadPart.bind(this),
  46. abortMultipartUpload: this.abortMultipartUpload.bind(this),
  47. completeMultipartUpload: this.completeMultipartUpload.bind(this)
  48. }
  49. this.opts = Object.assign({}, defaultOptions, opts)
  50. this.upload = this.upload.bind(this)
  51. if (typeof this.opts.limit === 'number' && this.opts.limit !== 0) {
  52. this.limitRequests = limitPromises(this.opts.limit)
  53. } else {
  54. this.limitRequests = (fn) => fn
  55. }
  56. this.uploaders = Object.create(null)
  57. this.uploaderEvents = Object.create(null)
  58. this.uploaderSockets = Object.create(null)
  59. }
  60. /**
  61. * Clean up all references for a file's upload: the MultipartUploader instance,
  62. * any events related to the file, and the uppy-server WebSocket connection.
  63. */
  64. resetUploaderReferences (fileID) {
  65. if (this.uploaders[fileID]) {
  66. this.uploaders[fileID].abort()
  67. this.uploaders[fileID] = null
  68. }
  69. if (this.uploaderEvents[fileID]) {
  70. this.uploaderEvents[fileID].remove()
  71. this.uploaderEvents[fileID] = null
  72. }
  73. if (this.uploaderSockets[fileID]) {
  74. this.uploaderSockets[fileID].close()
  75. this.uploaderSockets[fileID] = null
  76. }
  77. }
  78. assertHost () {
  79. if (!this.opts.serverUrl) {
  80. throw new Error('Expected a `serverUrl` option containing an uppy-server address.')
  81. }
  82. }
  83. createMultipartUpload (file) {
  84. this.assertHost()
  85. return this.client.post('s3/multipart', {
  86. filename: file.name,
  87. type: file.type
  88. }).then(assertServerError)
  89. }
  90. listParts (file, { key, uploadId }) {
  91. this.assertHost()
  92. const filename = encodeURIComponent(key)
  93. return this.client.get(`s3/multipart/${uploadId}?key=${filename}`)
  94. .then(assertServerError)
  95. }
  96. prepareUploadPart (file, { key, uploadId, number }) {
  97. this.assertHost()
  98. const filename = encodeURIComponent(key)
  99. return this.client.get(`s3/multipart/${uploadId}/${number}?key=${filename}`)
  100. .then(assertServerError)
  101. }
  102. completeMultipartUpload (file, { key, uploadId, parts }) {
  103. this.assertHost()
  104. const filename = encodeURIComponent(key)
  105. const uploadIdEnc = encodeURIComponent(uploadId)
  106. return this.client.post(`s3/multipart/${uploadIdEnc}/complete?key=${filename}`, { parts })
  107. .then(assertServerError)
  108. }
  109. abortMultipartUpload (file, { key, uploadId }) {
  110. this.assertHost()
  111. const filename = encodeURIComponent(key)
  112. const uploadIdEnc = encodeURIComponent(uploadId)
  113. return this.client.delete(`s3/multipart/${uploadIdEnc}?key=${filename}`)
  114. .then(assertServerError)
  115. }
  116. uploadFile (file) {
  117. return new Promise((resolve, reject) => {
  118. const upload = new Uploader(file.data, Object.assign({
  119. // .bind to pass the file object to each handler.
  120. createMultipartUpload: this.limitRequests(this.opts.createMultipartUpload.bind(this, file)),
  121. listParts: this.limitRequests(this.opts.listParts.bind(this, file)),
  122. prepareUploadPart: this.opts.prepareUploadPart.bind(this, file),
  123. completeMultipartUpload: this.limitRequests(this.opts.completeMultipartUpload.bind(this, file)),
  124. abortMultipartUpload: this.limitRequests(this.opts.abortMultipartUpload.bind(this, file)),
  125. limit: this.opts.limit || 5,
  126. onStart: (data) => {
  127. const cFile = this.uppy.getFile(file.id)
  128. this.uppy.setFileState(file.id, {
  129. s3Multipart: Object.assign({}, cFile.s3Multipart, {
  130. key: data.key,
  131. uploadId: data.uploadId,
  132. parts: []
  133. })
  134. })
  135. },
  136. onProgress: (bytesUploaded, bytesTotal) => {
  137. this.uppy.emit('upload-progress', file, {
  138. uploader: this,
  139. bytesUploaded: bytesUploaded,
  140. bytesTotal: bytesTotal
  141. })
  142. },
  143. onError: (err) => {
  144. this.uppy.log(err)
  145. this.uppy.emit('upload-error', file, err)
  146. err.message = `Failed because: ${err.message}`
  147. this.resetUploaderReferences(file.id)
  148. reject(err)
  149. },
  150. onSuccess: (result) => {
  151. this.uppy.emit('upload-success', file, upload, result.location)
  152. if (result.location) {
  153. this.uppy.log('Download ' + upload.file.name + ' from ' + result.location)
  154. }
  155. this.resetUploaderReferences(file.id)
  156. resolve(upload)
  157. },
  158. onPartComplete: (part) => {
  159. // Store completed parts in state.
  160. const cFile = this.uppy.getFile(file.id)
  161. this.uppy.setFileState(file.id, {
  162. s3Multipart: Object.assign({}, cFile.s3Multipart, {
  163. parts: [
  164. ...cFile.s3Multipart.parts,
  165. part
  166. ]
  167. })
  168. })
  169. this.uppy.emit('s3-multipart:part-uploaded', cFile, part)
  170. }
  171. }, file.s3Multipart))
  172. this.uploaders[file.id] = upload
  173. this.uploaderEvents[file.id] = createEventTracker(this.uppy)
  174. this.onFileRemove(file.id, (removed) => {
  175. this.resetUploaderReferences(file.id)
  176. resolve(`upload ${removed.id} was removed`)
  177. })
  178. this.onFilePause(file.id, (isPaused) => {
  179. if (isPaused) {
  180. upload.pause()
  181. } else {
  182. upload.start()
  183. }
  184. })
  185. this.onPauseAll(file.id, () => {
  186. upload.pause()
  187. })
  188. this.onCancelAll(file.id, () => {
  189. upload.abort({ really: true })
  190. })
  191. this.onResumeAll(file.id, () => {
  192. upload.start()
  193. })
  194. if (!file.isPaused) {
  195. upload.start()
  196. }
  197. if (!file.isRestored) {
  198. this.uppy.emit('upload-started', file, upload)
  199. }
  200. })
  201. }
  202. uploadRemote (file) {
  203. this.resetUploaderReferences(file.id)
  204. return new Promise((resolve, reject) => {
  205. if (file.serverToken) {
  206. return this.connectToServerSocket(file)
  207. .then(() => resolve())
  208. .catch(reject)
  209. }
  210. this.uppy.emit('upload-started', file)
  211. fetch(file.remote.url, {
  212. method: 'post',
  213. credentials: 'include',
  214. headers: {
  215. 'Accept': 'application/json',
  216. 'Content-Type': 'application/json'
  217. },
  218. body: JSON.stringify(Object.assign({}, file.remote.body, {
  219. protocol: 's3-multipart',
  220. size: file.data.size,
  221. metadata: file.meta
  222. }))
  223. })
  224. .then((res) => {
  225. if (res.status < 200 || res.status > 300) {
  226. return reject(res.statusText)
  227. }
  228. return res.json().then((data) => {
  229. this.uppy.setFileState(file.id, { serverToken: data.token })
  230. return this.uppy.getFile(file.id)
  231. })
  232. })
  233. .then((file) => {
  234. return this.connectToServerSocket(file)
  235. })
  236. .then(() => {
  237. resolve()
  238. })
  239. .catch((err) => {
  240. reject(new Error(err))
  241. })
  242. })
  243. }
  244. connectToServerSocket (file) {
  245. return new Promise((resolve, reject) => {
  246. const token = file.serverToken
  247. const host = getSocketHost(file.remote.serverUrl)
  248. const socket = new Socket({ target: `${host}/api/${token}` })
  249. this.uploaderSockets[socket] = socket
  250. this.uploaderEvents[file.id] = createEventTracker(this.uppy)
  251. this.onFileRemove(file.id, (removed) => {
  252. socket.send('pause', {})
  253. resolve(`upload ${file.id} was removed`)
  254. })
  255. this.onFilePause(file.id, (isPaused) => {
  256. socket.send(isPaused ? 'pause' : 'resume', {})
  257. })
  258. this.onPauseAll(file.id, () => socket.send('pause', {}))
  259. this.onCancelAll(file.id, () => socket.send('pause', {}))
  260. this.onResumeAll(file.id, () => {
  261. if (file.error) {
  262. socket.send('pause', {})
  263. }
  264. socket.send('resume', {})
  265. })
  266. this.onRetry(file.id, () => {
  267. socket.send('pause', {})
  268. socket.send('resume', {})
  269. })
  270. this.onRetryAll(file.id, () => {
  271. socket.send('pause', {})
  272. socket.send('resume', {})
  273. })
  274. if (file.isPaused) {
  275. socket.send('pause', {})
  276. }
  277. socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
  278. socket.on('error', (errData) => {
  279. this.uppy.emit('upload-error', file, new Error(errData.error))
  280. reject(new Error(errData.error))
  281. })
  282. socket.on('success', (data) => {
  283. this.uppy.emit('upload-success', file, data, data.url)
  284. resolve()
  285. })
  286. })
  287. }
  288. upload (fileIDs) {
  289. if (fileIDs.length === 0) return Promise.resolve()
  290. const promises = fileIDs.map((id) => {
  291. const file = this.uppy.getFile(id)
  292. if (file.isRemote) {
  293. return this.uploadRemote(file)
  294. }
  295. return this.uploadFile(file)
  296. })
  297. return Promise.all(promises)
  298. }
  299. onFileRemove (fileID, cb) {
  300. this.uploaderEvents[fileID].on('file-removed', (file) => {
  301. if (fileID === file.id) cb(file.id)
  302. })
  303. }
  304. onFilePause (fileID, cb) {
  305. this.uploaderEvents[fileID].on('upload-pause', (targetFileID, isPaused) => {
  306. if (fileID === targetFileID) {
  307. // const isPaused = this.uppy.pauseResume(fileID)
  308. cb(isPaused)
  309. }
  310. })
  311. }
  312. onRetry (fileID, cb) {
  313. this.uploaderEvents[fileID].on('upload-retry', (targetFileID) => {
  314. if (fileID === targetFileID) {
  315. cb()
  316. }
  317. })
  318. }
  319. onRetryAll (fileID, cb) {
  320. this.uploaderEvents[fileID].on('retry-all', (filesToRetry) => {
  321. if (!this.uppy.getFile(fileID)) return
  322. cb()
  323. })
  324. }
  325. onPauseAll (fileID, cb) {
  326. this.uploaderEvents[fileID].on('pause-all', () => {
  327. if (!this.uppy.getFile(fileID)) return
  328. cb()
  329. })
  330. }
  331. onCancelAll (fileID, cb) {
  332. this.uploaderEvents[fileID].on('cancel-all', () => {
  333. if (!this.uppy.getFile(fileID)) return
  334. cb()
  335. })
  336. }
  337. onResumeAll (fileID, cb) {
  338. this.uploaderEvents[fileID].on('resume-all', () => {
  339. if (!this.uppy.getFile(fileID)) return
  340. cb()
  341. })
  342. }
  343. install () {
  344. this.uppy.setState({
  345. capabilities: Object.assign({}, this.uppy.getState().capabilities, {
  346. resumableUploads: true
  347. })
  348. })
  349. this.uppy.addUploader(this.upload)
  350. }
  351. uninstall () {
  352. this.uppy.setState({
  353. capabilities: Object.assign({}, this.uppy.getState().capabilities, {
  354. resumableUploads: false
  355. })
  356. })
  357. this.uppy.removeUploader(this.upload)
  358. }
  359. }