diff --git a/packages/hub/README.md b/packages/hub/README.md index 08f57f08be..6754634b26 100644 --- a/packages/hub/README.md +++ b/packages/hub/README.md @@ -82,6 +82,25 @@ for await (const progressEvent of await hub.uploadFilesWithProgress({ console.log(progressEvent); } +// Edit a file by adding prefix & suffix +await commit({ + repo, + accessToken: "hf_...", + operations: [{ + type: "edit", + originalContent: originalFile, + edits: [{ + start: 0, + end: 0, + content: new Blob(["prefix"]) + }, { + start: originalFile.length, + end: originalFile.length, + content: new Blob(["suffix"]) + }] + }] +}) + await hub.deleteFile({repo, accessToken: "hf_...", path: "myfile.bin"}); await (await hub.downloadFile({ repo, path: "README.md" })).text(); diff --git a/packages/hub/scripts/bench.ts b/packages/hub/scripts/bench.ts index 170ee9b23c..69efb03d40 100644 --- a/packages/hub/scripts/bench.ts +++ b/packages/hub/scripts/bench.ts @@ -6,9 +6,10 @@ import { join } from "node:path"; import { writeFile, readFile, stat, mkdir } from "node:fs/promises"; import type { RepoId } from "../src/types/public.js"; import { toRepoId } from "../src/utils/toRepoId.js"; -import { commitIter } from "../src/index.js"; +import type { CommitOperation } from "../src/index.js"; +import { commitIter, downloadFile } from "../src/index.js"; +import { SplicedBlob } from "../src/utils/SplicedBlob.js"; import { pathToFileURL } from "node:url"; -import { WebBlob } from "../src/utils/WebBlob.js"; /** * This script downloads the files from openai-community/gpt2 and simulates an upload to a xet repo. @@ -38,6 +39,23 @@ const FILES_TO_DOWNLOAD = [ }, ]; +const FILES_TO_EDIT = [ + { + url: "https://huggingface.co/openai-community/gpt2/resolve/main/64-8bits.tflite?download=true", + filename: "64-8bits.tflite.edited", + sha256: "c2b116ccc9e5362d55dd60b344a4b93156594feeef312b5b8833151f0732aa0a", + edits: [ + { + start: 0, + end: 1000, + content: new Blob([ + "Adding a new prefix to this TFLite file. Will xet still be efficient in deduplicating the file?", + ]), + }, + ], + }, +]; + async function downloadFileIfNotExists(url: string, filepath: string): Promise { try { await stat(filepath); @@ -58,13 +76,25 @@ async function downloadFileIfNotExists(url: string, filepath: string): Promise -): AsyncGenerator<{ content: Blob; path: string; sha256: string }> { +async function* createFileSource(files: Array<{ filepath: string; filename: string }>): AsyncGenerator<{ + content: Blob; + path: string; + sha256: string; + edits?: Array<{ start: number; end: number; content: Blob }>; +}> { for (const file of files) { console.log(`Processing ${file.filename}...`); const buffer = await readFile(file.filepath); - const blob = new Blob([buffer]); + let blob = new Blob([buffer]); + + if (file.filename.endsWith(".edited")) { + const edits = FILES_TO_EDIT.find((f) => f.filename === file.filename)?.edits; + if (edits !== undefined) { + for (const edit of edits) { + blob = SplicedBlob.create(blob, [{ insert: edit.content, start: edit.start, end: edit.end }]); + } + } + } // Calculate sha256 console.log(`Calculating SHA256 for ${file.filename}...`); @@ -77,12 +107,11 @@ async function* createFileSource( console.log(`SHA256 for ${file.filename}: ${sha256Hash}`); - if (sha256Hash !== FILES_TO_DOWNLOAD.find((f) => f.filename === file.filename)?.sha256) { - throw new Error( - `SHA256 mismatch for ${file.filename}: ${sha256Hash} !== ${FILES_TO_DOWNLOAD.find( - (f) => f.filename === file.filename - )?.sha256}` - ); + const sha256ToCheck = + FILES_TO_DOWNLOAD.find((f) => f.filename === file.filename)?.sha256 || + FILES_TO_EDIT.find((f) => f.filename === file.filename)?.sha256; + if (sha256Hash !== sha256ToCheck) { + throw new Error(`SHA256 mismatch for ${file.filename}: ${sha256Hash} !== ${sha256ToCheck}`); } yield { @@ -215,6 +244,12 @@ async function main() { files.push({ filepath, filename: fileInfo.filename }); } + for (const fileInfo of FILES_TO_EDIT) { + const filepath = join(downloadDir, fileInfo.filename); + await downloadFileIfNotExists(fileInfo.url, filepath); + files.push({ filepath, filename: fileInfo.filename }); + } + // Parse repo const repoName = args.repo; @@ -302,13 +337,25 @@ async function main() { if (args.commit) { console.log("\n=== Committing files ==="); + const operations: CommitOperation[] = []; + for (const fileInfo of FILES_TO_DOWNLOAD) { + operations.push({ + operation: "addOrUpdate", + content: pathToFileURL(join(downloadDir, fileInfo.filename)), + path: fileInfo.filename, + }); + } + for (const fileInfo of FILES_TO_EDIT) { + operations.push({ + operation: "edit", + originalContent: new Blob([await readFile(join(downloadDir, fileInfo.filename))]), + edits: fileInfo.edits, + path: fileInfo.filename, + }); + } const iterator = commitIter({ repo, - operations: files.map((file) => ({ - operation: "addOrUpdate", - content: pathToFileURL(file.filepath), - path: file.filename, - })), + operations, accessToken: args.token, title: "Upload xet files with JS lib", useXet: true, @@ -325,7 +372,16 @@ async function main() { console.log("Redownloading files and verifying SHA256 integrity"); for (const file of FILES_TO_DOWNLOAD) { - const fileBlob = await WebBlob.create(new URL(file.url)); + const fileBlob = await downloadFile({ + repo, + path: file.filename, + accessToken: args.token, + }); + + if (!fileBlob) { + throw new Error(`Failed to download ${file.filename}`); + } + const sha256Hash = sha256(fileBlob, { useWebWorker: false }); let res: IteratorResult; do { @@ -335,6 +391,26 @@ async function main() { console.log(`${file.filename}: ${finalHash} === ${file.sha256} ${finalHash === file.sha256 ? "✅" : "❌"}`); } + + for (const file of FILES_TO_EDIT) { + const fileBlob = await downloadFile({ + repo, + path: file.filename, + accessToken: args.token, + }); + + if (!fileBlob) { + throw new Error(`Failed to download ${file.filename}`); + } + + const sha256Hash = sha256(fileBlob, { useWebWorker: false }); + let res: IteratorResult; + do { + res = await sha256Hash.next(); + } while (!res.done); + const finalHash = res.value; + console.log(`${file.filename}: ${finalHash} === ${file.sha256} ${finalHash === file.sha256 ? "✅" : "❌"}`); + } } } diff --git a/packages/hub/src/lib/commit.ts b/packages/hub/src/lib/commit.ts index 30f83e6ebc..4bea82654f 100644 --- a/packages/hub/src/lib/commit.ts +++ b/packages/hub/src/lib/commit.ts @@ -25,6 +25,7 @@ import { createBlobs } from "../utils/createBlobs"; import { uploadShards } from "../utils/uploadShards"; import { splitAsyncGenerator } from "../utils/splitAsyncGenerator"; import { mergeAsyncGenerators } from "../utils/mergeAsyncGenerators"; +import { SplicedBlob } from "../utils/SplicedBlob"; const CONCURRENT_SHAS = 5; const CONCURRENT_LFS_UPLOADS = 5; @@ -44,6 +45,36 @@ export interface CommitFile { // forceLfs?: boolean } +/** + * Opitmized when only the beginning or the end of the file is replaced + * + * todo: handle other cases + */ +export interface CommitEditFile { + operation: "edit"; + path: string; + /** Later, will be ContentSource. For now simpler to just handle blobs */ + originalContent: Blob; + edits: Array<{ + /** + * Later, will be ContentSource. For now simpler to just handle blobs + * + * originalContent from [start, end) will be replaced by this + */ + content: Blob; + /** + * The start position of the edit in the original content + */ + start: number; + /** + * The end position of the edit in the original content + * + * originalContent from [start, end) will be replaced by the edit + */ + end: number; + }>; +} + type CommitBlob = Omit & { content: Blob }; // TODO: find a nice way to handle LFS & non-LFS files in an uniform manner, see https://github.com/huggingface/moon-landing/issues/4370 @@ -54,7 +85,7 @@ type CommitBlob = Omit & { content: Blob }; // content?: ContentSource; // }; -export type CommitOperation = CommitDeletedEntry | CommitFile /* | CommitRenameFile */; +export type CommitOperation = CommitDeletedEntry | CommitFile | CommitEditFile /* | CommitRenameFile */; type CommitBlobOperation = Exclude | CommitBlob; export type CommitParams = { @@ -91,9 +122,6 @@ export type CommitParams = { fetch?: typeof fetch; abortSignal?: AbortSignal; // Credentials are optional due to custom fetch functions or cookie auth - /** - * @deprecated Not yet ready for production use - */ useXet?: boolean; } & Partial; @@ -138,6 +166,25 @@ export async function* commitIter(params: CommitParams): AsyncGenerator(); const abortController = new AbortController(); @@ -160,6 +207,23 @@ export async function* commitIter(params: CommitParams): AsyncGenerator { + if (operation.operation === "edit" && !useXet) { + throw new Error("Edit operation is not supported when Xet is disabled"); + } + + if (operation.operation === "edit") { + // Convert EditFile operation to a file operation with SplicedBlob + const splicedBlob = SplicedBlob.create( + operation.originalContent, + operation.edits.map((splice) => ({ insert: splice.content, start: splice.start, end: splice.end })) + ); + return { + operation: "addOrUpdate" as const, + path: operation.path, + content: splicedBlob, + }; + } + if (operation.operation !== "addOrUpdate") { return operation; } @@ -678,6 +742,13 @@ async function convertOperationToNdJson(operation: CommitBlobOperation): Promise }, }; } + case "edit": { + // Note: By the time we get here, splice operations should have been converted to addOrUpdate operations with SplicedBlob + // But we handle this case for completeness + throw new Error( + "Edit operations should be converted to addOrUpdate operations before reaching convertOperationToNdJson" + ); + } default: throw new TypeError("Unknown operation: " + (operation as { operation: string }).operation); } diff --git a/packages/hub/src/lib/dataset-info.ts b/packages/hub/src/lib/dataset-info.ts index 542b5aa0f4..9253546845 100644 --- a/packages/hub/src/lib/dataset-info.ts +++ b/packages/hub/src/lib/dataset-info.ts @@ -37,7 +37,6 @@ export async function datasetInfo< { headers: { ...(accessToken ? { Authorization: `Bearer ${accessToken}` } : {}), - Accepts: "application/json", }, } ); diff --git a/packages/hub/src/lib/model-info.ts b/packages/hub/src/lib/model-info.ts index ee7ff571f2..247f7e7c32 100644 --- a/packages/hub/src/lib/model-info.ts +++ b/packages/hub/src/lib/model-info.ts @@ -38,7 +38,6 @@ export async function modelInfo< { headers: { ...(accessToken ? { Authorization: `Bearer ${accessToken}` } : {}), - Accepts: "application/json", }, } ); diff --git a/packages/hub/src/lib/space-info.ts b/packages/hub/src/lib/space-info.ts index 9422353825..198c91d992 100644 --- a/packages/hub/src/lib/space-info.ts +++ b/packages/hub/src/lib/space-info.ts @@ -38,7 +38,6 @@ export async function spaceInfo< { headers: { ...(accessToken ? { Authorization: `Bearer ${accessToken}` } : {}), - Accepts: "application/json", }, } ); diff --git a/packages/hub/src/lib/upload-file.ts b/packages/hub/src/lib/upload-file.ts index a3d4368cdb..da051cb0d5 100644 --- a/packages/hub/src/lib/upload-file.ts +++ b/packages/hub/src/lib/upload-file.ts @@ -15,9 +15,6 @@ export function uploadFile( fetch?: CommitParams["fetch"]; useWebWorkers?: CommitParams["useWebWorkers"]; abortSignal?: CommitParams["abortSignal"]; - /** - * @deprecated Not yet ready for production use - */ useXet?: CommitParams["useXet"]; } & Partial ): Promise { diff --git a/packages/hub/src/lib/upload-files-with-progress.ts b/packages/hub/src/lib/upload-files-with-progress.ts index e490306b92..ed026465fb 100644 --- a/packages/hub/src/lib/upload-files-with-progress.ts +++ b/packages/hub/src/lib/upload-files-with-progress.ts @@ -29,9 +29,6 @@ export async function* uploadFilesWithProgress( parentCommit?: CommitParams["parentCommit"]; abortSignal?: CommitParams["abortSignal"]; maxFolderDepth?: CommitParams["maxFolderDepth"]; - /** - * @deprecated Not yet ready for production use - */ useXet?: CommitParams["useXet"]; /** * Set this to true in order to have progress events for hashing diff --git a/packages/hub/src/lib/upload-files.ts b/packages/hub/src/lib/upload-files.ts index 2226810652..585273fa98 100644 --- a/packages/hub/src/lib/upload-files.ts +++ b/packages/hub/src/lib/upload-files.ts @@ -16,9 +16,6 @@ export function uploadFiles( useWebWorkers?: CommitParams["useWebWorkers"]; maxFolderDepth?: CommitParams["maxFolderDepth"]; abortSignal?: CommitParams["abortSignal"]; - /** - * @deprecated Not yet ready for production use - */ useXet?: CommitParams["useXet"]; } & Partial ): Promise { diff --git a/packages/hub/src/utils/SplicedBlob.spec.ts b/packages/hub/src/utils/SplicedBlob.spec.ts new file mode 100644 index 0000000000..0e46598220 --- /dev/null +++ b/packages/hub/src/utils/SplicedBlob.spec.ts @@ -0,0 +1,454 @@ +import { describe, expect, it, beforeEach } from "vitest"; +import { SplicedBlob } from "./SplicedBlob"; + +describe("SplicedBlob", () => { + let originalBlob: Blob; + let insertBlob: Blob; + let replaceBlob: Blob; + + beforeEach(() => { + // originalBlob: "0123456789" (10 chars) + originalBlob = new Blob(["0123456789"]); + // insertBlob: "ABC" (3 chars) - used in tests where we insert something into the blob + insertBlob = new Blob(["ABC"]); + // replaceBlob: "XY" (2 chars) - used in tests where part of the blob is replaced + replaceBlob = new Blob(["XY"]); + }); + + describe("create", () => { + it("should create a SplicedBlob with valid parameters", () => { + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + expect(splicedBlob).toBeInstanceOf(SplicedBlob); + }); + + it("should throw error for negative start", () => { + expect(() => SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: -1, end: 5 }])).toThrow( + "Invalid start/end positions for SplicedBlob" + ); + }); + + it("should throw error for negative end", () => { + expect(() => SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: -1 }])).toThrow( + "Invalid start/end positions for SplicedBlob" + ); + }); + + it("should throw error for start > original.size", () => { + expect(() => SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 15, end: 5 }])).toThrow( + "Invalid start/end positions for SplicedBlob" + ); + }); + + it("should throw error for end > original.size", () => { + expect(() => SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 15 }])).toThrow( + "Invalid start/end positions for SplicedBlob" + ); + }); + + it("should throw error for start > end", () => { + expect(() => SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 7, end: 5 }])).toThrow( + "Invalid start/end positions for SplicedBlob" + ); + }); + }); + + describe("size and type", () => { + it("should calculate size correctly for insertion", () => { + // Insert "ABC" at position 5: "01234ABC56789" = 13 chars + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + expect(splicedBlob.size).toBe(13); + }); + + it("should calculate size correctly for replacement", () => { + // Replace "345" with "XY": "012XY6789" = 9 chars + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: replaceBlob, start: 3, end: 6 }]); + expect(splicedBlob.size).toBe(9); + }); + + it("should return original blob type", () => { + const typedBlob = new Blob(["test"], { type: "text/plain" }); + const splicedBlob = SplicedBlob.create(typedBlob, [{ insert: insertBlob, start: 2, end: 2 }]); + expect(splicedBlob.type).toBe("text/plain"); + }); + }); + + describe("text method", () => { + it("should insert at beginning", async () => { + // Insert "ABC" at start: "ABC0123456789" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 0, end: 0 }]); + const text = await splicedBlob.text(); + expect(text).toBe("ABC0123456789"); + }); + + it("should insert at end", async () => { + // Insert "ABC" at end: "0123456789ABC" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 10, end: 10 }]); + const text = await splicedBlob.text(); + expect(text).toBe("0123456789ABC"); + }); + + it("should insert in middle", async () => { + // Insert "ABC" at position 5: "01234ABC56789" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const text = await splicedBlob.text(); + expect(text).toBe("01234ABC56789"); + }); + + it("should replace content", async () => { + // Replace "345" with "XY": "012XY6789" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: replaceBlob, start: 3, end: 6 }]); + const text = await splicedBlob.text(); + expect(text).toBe("012XY6789"); + }); + + it("should replace everything", async () => { + // Replace entire content with "ABC": "ABC" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 0, end: 10 }]); + const text = await splicedBlob.text(); + expect(text).toBe("ABC"); + }); + }); + + describe("slice method - basic cases", () => { + it("should return empty blob for start >= end", async () => { + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(5, 5); + expect(slice.size).toBe(0); + expect(await slice.text()).toBe(""); + }); + + it("should handle slice beyond size", async () => { + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(10, 20); + const text = await slice.text(); + expect(text).toBe("789"); // Only gets what's available + }); + + it("should throw error for negative start/end", () => { + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + expect(() => splicedBlob.slice(-1, 5)).toThrow("Unsupported negative start/end on SplicedBlob.slice"); + expect(() => splicedBlob.slice(0, -1)).toThrow("Unsupported negative start/end on SplicedBlob.slice"); + }); + }); + + describe("slice method - before segment only", () => { + it("should slice entirely in before segment", async () => { + // SplicedBlob: "01234ABC56789", slice(1, 4) = "123" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(1, 4); + const text = await slice.text(); + expect(text).toBe("123"); + }); + + it("should slice from start of before segment", async () => { + // SplicedBlob: "01234ABC56789", slice(0, 3) = "012" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(0, 3); + const text = await slice.text(); + expect(text).toBe("012"); + }); + + it("should slice to end of before segment", async () => { + // SplicedBlob: "01234ABC56789", slice(2, 5) = "234" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(2, 5); + const text = await slice.text(); + expect(text).toBe("234"); + }); + }); + + describe("slice method - insert segment only", () => { + it("should slice entirely in insert segment", async () => { + // SplicedBlob: "01234ABC56789", slice(6, 7) = "B" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(6, 7); + const text = await slice.text(); + expect(text).toBe("B"); + }); + + it("should slice entire insert segment", async () => { + // SplicedBlob: "01234ABC56789", slice(5, 8) = "ABC" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(5, 8); + const text = await slice.text(); + expect(text).toBe("ABC"); + }); + + it("should slice from start of insert segment", async () => { + // SplicedBlob: "01234ABC56789", slice(5, 7) = "AB" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(5, 7); + const text = await slice.text(); + expect(text).toBe("AB"); + }); + + it("should slice to end of insert segment", async () => { + // SplicedBlob: "01234ABC56789", slice(6, 8) = "BC" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(6, 8); + const text = await slice.text(); + expect(text).toBe("BC"); + }); + }); + + describe("slice method - after segment only", () => { + it("should slice entirely in after segment", async () => { + // SplicedBlob: "01234ABC56789", slice(9, 12) = "678" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(9, 12); + const text = await slice.text(); + expect(text).toBe("678"); + }); + + it("should slice from start of after segment", async () => { + // SplicedBlob: "01234ABC56789", slice(8, 11) = "567" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(8, 11); + const text = await slice.text(); + expect(text).toBe("567"); + }); + + it("should slice to end of after segment", async () => { + // SplicedBlob: "01234ABC56789", slice(10, 13) = "789" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(10, 13); + const text = await slice.text(); + expect(text).toBe("789"); + }); + }); + + describe("slice method - spanning before and insert", () => { + it("should slice spanning before and insert segments", async () => { + // SplicedBlob: "01234ABC56789", slice(3, 7) = "34AB" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(3, 7); + const text = await slice.text(); + expect(text).toBe("34AB"); + }); + + it("should slice from start spanning before and insert", async () => { + // SplicedBlob: "01234ABC56789", slice(0, 6) = "01234A" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(0, 6); + const text = await slice.text(); + expect(text).toBe("01234A"); + }); + + it("should slice to end of insert spanning before and insert", async () => { + // SplicedBlob: "01234ABC56789", slice(4, 8) = "4ABC" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(4, 8); + const text = await slice.text(); + expect(text).toBe("4ABC"); + }); + }); + + describe("slice method - spanning insert and after", () => { + it("should slice spanning insert and after segments", async () => { + // SplicedBlob: "01234ABC56789", slice(6, 10) = "BC56" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(6, 10); + const text = await slice.text(); + expect(text).toBe("BC56"); + }); + + it("should slice from start of insert to end", async () => { + // SplicedBlob: "01234ABC56789", slice(5, 13) = "ABC56789" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(5, 13); + const text = await slice.text(); + expect(text).toBe("ABC56789"); + }); + + it("should slice from middle of insert spanning to after", async () => { + // SplicedBlob: "01234ABC56789", slice(7, 11) = "C567" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(7, 11); + const text = await slice.text(); + expect(text).toBe("C567"); + }); + }); + + describe("slice method - spanning all three segments", () => { + it("should slice spanning all three segments", async () => { + // SplicedBlob: "01234ABC56789", slice(3, 10) = "34ABC56" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(3, 10); + const text = await slice.text(); + expect(text).toBe("34ABC56"); + }); + + it("should slice entire spliced blob", async () => { + // SplicedBlob: "01234ABC56789", slice(0, 13) = "01234ABC56789" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(0, 13); + const text = await slice.text(); + expect(text).toBe("01234ABC56789"); + }); + + it("should slice most of spliced blob", async () => { + // SplicedBlob: "01234ABC56789", slice(1, 12) = "1234ABC5678" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice = splicedBlob.slice(1, 12); + const text = await slice.text(); + expect(text).toBe("1234ABC5678"); + }); + }); + + describe("slice method - with replacement", () => { + it("should slice before replacement", async () => { + // Replace "345" with "XY": "012XY6789", slice(0, 3) = "012" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: replaceBlob, start: 3, end: 6 }]); + const slice = splicedBlob.slice(0, 3); + const text = await slice.text(); + expect(text).toBe("012"); + }); + + it("should slice replacement only", async () => { + // Replace "345" with "XY": "012XY6789", slice(3, 5) = "XY" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: replaceBlob, start: 3, end: 6 }]); + const slice = splicedBlob.slice(3, 5); + const text = await slice.text(); + expect(text).toBe("XY"); + }); + + it("should slice after replacement", async () => { + // Replace "345" with "XY": "012XY6789", slice(5, 9) = "6789" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: replaceBlob, start: 3, end: 6 }]); + const slice = splicedBlob.slice(5, 9); + const text = await slice.text(); + expect(text).toBe("6789"); + }); + + it("should slice spanning before and replacement", async () => { + // Replace "345" with "XY": "012XY6789", slice(1, 4) = "12X" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: replaceBlob, start: 3, end: 6 }]); + const slice = splicedBlob.slice(1, 4); + const text = await slice.text(); + expect(text).toBe("12X"); + }); + + it("should slice spanning replacement and after", async () => { + // Replace "345" with "XY": "012XY6789", slice(4, 7) = "Y67" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: replaceBlob, start: 3, end: 6 }]); + const slice = splicedBlob.slice(4, 7); + const text = await slice.text(); + expect(text).toBe("Y67"); + }); + + it("should slice spanning all segments with replacement", async () => { + // Replace "345" with "XY": "012XY6789", slice(1, 8) = "12XY678" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: replaceBlob, start: 3, end: 6 }]); + const slice = splicedBlob.slice(1, 8); + const text = await slice.text(); + expect(text).toBe("12XY678"); + }); + }); + + describe("slice method - edge cases", () => { + it("should handle empty insert blob", async () => { + const emptyBlob = new Blob([""]); + // Replace "345" with "": "0126789" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: emptyBlob, start: 3, end: 6 }]); + const text = await splicedBlob.text(); + expect(text).toBe("0126789"); + + const slice = splicedBlob.slice(2, 5); + expect(await slice.text()).toBe("267"); + }); + + it("should handle slice at segment boundaries", async () => { + // SplicedBlob: "01234ABC56789", exact boundary slices + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + + // End of before segment + expect(await splicedBlob.slice(4, 5).text()).toBe("4"); + // Start of insert segment + expect(await splicedBlob.slice(5, 6).text()).toBe("A"); + // End of insert segment + expect(await splicedBlob.slice(7, 8).text()).toBe("C"); + // Start of after segment + expect(await splicedBlob.slice(8, 9).text()).toBe("5"); + }); + + it("should handle insert at beginning with slice", async () => { + // Insert "ABC" at start: "ABC0123456789" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 0, end: 0 }]); + const slice = splicedBlob.slice(1, 5); + expect(await slice.text()).toBe("BC01"); + }); + + it("should handle insert at end with slice", async () => { + // Insert "ABC" at end: "0123456789ABC" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 10, end: 10 }]); + const slice = splicedBlob.slice(8, 12); + expect(await slice.text()).toBe("89AB"); + }); + }); + + describe("arrayBuffer method", () => { + it("should return correct ArrayBuffer for insertion", async () => { + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const buffer = await splicedBlob.arrayBuffer(); + const text = new TextDecoder().decode(buffer); + expect(text).toBe("01234ABC56789"); + expect(buffer.byteLength).toBe(13); + }); + + it("should return correct ArrayBuffer for replacement", async () => { + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: replaceBlob, start: 3, end: 6 }]); + const buffer = await splicedBlob.arrayBuffer(); + const text = new TextDecoder().decode(buffer); + expect(text).toBe("012XY6789"); + expect(buffer.byteLength).toBe(9); + }); + }); + + describe("stream method", () => { + it("should return correct stream for insertion", async () => { + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const stream = splicedBlob.stream(); + const response = new Response(stream); + const text = await response.text(); + expect(text).toBe("01234ABC56789"); + }); + + it("should return correct stream for replacement", async () => { + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: replaceBlob, start: 3, end: 6 }]); + const stream = splicedBlob.stream(); + const response = new Response(stream); + const text = await response.text(); + expect(text).toBe("012XY6789"); + }); + + it("should handle empty segments in stream", async () => { + const emptyBlob = new Blob([""]); + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: emptyBlob, start: 5, end: 5 }]); + const stream = splicedBlob.stream(); + const response = new Response(stream); + const text = await response.text(); + expect(text).toBe("0123456789"); + }); + }); + + describe("nested slicing", () => { + it("should allow slicing of sliced blob", async () => { + // SplicedBlob: "01234ABC56789", slice(3, 10) = "34ABC56", then slice(2, 5) = "ABC" + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const firstSlice = splicedBlob.slice(3, 10); + const secondSlice = firstSlice.slice(2, 5); + const text = await secondSlice.text(); + expect(text).toBe("ABC"); + }); + + it("should handle multiple levels of slicing", async () => { + // Complex slicing chain + const splicedBlob = SplicedBlob.create(originalBlob, [{ insert: insertBlob, start: 5, end: 5 }]); + const slice1 = splicedBlob.slice(2, 11); // "234ABC567" + const slice2 = slice1.slice(1, 7); // "34ABC5" + const slice3 = slice2.slice(2, 5); // "ABC" + const text = await slice3.text(); + expect(text).toBe("ABC"); + }); + }); +}); diff --git a/packages/hub/src/utils/SplicedBlob.ts b/packages/hub/src/utils/SplicedBlob.ts new file mode 100644 index 0000000000..28877bcc47 --- /dev/null +++ b/packages/hub/src/utils/SplicedBlob.ts @@ -0,0 +1,246 @@ +import { sum } from "./sum"; + +/** + * Represents a single splice operation + */ +interface SpliceOperation { + insert: Blob; + start: number; + end: number; +} + +/** + * @internal + * + * A SplicedBlob is a Blob that represents the result of splicing one or more insert blobs + * into an original blob at specified positions, replacing content between start and end. + * + * It is a drop-in replacement for the Blob class, so you can use it as a Blob. + * The splicing is done virtually without copying data until accessed. + * + * @example + * const originalBlob = new Blob(["Hello, World!"]); + * const insertBlob = new Blob(["Beautiful "]); + * const splicedBlob = SplicedBlob.create(originalBlob, insertBlob, 7, 7); + * // Result represents: "Hello, Beautiful World!" + */ +export class SplicedBlob extends Blob { + public originalBlob: Blob; + public spliceOperations: SpliceOperation[]; + + private constructor(originalBlob: Blob, spliceOperations: SpliceOperation[]) { + super(); + + this.originalBlob = originalBlob; + this.spliceOperations = spliceOperations; // Create a copy to prevent external mutation + } + + static create(originalBlob: Blob, operations: SpliceOperation[]): SplicedBlob { + // Validate all operations + for (const op of operations) { + if (op.start < 0 || op.end < 0) { + throw new Error("Invalid start/end positions for SplicedBlob"); + } + if (op.start > originalBlob.size || op.end > originalBlob.size) { + throw new Error("Invalid start/end positions for SplicedBlob"); + } + if (op.start > op.end) { + throw new Error("Invalid start/end positions for SplicedBlob"); + } + } + + // Sort operations by start position and validate no overlaps + const sortedOps = [...operations].sort((a, b) => a.start - b.start); + for (let i = 0; i < sortedOps.length - 1; i++) { + if (sortedOps[i].end > sortedOps[i + 1].start) { + throw new Error("Overlapping splice operations are not supported"); + } + } + + return new SplicedBlob(originalBlob, sortedOps); + } + + /** + * Returns the size of the spliced blob. + * Size = original size - total replaced size + total insert size + */ + override get size(): number { + let totalReplacedSize = 0; + let totalInsertSize = 0; + + for (const op of this.spliceOperations) { + totalReplacedSize += op.end - op.start; + totalInsertSize += op.insert.size; + } + + return this.originalBlob.size - totalReplacedSize + totalInsertSize; + } + + /** + * Returns the MIME type of the original blob. + */ + override get type(): string { + return this.originalBlob.type; + } + + /** + * Returns a new instance of SplicedBlob that is a slice of the current one. + * + * The slice is inclusive of the start and exclusive of the end. + * The slice method does not support negative start/end. + * + * @param start beginning of the slice + * @param end end of the slice + */ + override slice(start = 0, end = this.size): Blob { + if (start < 0 || end < 0) { + throw new TypeError("Unsupported negative start/end on SplicedBlob.slice"); + } + + start = Math.min(start, this.size); + end = Math.min(end, this.size); + + if (start >= end) { + return new Blob([]); + } + + // Get all segments and calculate their cumulative positions + const segments = this.segments; + const segmentBoundaries: number[] = [0]; + let cumulativeSize = 0; + + for (const segment of segments) { + cumulativeSize += segment.size; + segmentBoundaries.push(cumulativeSize); + } + + // Find which segments the slice spans + const resultSegments: Blob[] = []; + + for (let i = 0; i < segments.length; i++) { + const segmentStart = segmentBoundaries[i]; + const segmentEnd = segmentBoundaries[i + 1]; + + // Skip segments that are entirely before the slice + if (segmentEnd <= start) { + continue; + } + + // Skip segments that are entirely after the slice + if (segmentStart >= end) { + break; + } + + // Calculate slice bounds within this segment + const sliceStart = Math.max(0, start - segmentStart); + const sliceEnd = Math.min(segments[i].size, end - segmentStart); + + if (sliceStart < sliceEnd) { + resultSegments.push(segments[i].slice(sliceStart, sliceEnd)); + } + } + + return new Blob(resultSegments); + } + + get firstSpliceIndex(): number { + return this.spliceOperations[0]?.start ?? Infinity; + } + + /** + * Read the spliced blob content and returns it as an ArrayBuffer. + */ + override async arrayBuffer(): Promise { + const segments = this.segments; + const buffers = await Promise.all(segments.map((segment) => segment.arrayBuffer())); + + // Concatenate all buffers + const totalSize = sum(buffers.map((buffer) => buffer.byteLength)); + const result = new Uint8Array(totalSize); + + let offset = 0; + for (const buffer of buffers) { + result.set(new Uint8Array(buffer), offset); + offset += buffer.byteLength; + } + + return result.buffer; + } + + /** + * Read the spliced blob content and returns it as a string. + */ + override async text(): Promise { + const buffer = await this.arrayBuffer(); + return new TextDecoder().decode(buffer); + } + + /** + * Returns a stream around the spliced blob content. + */ + override stream(): ReturnType { + const readable = new ReadableStream({ + start: async (controller) => { + try { + const segments = this.segments; + + for (const segment of segments) { + const reader = segment.stream().getReader(); + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + controller.enqueue(value); + } + } finally { + reader.releaseLock(); + } + } + + controller.close(); + } catch (error) { + controller.error(error); + } + }, + }); + + return readable; + } + + /** + * Get all segments that make up the spliced blob. + * This includes original blob segments between splice operations and insert blobs. + */ + private get segments(): Blob[] { + const segments: Blob[] = []; + let currentPosition = 0; + + // Sort operations by start position to ensure correct order + const sortedOps = [...this.spliceOperations].sort((a, b) => a.start - b.start); + + for (const op of sortedOps) { + // Add segment from current position to start of this operation + if (currentPosition < op.start) { + segments.push(this.originalBlob.slice(currentPosition, op.start)); + } + + // Add the insert blob (if it has content) + if (op.insert.size > 0) { + segments.push(op.insert); + } + + // Move current position to end of this operation + currentPosition = op.end; + } + + // Add remaining segment after last operation + if (currentPosition < this.originalBlob.size) { + segments.push(this.originalBlob.slice(currentPosition)); + } + + return segments; + } +} diff --git a/packages/hub/src/utils/createXorbs.ts b/packages/hub/src/utils/createXorbs.ts index 78e52dbc99..62d388eb08 100644 --- a/packages/hub/src/utils/createXorbs.ts +++ b/packages/hub/src/utils/createXorbs.ts @@ -1,15 +1,10 @@ -/** - * Todo: add dedup: we actually need to remember chunks already written, and not add them to the xorb, and also - * take that into account for file reconstruction - * Todo: byte grouping? - */ - import { bg4_split_bytes, XET_CHUNK_HEADER_BYTES, XetChunkCompressionScheme } from "./XetBlob"; import { compress as lz4_compress } from "../vendor/lz4js"; import { ChunkCache } from "./ChunkCache"; import { xetWriteToken, type XetWriteTokenParams } from "./xetWriteToken"; import type { ShardData } from "./shardParser"; import { parseShardData } from "./shardParser"; +import { SplicedBlob } from "./SplicedBlob"; const TARGET_CHUNK_SIZE = 64 * 1024; /* eslint-disable @typescript-eslint/no-unused-vars */ @@ -140,7 +135,22 @@ export async function* createXorbs( try { for await (const fileSource of fileSources) { + // Load dedup info for the first chunk of the file, if it's potentially modified by the splice + if (fileSource.content instanceof SplicedBlob && fileSource.content.firstSpliceIndex < MAX_CHUNK_SIZE) { + await loadDedupInfoToCache( + fileSource.content.originalBlob.slice(0, fileSource.content.firstSpliceIndex), + remoteXorbHashes, + params, + chunkCache, + chunkModule, + { + maxChunks: 1, + isAtBeginning: true, + } + ); + } let bytesSinceRemoteDedup = Infinity; + let isFirstFileChunk = true; const sourceChunks: Array = []; const reader = fileSource.content.stream().getReader(); @@ -159,39 +169,16 @@ export async function* createXorbs( const addChunks = async function* (chunks: Array<{ hash: string; length: number; dedup: boolean }>) { for (const chunk of chunks) { + if (isFirstFileChunk) { + chunk.dedup = true; + isFirstFileChunk = false; + } let chunkIndex = xorb.chunks.length; let chunkXorbId = xorbId; fileChunks.push({ hash: chunk.hash, length: chunk.length }); // Remove chunks from source data - let chunkToCopy: Uint8Array; - if (chunk.length === sourceChunks[0].length) { - chunkToCopy = sourceChunks[0]; - sourceChunks.shift(); - } else if (chunk.length < sourceChunks[0].length) { - chunkToCopy = sourceChunks[0].subarray(0, chunk.length); - sourceChunks[0] = sourceChunks[0].subarray(chunk.length); - } else { - chunkToCopy = new Uint8Array(chunk.length); - let copyOffset = 0; - let index = 0; - let toSlice = -1; - while (copyOffset < chunk.length) { - const nToCopy = Math.min(sourceChunks[index].length, chunk.length - copyOffset); - chunkToCopy.set(sourceChunks[index].subarray(0, nToCopy), copyOffset); - copyOffset += nToCopy; - - if (nToCopy === sourceChunks[index].length) { - index++; - } else { - toSlice = nToCopy; - } - } - sourceChunks.splice(0, index); - if (toSlice !== -1) { - sourceChunks[0] = sourceChunks[0].subarray(toSlice); - } - } + const chunkToCopy = removeChunkFromSourceData(sourceChunks, chunk.length); let cacheData = chunkCache.getChunk(chunk.hash, chunkModule.compute_hmac); if (cacheData === undefined && chunk.dedup && bytesSinceRemoteDedup >= INTERVAL_BETWEEN_REMOTE_DEDUP) { @@ -434,14 +421,41 @@ function backtrackDedup( return dedupedBytes; } -// interface ChunkHeader { -// version: number; // u8, 1 byte -// compressed_length: number; // 3 * u8, 3 bytes -// compression_scheme: CompressionScheme; // u8, 1 byte -// uncompressed_length: number; // 3 * u8, 3 bytes -// } - -// const CHUNK_HEADER_BYTES = 8; +/** + * Removes and returns a chunk of the specified length from the sourceChunks array. + */ +function removeChunkFromSourceData(sourceChunks: Array, chunkLength: number): Uint8Array { + if (chunkLength === sourceChunks[0].length) { + const chunkToCopy = sourceChunks[0]; + sourceChunks.shift(); + return chunkToCopy; + } else if (chunkLength < sourceChunks[0].length) { + const chunkToCopy = sourceChunks[0].subarray(0, chunkLength); + sourceChunks[0] = sourceChunks[0].subarray(chunkLength); + return chunkToCopy; + } else { + const chunkToCopy = new Uint8Array(chunkLength); + let copyOffset = 0; + let index = 0; + let toSlice = -1; + while (copyOffset < chunkLength) { + const nToCopy = Math.min(sourceChunks[index].length, chunkLength - copyOffset); + chunkToCopy.set(sourceChunks[index].subarray(0, nToCopy), copyOffset); + copyOffset += nToCopy; + + if (nToCopy === sourceChunks[index].length) { + index++; + } else { + toSlice = nToCopy; + } + } + sourceChunks.splice(0, index); + if (toSlice !== -1) { + sourceChunks[0] = sourceChunks[0].subarray(toSlice); + } + return chunkToCopy; + } +} /** * Write a chunk header to the xorb and return the offset of where to write the next chunk @@ -553,3 +567,128 @@ const buildFileRepresentation = ( return representation; }; + +/** + * Helper to load dedup info for blob contents into cache. + * Processes the blob's contents, chunks it, and loads dedup info into cache without writing to xorb. + * + * For now this is optimized for when the replacement data is at the very beginning of the file + * + * todo: handle when it's not at the beginning of the file by backingtracking xorb contents + * todo: handle when it's not at the beginning of the file by using previous content to chunk at the same boundaries as it would have in the original file + */ +async function loadDedupInfoToCache( + content: Blob, + /** Will be mutated */ + remoteXorbHashes: string[], + params: XetWriteTokenParams, + chunkCache: ChunkCache, + // eslint-disable-next-line @typescript-eslint/consistent-type-imports + chunkModule: typeof import("../vendor/xet-chunk/chunker_wasm"), + + opts?: { + isAtBeginning?: boolean; + /** + * The end position of the content to process + * + * Will process content up to the end of the chunk after this position + */ + end?: number; + /** + * The maximum number of chunks to process + * + * Will process content up to the end of the chunk after this position + */ + maxChunks?: number; + } +): Promise { + const chunker = new chunkModule.Chunker(TARGET_CHUNK_SIZE); + const cache = chunkCache; + + let dedupedBytes = 0; + let chunksProcessed = 0; + let totalBytes = 0; + let bytesSinceRemoteDedup = Infinity; + const sourceChunks: Array = []; + + try { + const reader = content.stream().getReader(); + + const processChunks = async (chunkData: Array<{ hash: string; length: number; dedup: boolean }>) => { + for (const chunk of chunkData) { + chunksProcessed++; + if (opts?.isAtBeginning && chunksProcessed === 1) { + chunk.dedup = true; + } + totalBytes += chunk.length; + + // Remove chunks from source data + removeChunkFromSourceData(sourceChunks, chunk.length); + + // Check if chunk is already in cache + let cacheData = cache.getChunk(chunk.hash, chunkModule.compute_hmac); + + // Early return if already cached - no need for remote lookup + if (cacheData !== undefined) { + dedupedBytes += chunk.length; + bytesSinceRemoteDedup += chunk.length; + continue; + } + + // Try remote dedup lookup if conditions are met + if (chunk.dedup && bytesSinceRemoteDedup >= INTERVAL_BETWEEN_REMOTE_DEDUP) { + const token = await xetWriteToken(params); + bytesSinceRemoteDedup = 0; + + const shardResp = await (params.fetch ?? fetch)(token.casUrl + "/v1/chunk/default/" + chunk.hash, { + headers: { + Authorization: `Bearer ${token.accessToken}`, + }, + }); + + if (shardResp.ok) { + const shard = await shardResp.blob(); + const shardData = await parseShardData(shard); + + // Load remote dedup info into cache + for (const xorb of shardData.xorbs) { + const remoteXorbId = -remoteXorbHashes.length; + remoteXorbHashes.push(xorb.hash); + let i = 0; + for (const xorbChunk of xorb.chunks) { + cache.addChunkToCache(xorbChunk.hash, remoteXorbId, i++, shardData.hmacKey); + } + } + cacheData = cache.getChunk(chunk.hash, chunkModule.compute_hmac); + } + } + + if (cacheData !== undefined) { + // Chunk found in cache after remote lookup - it's deduplicated + dedupedBytes += chunk.length; + } + + bytesSinceRemoteDedup += chunk.length; + } + }; + + // Read and process blob content + while (true) { + if (opts?.end !== undefined && totalBytes >= opts.end) { + break; + } + if (opts?.maxChunks !== undefined && chunksProcessed >= opts.maxChunks) { + break; + } + const { done, value } = await reader.read(); + if (done) { + await processChunks(chunker.finish()); + break; + } + sourceChunks.push(value); + await processChunks(chunker.add_data(value)); + } + } finally { + chunker.free(); + } +}