Bladeren bron

s3: Begin work on remote Multipart uploads

Renée Kooi 7 jaren geleden
bovenliggende
commit
1f251c5ee4
1 gewijzigde bestanden met toevoegingen van 215 en 10 verwijderingen
  1. 215 10
      src/plugins/AwsS3/Multipart.js

+ 215 - 10
src/plugins/AwsS3/Multipart.js

@@ -1,5 +1,10 @@
 const Plugin = require('../../core/Plugin')
 const Plugin = require('../../core/Plugin')
-const { limitPromises } = require('../../core/Utils')
+const UppySocket = require('../../core/UppySocket')
+const {
+  emitSocketProgress,
+  getSocketHost,
+  limitPromises
+} = require('../../core/Utils')
 const Uploader = require('./MultipartUploader')
 const Uploader = require('./MultipartUploader')
 
 
 function handleResponse (response) {
 function handleResponse (response) {
@@ -9,6 +14,25 @@ function handleResponse (response) {
   })
   })
 }
 }
 
 
+/**
+ * Create a wrapper around an event emitter with a `remove` method to remove
+ * all events that were added using the wrapped emitter.
+ */
+function createEventTracker (emitter) {
+  const events = []
+  return {
+    on (event, fn) {
+      events.push([ event, fn ])
+      return emitter.on(event, fn)
+    },
+    remove () {
+      events.forEach(([ event, fn ]) => {
+        emitter.off(event, fn)
+      })
+    }
+  }
+}
+
 module.exports = class AwsS3Multipart extends Plugin {
 module.exports = class AwsS3Multipart extends Plugin {
   constructor (uppy, opts) {
   constructor (uppy, opts) {
     super(uppy, opts)
     super(uppy, opts)
@@ -35,6 +59,29 @@ module.exports = class AwsS3Multipart extends Plugin {
     } else {
     } else {
       this.limitRequests = (fn) => fn
       this.limitRequests = (fn) => fn
     }
     }
+
+    this.uploaders = Object.create(null)
+    this.uploaderEvents = Object.create(null)
+    this.uploaderSockets = Object.create(null)
+  }
+
+  /**
+   * Clean up all references for a file's upload: the MultipartUploader instance,
+   * any events related to the file, and the uppy-server WebSocket connection.
+   */
+  resetUploaderReferences (fileID) {
+    if (this.uploaders[fileID]) {
+      this.uploaders[fileID].abort()
+      this.uploaders[fileID] = null
+    }
+    if (this.uploaderEvents[fileID]) {
+      this.uploaderEvents[fileID].remove()
+      this.uploaderEvents[fileID] = null
+    }
+    if (this.uploaderSockets[fileID]) {
+      this.uploaderSockets[fileID].close()
+      this.uploaderSockets[fileID] = null
+    }
   }
   }
 
 
   assertHost () {
   assertHost () {
@@ -132,6 +179,8 @@ module.exports = class AwsS3Multipart extends Plugin {
           this.uppy.log(err)
           this.uppy.log(err)
           this.uppy.emit('upload-error', file, err)
           this.uppy.emit('upload-error', file, err)
           err.message = `Failed because: ${err.message}`
           err.message = `Failed because: ${err.message}`
+
+          this.resetUploaderReferences(file.id)
           reject(err)
           reject(err)
         },
         },
         onSuccess: (result) => {
         onSuccess: (result) => {
@@ -141,6 +190,7 @@ module.exports = class AwsS3Multipart extends Plugin {
             this.uppy.log('Download ' + upload.file.name + ' from ' + result.location)
             this.uppy.log('Download ' + upload.file.name + ' from ' + result.location)
           }
           }
 
 
+          this.resetUploaderReferences(file.id)
           resolve(upload)
           resolve(upload)
         },
         },
         onPartComplete: (part) => {
         onPartComplete: (part) => {
@@ -159,14 +209,15 @@ module.exports = class AwsS3Multipart extends Plugin {
         }
         }
       }, file.s3Multipart))
       }, file.s3Multipart))
 
 
-      this.uppy.on('file-removed', (removed) => {
-        if (file.id !== removed.id) return
-        upload.abort({ really: true })
+      this.uploaders[file.id] = upload
+      this.uploaderEvents[file.id] = createEventTracker(this.uppy)
+
+      this.onFileRemove(file.id, (removed) => {
+        this.resetUploaderReferences(file.id)
         resolve(`upload ${removed.id} was removed`)
         resolve(`upload ${removed.id} was removed`)
       })
       })
 
 
-      this.uppy.on('upload-pause', (pausee, isPaused) => {
-        if (pausee.id !== file.id) return
+      this.onFilePause(file.id, (isPaused) => {
         if (isPaused) {
         if (isPaused) {
           upload.pause()
           upload.pause()
         } else {
         } else {
@@ -174,15 +225,15 @@ module.exports = class AwsS3Multipart extends Plugin {
         }
         }
       })
       })
 
 
-      this.uppy.on('pause-all', () => {
+      this.onPauseAll(file.id, () => {
         upload.pause()
         upload.pause()
       })
       })
 
 
-      this.uppy.on('cancel-all', () => {
+      this.onCancelAll(file.id, () => {
         upload.abort({ really: true })
         upload.abort({ really: true })
       })
       })
 
 
-      this.uppy.on('resume-all', () => {
+      this.onResumeAll(file.id, () => {
         upload.start()
         upload.start()
       })
       })
 
 
@@ -196,13 +247,116 @@ module.exports = class AwsS3Multipart extends Plugin {
     })
     })
   }
   }
 
 
