Skip to content

Commit e1a5cc1

Browse files
authored
Emit more progress events for xet upload (#1727)
Fix #1726 cc @assafvayner for viz ideally it there would be two progress bars, one for uploading & one for processing data. currently processed data count as 10% of the upload, but deduped bytes count as being uploaded
1 parent a943547 commit e1a5cc1

File tree

4 files changed

+198
-143
lines changed

4 files changed

+198
-143
lines changed

packages/hub/src/lib/commit.ts

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import { isFrontend } from "../utils/isFrontend";
2424
import { createBlobs } from "../utils/createBlobs";
2525
import { uploadShards } from "../utils/uploadShards";
2626
import { splitAsyncGenerator } from "../utils/splitAsyncGenerator";
27-
import { mergeAsyncGenerators } from "../utils/mergeAsyncGenerators";
2827
import { SplicedBlob } from "../utils/SplicedBlob";
2928

3029
const CONCURRENT_SHAS = 5;
@@ -401,33 +400,36 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr
401400
}
402401
})();
403402
const sources = splitAsyncGenerator(source, 5);
404-
yield* mergeAsyncGenerators(
405-
sources.map(async function* (source) {
406-
for await (const event of uploadShards(source, {
407-
fetch: params.fetch,
408-
accessToken,
409-
hubUrl: params.hubUrl ?? HUB_URL,
410-
repo: repoId,
411-
// todo: maybe leave empty if PR?
412-
rev: params.branch ?? "main",
413-
})) {
414-
if (event.event === "file") {
415-
yield {
416-
event: "fileProgress" as const,
417-
path: event.path,
418-
progress: 1,
419-
state: "uploading" as const,
420-
};
421-
} else if (event.event === "fileProgress") {
422-
yield {
423-
event: "fileProgress" as const,
424-
path: event.path,
425-
progress: event.progress,
426-
state: "uploading" as const,
427-
};
403+
yield* eventToGenerator((yieldCallback, returnCallback, rejectCallback) =>
404+
Promise.all(
405+
sources.map(async function (source) {
406+
for await (const event of uploadShards(source, {
407+
fetch: params.fetch,
408+
accessToken,
409+
hubUrl: params.hubUrl ?? HUB_URL,
410+
repo: repoId,
411+
// todo: maybe leave empty if PR?
412+
rev: params.branch ?? "main",
413+
yieldCallback: (event) => yieldCallback({ ...event, state: "uploading" }),
414+
})) {
415+
if (event.event === "file") {
416+
yieldCallback({
417+
event: "fileProgress" as const,
418+
path: event.path,
419+
progress: 1,
420+
state: "uploading" as const,
421+
});
422+
} else if (event.event === "fileProgress") {
423+
yieldCallback({
424+
event: "fileProgress" as const,
425+
path: event.path,
426+
progress: event.progress,
427+
state: "uploading" as const,
428+
});
429+
}
428430
}
429-
}
430-
})
431+
})
432+
).then(() => returnCallback(undefined), rejectCallback)
431433
);
432434
} else {
433435
yield* eventToGenerator<CommitProgressEvent, void>((yieldCallback, returnCallback, rejectCallback) => {

packages/hub/src/lib/upload-files-with-progress.ts

Lines changed: 89 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -53,111 +53,104 @@ export async function* uploadFilesWithProgress(
5353
useWebWorkers: params.useWebWorkers,
5454
abortSignal: params.abortSignal,
5555
useXet: params.useXet,
56-
fetch:
57-
params.useXet === true
58-
? // no need for custom fetch function if we use Xet, as we already have progress events in the commit function for file uploads in that case
59-
undefined
60-
: async (input, init) => {
61-
if (!init) {
62-
return fetch(input);
63-
}
56+
fetch: async (input, init) => {
57+
if (!init) {
58+
return fetch(input);
59+
}
6460

65-
if (
66-
!typedInclude(["PUT", "POST"], init.method) ||
67-
!("progressHint" in init) ||
68-
!init.progressHint ||
69-
typeof XMLHttpRequest === "undefined" ||
70-
typeof input !== "string" ||
71-
(!(init.body instanceof ArrayBuffer) &&
72-
!(init.body instanceof Blob) &&
73-
!(init.body instanceof File) &&
74-
typeof init.body !== "string")
75-
) {
76-
return fetch(input, init);
77-
}
61+
if (
62+
!typedInclude(["PUT", "POST"], init.method) ||
63+
!("progressHint" in init) ||
64+
!init.progressHint ||
65+
typeof XMLHttpRequest === "undefined" ||
66+
typeof input !== "string" ||
67+
(!(init.body instanceof ArrayBuffer) &&
68+
!(init.body instanceof Blob) &&
69+
!(init.body instanceof File) &&
70+
typeof init.body !== "string")
71+
) {
72+
return fetch(input, init);
73+
}
7874

79-
const progressHint = init.progressHint as {
80-
progressCallback: (progress: number) => void;
81-
} & (Record<string, never> | { part: number; numParts: number });
82-
const progressCallback = progressHint.progressCallback;
75+
const progressHint = init.progressHint as {
76+
progressCallback: (progress: number) => void;
77+
} & (Record<string, never> | { part: number; numParts: number });
78+
const progressCallback = progressHint.progressCallback;
8379

84-
const xhr = new XMLHttpRequest();
80+
const xhr = new XMLHttpRequest();
8581

86-
xhr.upload.addEventListener("progress", (event) => {
87-
if (event.lengthComputable) {
88-
if (progressHint.part !== undefined) {
89-
let tracking = multipartUploadTracking.get(progressCallback);
90-
if (!tracking) {
91-
tracking = { numParts: progressHint.numParts, partsProgress: {} };
92-
multipartUploadTracking.set(progressCallback, tracking);
93-
}
94-
tracking.partsProgress[progressHint.part] = event.loaded / event.total;
95-
let totalProgress = 0;
96-
for (const partProgress of Object.values(tracking.partsProgress)) {
97-
totalProgress += partProgress;
98-
}
99-
if (totalProgress === tracking.numParts) {
100-
progressCallback(0.9999999999);
101-
} else {
102-
progressCallback(totalProgress / tracking.numParts);
103-
}
104-
} else {
105-
if (event.loaded === event.total) {
106-
progressCallback(0.9999999999);
107-
} else {
108-
progressCallback(event.loaded / event.total);
109-
}
110-
}
111-
}
112-
});
82+
xhr.upload.addEventListener("progress", (event) => {
83+
if (event.lengthComputable) {
84+
if (progressHint.part !== undefined) {
85+
let tracking = multipartUploadTracking.get(progressCallback);
86+
if (!tracking) {
87+
tracking = { numParts: progressHint.numParts, partsProgress: {} };
88+
multipartUploadTracking.set(progressCallback, tracking);
89+
}
90+
tracking.partsProgress[progressHint.part] = event.loaded / event.total;
91+
let totalProgress = 0;
92+
for (const partProgress of Object.values(tracking.partsProgress)) {
93+
totalProgress += partProgress;
94+
}
95+
if (totalProgress === tracking.numParts) {
96+
progressCallback(0.9999999999);
97+
} else {
98+
progressCallback(totalProgress / tracking.numParts);
99+
}
100+
} else {
101+
if (event.loaded === event.total) {
102+
progressCallback(0.9999999999);
103+
} else {
104+
progressCallback(event.loaded / event.total);
105+
}
106+
}
107+
}
108+
});
113109

114-
xhr.open(init.method, input, true);
110+
xhr.open(init.method, input, true);
115111

116-
if (init.headers) {
117-
const headers = new Headers(init.headers);
118-
headers.forEach((value, key) => {
119-
xhr.setRequestHeader(key, value);
120-
});
121-
}
112+
if (init.headers) {
113+
const headers = new Headers(init.headers);
114+
headers.forEach((value, key) => {
115+
xhr.setRequestHeader(key, value);
116+
});
117+
}
122118

123-
init.signal?.throwIfAborted();
124-
xhr.send(init.body);
119+
init.signal?.throwIfAborted();
120+
xhr.send(init.body);
125121

126-
return new Promise((resolve, reject) => {
127-
xhr.addEventListener("load", () => {
128-
resolve(
129-
new Response(xhr.responseText, {
130-
status: xhr.status,
131-
statusText: xhr.statusText,
132-
headers: Object.fromEntries(
133-
xhr
134-
.getAllResponseHeaders()
135-
.trim()
136-
.split("\n")
137-
.map((header) => [
138-
header.slice(0, header.indexOf(":")),
139-
header.slice(header.indexOf(":") + 1).trim(),
140-
])
141-
),
142-
})
143-
);
144-
});
145-
xhr.addEventListener("error", () => {
146-
reject(new Error(xhr.statusText));
147-
});
122+
return new Promise((resolve, reject) => {
123+
xhr.addEventListener("load", () => {
124+
resolve(
125+
new Response(xhr.responseText, {
126+
status: xhr.status,
127+
statusText: xhr.statusText,
128+
headers: Object.fromEntries(
129+
xhr
130+
.getAllResponseHeaders()
131+
.trim()
132+
.split("\n")
133+
.map((header) => [header.slice(0, header.indexOf(":")), header.slice(header.indexOf(":") + 1).trim()])
134+
),
135+
})
136+
);
137+
});
138+
xhr.addEventListener("error", () => {
139+
reject(new Error(xhr.statusText));
140+
});
148141

149-
if (init.signal) {
150-
init.signal.addEventListener("abort", () => {
151-
xhr.abort();
142+
if (init.signal) {
143+
init.signal.addEventListener("abort", () => {
144+
xhr.abort();
152145

153-
try {
154-
init.signal?.throwIfAborted();
155-
} catch (err) {
156-
reject(err);
157-
}
158-
});
159-
}
160-
});
161-
},
146+
try {
147+
init.signal?.throwIfAborted();
148+
} catch (err) {
149+
reject(err);
150+
}
151+
});
152+
}
153+
});
154+
},
162155
});
163156
}

0 commit comments

Comments
 (0)