HTTPCommunicationQueue.ts 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. import type { Meta, UppyFile } from '@uppy/utils/lib/UppyFile'
  2. import type {
  3. RateLimitedQueue,
  4. WrapPromiseFunctionType,
  5. } from '@uppy/utils/lib/RateLimitedQueue'
  6. import { pausingUploadReason, type Chunk } from './MultipartUploader.ts'
  7. import type AwsS3Multipart from './index.ts'
  8. import { throwIfAborted } from './utils.ts'
  9. import type { Body, UploadPartBytesResult, UploadResult } from './utils.ts'
  10. import type { AwsS3MultipartOptions, uploadPartBytes } from './index.ts'
  11. function removeMetadataFromURL(urlString: string) {
  12. const urlObject = new URL(urlString)
  13. urlObject.search = ''
  14. urlObject.hash = ''
  15. return urlObject.href
  16. }
  17. export class HTTPCommunicationQueue<M extends Meta, B extends Body> {
  18. #abortMultipartUpload: WrapPromiseFunctionType<
  19. AwsS3Multipart<M, B>['abortMultipartUpload']
  20. >
  21. #cache = new WeakMap()
  22. #createMultipartUpload: WrapPromiseFunctionType<
  23. AwsS3Multipart<M, B>['createMultipartUpload']
  24. >
  25. #fetchSignature: WrapPromiseFunctionType<AwsS3Multipart<M, B>['signPart']>
  26. #getUploadParameters: WrapPromiseFunctionType<
  27. AwsS3Multipart<M, B>['getUploadParameters']
  28. >
  29. #listParts: WrapPromiseFunctionType<AwsS3Multipart<M, B>['listParts']>
  30. #previousRetryDelay: number
  31. #requests
  32. #retryDelays: { values: () => Iterator<number> }
  33. #sendCompletionRequest: WrapPromiseFunctionType<
  34. AwsS3Multipart<M, B>['completeMultipartUpload']
  35. >
  36. #setS3MultipartState
  37. #uploadPartBytes: WrapPromiseFunctionType<uploadPartBytes>
  38. #getFile
  39. constructor(
  40. requests: RateLimitedQueue,
  41. options: AwsS3MultipartOptions<M, B>,
  42. setS3MultipartState: (file: UppyFile<M, B>, result: UploadResult) => void,
  43. getFile: (file: UppyFile<M, B>) => UppyFile<M, B>,
  44. ) {
  45. this.#requests = requests
  46. this.#setS3MultipartState = setS3MultipartState
  47. this.#getFile = getFile
  48. this.setOptions(options)
  49. }
  50. setOptions(options: Partial<AwsS3MultipartOptions<M, B>>): void {
  51. const requests = this.#requests
  52. if ('abortMultipartUpload' in options) {
  53. this.#abortMultipartUpload = requests.wrapPromiseFunction(
  54. options.abortMultipartUpload as any,
  55. { priority: 1 },
  56. )
  57. }
  58. if ('createMultipartUpload' in options) {
  59. this.#createMultipartUpload = requests.wrapPromiseFunction(
  60. options.createMultipartUpload as any,
  61. { priority: -1 },
  62. )
  63. }
  64. if ('signPart' in options) {
  65. this.#fetchSignature = requests.wrapPromiseFunction(
  66. options.signPart as any,
  67. )
  68. }
  69. if ('listParts' in options) {
  70. this.#listParts = requests.wrapPromiseFunction(options.listParts as any)
  71. }
  72. if ('completeMultipartUpload' in options) {
  73. this.#sendCompletionRequest = requests.wrapPromiseFunction(
  74. options.completeMultipartUpload as any,
  75. { priority: 1 },
  76. )
  77. }
  78. if ('retryDelays' in options) {
  79. this.#retryDelays = options.retryDelays ?? []
  80. }
  81. if ('uploadPartBytes' in options) {
  82. this.#uploadPartBytes = requests.wrapPromiseFunction(
  83. options.uploadPartBytes as any,
  84. { priority: Infinity },
  85. )
  86. }
  87. if ('getUploadParameters' in options) {
  88. this.#getUploadParameters = requests.wrapPromiseFunction(
  89. options.getUploadParameters as any,
  90. )
  91. }
  92. }
  93. async #shouldRetry(err: any, retryDelayIterator: Iterator<number>) {
  94. const requests = this.#requests
  95. const status = err?.source?.status
  96. // TODO: this retry logic is taken out of Tus. We should have a centralized place for retrying,
  97. // perhaps the rate limited queue, and dedupe all plugins with that.
  98. if (status == null) {
  99. return false
  100. }
  101. if (status === 403 && err.message === 'Request has expired') {
  102. if (!requests.isPaused) {
  103. // We don't want to exhaust the retryDelayIterator as long as there are
  104. // more than one request in parallel, to give slower connection a chance
  105. // to catch up with the expiry set in Companion.
  106. if (requests.limit === 1 || this.#previousRetryDelay == null) {
  107. const next = retryDelayIterator.next()
  108. if (next == null || next.done) {
  109. return false
  110. }
  111. // If there are more than 1 request done in parallel, the RLQ limit is
  112. // decreased and the failed request is requeued after waiting for a bit.
  113. // If there is only one request in parallel, the limit can't be
  114. // decreased, so we iterate over `retryDelayIterator` as we do for
  115. // other failures.
  116. // `#previousRetryDelay` caches the value so we can re-use it next time.
  117. this.#previousRetryDelay = next.value
  118. }
  119. // No need to stop the other requests, we just want to lower the limit.
  120. requests.rateLimit(0)
  121. await new Promise((resolve) =>
  122. setTimeout(resolve, this.#previousRetryDelay),
  123. )
  124. }
  125. } else if (status === 429) {
  126. // HTTP 429 Too Many Requests => to avoid the whole download to fail, pause all requests.
  127. if (!requests.isPaused) {
  128. const next = retryDelayIterator.next()
  129. if (next == null || next.done) {
  130. return false
  131. }
  132. requests.rateLimit(next.value)
  133. }
  134. } else if (status > 400 && status < 500 && status !== 409) {
  135. // HTTP 4xx, the server won't send anything, it's doesn't make sense to retry
  136. return false
  137. } else if (typeof navigator !== 'undefined' && navigator.onLine === false) {
  138. // The navigator is offline, let's wait for it to come back online.
  139. if (!requests.isPaused) {
  140. requests.pause()
  141. window.addEventListener(
  142. 'online',
  143. () => {
  144. requests.resume()
  145. },
  146. { once: true },
  147. )
  148. }
  149. } else {
  150. // Other error code means the request can be retried later.
  151. const next = retryDelayIterator.next()
  152. if (next == null || next.done) {
  153. return false
  154. }
  155. await new Promise((resolve) => setTimeout(resolve, next.value))
  156. }
  157. return true
  158. }
  159. async getUploadId(
  160. file: UppyFile<M, B>,
  161. signal: AbortSignal,
  162. ): Promise<UploadResult> {
  163. let cachedResult
  164. // As the cache is updated asynchronously, there could be a race condition
  165. // where we just miss a new result so we loop here until we get nothing back,
  166. // at which point it's out turn to create a new cache entry.
  167. // eslint-disable-next-line no-cond-assign
  168. while ((cachedResult = this.#cache.get(file.data)) != null) {
  169. try {
  170. return await cachedResult
  171. } catch {
  172. // In case of failure, we want to ignore the cached error.
  173. // At this point, either there's a new cached value, or we'll exit the loop a create a new one.
  174. }
  175. }
  176. const promise = this.#createMultipartUpload(this.#getFile(file), signal)
  177. const abortPromise = () => {
  178. promise.abort(signal.reason)
  179. this.#cache.delete(file.data)
  180. }
  181. signal.addEventListener('abort', abortPromise, { once: true })
  182. this.#cache.set(file.data, promise)
  183. promise.then(
  184. async (result) => {
  185. signal.removeEventListener('abort', abortPromise)
  186. this.#setS3MultipartState(file, result)
  187. this.#cache.set(file.data, result)
  188. },
  189. () => {
  190. signal.removeEventListener('abort', abortPromise)
  191. this.#cache.delete(file.data)
  192. },
  193. )
  194. return promise
  195. }
  196. async abortFileUpload(file: UppyFile<M, B>): Promise<void> {
  197. const result = this.#cache.get(file.data)
  198. if (result == null) {
  199. // If the createMultipartUpload request never was made, we don't
  200. // need to send the abortMultipartUpload request.
  201. return
  202. }
  203. // Remove the cache entry right away for follow-up requests do not try to
  204. // use the soon-to-be aborted cached values.
  205. this.#cache.delete(file.data)
  206. this.#setS3MultipartState(file, Object.create(null))
  207. let awaitedResult
  208. try {
  209. awaitedResult = await result
  210. } catch {
  211. // If the cached result rejects, there's nothing to abort.
  212. return
  213. }
  214. await this.#abortMultipartUpload(this.#getFile(file), awaitedResult)
  215. }
  216. async #nonMultipartUpload(
  217. file: UppyFile<M, B>,
  218. chunk: Chunk,
  219. signal?: AbortSignal,
  220. ): Promise<UploadPartBytesResult & B> {
  221. const {
  222. method = 'POST',
  223. url,
  224. fields,
  225. headers,
  226. } = await this.#getUploadParameters(this.#getFile(file), {
  227. signal,
  228. }).abortOn(signal)
  229. let body
  230. const data = chunk.getData()
  231. if (method.toUpperCase() === 'POST') {
  232. const formData = new FormData()
  233. Object.entries(fields!).forEach(([key, value]) =>
  234. formData.set(key, value),
  235. )
  236. formData.set('file', data)
  237. body = formData
  238. } else {
  239. body = data
  240. }
  241. const { onProgress, onComplete } = chunk
  242. const result = await this.#uploadPartBytes({
  243. signature: { url, headers, method } as any,
  244. body,
  245. size: data.size,
  246. onProgress,
  247. onComplete,
  248. signal,
  249. }).abortOn(signal)
  250. return 'location' in result ?
  251. (result as UploadPartBytesResult & B)
  252. : ({
  253. location: removeMetadataFromURL(url),
  254. ...result,
  255. } as any)
  256. }
  257. async uploadFile(
  258. file: UppyFile<M, B>,
  259. chunks: Chunk[],
  260. signal: AbortSignal,
  261. ): Promise<B & Partial<UploadPartBytesResult>> {
  262. throwIfAborted(signal)
  263. if (chunks.length === 1 && !chunks[0].shouldUseMultipart) {
  264. return this.#nonMultipartUpload(file, chunks[0], signal)
  265. }
  266. const { uploadId, key } = await this.getUploadId(file, signal)
  267. throwIfAborted(signal)
  268. try {
  269. const parts = await Promise.all(
  270. chunks.map((chunk, i) => this.uploadChunk(file, i + 1, chunk, signal)),
  271. )
  272. throwIfAborted(signal)
  273. return await this.#sendCompletionRequest(
  274. this.#getFile(file),
  275. { key, uploadId, parts, signal },
  276. signal,
  277. ).abortOn(signal)
  278. } catch (err) {
  279. if (err?.cause !== pausingUploadReason && err?.name !== 'AbortError') {
  280. // We purposefully don't wait for the promise and ignore its status,
  281. // because we want the error `err` to bubble up ASAP to report it to the
  282. // user. A failure to abort is not that big of a deal anyway.
  283. this.abortFileUpload(file)
  284. }
  285. throw err
  286. }
  287. }
  288. restoreUploadFile(file: UppyFile<M, B>, uploadIdAndKey: UploadResult): void {
  289. this.#cache.set(file.data, uploadIdAndKey)
  290. }
  291. async resumeUploadFile(
  292. file: UppyFile<M, B>,
  293. chunks: Array<Chunk | null>,
  294. signal: AbortSignal,
  295. ): Promise<B> {
  296. throwIfAborted(signal)
  297. if (
  298. chunks.length === 1 &&
  299. chunks[0] != null &&
  300. !chunks[0].shouldUseMultipart
  301. ) {
  302. return this.#nonMultipartUpload(file, chunks[0], signal)
  303. }
  304. const { uploadId, key } = await this.getUploadId(file, signal)
  305. throwIfAborted(signal)
  306. const alreadyUploadedParts = await this.#listParts(
  307. this.#getFile(file),
  308. { uploadId, key, signal },
  309. signal,
  310. ).abortOn(signal)
  311. throwIfAborted(signal)
  312. const parts = await Promise.all(
  313. chunks.map((chunk, i) => {
  314. const partNumber = i + 1
  315. const alreadyUploadedInfo = alreadyUploadedParts.find(
  316. ({ PartNumber }) => PartNumber === partNumber,
  317. )
  318. if (alreadyUploadedInfo == null) {
  319. return this.uploadChunk(file, partNumber, chunk!, signal)
  320. }
  321. // Already uploaded chunks are set to null. If we are restoring the upload, we need to mark it as already uploaded.
  322. chunk?.setAsUploaded?.()
  323. return { PartNumber: partNumber, ETag: alreadyUploadedInfo.ETag }
  324. }),
  325. )
  326. throwIfAborted(signal)
  327. return this.#sendCompletionRequest(
  328. this.#getFile(file),
  329. { key, uploadId, parts, signal },
  330. signal,
  331. ).abortOn(signal)
  332. }
  333. async uploadChunk(
  334. file: UppyFile<M, B>,
  335. partNumber: number,
  336. chunk: Chunk,
  337. signal: AbortSignal,
  338. ): Promise<UploadPartBytesResult & { PartNumber: number }> {
  339. throwIfAborted(signal)
  340. const { uploadId, key } = await this.getUploadId(file, signal)
  341. const signatureRetryIterator = this.#retryDelays.values()
  342. const chunkRetryIterator = this.#retryDelays.values()
  343. const shouldRetrySignature = () => {
  344. const next = signatureRetryIterator.next()
  345. if (next == null || next.done) {
  346. return null
  347. }
  348. return next.value
  349. }
  350. for (;;) {
  351. throwIfAborted(signal)
  352. const chunkData = chunk.getData()
  353. const { onProgress, onComplete } = chunk
  354. let signature
  355. try {
  356. signature = await this.#fetchSignature(this.#getFile(file), {
  357. uploadId,
  358. key,
  359. partNumber,
  360. body: chunkData,
  361. signal,
  362. }).abortOn(signal)
  363. } catch (err) {
  364. const timeout = shouldRetrySignature()
  365. if (timeout == null || signal.aborted) {
  366. throw err
  367. }
  368. await new Promise((resolve) => setTimeout(resolve, timeout))
  369. // eslint-disable-next-line no-continue
  370. continue
  371. }
  372. throwIfAborted(signal)
  373. try {
  374. return {
  375. PartNumber: partNumber,
  376. ...(await this.#uploadPartBytes({
  377. signature,
  378. body: chunkData,
  379. size: chunkData.size,
  380. onProgress,
  381. onComplete,
  382. signal,
  383. }).abortOn(signal)),
  384. }
  385. } catch (err) {
  386. if (!(await this.#shouldRetry(err, chunkRetryIterator))) throw err
  387. }
  388. }
  389. }
  390. }