ソースを参照

transloadit: Emit missed events on restore.

Diffs assembly status to find new `uploads` and `results`, and emits
`transloadit:upload` and `transloadit:result` events accordingly.
Also emits `transloadit:complete` for newly finished assemblies
and `transloadit:assembly-error` for assemblies that failed.

This feature needs to store some custom data with the RestoreFiles
plugin. It does this by adding a `restore:get-data` event that gets a
callback as a parameter. This callback stores custom plugin data. It's
not great but idk.

    core.on('restore:get-data', (setData) => {
      setData({ whatever: 'beepboop' })
    })
Renée Kooi 7 年 前
コミット
d5a6b0fe3c
2 ファイル変更106 行追加9 行削除
  1. 14 3
      src/plugins/GoldenRetriever/index.js
  2. 92 6
      src/plugins/Transloadit/index.js

+ 14 - 3
src/plugins/GoldenRetriever/index.js

@@ -49,7 +49,12 @@ module.exports = class GoldenRetriever extends Plugin {
 
 
     if (savedState) {
     if (savedState) {
       this.uppy.log('Recovered some state from Local Storage')
       this.uppy.log('Recovered some state from Local Storage')
-      this.uppy.setState(savedState)
+      this.uppy.setState({
+        currentUploads: savedState.currentUploads || {},
+        files: savedState.files || {}
+      })
+
+      this.savedPluginData = savedState.pluginData
     }
     }
   }
   }
 
 
@@ -99,9 +104,15 @@ module.exports = class GoldenRetriever extends Plugin {
       this.getUploadingFiles()
       this.getUploadingFiles()
     )
     )
 
 
+    const pluginData = {}
+    this.uppy.emit('restore:get-data', (data) => {
+      Object.assign(pluginData, data)
+    })
+
     this.MetaDataStore.save({
     this.MetaDataStore.save({
       currentUploads: this.uppy.state.currentUploads,
       currentUploads: this.uppy.state.currentUploads,
-      files: filesToSave
+      files: filesToSave,
+      pluginData: pluginData
     })
     })
   }
   }
 
 
