MultipartUploader.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. const { AbortController, createAbortError } = require('@uppy/utils/lib/AbortController')
  2. const delay = require('@uppy/utils/lib/delay')
  3. const MB = 1024 * 1024
  4. const defaultOptions = {
  5. limit: 1,
  6. retryDelays: [0, 1000, 3000, 5000],
  7. getChunkSize (file) {
  8. return Math.ceil(file.size / 10000)
  9. },
  10. onStart () {},
  11. onProgress () {},
  12. onPartComplete () {},
  13. onSuccess () {},
  14. onError (err) {
  15. throw err
  16. },
  17. }
  18. function ensureInt (value) {
  19. if (typeof value === 'string') {
  20. return parseInt(value, 10)
  21. }
  22. if (typeof value === 'number') {
  23. return value
  24. }
  25. throw new TypeError('Expected a number')
  26. }
  27. class MultipartUploader {
  28. constructor (file, options) {
  29. this.options = {
  30. ...defaultOptions,
  31. ...options,
  32. }
  33. // Use default `getChunkSize` if it was null or something
  34. if (!this.options.getChunkSize) {
  35. this.options.getChunkSize = defaultOptions.getChunkSize
  36. }
  37. this.file = file
  38. this.abortController = new AbortController()
  39. this.key = this.options.key || null
  40. this.uploadId = this.options.uploadId || null
  41. this.parts = []
  42. // Do `this.createdPromise.then(OP)` to execute an operation `OP` _only_ if the
  43. // upload was created already. That also ensures that the sequencing is right
  44. // (so the `OP` definitely happens if the upload is created).
  45. //
  46. // This mostly exists to make `#abortUpload` work well: only sending the abort request if
  47. // the upload was already created, and if the createMultipartUpload request is still in flight,
  48. // aborting it immediately after it finishes.
  49. this.createdPromise = Promise.reject() // eslint-disable-line prefer-promise-reject-errors
  50. this.isPaused = false
  51. this.partsInProgress = 0
  52. this.chunks = null
  53. this.chunkState = null
  54. this.lockedCandidatesForBatch = []
  55. this.#initChunks()
  56. this.createdPromise.catch(() => {}) // silence uncaught rejection warning
  57. }
  58. /**
  59. * Was this upload aborted?
  60. *
  61. * If yes, we may need to throw an AbortError.
  62. *
  63. * @returns {boolean}
  64. */
  65. #aborted () {
  66. return this.abortController.signal.aborted
  67. }
  68. #initChunks () {
  69. const chunks = []
  70. const desiredChunkSize = this.options.getChunkSize(this.file)
  71. // at least 5MB per request, at most 10k requests
  72. const minChunkSize = Math.max(5 * MB, Math.ceil(this.file.size / 10000))
  73. const chunkSize = Math.max(desiredChunkSize, minChunkSize)
  74. // Upload zero-sized files in one zero-sized chunk
  75. if (this.file.size === 0) {
  76. chunks.push(this.file)
  77. } else {
  78. for (let i = 0; i < this.file.size; i += chunkSize) {
  79. const end = Math.min(this.file.size, i + chunkSize)
  80. chunks.push(this.file.slice(i, end))
  81. }
  82. }
  83. this.chunks = chunks
  84. this.chunkState = chunks.map(() => ({
  85. uploaded: 0,
  86. busy: false,
  87. done: false,
  88. }))
  89. }
  90. #createUpload () {
  91. this.createdPromise = Promise.resolve().then(() => this.options.createMultipartUpload())
  92. return this.createdPromise.then((result) => {
  93. if (this.#aborted()) throw createAbortError()
  94. const valid = typeof result === 'object' && result
  95. && typeof result.uploadId === 'string'
  96. && typeof result.key === 'string'
  97. if (!valid) {
  98. throw new TypeError('AwsS3/Multipart: Got incorrect result from `createMultipartUpload()`, expected an object `{ uploadId, key }`.')
  99. }
  100. this.key = result.key
  101. this.uploadId = result.uploadId
  102. this.options.onStart(result)
  103. this.#uploadParts()
  104. }).catch((err) => {
  105. this.#onError(err)
  106. })
  107. }
  108. async #resumeUpload () {
  109. try {
  110. const parts = await this.options.listParts({
  111. uploadId: this.uploadId,
  112. key: this.key,
  113. })
  114. if (this.#aborted()) throw createAbortError()
  115. parts.forEach((part) => {
  116. const i = part.PartNumber - 1
  117. this.chunkState[i] = {
  118. uploaded: ensureInt(part.Size),
  119. etag: part.ETag,
  120. done: true,
  121. }
  122. // Only add if we did not yet know about this part.
  123. if (!this.parts.some((p) => p.PartNumber === part.PartNumber)) {
  124. this.parts.push({
  125. PartNumber: part.PartNumber,
  126. ETag: part.ETag,
  127. })
  128. }
  129. })
  130. this.#uploadParts()
  131. } catch (err) {
  132. this.#onError(err)
  133. }
  134. }
  135. #uploadParts () {
  136. if (this.isPaused) return
  137. // All parts are uploaded.
  138. if (this.chunkState.every((state) => state.done)) {
  139. this.#completeUpload()
  140. return
  141. }
  142. // For a 100MB file, with the default min chunk size of 5MB and a limit of 10:
  143. //
  144. // Total 20 parts
  145. // ---------
  146. // Need 1 is 10
  147. // Need 2 is 5
  148. // Need 3 is 5
  149. const need = this.options.limit - this.partsInProgress
  150. const completeChunks = this.chunkState.filter((state) => state.done).length
  151. const remainingChunks = this.chunks.length - completeChunks
  152. let minNeeded = Math.ceil(this.options.limit / 2)
  153. if (minNeeded > remainingChunks) {
  154. minNeeded = remainingChunks
  155. }
  156. if (need < minNeeded) return
  157. const candidates = []
  158. for (let i = 0; i < this.chunkState.length; i++) {
  159. if (this.lockedCandidatesForBatch.includes(i)) continue
  160. const state = this.chunkState[i]
  161. if (state.done || state.busy) continue
  162. candidates.push(i)
  163. if (candidates.length >= need) {
  164. break
  165. }
  166. }
  167. if (candidates.length === 0) return
  168. this.#prepareUploadParts(candidates).then((result) => {
  169. candidates.forEach((index) => {
  170. const partNumber = index + 1
  171. const prePreparedPart = { url: result.presignedUrls[partNumber], headers: result.headers }
  172. this.#uploadPartRetryable(index, prePreparedPart).then(() => {
  173. this.#uploadParts()
  174. }, (err) => {
  175. this.#onError(err)
  176. })
  177. })
  178. })
  179. }
  180. #retryable ({ before, attempt, after }) {
  181. const { retryDelays } = this.options
  182. const { signal } = this.abortController
  183. if (before) before()
  184. function shouldRetry (err) {
  185. if (err.source && typeof err.source.status === 'number') {
  186. const { status } = err.source
  187. // 0 probably indicates network failure
  188. return status === 0 || status === 409 || status === 423 || (status >= 500 && status < 600)
  189. }
  190. return false
  191. }
  192. const doAttempt = (retryAttempt) => attempt().catch((err) => {
  193. if (this.#aborted()) throw createAbortError()
  194. if (shouldRetry(err) && retryAttempt < retryDelays.length) {
  195. return delay(retryDelays[retryAttempt], { signal })
  196. .then(() => doAttempt(retryAttempt + 1))
  197. }
  198. throw err
  199. })
  200. return doAttempt(0).then((result) => {
  201. if (after) after()
  202. return result
  203. }, (err) => {
  204. if (after) after()
  205. throw err
  206. })
  207. }
  208. async #prepareUploadParts (candidates) {
  209. this.lockedCandidatesForBatch.push(...candidates)
  210. const result = await this.options.prepareUploadParts({
  211. key: this.key,
  212. uploadId: this.uploadId,
  213. partNumbers: candidates.map((index) => index + 1),
  214. })
  215. const valid = typeof result?.presignedUrls === 'object'
  216. if (!valid) {
  217. throw new TypeError(
  218. 'AwsS3/Multipart: Got incorrect result from `prepareUploadParts()`, expected an object `{ presignedUrls }`.'
  219. )
  220. }
  221. return result
  222. }
  223. #uploadPartRetryable (index, prePreparedPart) {
  224. return this.#retryable({
  225. before: () => {
  226. this.partsInProgress += 1
  227. },
  228. attempt: () => this.#uploadPart(index, prePreparedPart),
  229. after: () => {
  230. this.partsInProgress -= 1
  231. },
  232. })
  233. }
  234. #uploadPart (index, prePreparedPart) {
  235. const body = this.chunks[index]
  236. this.chunkState[index].busy = true
  237. const valid = typeof prePreparedPart?.url === 'string'
  238. if (!valid) {
  239. throw new TypeError('AwsS3/Multipart: Got incorrect result for `prePreparedPart`, expected an object `{ url }`.')
  240. }
  241. const { url, headers } = prePreparedPart
  242. if (this.#aborted()) {
  243. this.chunkState[index].busy = false
  244. throw createAbortError()
  245. }
  246. return this.#uploadPartBytes(index, url, headers)
  247. }
  248. #onPartProgress (index, sent, total) {
  249. this.chunkState[index].uploaded = ensureInt(sent)
  250. const totalUploaded = this.chunkState.reduce((n, c) => n + c.uploaded, 0)
  251. this.options.onProgress(totalUploaded, this.file.size)
  252. }
  253. #onPartComplete (index, etag) {
  254. this.chunkState[index].etag = etag
  255. this.chunkState[index].done = true
  256. const part = {
  257. PartNumber: index + 1,
  258. ETag: etag,
  259. }
  260. this.parts.push(part)
  261. this.options.onPartComplete(part)
  262. }
  263. #uploadPartBytes (index, url, headers) {
  264. const body = this.chunks[index]
  265. const { signal } = this.abortController
  266. let defer
  267. const promise = new Promise((resolve, reject) => {
  268. defer = { resolve, reject }
  269. })
  270. const xhr = new XMLHttpRequest()
  271. xhr.open('PUT', url, true)
  272. if (headers) {
  273. Object.keys(headers).map((key) => {
  274. xhr.setRequestHeader(key, headers[key])
  275. })
  276. }
  277. xhr.responseType = 'text'
  278. function cleanup () {
  279. signal.removeEventListener('abort', onabort)
  280. }
  281. function onabort () {
  282. xhr.abort()
  283. }
  284. signal.addEventListener('abort', onabort)
  285. xhr.upload.addEventListener('progress', (ev) => {
  286. if (!ev.lengthComputable) return
  287. this.#onPartProgress(index, ev.loaded, ev.total)
  288. })
  289. xhr.addEventListener('abort', (ev) => {
  290. cleanup()
  291. this.chunkState[index].busy = false
  292. defer.reject(createAbortError())
  293. })
  294. xhr.addEventListener('load', (ev) => {
  295. cleanup()
  296. this.chunkState[index].busy = false
  297. if (ev.target.status < 200 || ev.target.status >= 300) {
  298. const error = new Error('Non 2xx')
  299. error.source = ev.target
  300. defer.reject(error)
  301. return
  302. }
  303. this.#onPartProgress(index, body.size, body.size)
  304. // NOTE This must be allowed by CORS.
  305. const etag = ev.target.getResponseHeader('ETag')
  306. if (etag === null) {
  307. defer.reject(new Error('AwsS3/Multipart: Could not read the ETag header. This likely means CORS is not configured correctly on the S3 Bucket. See https://uppy.io/docs/aws-s3-multipart#S3-Bucket-Configuration for instructions.'))
  308. return
  309. }
  310. this.#onPartComplete(index, etag)
  311. defer.resolve()
  312. })
  313. xhr.addEventListener('error', (ev) => {
  314. cleanup()
  315. this.chunkState[index].busy = false
  316. const error = new Error('Unknown error')
  317. error.source = ev.target
  318. defer.reject(error)
  319. })
  320. xhr.send(body)
  321. return promise
  322. }
  323. async #completeUpload () {
  324. // Parts may not have completed uploading in sorted order, if limit > 1.
  325. this.parts.sort((a, b) => a.PartNumber - b.PartNumber)
  326. try {
  327. const result = await this.options.completeMultipartUpload({
  328. key: this.key,
  329. uploadId: this.uploadId,
  330. parts: this.parts,
  331. })
  332. this.options.onSuccess(result)
  333. } catch (err) {
  334. this.#onError(err)
  335. }
  336. }
  337. #abortUpload () {
  338. this.abortController.abort()
  339. this.createdPromise.then(() => {
  340. this.options.abortMultipartUpload({
  341. key: this.key,
  342. uploadId: this.uploadId,
  343. })
  344. }, () => {
  345. // if the creation failed we do not need to abort
  346. })
  347. }
  348. #onError (err) {
  349. if (err && err.name === 'AbortError') {
  350. return
  351. }
  352. this.options.onError(err)
  353. }
  354. start () {
  355. this.isPaused = false
  356. if (this.uploadId) {
  357. this.#resumeUpload()
  358. } else {
  359. this.#createUpload()
  360. }
  361. }
  362. pause () {
  363. this.abortController.abort()
  364. // Swap it out for a new controller, because this instance may be resumed later.
  365. this.abortController = new AbortController()
  366. this.isPaused = true
  367. }
  368. abort (opts = {}) {
  369. const really = opts.really || false
  370. if (!really) return this.pause()
  371. this.#abortUpload()
  372. }
  373. }
  374. module.exports = MultipartUploader