socket.js 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. const SocketServer = require('ws').WebSocketServer
  2. const { jsonStringify } = require('./helpers/utils')
  3. const emitter = require('./emitter')
  4. const redis = require('./redis')
  5. const logger = require('./logger')
  6. const { STORAGE_PREFIX, shortenToken } = require('./Uploader')
  7. /**
  8. * the socket is used to send progress events during an upload
  9. *
  10. * @param {import('http').Server | import('https').Server} server
  11. */
  12. module.exports = (server) => {
  13. const wss = new SocketServer({ server })
  14. const redisClient = redis.client()
  15. // A new connection is usually created when an upload begins,
  16. // or when connection fails while an upload is on-going and,
  17. // client attempts to reconnect.
  18. wss.on('connection', (ws, req) => {
  19. const fullPath = req.url
  20. // the token identifies which ongoing upload's progress, the socket
  21. // connection wishes to listen to.
  22. const token = fullPath.replace(/^.*\/api\//, '')
  23. logger.info(`connection received from ${token}`, 'socket.connect')
  24. /**
  25. *
  26. * @param {{action: string, payload: object}} data
  27. */
  28. function sendProgress (data) {
  29. ws.send(jsonStringify(data), (err) => {
  30. if (err) logger.error(err, 'socket.progress.error', shortenToken(token))
  31. })
  32. }
  33. // if the redisClient is available, then we attempt to check the storage
  34. // if we have any already stored progress data on the upload.
  35. if (redisClient) {
  36. redisClient.get(`${STORAGE_PREFIX}:${token}`).then((data) => {
  37. if (data) {
  38. const dataObj = JSON.parse(data.toString())
  39. if (dataObj.action) sendProgress(dataObj)
  40. }
  41. }).catch((err) => logger.error(err, 'socket.redis.error', shortenToken(token)))
  42. }
  43. emitter().emit(`connection:${token}`)
  44. emitter().on(token, sendProgress)
  45. ws.on('message', (jsonData) => {
  46. const data = JSON.parse(jsonData.toString())
  47. // whitelist triggered actions
  48. if (['pause', 'resume', 'cancel'].includes(data.action)) {
  49. emitter().emit(`${data.action}:${token}`)
  50. }
  51. })
  52. ws.on('close', () => {
  53. emitter().removeListener(token, sendProgress)
  54. })
  55. })
  56. }