Переглянути джерело

s3: Upload multiple chunks simultaneously

Renée Kooi 7 роки тому
батько
коміт
b9cec0e4d2
2 змінених файлів з 33 додано та 81 видалено
  1. 18 75
      src/plugins/AwsS3/Multipart.js
  2. 15 6
      src/plugins/AwsS3/MultipartUploader.js

+ 18 - 75
src/plugins/AwsS3/Multipart.js

@@ -1,20 +1,20 @@
 const Plugin = require('../../core/Plugin')
 const Plugin = require('../../core/Plugin')
-const Translator = require('../../core/Translator')
 const { limitPromises } = require('../../core/Utils')
 const { limitPromises } = require('../../core/Utils')
 const Uploader = require('./MultipartUploader')
 const Uploader = require('./MultipartUploader')
 
 
+function handleResponse (response) {
+  if (response.ok) return response.json()
+  return response.json().then((err) => {
+    throw err
+  })
+}
+
 module.exports = class AwsS3Multipart extends Plugin {
 module.exports = class AwsS3Multipart extends Plugin {
   constructor (uppy, opts) {
   constructor (uppy, opts) {
     super(uppy, opts)
     super(uppy, opts)
     this.type = 'uploader'
     this.type = 'uploader'
-    this.id = 'AwsS3'
-    this.title = 'AWS S3'
-
-    const defaultLocale = {
-      strings: {
-        preparingUpload: 'Preparing upload...'
-      }
-    }
+    this.id = 'AwsS3Multipart'
+    this.title = 'AWS S3 Multipart'
 
 
     const defaultOptions = {
     const defaultOptions = {
       timeout: 30 * 1000,
       timeout: 30 * 1000,
@@ -23,18 +23,11 @@ module.exports = class AwsS3Multipart extends Plugin {
       listParts: this.listParts.bind(this),
       listParts: this.listParts.bind(this),
       prepareUploadPart: this.prepareUploadPart.bind(this),
       prepareUploadPart: this.prepareUploadPart.bind(this),
       abortMultipartUpload: this.abortMultipartUpload.bind(this),
       abortMultipartUpload: this.abortMultipartUpload.bind(this),
-      completeMultipartUpload: this.completeMultipartUpload.bind(this),
-      locale: defaultLocale
+      completeMultipartUpload: this.completeMultipartUpload.bind(this)
     }
     }
 
 
     this.opts = Object.assign({}, defaultOptions, opts)
     this.opts = Object.assign({}, defaultOptions, opts)
-    this.locale = Object.assign({}, defaultLocale, this.opts.locale)
-    this.locale.strings = Object.assign({}, defaultLocale.strings, this.opts.locale.strings)
-
-    this.translator = new Translator({ locale: this.locale })
-    this.i18n = this.translator.translate.bind(this.translator)
 
 
-    this.prepareUpload = this.prepareUpload.bind(this)
     this.upload = this.upload.bind(this)
     this.upload = this.upload.bind(this)
 
 
     if (typeof this.opts.limit === 'number' && this.opts.limit !== 0) {
     if (typeof this.opts.limit === 'number' && this.opts.limit !== 0) {
@@ -58,7 +51,7 @@ module.exports = class AwsS3Multipart extends Plugin {
     return fetch(`${this.opts.host}/s3/multipart?filename=${filename}&type=${type}`, {
     return fetch(`${this.opts.host}/s3/multipart?filename=${filename}&type=${type}`, {
       method: 'post',
       method: 'post',
       headers: { accept: 'application/json' }
       headers: { accept: 'application/json' }
-    }).then((response) => response.json())
+    }).then(handleResponse)
   }
   }
 
 
   listParts (file, { key, uploadId }) {
   listParts (file, { key, uploadId }) {
@@ -68,7 +61,7 @@ module.exports = class AwsS3Multipart extends Plugin {
     return fetch(`${this.opts.host}/s3/multipart/${uploadId}?key=${filename}`, {
     return fetch(`${this.opts.host}/s3/multipart/${uploadId}?key=${filename}`, {
       method: 'get',
       method: 'get',
       headers: { accept: 'application/json' }
       headers: { accept: 'application/json' }
-    }).then((response) => response.json())
+    }).then(handleResponse)
   }
   }
 
 
   prepareUploadPart (file, { key, uploadId, number }) {
   prepareUploadPart (file, { key, uploadId, number }) {
@@ -78,7 +71,7 @@ module.exports = class AwsS3Multipart extends Plugin {
     return fetch(`${this.opts.host}/s3/multipart/${uploadId}/${number}?key=${filename}`, {
     return fetch(`${this.opts.host}/s3/multipart/${uploadId}/${number}?key=${filename}`, {
       method: 'get',
       method: 'get',
       headers: { accept: 'application/json' }
       headers: { accept: 'application/json' }
-    }).then((response) => response.json())
+    }).then(handleResponse)
   }
   }
 
 
   completeMultipartUpload (file, { key, uploadId, parts }) {
   completeMultipartUpload (file, { key, uploadId, parts }) {
@@ -93,7 +86,7 @@ module.exports = class AwsS3Multipart extends Plugin {
         'content-type': 'application/json'
         'content-type': 'application/json'
       },
       },
       body: JSON.stringify({ parts })
       body: JSON.stringify({ parts })
-    }).then((response) => response.json())
+    }).then(handleResponse)
   }
   }
 
 
   abortMultipartUpload (file, { key, uploadId }) {
   abortMultipartUpload (file, { key, uploadId }) {
@@ -104,59 +97,7 @@ module.exports = class AwsS3Multipart extends Plugin {
     return fetch(`${this.opts.host}/s3/multipart/${uploadIdEnc}?key=${filename}`, {
     return fetch(`${this.opts.host}/s3/multipart/${uploadIdEnc}?key=${filename}`, {
       method: 'delete',
       method: 'delete',
       headers: { accept: 'application/json' }
       headers: { accept: 'application/json' }
-    }).then((response) => response.json())
-  }
-
-  prepareUpload (fileIDs) {
-    fileIDs.forEach((id) => {
-      const file = this.uppy.getFile(id)
-      this.uppy.emit('preprocess-progress', file, {
-        mode: 'determinate',
-        message: this.i18n('preparingUpload'),
-        value: 0
-      })
-    })
-
-    const createMultipartUpload = this.limitRequests(this.opts.createMultipartUpload)
-
-    return Promise.all(
-      fileIDs.map((id) => {
-        const file = this.uppy.getFile(id)
-        if (file.s3Multipart && file.s3Multipart.uploadId) {
-          return Promise.resolve(file.s3Multipart)
-        }
-
-        const createPromise = Promise.resolve()
-          .then(() => createMultipartUpload(file))
-        return createPromise.then((result) => {
-          const valid = typeof result === 'object' && result &&
-            typeof result.uploadId === 'string' &&
-            typeof result.key === 'string'
-          if (!valid) {
-            throw new TypeError(`AwsS3/Multipart: Got incorrect result from 'createMultipartUpload()' for file '${file.name}', expected an object '{ uploadId, key }'.`)
-          }
-          return result
-        }).then((result) => {
-          this.uppy.emit('preprocess-progress', file, {
-            mode: 'determinate',
-            message: this.i18n('preparingUpload'),
-            value: 1
-          })
-
-          this.uppy.setFileState(file.id, {
-            s3Multipart: Object.assign({
-              parts: []
-            }, result)
-          })
-          this.uppy.emit('preprocess-complete', file)
-
-          return result
-        }).catch((error) => {
-          this.uppy.emit('upload-error', file, error)
-          throw error
-        })
-      })
-    )
+    }).then(handleResponse)
   }
   }
 
 
   uploadFile (file) {
   uploadFile (file) {
@@ -165,9 +106,11 @@ module.exports = class AwsS3Multipart extends Plugin {
         // .bind to pass the file object to each handler.
         // .bind to pass the file object to each handler.
         createMultipartUpload: this.limitRequests(this.opts.createMultipartUpload.bind(this, file)),
         createMultipartUpload: this.limitRequests(this.opts.createMultipartUpload.bind(this, file)),
         listParts: this.limitRequests(this.opts.listParts.bind(this, file)),
         listParts: this.limitRequests(this.opts.listParts.bind(this, file)),
-        prepareUploadPart: this.limitRequests(this.opts.prepareUploadPart.bind(this, file)),
+        prepareUploadPart: this.opts.prepareUploadPart.bind(this, file),
         completeMultipartUpload: this.limitRequests(this.opts.completeMultipartUpload.bind(this, file)),
         completeMultipartUpload: this.limitRequests(this.opts.completeMultipartUpload.bind(this, file)),
         abortMultipartUpload: this.limitRequests(this.opts.abortMultipartUpload.bind(this, file)),
         abortMultipartUpload: this.limitRequests(this.opts.abortMultipartUpload.bind(this, file)),
+
+        limit: this.opts.limit || 5,
         onStart: (data) => {
         onStart: (data) => {
           const cFile = this.uppy.getFile(file.id)
           const cFile = this.uppy.getFile(file.id)
           this.uppy.setFileState(file.id, {
           this.uppy.setFileState(file.id, {

+ 15 - 6
src/plugins/AwsS3/MultipartUploader.js

@@ -43,6 +43,7 @@ class MultipartUploader {
     this.chunks = chunks
     this.chunks = chunks
     this.chunkState = chunks.map(() => ({
     this.chunkState = chunks.map(() => ({
       uploaded: 0,
       uploaded: 0,
+      busy: false,
       done: false
       done: false
     }))
     }))
   }
   }
@@ -62,7 +63,7 @@ class MultipartUploader {
 
 
       this.options.onStart(result)
       this.options.onStart(result)
     }).then(() => {
     }).then(() => {
-      if (!this.isPaused) this._uploadParts()
+      this._uploadParts()
     })
     })
   }
   }
 
 
@@ -94,6 +95,8 @@ class MultipartUploader {
   }
   }
 
 
   _uploadParts () {
   _uploadParts () {
+    if (this.isPaused) return
+
     const need = this.options.limit - this.uploading.length
     const need = this.options.limit - this.uploading.length
     if (need === 0) return
     if (need === 0) return
 
 
@@ -106,7 +109,7 @@ class MultipartUploader {
     const candidates = []
     const candidates = []
     for (let i = 0; i < this.chunkState.length; i++) {
     for (let i = 0; i < this.chunkState.length; i++) {
       const state = this.chunkState[i]
       const state = this.chunkState[i]
-      if (state.done) continue
+      if (state.done || state.busy) continue
 
 
       candidates.push(i)
       candidates.push(i)
       if (candidates.length >= need) {
       if (candidates.length >= need) {
@@ -121,6 +124,8 @@ class MultipartUploader {
 
 
   _uploadPart (index) {
   _uploadPart (index) {
     const body = this.chunks[index]
     const body = this.chunks[index]
+    this.chunkState[index].busy = true
+
     return Promise.resolve(
     return Promise.resolve(
       this.options.prepareUploadPart({
       this.options.prepareUploadPart({
         key: this.key,
         key: this.key,
@@ -168,8 +173,6 @@ class MultipartUploader {
     xhr.open('PUT', url, true)
     xhr.open('PUT', url, true)
     xhr.responseType = 'text'
     xhr.responseType = 'text'
 
 
-    xhr._id = Math.random()
-
     this.uploading.push(xhr)
     this.uploading.push(xhr)
 
 
     xhr.upload.addEventListener('progress', (ev) => {
     xhr.upload.addEventListener('progress', (ev) => {
@@ -179,13 +182,14 @@ class MultipartUploader {
     })
     })
 
 
     xhr.addEventListener('abort', (ev) => {
     xhr.addEventListener('abort', (ev) => {
-      console.log('abort', ev)
       remove(this.uploading, ev.target)
       remove(this.uploading, ev.target)
+      this.chunkState[index].busy = false
     })
     })
 
 
     xhr.addEventListener('load', (ev) => {
     xhr.addEventListener('load', (ev) => {
-      console.log('load', ev)
       remove(this.uploading, ev.target)
       remove(this.uploading, ev.target)
+      this.chunkState[index].busy = false
+
       if (ev.target.status < 200 || ev.target.status >= 300) {
       if (ev.target.status < 200 || ev.target.status >= 300) {
         this._onError(new Error('Non 2xx'))
         this._onError(new Error('Non 2xx'))
         return
         return
@@ -201,6 +205,7 @@ class MultipartUploader {
 
 
     xhr.addEventListener('error', (ev) => {
     xhr.addEventListener('error', (ev) => {
       remove(this.uploading, ev.target)
       remove(this.uploading, ev.target)
+      this.chunkState[index].busy = false
 
 
       const error = new Error('Unknown error')
       const error = new Error('Unknown error')
       error.source = ev.target
       error.source = ev.target
@@ -211,6 +216,9 @@ class MultipartUploader {
   }
   }
 
 
   _completeUpload () {
   _completeUpload () {
+    // Parts may not have completed uploading in sorted order, if limit > 1.
+    this.parts.sort((a, b) => a.PartNumber - b.PartNumber)
+
     return Promise.resolve(
     return Promise.resolve(
       this.options.completeMultipartUpload({
       this.options.completeMultipartUpload({
         key: this.key,
         key: this.key,
@@ -232,6 +240,7 @@ class MultipartUploader {
   }
   }
 
 
   start () {
   start () {
+    this.isPaused = false
     if (this.uploadId) {
     if (this.uploadId) {
       this._resumeUpload()
       this._resumeUpload()
     } else {
     } else {