RateLimitedQueue.js 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. function createCancelError (cause) {
  2. return new Error('Cancelled', { cause })
  3. }
  4. function abortOn (signal) {
  5. if (signal != null) {
  6. const abortPromise = () => this.abort(signal.reason)
  7. signal.addEventListener('abort', abortPromise, { once: true })
  8. const removeAbortListener = () => { signal.removeEventListener('abort', abortPromise) }
  9. this.then?.(removeAbortListener, removeAbortListener)
  10. }
  11. return this
  12. }
  13. export class RateLimitedQueue {
  14. #activeRequests = 0
  15. #queuedHandlers = []
  16. #paused = false
  17. #pauseTimer
  18. #downLimit = 1
  19. #upperLimit
  20. #rateLimitingTimer
  21. constructor (limit) {
  22. if (typeof limit !== 'number' || limit === 0) {
  23. this.limit = Infinity
  24. } else {
  25. this.limit = limit
  26. }
  27. }
  28. #call (fn) {
  29. this.#activeRequests += 1
  30. let done = false
  31. let cancelActive
  32. try {
  33. cancelActive = fn()
  34. } catch (err) {
  35. this.#activeRequests -= 1
  36. throw err
  37. }
  38. return {
  39. abort: (cause) => {
  40. if (done) return
  41. done = true
  42. this.#activeRequests -= 1
  43. cancelActive?.(cause)
  44. this.#queueNext()
  45. },
  46. done: () => {
  47. if (done) return
  48. done = true
  49. this.#activeRequests -= 1
  50. this.#queueNext()
  51. },
  52. }
  53. }
  54. #queueNext () {
  55. // Do it soon but not immediately, this allows clearing out the entire queue synchronously
  56. // one by one without continuously _advancing_ it (and starting new tasks before immediately
  57. // aborting them)
  58. queueMicrotask(() => this.#next())
  59. }
  60. #next () {
  61. if (this.#paused || this.#activeRequests >= this.limit) {
  62. return
  63. }
  64. if (this.#queuedHandlers.length === 0) {
  65. return
  66. }
  67. // Dispatch the next request, and update the abort/done handlers
  68. // so that cancelling it does the Right Thing (and doesn't just try
  69. // to dequeue an already-running request).
  70. const next = this.#queuedHandlers.shift()
  71. const handler = this.#call(next.fn)
  72. next.abort = handler.abort
  73. next.done = handler.done
  74. }
  75. #queue (fn, options = {}) {
  76. const handler = {
  77. fn,
  78. priority: options.priority || 0,
  79. abort: () => {
  80. this.#dequeue(handler)
  81. },
  82. done: () => {
  83. throw new Error('Cannot mark a queued request as done: this indicates a bug')
  84. },
  85. }
  86. const index = this.#queuedHandlers.findIndex((other) => {
  87. return handler.priority > other.priority
  88. })
  89. if (index === -1) {
  90. this.#queuedHandlers.push(handler)
  91. } else {
  92. this.#queuedHandlers.splice(index, 0, handler)
  93. }
  94. return handler
  95. }
  96. #dequeue (handler) {
  97. const index = this.#queuedHandlers.indexOf(handler)
  98. if (index !== -1) {
  99. this.#queuedHandlers.splice(index, 1)
  100. }
  101. }
  102. run (fn, queueOptions) {
  103. if (!this.#paused && this.#activeRequests < this.limit) {
  104. return this.#call(fn)
  105. }
  106. return this.#queue(fn, queueOptions)
  107. }
  108. wrapSyncFunction (fn, queueOptions) {
  109. return (...args) => {
  110. const queuedRequest = this.run(() => {
  111. fn(...args)
  112. queueMicrotask(() => queuedRequest.done())
  113. return () => {}
  114. }, queueOptions)
  115. return {
  116. abortOn,
  117. abort () {
  118. queuedRequest.abort()
  119. },
  120. }
  121. }
  122. }
  123. wrapPromiseFunction (fn, queueOptions) {
  124. return (...args) => {
  125. let queuedRequest
  126. const outerPromise = new Promise((resolve, reject) => {
  127. queuedRequest = this.run(() => {
  128. let cancelError
  129. let innerPromise
  130. try {
  131. innerPromise = Promise.resolve(fn(...args))
  132. } catch (err) {
  133. innerPromise = Promise.reject(err)
  134. }
  135. innerPromise.then((result) => {
  136. if (cancelError) {
  137. reject(cancelError)
  138. } else {
  139. queuedRequest.done()
  140. resolve(result)
  141. }
  142. }, (err) => {
  143. if (cancelError) {
  144. reject(cancelError)
  145. } else {
  146. queuedRequest.done()
  147. reject(err)
  148. }
  149. })
  150. return (cause) => {
  151. cancelError = createCancelError(cause)
  152. }
  153. }, queueOptions)
  154. })
  155. outerPromise.abort = (cause) => {
  156. queuedRequest.abort(cause)
  157. }
  158. outerPromise.abortOn = abortOn
  159. return outerPromise
  160. }
  161. }
  162. resume () {
  163. this.#paused = false
  164. clearTimeout(this.#pauseTimer)
  165. for (let i = 0; i < this.limit; i++) {
  166. this.#queueNext()
  167. }
  168. }
  169. #resume = () => this.resume()
  170. /**
  171. * Freezes the queue for a while or indefinitely.
  172. *
  173. * @param {number | null } [duration] Duration for the pause to happen, in milliseconds.
  174. * If omitted, the queue won't resume automatically.
  175. */
  176. pause (duration = null) {
  177. this.#paused = true
  178. clearTimeout(this.#pauseTimer)
  179. if (duration != null) {
  180. this.#pauseTimer = setTimeout(this.#resume, duration)
  181. }
  182. }
  183. /**
  184. * Pauses the queue for a duration, and lower the limit of concurrent requests
  185. * when the queue resumes. When the queue resumes, it tries to progressively
  186. * increase the limit in `this.#increaseLimit` until another call is made to
  187. * `this.rateLimit`.
  188. * Call this function when using the RateLimitedQueue for network requests and
  189. * the remote server responds with 429 HTTP code.
  190. *
  191. * @param {number} duration in milliseconds.
  192. */
  193. rateLimit (duration) {
  194. clearTimeout(this.#rateLimitingTimer)
  195. this.pause(duration)
  196. if (this.limit > 1 && Number.isFinite(this.limit)) {
  197. this.#upperLimit = this.limit - 1
  198. this.limit = this.#downLimit
  199. this.#rateLimitingTimer = setTimeout(this.#increaseLimit, duration)
  200. }
  201. }
  202. #increaseLimit = () => {
  203. if (this.#paused) {
  204. this.#rateLimitingTimer = setTimeout(this.#increaseLimit, 0)
  205. return
  206. }
  207. this.#downLimit = this.limit
  208. this.limit = Math.ceil((this.#upperLimit + this.#downLimit) / 2)
  209. for (let i = this.#downLimit; i <= this.limit; i++) {
  210. this.#queueNext()
  211. }
  212. if (this.#upperLimit - this.#downLimit > 3) {
  213. this.#rateLimitingTimer = setTimeout(this.#increaseLimit, 2000)
  214. } else {
  215. this.#downLimit = Math.floor(this.#downLimit / 2)
  216. }
  217. }
  218. get isPaused () { return this.#paused }
  219. }
  220. export const internalRateLimitedQueue = Symbol('__queue')