Skip to content

Commit 566d22f

Browse files
authored
Xet upload: Fixup file progress (#1728)
Follow up #1727 Tested the wrong file that was already deduplicated
1 parent 5305f54 commit 566d22f

File tree

3 files changed

+62
-12
lines changed

3 files changed

+62
-12
lines changed

packages/hub/scripts/bench.ts

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@ import { uploadShards } from "../src/utils/uploadShards.js";
22
import { sha256 } from "../src/utils/sha256.js";
33
import { parseArgs } from "node:util";
44
import { tmpdir } from "node:os";
5-
import { join } from "node:path";
5+
import path, { join } from "node:path";
66
import { writeFile, readFile, stat, mkdir } from "node:fs/promises";
77
import type { RepoId } from "../src/types/public.js";
88
import { toRepoId } from "../src/utils/toRepoId.js";
99
import type { CommitOperation } from "../src/index.js";
1010
import { commitIter, downloadFile } from "../src/index.js";
1111
import { SplicedBlob } from "../src/utils/SplicedBlob.js";
1212
import { pathToFileURL } from "node:url";
13+
import { existsSync } from "node:fs";
14+
import { FileBlob } from "../src/utils/FileBlob.js";
1315

1416
/**
1517
* This script downloads the files from openai-community/gpt2 and simulates an upload to a xet repo.
@@ -37,6 +39,11 @@ const FILES_TO_DOWNLOAD = [
3739
filename: "64.tflite",
3840
sha256: "cfcd510b239d90b71ee87d4e57a5a8c2d55b2a941e5d9fe5852298268ddbe61b",
3941
},
42+
{
43+
url: "https://huggingface.co/openai-community/gpt2/resolve/main/model.safetensors?download=true",
44+
filename: "model.safetensors",
45+
sha256: "248dfc3911869ec493c76e65bf2fcf7f615828b0254c12b473182f0f81d3a707",
46+
},
4047
];
4148

4249
const FILES_TO_EDIT = [
@@ -84,8 +91,7 @@ async function* createFileSource(files: Array<{ filepath: string; filename: stri
8491
}> {
8592
for (const file of files) {
8693
console.log(`Processing ${file.filename}...`);
87-
const buffer = await readFile(file.filepath);
88-
let blob = new Blob([buffer]);
94+
let blob: Blob = await FileBlob.create(file.filepath);
8995

9096
if (file.filename.endsWith(".edited")) {
9197
const edits = FILES_TO_EDIT.find((f) => f.filename === file.filename)?.edits;
@@ -110,7 +116,7 @@ async function* createFileSource(files: Array<{ filepath: string; filename: stri
110116
const sha256ToCheck =
111117
FILES_TO_DOWNLOAD.find((f) => f.filename === file.filename)?.sha256 ||
112118
FILES_TO_EDIT.find((f) => f.filename === file.filename)?.sha256;
113-
if (sha256Hash !== sha256ToCheck) {
119+
if (sha256ToCheck !== undefined && sha256Hash !== sha256ToCheck) {
114120
throw new Error(`SHA256 mismatch for ${file.filename}: ${sha256Hash} !== ${sha256ToCheck}`);
115121
}
116122

@@ -214,6 +220,10 @@ async function main() {
214220
short: "c",
215221
default: false,
216222
},
223+
localFilePath: {
224+
type: "string",
225+
short: "f",
226+
},
217227
write: {
218228
type: "boolean",
219229
short: "w",
@@ -250,6 +260,13 @@ async function main() {
250260
files.push({ filepath, filename: fileInfo.filename });
251261
}
252262

263+
if (args.localFilePath) {
264+
if (!existsSync(args.localFilePath)) {
265+
throw new Error(`Local file ${args.localFilePath} does not exist`);
266+
}
267+
files.push({ filepath: args.localFilePath, filename: path.basename(args.localFilePath) });
268+
}
269+
253270
// Parse repo
254271
const repoName = args.repo;
255272

@@ -279,7 +296,20 @@ async function main() {
279296
// Process files through uploadShards
280297
const fileSource = createFileSource(files);
281298

282-
for await (const event of uploadShards(fileSource, uploadParams)) {
299+
const fileProgress: Record<string, number> = {};
300+
301+
for await (const event of uploadShards(fileSource, {
302+
...uploadParams,
303+
yieldCallback: (event) => {
304+
if (!fileProgress[event.path]) {
305+
fileProgress[event.path] = event.progress;
306+
}
307+
if (event.progress < fileProgress[event.path]) {
308+
throw new Error(`Progress for ${event.path} went down from ${fileProgress[event.path]} to ${event.progress}`);
309+
}
310+
fileProgress[event.path] = event.progress;
311+
},
312+
})) {
283313
switch (event.event) {
284314
case "file": {
285315
console.log(`\n📁 Processed file: ${event.path}`);
@@ -303,6 +333,14 @@ async function main() {
303333
case "fileProgress": {
304334
const progress = (event.progress * 100).toFixed(1);
305335
console.log(` 📈 Progress for ${event.path}: ${progress}%`);
336+
337+
if (!fileProgress[event.path]) {
338+
fileProgress[event.path] = event.progress;
339+
}
340+
if (event.progress < fileProgress[event.path]) {
341+
throw new Error(`Progress for ${event.path} went down from ${fileProgress[event.path]} to ${event.progress}`);
342+
}
343+
fileProgress[event.path] = event.progress;
306344
break;
307345
}
308346
}

packages/hub/src/utils/FileBlob.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ export class FileBlob extends Blob {
9696
* Returns a stream around the part of the file delimited by the FileBlob.
9797
*/
9898
override stream(): ReturnType<Blob["stream"]> {
99+
if (this.start === this.end) {
100+
return new Blob([]).stream();
101+
}
102+
99103
return Readable.toWeb(createReadStream(this.path, { start: this.start, end: this.end - 1 })) as ReturnType<
100104
Blob["stream"]
101105
>;

packages/hub/src/utils/createXorbs.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class CurrentXorbInfo {
4040

4141
fileProcessedBytes: Record<string, number>;
4242
fileUploadedBytes: Record<string, number>;
43+
fileSize: Record<string, number>;
4344
data: Uint8Array;
4445
immutableData: {
4546
chunkIndex: number;
@@ -52,6 +53,7 @@ class CurrentXorbInfo {
5253
this.chunks = [];
5354
this.fileProcessedBytes = {};
5455
this.fileUploadedBytes = {};
56+
this.fileSize = {};
5557
this.data = new Uint8Array(XORB_SIZE);
5658
this.immutableData = null;
5759
}
@@ -68,10 +70,13 @@ class CurrentXorbInfo {
6870
hash: computeXorbHash(xorbChunksCleaned),
6971
chunks: xorbChunksCleaned,
7072
id: this.id,
71-
files: Object.entries(this.fileProcessedBytes).map(([path, progress]) => ({
73+
files: Object.entries(this.fileProcessedBytes).map(([path, processedBytes]) => ({
7274
path,
73-
progress,
74-
lastSentProgress: this.fileUploadedBytes[path] ?? 0,
75+
progress: processedBytes / this.fileSize[path],
76+
lastSentProgress:
77+
((this.fileUploadedBytes[path] ?? 0) +
78+
(processedBytes - (this.fileUploadedBytes[path] ?? 0)) * PROCESSING_PROGRESS_RATIO) /
79+
this.fileSize[path],
7580
})),
7681
};
7782
}
@@ -111,7 +116,7 @@ export async function* createXorbs(
111116
const chunkCache = new ChunkCache();
112117
let xorb = new CurrentXorbInfo();
113118

114-
const nextXorb = (currentFile: { path: string; uploadedBytes: number }): XorbEvent => {
119+
const nextXorb = (currentFile: { path: string; uploadedBytes: number; size: number }): XorbEvent => {
115120
const event = xorb.event(chunkModule.compute_xorb_hash.bind(chunkModule));
116121

117122
xorbId++;
@@ -120,6 +125,7 @@ export async function* createXorbs(
120125
xorb.fileUploadedBytes = {
121126
[currentFile.path]: currentFile.uploadedBytes,
122127
};
128+
xorb.fileSize[currentFile.path] = currentFile.size;
123129

124130
return event;
125131
};
@@ -143,10 +149,12 @@ export async function* createXorbs(
143149

144150
try {
145151
for await (const fileSource of fileSources) {
152+
xorb.fileSize[fileSource.path] = fileSource.content.size;
153+
146154
// Load dedup info for the first chunk of the file, if it's potentially modified by the splice
147155
if (fileSource.content instanceof SplicedBlob && fileSource.content.firstSpliceIndex < MAX_CHUNK_SIZE) {
148156
await loadDedupInfoToCache(
149-
fileSource.content.originalBlob.slice(0, fileSource.content.firstSpliceIndex),
157+
fileSource.content.originalBlob.slice(0, MAX_CHUNK_SIZE),
150158
remoteXorbHashes,
151159
params,
152160
chunkCache,
@@ -235,7 +243,7 @@ export async function* createXorbs(
235243
if (cacheData === undefined) {
236244
if (!writeChunk(xorb, chunkToCopy, chunk.hash)) {
237245
// Failure to write chunk, maybe because it went over xorb size limit
238-
yield nextXorb({ path: fileSource.path, uploadedBytes: processedBytes / fileSource.content.size });
246+
yield nextXorb({ path: fileSource.path, uploadedBytes: processedBytes, size: fileSource.content.size });
239247

240248
chunkIndex = 0;
241249
chunkXorbId = xorbId;
@@ -290,7 +298,7 @@ export async function* createXorbs(
290298
}
291299

292300
if (xorb.chunks.length >= MAX_XORB_CHUNKS) {
293-
yield nextXorb({ path: fileSource.path, uploadedBytes: processedBytes });
301+
yield nextXorb({ path: fileSource.path, uploadedBytes: processedBytes, size: fileSource.content.size });
294302

295303
for (const event of pendingFileEvents) {
296304
event.representation = event.representation.map((rep) => ({

0 commit comments

Comments
 (0)