@@ -157,7 +168,7 @@ module.exports = class GoldenRetriever extends Plugin {
     this.uppy.setState({
     this.uppy.setState({
       files: updatedFiles
       files: updatedFiles
     })
     })
-    this.uppy.emit('restored')
+    this.uppy.emit('restored', this.savedPluginData)
 
 
     if (obsoleteBlobs.length) {
     if (obsoleteBlobs.length) {
       this.deleteBlobs(obsoleteBlobs).then(() => {
       this.deleteBlobs(obsoleteBlobs).then(() => {

+ 92 - 6
src/plugins/Transloadit/index.js

@@ -51,6 +51,7 @@ module.exports = class Transloadit extends Plugin {
     this.afterUpload = this.afterUpload.bind(this)
     this.afterUpload = this.afterUpload.bind(this)
     this.onFileUploadURLAvailable = this.onFileUploadURLAvailable.bind(this)
     this.onFileUploadURLAvailable = this.onFileUploadURLAvailable.bind(this)
     this.onRestored = this.onRestored.bind(this)
     this.onRestored = this.onRestored.bind(this)
+    this.getPersistentData = this.getPersistentData.bind(this)
 
 
     if (this.opts.params) {
     if (this.opts.params) {
       this.validateParams(this.opts.params)
       this.validateParams(this.opts.params)
@@ -282,7 +283,29 @@ module.exports = class Transloadit extends Plugin {
     })
     })
   }
   }
 
 
-  onRestored () {
+  getPersistentData (setData) {
+    const state = this.getPluginState()
+    const assemblies = state.assemblies
+    const uploads = Object.keys(state.files)
+    const results = state.results.map((result) => result.id)
+
+    setData({
+      [this.id]: {
+        assemblies,
+        uploads,
+        results
+      }
+    })
+  }
+
+  onRestored (pluginData) {
+    const knownUploads = pluginData[this.id].files || []
+    const knownResults = pluginData[this.id].results || []
+    const previousAssemblies = pluginData[this.id].assemblies || {}
+
+    const allUploads = []
+    const allResults = []
+
     // Fetch up-to-date assembly statuses.
     // Fetch up-to-date assembly statuses.
     const loadAssemblies = () => {
     const loadAssemblies = () => {
       const fileIDs = Object.keys(this.core.state.files)
       const fileIDs = Object.keys(this.core.state.files)
@@ -305,6 +328,10 @@ module.exports = class Transloadit extends Plugin {
 
 
     const reconnectSockets = (assemblies) => {
     const reconnectSockets = (assemblies) => {
       return Promise.all(assemblies.map((assembly) => {
       return Promise.all(assemblies.map((assembly) => {
+        // No need to connect to the socket if the assembly has completed by now.
+        if (assembly.ok === 'ASSEMBLY_COMPLETE') {
+          return null
+        }
         return this.connectSocket(assembly)
         return this.connectSocket(assembly)
       }))
       }))
     }
     }
@@ -342,6 +369,10 @@ module.exports = class Transloadit extends Plugin {
 
 
         assembly.uploads.forEach((uploadedFile) => {
         assembly.uploads.forEach((uploadedFile) => {
           const file = this.findFile(uploadedFile)
           const file = this.findFile(uploadedFile)
+          allUploads.push({
+            assembly: assembly.assembly_id,
+            uploadedFile
+          })
           files[uploadedFile.id] = {
           files[uploadedFile.id] = {
             id: file.id,
             id: file.id,
             uploadedFile
             uploadedFile
@@ -353,6 +384,11 @@ module.exports = class Transloadit extends Plugin {
           assembly.results[stepName].forEach((result) => {
           assembly.results[stepName].forEach((result) => {
             const file = state.files[result.original_id]
             const file = state.files[result.original_id]
             result.localId = file ? file.id : null
             result.localId = file ? file.id : null
+            allResults.push({
+              assembly: assembly.assembly_id,
+              stepName,
+              result
+            })
             results.push(result)
             results.push(result)
           })
           })
         })
         })
@@ -377,8 +413,46 @@ module.exports = class Transloadit extends Plugin {
     }
     }
 
 
     const emitMissedEvents = () => {
     const emitMissedEvents = () => {
-      // TODO: Emit events for completed uploads, completed results,
-      // completed assemblies, that we've missed while we were away.
+      // Emit events for completed uploads and completed results
+      // that we've missed while we were away.
+      const newUploads = allUploads.filter((up) => {
+        return knownUploads.indexOf(up.uploadedFile.id) === -1
+      })
+      const newResults = allResults.filter((result) => {
+        return knownResults.indexOf(result.result.id) === -1
+      })
+
+      console.log('[Transloadit] New fully uploaded files since restore:', newUploads)
+      newUploads.forEach(({ assembly, uploadedFile }) => {
+        console.log('  emitting transloadit:upload', uploadedFile.id)
+        this.core.emit('transloadit:upload', uploadedFile, this.getAssembly(assembly))
+      })
+      console.log('[Transloadit] New results since restore:', newResults)
+      newResults.forEach(({ assembly, stepName, result }) => {
+        console.log('  emitting transloadit:result', stepName, result.id)
+        this.core.emit('transloadit:result', stepName, result, this.getAssembly(assembly))
+      })
+
+      const newAssemblies = this.getPluginState().assemblies
+      console.log('[Transloadit] Current assembly status after restore', newAssemblies)
+      console.log('[Transloadit] Assembly status before restore', previousAssemblies)
+      Object.keys(newAssemblies).forEach((assemblyId) => {
+        const oldAssembly = previousAssemblies[assemblyId]
+        diffAssemblyStatus(oldAssembly, newAssemblies[assemblyId])
+      })
+    }
+
+    // Emit events for assemblies that have completed or errored while we were away.
+    const diffAssemblyStatus = (prev, next) => {
+      console.log('[Transloadit] Diff assemblies', prev, next)
+      if (next.ok === 'ASSEMBLY_COMPLETED' && prev.ok !== 'ASSEMBLY_COMPLETED') {
+        console.log('  Emitting transloadit:complete for', next.assembly_id, next)
+        this.core.emit('transloadit:complete', next)
+      }
+      if (next.error && !prev.error) {
+        console.log('  !!! Emitting transloadit:assembly-error for', next.assembly_id, next)
+        this.core.emit('transloadit:assembly-error', next, new Error(next.message))
+      }
     }
     }
 
 
     // Restore all assembly state.
     // Restore all assembly state.
@@ -388,7 +462,14 @@ module.exports = class Transloadit extends Plugin {
         restoreState(assemblies)
         restoreState(assemblies)
         return reconnectSockets(assemblies)
         return reconnectSockets(assemblies)
       })
       })
-      .then(emitMissedEvents)
+      .then(() => {
+        // Wait for a bit, so the Promise resolves and `afterUpload` can
+        // add event handlers.
+        // Then we emit the events.
+        // This is reliable, because Promises use the microtask queue, and timeouts
+        // use the macrotask queue—microtasks are executed first.
+        setTimeout(emitMissedEvents, 10)
+      })
 
 
     this.restored.then(() => {
     this.restored.then(() => {
       this.restored = null
       this.restored = null
@@ -492,7 +573,7 @@ module.exports = class Transloadit extends Plugin {
     // If we're still restoring state, wait for that to be done.
     // If we're still restoring state, wait for that to be done.
     if (this.restored) {
     if (this.restored) {
       return this.restored.then(() => {
       return this.restored.then(() => {
-        this.afterUpload(fileIDs, uploadID)
+        return this.afterUpload(fileIDs, uploadID)
       })
       })
     }
     }
 
 
@@ -527,8 +608,10 @@ module.exports = class Transloadit extends Plugin {
       const onAssemblyFinished = (assembly) => {
       const onAssemblyFinished = (assembly) => {
         // An assembly for a different upload just finished. We can ignore it.
         // An assembly for a different upload just finished. We can ignore it.
         if (assemblyIDs.indexOf(assembly.assembly_id) === -1) {
         if (assemblyIDs.indexOf(assembly.assembly_id) === -1) {
+          console.log('[Transloadit] afterUpload(): Ignoring finished assembly', assembly.assembly_id)
           return
           return
         }
         }
+        console.log('[Transloadit] afterUpload(): Got assembly finish', assembly.assembly_id)
 
 
         // TODO set the `file.uploadURL` to a result?
         // TODO set the `file.uploadURL` to a result?
         // We will probably need an option here so the plugin user can tell us
         // We will probably need an option here so the plugin user can tell us
@@ -543,10 +626,12 @@ module.exports = class Transloadit extends Plugin {
       }
       }
 
 
       const onAssemblyError = (assembly, error) => {
       const onAssemblyError = (assembly, error) => {
-        // An assembly for a different upload just finished. We can ignore it.
+        // An assembly for a different upload just errored. We can ignore it.
         if (assemblyIDs.indexOf(assembly.assembly_id) === -1) {
         if (assemblyIDs.indexOf(assembly.assembly_id) === -1) {
+          console.log('[Transloadit] afterUpload(): Ignoring errored assembly', assembly.assembly_id)
           return
           return
         }
         }
+        console.log('[Transloadit] afterUpload(): Got assembly error', assembly.assembly_id, error)
 
 
         // Clear postprocessing state for all our files.
         // Clear postprocessing state for all our files.
         const files = this.getAssemblyFiles(assembly.assembly_id)
         const files = this.getAssemblyFiles(assembly.assembly_id)
@@ -608,6 +693,7 @@ module.exports = class Transloadit extends Plugin {
       this.uppy.on('upload-success', this.onFileUploadURLAvailable)
       this.uppy.on('upload-success', this.onFileUploadURLAvailable)
     }
     }
 
 
+    this.core.on('restore:get-data', this.getPersistentData)
     this.core.on('core:restored', this.onRestored)
     this.core.on('core:restored', this.onRestored)
 
 
     this.setPluginState({
     this.setPluginState({