+  uploadRemote (file) {
+    this.resetUploaderReferences(file.id)
+
+    return new Promise((resolve, reject) => {
+      if (file.serverToken) {
+        return this.connectToServerSocket(file)
+          .then(() => resolve())
+          .catch(reject)
+      }
+
+      this.uppy.emit('upload-started', file)
+
+      fetch(file.remote.url, {
+        method: 'post',
+        credentials: 'include',
+        headers: {
+          'Accept': 'application/json',
+          'Content-Type': 'application/json'
+        },
+        body: JSON.stringify(Object.assign({}, file.remote.body, {
+          protocol: 's3-multipart',
+          size: file.data.size,
+          metadata: file.meta
+        }))
+      })
+      .then((res) => {
+        if (res.status < 200 || res.status > 300) {
+          return reject(res.statusText)
+        }
+
+        return res.json().then((data) => {
+          this.uppy.setFileState(file.id, { serverToken: data.token })
+          return this.uppy.getFile(file.id)
+        })
+      })
+      .then((file) => {
+        return this.connectToServerSocket(file)
+      })
+      .then(() => {
+        resolve()
+      })
+      .catch((err) => {
+        reject(new Error(err))
+      })
+    })
+  }
+
+  connectToServerSocket (file) {
+    return new Promise((resolve, reject) => {
+      const token = file.serverToken
+      const host = getSocketHost(file.remote.host)
+      const socket = new UppySocket({ target: `${host}/api/${token}` })
+      this.uploaderSockets[socket] = socket
+      this.uploaderEvents[file.id] = createEventTracker(this.uppy)
+
+      this.onFileRemove(file.id, (removed) => {
+        socket.send('pause', {})
+        resolve(`upload ${file.id} was removed`)
+      })
+
+      this.onFilePause(file.id, (isPaused) => {
+        socket.send(isPaused ? 'pause' : 'resume', {})
+      })
+
+      this.onPauseAll(file.id, () => socket.send('pause', {}))
+
+      this.onCancelAll(file.id, () => socket.send('pause', {}))
+
+      this.onResumeAll(file.id, () => {
+        if (file.error) {
+          socket.send('pause', {})
+        }
+        socket.send('resume', {})
+      })
+
+      this.onRetry(file.id, () => {
+        socket.send('pause', {})
+        socket.send('resume', {})
+      })
+
+      this.onRetryAll(file.id, () => {
+        socket.send('pause', {})
+        socket.send('resume', {})
+      })
+
+      if (file.isPaused) {
+        socket.send('pause', {})
+      }
+
+      socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
+
+      socket.on('error', (errData) => {
+        this.uppy.emit('upload-error', file, new Error(errData.error))
+        reject(new Error(errData.error))
+      })
+
+      socket.on('success', (data) => {
+        this.uppy.emit('upload-success', file, data, data.url)
+        resolve()
+      })
+    })
+  }
+
   upload (fileIDs) {
   upload (fileIDs) {
     if (fileIDs.length === 0) return Promise.resolve()
     if (fileIDs.length === 0) return Promise.resolve()
 
 
     const promises = fileIDs.map((id) => {
     const promises = fileIDs.map((id) => {
       const file = this.uppy.getFile(id)
       const file = this.uppy.getFile(id)
       if (file.isRemote) {
       if (file.isRemote) {
-        return Promise.reject(new Error('AwsS3/Multipart: Remote file uploads are not currently supported.'))
+        return this.uploadRemote(file)
       }
       }
       return this.uploadFile(file)
       return this.uploadFile(file)
     })
     })
@@ -218,6 +372,57 @@ module.exports = class AwsS3Multipart extends Plugin {
     })
     })
   }
   }
 
 
+  onFileRemove (fileID, cb) {
+    this.uploaderEvents[fileID].on('file-removed', (file) => {
+      if (fileID === file.id) cb(file.id)
+    })
+  }
+
+  onFilePause (fileID, cb) {
+    this.uploaderEvents[fileID].on('upload-pause', (targetFileID, isPaused) => {
+      if (fileID === targetFileID) {
+        // const isPaused = this.uppy.pauseResume(fileID)
+        cb(isPaused)
+      }
+    })
+  }
+
+  onRetry (fileID, cb) {
+    this.uploaderEvents[fileID].on('upload-retry', (targetFileID) => {
+      if (fileID === targetFileID) {
+        cb()
+      }
+    })
+  }
+
+  onRetryAll (fileID, cb) {
+    this.uploaderEvents[fileID].on('retry-all', (filesToRetry) => {
+      if (!this.uppy.getFile(fileID)) return
+      cb()
+    })
+  }
+
+  onPauseAll (fileID, cb) {
+    this.uploaderEvents[fileID].on('pause-all', () => {
+      if (!this.uppy.getFile(fileID)) return
+      cb()
+    })
+  }
+
+  onCancelAll (fileID, cb) {
+    this.uploaderEvents[fileID].on('cancel-all', () => {
+      if (!this.uppy.getFile(fileID)) return
+      cb()
+    })
+  }
+
+  onResumeAll (fileID, cb) {
+    this.uploaderEvents[fileID].on('resume-all', () => {
+      if (!this.uppy.getFile(fileID)) return
+      cb()
+    })
+  }
+
   install () {
   install () {
     this.addResumableUploadsCapabilityFlag()
     this.addResumableUploadsCapabilityFlag()
     this.uppy.addUploader(this.upload)
     this.uppy.addUploader(this.upload)