Browse Source

s3: Multipart pause/resume WIP

Renée Kooi 7 years ago
parent
commit
cb71a38a25
1 changed files with 116 additions and 6 deletions
  1. 116 6
      src/plugins/AwsS3/Multipart.js

+ 116 - 6
src/plugins/AwsS3/Multipart.js

@@ -3,6 +3,27 @@ const Translator = require('../../core/Translator')
 const { limitPromises } = require('../../core/Utils')
 
 const MB = 1024 * 1024
+const INTENTIONAL_CANCEL = new Error('Intentional cancel')
+const INTENTIONAL_PAUSE = new Error('Intentional pause')
+
+/**
+ * Create a wrapper around an event emitter with a `remove` method to remove
+ * all events that were added using the wrapped emitter.
+ */
+function createEventTracker (emitter) {
+  const events = []
+  return {
+    on (event, fn) {
+      events.push([ event, fn ])
+      return emitter.on(event, fn)
+    },
+    remove () {
+      events.forEach(([ event, fn ]) => {
+        emitter.off(event, fn)
+      })
+    }
+  }
+}
 
 module.exports = class AwsS3Multipart extends Plugin {
   constructor (uppy, opts) {
@@ -83,12 +104,14 @@ module.exports = class AwsS3Multipart extends Plugin {
     }).then((response) => response.json())
   }
 
-  uploadPart (url, body, onProgress) {
-    this.uppy.log(`Uploading a chunk of ${body.size} bytes to ${url}`)
+  uploadPart (file, url, partNumber, body, onProgress) {
+    this.uppy.log(`Uploading chunk #${partNumber} of ${body.size} bytes to ${url}`)
     return new Promise((resolve, reject) => {
       var xhr = new XMLHttpRequest()
       xhr.open('PUT', url, true)
 
+      const events = createEventTracker(this.uppy)
+
       xhr.upload.addEventListener('progress', (ev) => {
         if (!ev.lengthComputable) return
 
@@ -96,6 +119,7 @@ module.exports = class AwsS3Multipart extends Plugin {
       })
 
       xhr.addEventListener('load', (ev) => {
+        events.remove()
         if (ev.target.status < 200 || ev.target.status >= 300) {
           reject(new Error('Non 2xx'))
           return
@@ -105,15 +129,47 @@ module.exports = class AwsS3Multipart extends Plugin {
 
         // NOTE This must be allowed by CORS.
         const etag = ev.target.getResponseHeader('ETag')
+
+        // Store completed parts in state.
+        const cFile = this.uppy.getFile(file.id)
+        this.uppy.setFileState(file.id, {
+          s3Multipart: Object.assign({}, cFile.s3Multipart, {
+            parts: [
+              ...cFile.s3Multipart.parts,
+              { partNumber, etag }
+            ]
+          })
+        })
+
+        this.uppy.emit('s3-multipart:part-uploaded', cFile, {
+          partNumber,
+          etag
+        })
+
         resolve({ etag })
       })
 
       xhr.addEventListener('error', (ev) => {
+        events.remove()
         const error = new Error('Unknown error')
         error.source = ev.target
         reject(error)
       })
 
+      events.on('upload-pause', (fileID, isPaused) => {
+        if (fileID === file.id && isPaused) {
+          abort(INTENTIONAL_PAUSE)
+        }
+      })
+      events.on('pause-all', () => abort(INTENTIONAL_PAUSE))
+      events.on('cancel-all', () => abort(INTENTIONAL_CANCEL))
+
+      function abort (reason) {
+        xhr.abort()
+        events.remove()
+        reject(reason)
+      }
+
       xhr.send(body)
     })
   }
@@ -159,6 +215,10 @@ module.exports = class AwsS3Multipart extends Plugin {
     return Promise.all(
       fileIDs.map((id) => {
         const file = this.uppy.getFile(id)
+        if (file.s3Multipart && file.s3Multipart.uploadId) {
+          return Promise.resolve(file.s3Multipart)
+        }
+
         const createPromise = Promise.resolve()
           .then(() => createMultipartUpload(file))
         return createPromise.then((result) => {
@@ -177,7 +237,9 @@ module.exports = class AwsS3Multipart extends Plugin {
           })
 
           this.uppy.setFileState(file.id, {
-            s3Multipart: result
+            s3Multipart: Object.assign({
+              parts: []
+            }, result)
           })
           this.uppy.emit('preprocess-complete', file)
 
@@ -202,6 +264,9 @@ module.exports = class AwsS3Multipart extends Plugin {
     const currentProgress = chunks.map(() => 0)
 
     const doUploadPart = (chunk, index) => {
+      const cFile = this.uppy.getFile(file.id)
+      if (cFile.progress.isPaused) return Promise.reject(INTENTIONAL_PAUSE)
+
       return Promise.resolve(
         this.opts.prepareUploadPart(file, {
           key: file.s3Multipart.key,
@@ -217,7 +282,7 @@ module.exports = class AwsS3Multipart extends Plugin {
         }
         return result
       }).then(({ url }) => {
-        return this.uploadPart(url, chunk, (current) => {
+        return this.uploadPart(file, url, index + 1, chunk, (current) => {
           currentProgress[index] = current
 
           this.uppy.emit('upload-progress', this.uppy.getFile(file.id), {
@@ -240,17 +305,53 @@ module.exports = class AwsS3Multipart extends Plugin {
       return uploadPart(chunk, index)
     })
 
-    return Promise.all(partUploads).then((parts) => {
+    return Promise.all(partUploads).catch((err) => {
+      if (err === INTENTIONAL_CANCEL) {
+        console.log('cancelled, what to do?')
+      }
+      if (err === INTENTIONAL_PAUSE) {
+        return this.handlePaused(file.id)
+      }
+      throw err
+    }).then(() => {
       return completeMultipartUpload(file, {
         key: file.s3Multipart.key,
         uploadId: file.s3Multipart.uploadId,
-        parts
+        parts: file.s3Multipart.parts
       })
     }).then(() => {
       this.uppy.emit('upload-success', this.uppy.getFile(file.id))
     })
   }
 
+  // When an upload is paused, wait for it to start again.
+  // (This would be easier with a separate internal S3 Multipart uploader class)
+  handlePaused (fileID) {
+    return new Promise((resolve, reject) => {
+      const events = createEventTracker(this.uppy)
+
+      const resume = (resumeIDs) => {
+        events.remove()
+        const file = this.uppy.getFile(fileID)
+        return this.uploadFile(file)
+          .then(resolve, reject)
+      }
+
+      events.on('resume-all', () => {
+        resume()
+      })
+      events.on('cancel-all', () => {
+        events.remove()
+        reject(INTENTIONAL_CANCEL)
+      })
+      events.on('upload-pause', (unpauseID, isPaused) => {
+        if (fileID === unpauseID && !isPaused) {
+          resume()
+        }
+      })
+    })
+  }
+
   upload (fileIDs) {
     if (fileIDs.length === 0) return Promise.resolve()
 
@@ -265,7 +366,16 @@ module.exports = class AwsS3Multipart extends Plugin {
     return Promise.all(promises)
   }
 
+  addResumableUploadsCapabilityFlag () {
+    this.uppy.setState({
+      capabilities: Object.assign({}, this.uppy.getState().capabilities, {
+        resumableUploads: true
+      })
+    })
+  }
+
   install () {
+    this.addResumableUploadsCapabilityFlag()
     this.uppy.addPreProcessor(this.prepareUpload)
     this.uppy.addUploader(this.upload)
   }