Browse Source

s3: Move multipart uploader to a class

Renée Kooi 7 years ago
parent
commit
17c2224e67
2 changed files with 342 additions and 188 deletions
  1. 89 188
      src/plugins/AwsS3/Multipart.js
  2. 253 0
      src/plugins/AwsS3/MultipartUploader.js

+ 89 - 188
src/plugins/AwsS3/Multipart.js

@@ -1,29 +1,7 @@
 const Plugin = require('../../core/Plugin')
 const Translator = require('../../core/Translator')
 const { limitPromises } = require('../../core/Utils')
-
-const MB = 1024 * 1024
-const INTENTIONAL_CANCEL = new Error('Intentional cancel')
-const INTENTIONAL_PAUSE = new Error('Intentional pause')
-
-/**
- * Create a wrapper around an event emitter with a `remove` method to remove
- * all events that were added using the wrapped emitter.
- */
-function createEventTracker (emitter) {
-  const events = []
-  return {
-    on (event, fn) {
-      events.push([ event, fn ])
-      return emitter.on(event, fn)
-    },
-    remove () {
-      events.forEach(([ event, fn ]) => {
-        emitter.off(event, fn)
-      })
-    }
-  }
-}
+const Uploader = require('./MultipartUploader')
 
 module.exports = class AwsS3Multipart extends Plugin {
   constructor (uppy, opts) {
@@ -42,6 +20,7 @@ module.exports = class AwsS3Multipart extends Plugin {
       timeout: 30 * 1000,
       limit: 0,
       createMultipartUpload: this.createMultipartUpload.bind(this),
+      listParts: this.listParts.bind(this),
       prepareUploadPart: this.prepareUploadPart.bind(this),
       abortMultipartUpload: this.abortMultipartUpload.bind(this),
       completeMultipartUpload: this.completeMultipartUpload.bind(this),
@@ -65,18 +44,6 @@ module.exports = class AwsS3Multipart extends Plugin {
     }
   }
 
-  splitFile (file) {
-    const chunks = []
-    const chunkSize = Math.max(Math.ceil(file.size / 10000), 5 * MB)
-
-    for (let i = 0; i < file.size; i += chunkSize) {
-      const end = Math.min(file.size, i + chunkSize)
-      chunks.push(file.slice(i, end))
-    }
-
-    return chunks
-  }
-
   assertHost () {
     if (!this.opts.host) {
       throw new Error('Expected a `host` option containing an uppy-server address.')
@@ -94,84 +61,24 @@ module.exports = class AwsS3Multipart extends Plugin {
     }).then((response) => response.json())
   }
 
-  prepareUploadPart (file, { key, uploadId, number }) {
+  listParts (file, { key, uploadId }) {
     this.assertHost()
 
     const filename = encodeURIComponent(key)
-    return fetch(`${this.opts.host}/s3/multipart/${uploadId}/${number}?key=${filename}`, {
+    return fetch(`${this.opts.host}/s3/multipart/${uploadId}?key=${filename}`, {
       method: 'get',
       headers: { accept: 'application/json' }
     }).then((response) => response.json())
   }
 
-  uploadPart (file, url, partNumber, body, onProgress) {
-    this.uppy.log(`Uploading chunk #${partNumber} of ${body.size} bytes to ${url}`)
-    return new Promise((resolve, reject) => {
-      var xhr = new XMLHttpRequest()
-      xhr.open('PUT', url, true)
-
-      const events = createEventTracker(this.uppy)
-
-      xhr.upload.addEventListener('progress', (ev) => {
-        if (!ev.lengthComputable) return
-
-        onProgress(ev.loaded, ev.total)
-      })
-
-      xhr.addEventListener('load', (ev) => {
-        events.remove()
-        if (ev.target.status < 200 || ev.target.status >= 300) {
-          reject(new Error('Non 2xx'))
-          return
-        }
-
-        onProgress(body.size)
-
-        // NOTE This must be allowed by CORS.
-        const etag = ev.target.getResponseHeader('ETag')
-
-        // Store completed parts in state.
-        const cFile = this.uppy.getFile(file.id)
-        this.uppy.setFileState(file.id, {
-          s3Multipart: Object.assign({}, cFile.s3Multipart, {
-            parts: [
-              ...cFile.s3Multipart.parts,
-              { partNumber, etag }
-            ]
-          })
-        })
-
-        this.uppy.emit('s3-multipart:part-uploaded', cFile, {
-          partNumber,
-          etag
-        })
-
-        resolve({ etag })
-      })
-
-      xhr.addEventListener('error', (ev) => {
-        events.remove()
-        const error = new Error('Unknown error')
-        error.source = ev.target
-        reject(error)
-      })
-
-      events.on('upload-pause', (fileID, isPaused) => {
-        if (fileID === file.id && isPaused) {
-          abort(INTENTIONAL_PAUSE)
-        }
-      })
-      events.on('pause-all', () => abort(INTENTIONAL_PAUSE))
-      events.on('cancel-all', () => abort(INTENTIONAL_CANCEL))
-
-      function abort (reason) {
-        xhr.abort()
-        events.remove()
-        reject(reason)
-      }
+  prepareUploadPart (file, { key, uploadId, number }) {
+    this.assertHost()
 
-      xhr.send(body)
-    })
+    const filename = encodeURIComponent(key)
+    return fetch(`${this.opts.host}/s3/multipart/${uploadId}/${number}?key=${filename}`, {
+      method: 'get',
+      headers: { accept: 'application/json' }
+    }).then((response) => response.json())
   }
 
   completeMultipartUpload (file, { key, uploadId, parts }) {
@@ -253,102 +160,98 @@ module.exports = class AwsS3Multipart extends Plugin {
   }
 
   uploadFile (file) {
-    const chunks = this.splitFile(file.data)
+    return new Promise((resolve, reject) => {
+      const upload = new Uploader(file.data, Object.assign({
+        // .bind to pass the file object to each handler.
+        createMultipartUpload: this.limitRequests(this.opts.createMultipartUpload.bind(this, file)),
+        listParts: this.limitRequests(this.opts.listParts.bind(this, file)),
+        prepareUploadPart: this.limitRequests(this.opts.prepareUploadPart.bind(this, file)),
+        completeMultipartUpload: this.limitRequests(this.opts.completeMultipartUpload.bind(this, file)),
+        abortMultipartUpload: this.limitRequests(this.opts.abortMultipartUpload.bind(this, file)),
+        onStart: (data) => {
+          const cFile = this.uppy.getFile(file.id)
+          this.uppy.setFileState(file.id, {
+            s3Multipart: Object.assign({}, cFile.s3Multipart, {
+              key: data.key,
+              uploadId: data.uploadId,
+              parts: []
+            })
+          })
+        },
+        onProgress: (bytesUploaded, bytesTotal) => {
+          this.uppy.emit('upload-progress', file, {
+            uploader: this,
+            bytesUploaded: bytesUploaded,
+            bytesTotal: bytesTotal
+          })
+        },
+        onError: (err) => {
+          this.uppy.log(err)
+          this.uppy.emit('upload-error', file, err)
+          err.message = `Failed because: ${err.message}`
+          reject(err)
+        },
+        onSuccess: () => {
+          this.uppy.emit('upload-success', file, upload, upload.url)
+
+          if (upload.url) {
+            this.uppy.log('Download ' + upload.file.name + ' from ' + upload.url)
+          }
 
-    const completeMultipartUpload = this.limitRequests(this.opts.completeMultipartUpload)
+          resolve(upload)
+        },
+        onPartComplete: (part) => {
+          // Store completed parts in state.
+          const cFile = this.uppy.getFile(file.id)
+          this.uppy.setFileState(file.id, {
+            s3Multipart: Object.assign({}, cFile.s3Multipart, {
+              parts: [
+                ...cFile.s3Multipart.parts,
+                part
+              ]
+            })
+          })
 
-    this.uppy.emit('upload-started', file)
+          this.uppy.emit('s3-multipart:part-uploaded', cFile, part)
+        }
+      }, file.s3Multipart))
 
-    const total = file.size
-    // Keep track of progress for chunks individually, so it's easy to reset progress if one of them fails.
-    const currentProgress = chunks.map(() => 0)
+      console.log('uploader', upload)
 
-    const doUploadPart = (chunk, index) => {
-      const cFile = this.uppy.getFile(file.id)
-      if (cFile.progress.isPaused) return Promise.reject(INTENTIONAL_PAUSE)
+      this.uppy.on('file-removed', (removed) => {
+        if (file.id !== removed.id) return
+        upload.abort()
+        resolve(`upload ${removed.id} was removed`)
+      })
 
-      return Promise.resolve(
-        this.opts.prepareUploadPart(file, {
-          key: file.s3Multipart.key,
-          uploadId: file.s3Multipart.uploadId,
-          body: chunk,
-          number: index + 1
-        })
-      ).then((result) => {
-        const valid = typeof result === 'object' && result &&
-          typeof result.url === 'string'
-        if (!valid) {
-          throw new TypeError(`AwsS3/Multipart: Got incorrect result from 'prepareUploadPart()' for file '${file.name}', expected an object '{ url }'.`)
+      this.uppy.on('upload-pause', (pausee, isPaused) => {
+        if (pausee.id !== file.id) return
+        if (isPaused) {
+          upload.pause()
+        } else {
+          upload.start()
         }
-        return result
-      }).then(({ url }) => {
-        return this.uploadPart(file, url, index + 1, chunk, (current) => {
-          currentProgress[index] = current
-
-          this.uppy.emit('upload-progress', this.uppy.getFile(file.id), {
-            uploader: this,
-            bytesUploaded: currentProgress.reduce((a, b) => a + b),
-            bytesTotal: total
-          })
-        }).then(({ etag }) => ({
-          ETag: etag,
-          PartNumber: index + 1
-        }))
       })
-    }
-
-    // Limit this bundle of Prepare + Upload request instead of the individual requests;
-    // otherwise there might be too much time between Prepare and Upload (> 5 minutes).
-    const uploadPart = this.limitRequests(doUploadPart)
 
-    const partUploads = chunks.map((chunk, index) => {
-      return uploadPart(chunk, index)
-    })
+      this.uppy.on('pause-all', () => {
+        upload.pause()
+      })
 
-    return Promise.all(partUploads).catch((err) => {
-      if (err === INTENTIONAL_CANCEL) {
-        console.log('cancelled, what to do?')
-      }
-      if (err === INTENTIONAL_PAUSE) {
-        return this.handlePaused(file.id)
-      }
-      throw err
-    }).then(() => {
-      return completeMultipartUpload(file, {
-        key: file.s3Multipart.key,
-        uploadId: file.s3Multipart.uploadId,
-        parts: file.s3Multipart.parts
+      this.uppy.on('cancel-all', () => {
+        upload.abort()
       })
-    }).then(() => {
-      this.uppy.emit('upload-success', this.uppy.getFile(file.id))
-    })
-  }
 
-  // When an upload is paused, wait for it to start again.
-  // (This would be easier with a separate internal S3 Multipart uploader class)
-  handlePaused (fileID) {
-    return new Promise((resolve, reject) => {
-      const events = createEventTracker(this.uppy)
+      this.uppy.on('resume-all', () => {
+        upload.start()
+      })
 
-      const resume = (resumeIDs) => {
-        events.remove()
-        const file = this.uppy.getFile(fileID)
-        return this.uploadFile(file)
-          .then(resolve, reject)
+      if (!file.isPaused) {
+        upload.start()
       }
 
-      events.on('resume-all', () => {
-        resume()
-      })
-      events.on('cancel-all', () => {
-        events.remove()
-        reject(INTENTIONAL_CANCEL)
-      })
-      events.on('upload-pause', (unpauseID, isPaused) => {
-        if (fileID === unpauseID && !isPaused) {
-          resume()
-        }
-      })
+      if (!file.isRestored) {
+        this.uppy.emit('upload-started', file, upload)
+      }
     })
   }
 
@@ -376,12 +279,10 @@ module.exports = class AwsS3Multipart extends Plugin {
 
   install () {
     this.addResumableUploadsCapabilityFlag()
-    this.uppy.addPreProcessor(this.prepareUpload)
     this.uppy.addUploader(this.upload)
   }
 
   uninstall () {
     this.uppy.removeUploader(this.upload)
-    this.uppy.removePreProcessor(this.prepareUpload)
   }
 }

+ 253 - 0
src/plugins/AwsS3/MultipartUploader.js

@@ -0,0 +1,253 @@
+const MB = 1024 * 1024
+
+const defaultOptions = {
+  limit: 1,
+  onStart () {},
+  onProgress () {},
+  onPartComplete () {},
+  onSuccess () {},
+  onError (err) { console.error(err) }
+}
+
+function remove (arr, el) {
+  const i = arr.indexOf(el)
+  if (i !== -1) arr.splice(i, 1)
+}
+
+class MultipartUploader {
+  constructor (file, options) {
+    this.options = Object.assign({}, defaultOptions, options)
+    this.file = file
+
+    this.key = this.options.key || null
+    this.uploadId = this.options.uploadId || null
+    this.parts = this.options.parts || []
+
+    this.isPaused = false
+    this.chunks = null
+    this.chunkState = null
+    this.uploading = []
+
+    this._initChunks()
+  }
+
+  _initChunks () {
+    const chunks = []
+    const chunkSize = Math.max(Math.ceil(this.file.size / 10000), 5 * MB)
+
+    for (let i = 0; i < this.file.size; i += chunkSize) {
+      const end = Math.min(this.file.size, i + chunkSize)
+      chunks.push(this.file.slice(i, end))
+    }
+
+    this.chunks = chunks
+    this.chunkState = chunks.map(() => ({
+      uploaded: 0,
+      done: false
+    }))
+  }
+
+  _createUpload () {
+    return this.options.createMultipartUpload().then((result) => {
+      const valid = typeof result === 'object' && result &&
+        typeof result.uploadId === 'string' &&
+        typeof result.key === 'string'
+      if (!valid) {
+        throw new TypeError(`AwsS3/Multipart: Got incorrect result from 'createMultipartUpload()', expected an object '{ uploadId, key }'.`)
+      }
+      return result
+    }).then((result) => {
+      this.key = result.key
+      this.uploadId = result.uploadId
+
+      this.options.onStart(result)
+    }).then(() => {
+      if (!this.isPaused) this._uploadParts()
+    })
+  }
+
+  _resumeUpload () {
+    this.options.listParts({
+      uploadId: this.uploadId,
+      key: this.key
+    }).then((parts) => {
+      parts.forEach((part) => {
+        const i = part.PartNumber - 1
+        this.chunkState[i] = {
+          uploaded: part.Size,
+          etag: part.ETag,
+          done: true
+        }
+
+        // Only add if we did not yet know about this part.
+        if (!this.parts.some((p) => p.PartNumber === part.PartNumber)) {
+          this.parts.push({
+            PartNumber: part.PartNumber,
+            ETag: part.ETag
+          })
+        }
+      })
+      this._uploadParts()
+    }).catch((err) => {
+      this._onError(err)
+    })
+  }
+
+  _uploadParts () {
+    const need = this.options.limit - this.uploading.length
+    if (need === 0) return
+
+    // All parts are uploaded.
+    if (this.chunkState.every((state) => state.done)) {
+      this._completeUpload()
+      return
+    }
+
+    const candidates = []
+    for (let i = 0; i < this.chunkState.length; i++) {
+      const state = this.chunkState[i]
+      if (state.done) continue
+
+      candidates.push(i)
+      if (candidates.length >= need) {
+        break
+      }
+    }
+
+    candidates.forEach((index) => {
+      this._uploadPart(index)
+    })
+  }
+
+  _uploadPart (index) {
+    const body = this.chunks[index]
+    return Promise.resolve(
+      this.options.prepareUploadPart({
+        key: this.key,
+        uploadId: this.uploadId,
+        body,
+        number: index + 1
+      })
+    ).then((result) => {
+      const valid = typeof result === 'object' && result &&
+        typeof result.url === 'string'
+      if (!valid) {
+        throw new TypeError(`AwsS3/Multipart: Got incorrect result from 'prepareUploadPart()', expected an object '{ url }'.`)
+      }
+      return result
+    }).then(({ url }) => {
+      this._uploadPartBytes(index, url)
+    })
+  }
+
+  _onPartProgress (index, sent, total) {
+    this.chunkState[index].uploaded = sent
+
+    const totalUploaded = this.chunkState.reduce((n, c) => n + c.uploaded, 0)
+    this.options.onProgress(totalUploaded, this.file.size)
+  }
+
+  _onPartComplete (index, etag) {
+    this.chunkState[index].etag = etag
+    this.chunkState[index].done = true
+
+    const part = {
+      PartNumber: index + 1,
+      ETag: etag
+    }
+    this.parts.push(part)
+
+    this.options.onPartComplete(part)
+
+    this._uploadParts()
+  }
+
+  _uploadPartBytes (index, url) {
+    const body = this.chunks[index]
+    const xhr = new XMLHttpRequest()
+    xhr.open('PUT', url, true)
+    xhr.responseType = 'text'
+
+    xhr._id = Math.random()
+
+    this.uploading.push(xhr)
+
+    xhr.upload.addEventListener('progress', (ev) => {
+      if (!ev.lengthComputable) return
+
+      this._onPartProgress(index, ev.loaded, ev.total)
+    })
+
+    xhr.addEventListener('abort', (ev) => {
+      console.log('abort', ev)
+      remove(this.uploading, ev.target)
+    })
+
+    xhr.addEventListener('load', (ev) => {
+      console.log('load', ev)
+      remove(this.uploading, ev.target)
+      if (ev.target.status < 200 || ev.target.status >= 300) {
+        this._onError(new Error('Non 2xx'))
+        return
+      }
+
+      this._onPartProgress(index, body.size, body.size)
+
+      // NOTE This must be allowed by CORS.
+      const etag = ev.target.getResponseHeader('ETag')
+
+      this._onPartComplete(index, etag)
+    })
+
+    xhr.addEventListener('error', (ev) => {
+      remove(this.uploading, ev.target)
+
+      const error = new Error('Unknown error')
+      error.source = ev.target
+      this._onError(error)
+    })
+
+    xhr.send(body)
+  }
+
+  _completeUpload () {
+    return Promise.resolve(
+      this.options.completeMultipartUpload({
+        key: this.key,
+        uploadId: this.uploadId,
+        parts: this.parts
+      })
+    ).then((result) => {
+      this.options.onSuccess()
+    }, (err) => {
+      this._onError(err)
+    })
+  }
+
+  _abortUpload () {
+    this.options.abortMultipartUpload({
+      uploadId: this.uploadId
+    })
+  }
+
+  start () {
+    if (this.uploadId) {
+      this._resumeUpload()
+    } else {
+      this._createUpload()
+    }
+  }
+
+  pause () {
+    this.uploading.forEach((xhr) => xhr.abort())
+    this.isPaused = true
+  }
+
+  abort (really = false) {
+    if (!really) return this.pause()
+
+    this._abortUpload()
+  }
+}
+
+module.exports = MultipartUploader