redis-emitter.js 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. const redis = require('redis')
  2. const { EventEmitter } = require('node:events')
  3. const logger = require('../logger')
  4. /**
  5. * This module simulates the builtin events.EventEmitter but with the use of redis.
  6. * This is useful for when companion is running on multiple instances and events need
  7. * to be distributed across.
  8. */
  9. module.exports = (redisUrl, redisPubSubScope) => {
  10. const prefix = redisPubSubScope ? `${redisPubSubScope}:` : ''
  11. const getPrefixedEventName = (eventName) => `${prefix}${eventName}`
  12. const publisher = redis.createClient({ url: redisUrl })
  13. publisher.on('error', err => logger.error('publisher redis error', err))
  14. let subscriber
  15. const connectedPromise = publisher.connect().then(() => {
  16. subscriber = publisher.duplicate()
  17. subscriber.on('error', err => logger.error('subscriber redis error', err))
  18. return subscriber.connect()
  19. })
  20. const handlersByEvent = new Map()
  21. const errorEmitter = new EventEmitter()
  22. const handleError = (err) => errorEmitter.emit('error', err)
  23. connectedPromise.catch((err) => handleError(err))
  24. async function runWhenConnected (fn) {
  25. try {
  26. await connectedPromise
  27. await fn()
  28. } catch (err) {
  29. handleError(err)
  30. }
  31. }
  32. function addListener (eventName, handler, _once = false) {
  33. function actualHandler (message) {
  34. if (_once) removeListener(eventName, handler)
  35. let args
  36. try {
  37. args = JSON.parse(message)
  38. } catch (ex) {
  39. return handleError(new Error(`Invalid JSON received! Channel: ${eventName} Message: ${message}`))
  40. }
  41. return handler(...args)
  42. }
  43. let handlersByThisEventName = handlersByEvent.get(eventName)
  44. if (handlersByThisEventName == null) {
  45. handlersByThisEventName = new WeakMap()
  46. handlersByEvent.set(eventName, handlersByThisEventName)
  47. }
  48. handlersByThisEventName.set(handler, actualHandler)
  49. runWhenConnected(() => subscriber.pSubscribe(getPrefixedEventName(eventName), actualHandler))
  50. }
  51. /**
  52. * Add an event listener
  53. *
  54. * @param {string} eventName name of the event
  55. * @param {any} handler the handler of the event
  56. */
  57. function on (eventName, handler) {
  58. if (eventName === 'error') return errorEmitter.on('error', handler)
  59. return addListener(eventName, handler)
  60. }
  61. /**
  62. * Add an event listener (will be triggered at most once)
  63. *
  64. * @param {string} eventName name of the event
  65. * @param {any} handler the handler of the event
  66. */
  67. function once (eventName, handler) {
  68. if (eventName === 'error') return errorEmitter.once('error', handler)
  69. return addListener(eventName, handler, true)
  70. }
  71. /**
  72. * Announce the occurrence of an event
  73. *
  74. * @param {string} eventName name of the event
  75. */
  76. function emit (eventName, ...args) {
  77. runWhenConnected(() => publisher.publish(getPrefixedEventName(eventName), JSON.stringify(args)))
  78. }
  79. /**
  80. * Remove an event listener
  81. *
  82. * @param {string} eventName name of the event
  83. * @param {any} handler the handler of the event to remove
  84. */
  85. function removeListener (eventName, handler) {
  86. if (eventName === 'error') return errorEmitter.removeListener('error', handler)
  87. return runWhenConnected(() => {
  88. const handlersByThisEventName = handlersByEvent.get(eventName)
  89. if (handlersByThisEventName == null) return undefined
  90. const actualHandler = handlersByThisEventName.get(handler)
  91. if (actualHandler == null) return undefined
  92. handlersByThisEventName.delete(handler)
  93. if (handlersByThisEventName.size === 0) handlersByEvent.delete(eventName)
  94. return subscriber.pUnsubscribe(getPrefixedEventName(eventName), actualHandler)
  95. })
  96. }
  97. /**
  98. * Remove all listeners of an event
  99. *
  100. * @param {string} eventName name of the event
  101. */
  102. function removeAllListeners (eventName) {
  103. if (eventName === 'error') return errorEmitter.removeAllListeners(eventName)
  104. return runWhenConnected(() => {
  105. handlersByEvent.delete(eventName)
  106. return subscriber.pUnsubscribe(getPrefixedEventName(eventName))
  107. })
  108. }
  109. return {
  110. on,
  111. once,
  112. emit,
  113. removeListener,
  114. removeAllListeners,
  115. }
  116. }