|
@@ -1,34 +1,58 @@
|
|
|
-function createCancelError (cause) {
|
|
|
+function createCancelError(cause?: string) {
|
|
|
return new Error('Cancelled', { cause })
|
|
|
}
|
|
|
|
|
|
-function abortOn (signal) {
|
|
|
+function abortOn(
|
|
|
+ this: { abort: (cause: string) => void; then?: Promise<any>['then'] },
|
|
|
+ signal?: AbortSignal,
|
|
|
+) {
|
|
|
if (signal != null) {
|
|
|
const abortPromise = () => this.abort(signal.reason)
|
|
|
signal.addEventListener('abort', abortPromise, { once: true })
|
|
|
- const removeAbortListener = () => { signal.removeEventListener('abort', abortPromise) }
|
|
|
+ const removeAbortListener = () => {
|
|
|
+ signal.removeEventListener('abort', abortPromise)
|
|
|
+ }
|
|
|
this.then?.(removeAbortListener, removeAbortListener)
|
|
|
}
|
|
|
|
|
|
return this
|
|
|
}
|
|
|
|
|
|
+type Handler = {
|
|
|
+ shouldBeRequeued?: boolean
|
|
|
+ fn: () => (...args: any[]) => Promise<void> | void
|
|
|
+ priority: number
|
|
|
+ abort: (cause?: unknown) => void
|
|
|
+ done: () => void
|
|
|
+}
|
|
|
+
|
|
|
+type QueueOptions = {
|
|
|
+ priority?: number
|
|
|
+}
|
|
|
+
|
|
|
+interface AbortablePromise<T> extends Promise<T> {
|
|
|
+ abort(cause?: unknown): void
|
|
|
+ abortOn: typeof abortOn
|
|
|
+}
|
|
|
+
|
|
|
export class RateLimitedQueue {
|
|
|
#activeRequests = 0
|
|
|
|
|
|
- #queuedHandlers = []
|
|
|
+ #queuedHandlers: Handler[] = []
|
|
|
|
|
|
#paused = false
|
|
|
|
|
|
- #pauseTimer
|
|
|
+ #pauseTimer: ReturnType<typeof setTimeout>
|
|
|
|
|
|
#downLimit = 1
|
|
|
|
|
|
- #upperLimit
|
|
|
+ #upperLimit: number
|
|
|
+
|
|
|
+ #rateLimitingTimer: ReturnType<typeof setTimeout>
|
|
|
|
|
|
- #rateLimitingTimer
|
|
|
+ limit: number
|
|
|
|
|
|
- constructor (limit) {
|
|
|
+ constructor(limit?: number) {
|
|
|
if (typeof limit !== 'number' || limit === 0) {
|
|
|
this.limit = Infinity
|
|
|
} else {
|
|
@@ -36,12 +60,12 @@ export class RateLimitedQueue {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- #call (fn) {
|
|
|
+ #call(fn: Handler['fn']) {
|
|
|
this.#activeRequests += 1
|
|
|
|
|
|
let done = false
|
|
|
|
|
|
- let cancelActive
|
|
|
+ let cancelActive: (cause?: unknown) => void
|
|
|
try {
|
|
|
cancelActive = fn()
|
|
|
} catch (err) {
|
|
@@ -50,7 +74,7 @@ export class RateLimitedQueue {
|
|
|
}
|
|
|
|
|
|
return {
|
|
|
- abort: (cause) => {
|
|
|
+ abort: (cause?: unknown) => {
|
|
|
if (done) return
|
|
|
done = true
|
|
|
this.#activeRequests -= 1
|
|
@@ -67,14 +91,14 @@ export class RateLimitedQueue {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- #queueNext () {
|
|
|
+ #queueNext() {
|
|
|
// Do it soon but not immediately, this allows clearing out the entire queue synchronously
|
|
|
// one by one without continuously _advancing_ it (and starting new tasks before immediately
|
|
|
// aborting them)
|
|
|
queueMicrotask(() => this.#next())
|
|
|
}
|
|
|
|
|
|
- #next () {
|
|
|
+ #next() {
|
|
|
if (this.#paused || this.#activeRequests >= this.limit) {
|
|
|
return
|
|
|
}
|
|
@@ -86,20 +110,25 @@ export class RateLimitedQueue {
|
|
|
// so that cancelling it does the Right Thing (and doesn't just try
|
|
|
// to dequeue an already-running request).
|
|
|
const next = this.#queuedHandlers.shift()
|
|
|
+ if (next == null) {
|
|
|
+ throw new Error('Invariant violation: next is null')
|
|
|
+ }
|
|
|
const handler = this.#call(next.fn)
|
|
|
next.abort = handler.abort
|
|
|
next.done = handler.done
|
|
|
}
|
|
|
|
|
|
- #queue (fn, options = {}) {
|
|
|
- const handler = {
|
|
|
+ #queue(fn: Handler['fn'], options?: QueueOptions) {
|
|
|
+ const handler: Handler = {
|
|
|
fn,
|
|
|
- priority: options.priority || 0,
|
|
|
+ priority: options?.priority || 0,
|
|
|
abort: () => {
|
|
|
this.#dequeue(handler)
|
|
|
},
|
|
|
done: () => {
|
|
|
- throw new Error('Cannot mark a queued request as done: this indicates a bug')
|
|
|
+ throw new Error(
|
|
|
+ 'Cannot mark a queued request as done: this indicates a bug',
|
|
|
+ )
|
|
|
},
|
|
|
}
|
|
|
|
|
@@ -114,22 +143,27 @@ export class RateLimitedQueue {
|
|
|
return handler
|
|
|
}
|
|
|
|
|
|
- #dequeue (handler) {
|
|
|
+ #dequeue(handler: Handler) {
|
|
|
const index = this.#queuedHandlers.indexOf(handler)
|
|
|
if (index !== -1) {
|
|
|
this.#queuedHandlers.splice(index, 1)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- run (fn, queueOptions) {
|
|
|
+ run(
|
|
|
+ fn: Handler['fn'],
|
|
|
+ queueOptions?: QueueOptions,
|
|
|
+ ): Handler | Omit<Handler, 'fn' | 'priority'> {
|
|
|
if (!this.#paused && this.#activeRequests < this.limit) {
|
|
|
return this.#call(fn)
|
|
|
}
|
|
|
return this.#queue(fn, queueOptions)
|
|
|
}
|
|
|
|
|
|
- wrapSyncFunction (fn, queueOptions) {
|
|
|
- return (...args) => {
|
|
|
+ wrapSyncFunction(fn: () => void, queueOptions: QueueOptions) {
|
|
|
+ return (
|
|
|
+ ...args: Parameters<Handler['fn']>
|
|
|
+ ): { abortOn: typeof abortOn; abort: Handler['abort'] } => {
|
|
|
const queuedRequest = this.run(() => {
|
|
|
fn(...args)
|
|
|
queueMicrotask(() => queuedRequest.done())
|
|
@@ -138,19 +172,22 @@ export class RateLimitedQueue {
|
|
|
|
|
|
return {
|
|
|
abortOn,
|
|
|
- abort () {
|
|
|
+ abort() {
|
|
|
queuedRequest.abort()
|
|
|
},
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- wrapPromiseFunction (fn, queueOptions) {
|
|
|
- return (...args) => {
|
|
|
- let queuedRequest
|
|
|
+ wrapPromiseFunction<T extends (...args: any[]) => any>(
|
|
|
+ fn: T,
|
|
|
+ queueOptions?: QueueOptions,
|
|
|
+ ) {
|
|
|
+ return (...args: Parameters<T>): AbortablePromise<ReturnType<T>> => {
|
|
|
+ let queuedRequest: ReturnType<RateLimitedQueue['run']>
|
|
|
const outerPromise = new Promise((resolve, reject) => {
|
|
|
queuedRequest = this.run(() => {
|
|
|
- let cancelError
|
|
|
+ let cancelError: ReturnType<typeof createCancelError>
|
|
|
let innerPromise
|
|
|
try {
|
|
|
innerPromise = Promise.resolve(fn(...args))
|
|
@@ -158,27 +195,30 @@ export class RateLimitedQueue {
|
|
|
innerPromise = Promise.reject(err)
|
|
|
}
|
|
|
|
|
|
- innerPromise.then((result) => {
|
|
|
- if (cancelError) {
|
|
|
- reject(cancelError)
|
|
|
- } else {
|
|
|
- queuedRequest.done()
|
|
|
- resolve(result)
|
|
|
- }
|
|
|
- }, (err) => {
|
|
|
- if (cancelError) {
|
|
|
- reject(cancelError)
|
|
|
- } else {
|
|
|
- queuedRequest.done()
|
|
|
- reject(err)
|
|
|
- }
|
|
|
- })
|
|
|
+ innerPromise.then(
|
|
|
+ (result) => {
|
|
|
+ if (cancelError) {
|
|
|
+ reject(cancelError)
|
|
|
+ } else {
|
|
|
+ queuedRequest.done()
|
|
|
+ resolve(result)
|
|
|
+ }
|
|
|
+ },
|
|
|
+ (err) => {
|
|
|
+ if (cancelError) {
|
|
|
+ reject(cancelError)
|
|
|
+ } else {
|
|
|
+ queuedRequest.done()
|
|
|
+ reject(err)
|
|
|
+ }
|
|
|
+ },
|
|
|
+ )
|
|
|
|
|
|
return (cause) => {
|
|
|
cancelError = createCancelError(cause)
|
|
|
}
|
|
|
}, queueOptions)
|
|
|
- })
|
|
|
+ }) as AbortablePromise<ReturnType<T>>
|
|
|
|
|
|
outerPromise.abort = (cause) => {
|
|
|
queuedRequest.abort(cause)
|
|
@@ -189,7 +229,7 @@ export class RateLimitedQueue {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- resume () {
|
|
|
+ resume(): void {
|
|
|
this.#paused = false
|
|
|
clearTimeout(this.#pauseTimer)
|
|
|
for (let i = 0; i < this.limit; i++) {
|
|
@@ -205,7 +245,7 @@ export class RateLimitedQueue {
|
|
|
* @param {number | null } [duration] Duration for the pause to happen, in milliseconds.
|
|
|
* If omitted, the queue won't resume automatically.
|
|
|
*/
|
|
|
- pause (duration = null) {
|
|
|
+ pause(duration: number | null = null): void {
|
|
|
this.#paused = true
|
|
|
clearTimeout(this.#pauseTimer)
|
|
|
if (duration != null) {
|
|
@@ -223,7 +263,7 @@ export class RateLimitedQueue {
|
|
|
*
|
|
|
* @param {number} duration in milliseconds.
|
|
|
*/
|
|
|
- rateLimit (duration) {
|
|
|
+ rateLimit(duration: number): void {
|
|
|
clearTimeout(this.#rateLimitingTimer)
|
|
|
this.pause(duration)
|
|
|
if (this.limit > 1 && Number.isFinite(this.limit)) {
|
|
@@ -250,7 +290,9 @@ export class RateLimitedQueue {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- get isPaused () { return this.#paused }
|
|
|
+ get isPaused(): boolean {
|
|
|
+ return this.#paused
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
export const internalRateLimitedQueue = Symbol('__queue')
|