Skip to content

Commit 17d4b19

Browse files
committed
write xorbs from file
1 parent e3ff966 commit 17d4b19

File tree

10 files changed

+3122
-42
lines changed

10 files changed

+3122
-42
lines changed

packages/hub/src/utils/XetBlob.ts

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,26 +56,26 @@ export interface ReconstructionInfo {
5656
offset_into_first_range: number;
5757
}
5858

59-
enum CompressionScheme {
59+
export enum XetChunkCompressionScheme {
6060
None = 0,
6161
LZ4 = 1,
6262
ByteGroupingLZ4 = 2,
6363
}
6464

65-
const compressionSchemeLabels: Record<CompressionScheme, string> = {
66-
[CompressionScheme.None]: "None",
67-
[CompressionScheme.LZ4]: "LZ4",
68-
[CompressionScheme.ByteGroupingLZ4]: "ByteGroupingLZ4",
65+
const compressionSchemeLabels: Record<XetChunkCompressionScheme, string> = {
66+
[XetChunkCompressionScheme.None]: "None",
67+
[XetChunkCompressionScheme.LZ4]: "LZ4",
68+
[XetChunkCompressionScheme.ByteGroupingLZ4]: "ByteGroupingLZ4",
6969
};
7070

7171
interface ChunkHeader {
7272
version: number; // u8, 1 byte
7373
compressed_length: number; // 3 * u8, 3 bytes
74-
compression_scheme: CompressionScheme; // u8, 1 byte
74+
compression_scheme: XetChunkCompressionScheme; // u8, 1 byte
7575
uncompressed_length: number; // 3 * u8, 3 bytes
7676
}
7777

78-
const CHUNK_HEADER_BYTES = 8;
78+
export const XET_CHUNK_HEADER_BYTES = 8;
7979

8080
/**
8181
* XetBlob is a blob implementation that fetches data directly from the Xet storage
@@ -338,7 +338,7 @@ export class XetBlob extends Blob {
338338
continue fetchData;
339339
}
340340

341-
const header = new DataView(result.value.buffer, result.value.byteOffset, CHUNK_HEADER_BYTES);
341+
const header = new DataView(result.value.buffer, result.value.byteOffset, XET_CHUNK_HEADER_BYTES);
342342
const chunkHeader: ChunkHeader = {
343343
version: header.getUint8(0),
344344
compressed_length: header.getUint8(1) | (header.getUint8(2) << 8) | (header.getUint8(3) << 16),
@@ -353,9 +353,9 @@ export class XetBlob extends Blob {
353353
}
354354

355355
if (
356-
chunkHeader.compression_scheme !== CompressionScheme.None &&
357-
chunkHeader.compression_scheme !== CompressionScheme.LZ4 &&
358-
chunkHeader.compression_scheme !== CompressionScheme.ByteGroupingLZ4
356+
chunkHeader.compression_scheme !== XetChunkCompressionScheme.None &&
357+
chunkHeader.compression_scheme !== XetChunkCompressionScheme.LZ4 &&
358+
chunkHeader.compression_scheme !== XetChunkCompressionScheme.ByteGroupingLZ4
359359
) {
360360
throw new Error(
361361
`Unsupported compression scheme ${
@@ -364,18 +364,18 @@ export class XetBlob extends Blob {
364364
);
365365
}
366366

367-
if (result.value.byteLength < chunkHeader.compressed_length + CHUNK_HEADER_BYTES) {
367+
if (result.value.byteLength < chunkHeader.compressed_length + XET_CHUNK_HEADER_BYTES) {
368368
// We need more data to read the full chunk
369369
leftoverBytes = result.value;
370370
continue fetchData;
371371
}
372372

373-
result.value = result.value.slice(CHUNK_HEADER_BYTES);
373+
result.value = result.value.slice(XET_CHUNK_HEADER_BYTES);
374374

375375
let uncompressed =
376-
chunkHeader.compression_scheme === CompressionScheme.LZ4
376+
chunkHeader.compression_scheme === XetChunkCompressionScheme.LZ4
377377
? lz4_decompress(result.value.slice(0, chunkHeader.compressed_length), chunkHeader.uncompressed_length)
378-
: chunkHeader.compression_scheme === CompressionScheme.ByteGroupingLZ4
378+
: chunkHeader.compression_scheme === XetChunkCompressionScheme.ByteGroupingLZ4
379379
? bg4_regoup_bytes(
380380
lz4_decompress(
381381
result.value.slice(0, chunkHeader.compressed_length),
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/**
2+
* Todo: add dedup: we actually need to remember chunks already written, and not add themm to the xorb, and also
3+
* take that into account for file reconstruction
4+
* Todo: byte grouping?
5+
*/
6+
7+
import { XET_CHUNK_HEADER_BYTES, XetChunkCompressionScheme } from "./XetBlob";
8+
import { compress as lz4_compress } from "../vendor/lz4js";
9+
10+
const TARGET_CHUNK_SIZE = 64 * 1024;
11+
const XORB_SIZE = 64 * 1024 * 1024;
12+
const MAX_XORB_CHUNKS = 8 * 1024;
13+
14+
export async function* createXorbs(
15+
fileSource: Blob
16+
): AsyncGenerator<{ xorb: Uint8Array; hash: string }, void, undefined> {
17+
const chunkModule = await import("../vendor/xet-chunk/chunker_wasm");
18+
await chunkModule.init();
19+
const chunker = new chunkModule.Chunker(TARGET_CHUNK_SIZE);
20+
21+
let xorb = new Uint8Array(XORB_SIZE);
22+
const sourceChunks: Array<Uint8Array> = [];
23+
24+
try {
25+
const reader = fileSource.stream().getReader();
26+
let xorbOffset = 0;
27+
let xorbChunks = Array<{ hash: string; length: number }>();
28+
29+
const addChunks = function* (chunks: Array<{ hash: string; length: number }>) {
30+
for (const chunk of chunks) {
31+
let chunkToCopy: Uint8Array;
32+
if (chunk.length === sourceChunks[0].length) {
33+
chunkToCopy = sourceChunks[0];
34+
sourceChunks.shift();
35+
} else if (chunk.length < sourceChunks[0].length) {
36+
chunkToCopy = sourceChunks[0].subarray(0, chunk.length);
37+
sourceChunks[0] = sourceChunks[0].subarray(chunk.length);
38+
} else {
39+
chunkToCopy = new Uint8Array(chunk.length);
40+
let copyOffset = 0;
41+
let index = 0;
42+
while (copyOffset < chunk.length) {
43+
chunkToCopy.set(sourceChunks[index].subarray(0, chunk.length - copyOffset), copyOffset);
44+
copyOffset += sourceChunks[index].length;
45+
index++;
46+
}
47+
sourceChunks.splice(0, index);
48+
}
49+
xorbOffset = writeChunk(xorb, xorbOffset, chunkToCopy);
50+
if (xorbOffset === 0) {
51+
// Failure to write chunk, maybe because it went over xorb size limit
52+
yield { xorb: xorb.subarray(0, xorbOffset), hash: "" };
53+
xorb = new Uint8Array(XORB_SIZE);
54+
xorbOffset = writeChunk(xorb, 0, chunkToCopy);
55+
56+
if (xorbOffset === 0) {
57+
throw new Error("Failed to write chunk into xorb");
58+
}
59+
}
60+
xorbChunks.push(chunk);
61+
if (xorbChunks.length >= MAX_XORB_CHUNKS) {
62+
yield { xorb: xorb.subarray(0, xorbOffset), hash: chunkModule.compute_xorb_hash(xorbChunks) };
63+
xorbOffset = 0;
64+
xorbChunks = [];
65+
xorb = new Uint8Array(XORB_SIZE);
66+
}
67+
}
68+
};
69+
70+
while (true) {
71+
const { done, value } = await reader.read();
72+
if (done) {
73+
yield* addChunks(chunker.finish());
74+
break;
75+
}
76+
sourceChunks.push(value);
77+
yield* addChunks(chunker.add_data(value));
78+
}
79+
} finally {
80+
chunker.free();
81+
// ^ is this really needed ?
82+
}
83+
}
84+
85+
// interface ChunkHeader {
86+
// version: number; // u8, 1 byte
87+
// compressed_length: number; // 3 * u8, 3 bytes
88+
// compression_scheme: CompressionScheme; // u8, 1 byte
89+
// uncompressed_length: number; // 3 * u8, 3 bytes
90+
// }
91+
92+
// const CHUNK_HEADER_BYTES = 8;
93+
94+
/**
95+
* Write a chunk header to the xorb and return the offset of where to write the next chunk
96+
*
97+
* If it returns 0, it means there wasn't enough space in the xorb
98+
*
99+
* Todo: add bg4 compression maybe?
100+
*/
101+
function writeChunk(xorb: Uint8Array, offset: number, chunk: Uint8Array): number {
102+
const compressedChunk = lz4_compress(chunk);
103+
const chunkToWrite = compressedChunk.length < chunk.length ? compressedChunk : chunk;
104+
105+
if (offset + XET_CHUNK_HEADER_BYTES + chunkToWrite.length > XORB_SIZE) {
106+
return 0;
107+
}
108+
109+
xorb[offset] = 0;
110+
xorb[offset + 1] = chunkToWrite.length & 0xff;
111+
xorb[offset + 2] = (chunkToWrite.length >> 8) & 0xff;
112+
xorb[offset + 3] = (chunkToWrite.length >> 16) & 0xff;
113+
xorb[offset + 4] =
114+
chunkToWrite.length < chunk.length ? XetChunkCompressionScheme.LZ4 : XetChunkCompressionScheme.None;
115+
xorb[offset + 5] = chunk.length & 0xff;
116+
xorb[offset + 6] = (chunk.length >> 8) & 0xff;
117+
xorb[offset + 7] = (chunk.length >> 16) & 0xff;
118+
119+
xorb.set(chunkToWrite, offset + XET_CHUNK_HEADER_BYTES);
120+
return offset + XET_CHUNK_HEADER_BYTES + chunkToWrite.length;
121+
}

packages/hub/src/vendor/lz4js/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ export function decompress(src: Uint8Array, maxSize: number) {
519519
// Compresses a buffer to an Lz4 frame. maxSize is optional; if not provided,
520520
// a buffer will be created based on the theoretical worst output size for a
521521
// given input size. The buffer returned will always be perfectly-sized.
522-
export function compress(src: Uint8Array, maxSize: number) {
522+
export function compress(src: Uint8Array, maxSize?: number) {
523523
let dst, size;
524524

525525
if (maxSize === undefined) {
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Xet-Chunk
2+
3+
WASM utilities to chunk & hash data.
4+
5+
An assembly script implementation is available in `@huggingface/xetchunk-wasm`, but for performance reasons we're using WASM directly compiled from the rust source, see https://github.com/huggingface/xet-core/tree/main/hf_xet_wasm which on my local machine processes data at 480MB/s.
6+
7+
We hope in the future to include the build step directly in this package, or to use assembly script WASM (but blake 3 hashing perf needs to be significantly improved).
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// export * from "./chunker_wasm_bg.js";
2+
import * as __glue_imports from "./chunker_wasm_bg.js";
3+
// @ts-expect-error no types
4+
import { __wbg_set_wasm } from "./chunker_wasm_bg.js";
5+
// @ts-expect-error no types
6+
import { wasmBinary } from "./chunker_wasm_bg.wasm.base64.ts";
7+
8+
let initPromise: Promise<void> | null = null;
9+
10+
async function init(): Promise<void> {
11+
if (initPromise) {
12+
return initPromise;
13+
}
14+
let resolve: (value: void) => void;
15+
let reject: (reason?: unknown) => void;
16+
initPromise = new Promise((_resolve, _reject) => {
17+
resolve = _resolve;
18+
reject = _reject;
19+
});
20+
21+
await Promise.resolve();
22+
23+
try {
24+
const wasmModule = await WebAssembly.compile(wasmBinary);
25+
const imports = Object.entries(
26+
WebAssembly.Module.imports(wasmModule).reduce(
27+
(result, item) => ({
28+
...result,
29+
// @ts-expect-error ok for any type
30+
[item.module]: [...(result[item.module] || []), item.name],
31+
}),
32+
{}
33+
)
34+
).map(([from, names]) => ({ from, names }));
35+
const wasm = await WebAssembly.instantiate(wasmModule, {
36+
"./chunker_wasm_bg.js": Object.fromEntries(
37+
// @ts-expect-error ok for any type
38+
(imports[0].names as string[]).map((name) => [name, __glue_imports[name]])
39+
),
40+
});
41+
__wbg_set_wasm(wasm.exports);
42+
// console.log("exports", exports);
43+
// @ts-expect-error it's assigned
44+
wasm.exports.__wbindgen_start();
45+
46+
// @ts-expect-error it's assigned
47+
resolve();
48+
} catch (error) {
49+
// @ts-expect-error it's assigned
50+
reject(error);
51+
}
52+
}
53+
54+
init();
55+
56+
export { init };
57+
58+
export { compute_xorb_hash, Chunker } from "./chunker_wasm_bg.js";
59+
60+
// const exports = WebAssembly.Module.exports(wasmModule).map((item) => item.name);
61+
62+
// console.log("imports", imports);
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/* tslint:disable */
2+
/* eslint-disable */
3+
export function compute_xorb_hash(chunks_array: any): string;
4+
export class Chunker {
5+
free(): void;
6+
constructor(target_chunk_size: number);
7+
add_data(data: Uint8Array): Array<{ hash: string; length: number }>;
8+
finish(): Array<{ hash: string; length: number }>;
9+
}

0 commit comments

Comments
 (0)