Skip to content
30 changes: 15 additions & 15 deletions packages/hub/src/utils/XetBlob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,26 @@ export interface ReconstructionInfo {
offset_into_first_range: number;
}

enum CompressionScheme {
export enum XetChunkCompressionScheme {
None = 0,
LZ4 = 1,
ByteGroupingLZ4 = 2,
}

const compressionSchemeLabels: Record<CompressionScheme, string> = {
[CompressionScheme.None]: "None",
[CompressionScheme.LZ4]: "LZ4",
[CompressionScheme.ByteGroupingLZ4]: "ByteGroupingLZ4",
const compressionSchemeLabels: Record<XetChunkCompressionScheme, string> = {
[XetChunkCompressionScheme.None]: "None",
[XetChunkCompressionScheme.LZ4]: "LZ4",
[XetChunkCompressionScheme.ByteGroupingLZ4]: "ByteGroupingLZ4",
};

interface ChunkHeader {
version: number; // u8, 1 byte
compressed_length: number; // 3 * u8, 3 bytes
compression_scheme: CompressionScheme; // u8, 1 byte
compression_scheme: XetChunkCompressionScheme; // u8, 1 byte
uncompressed_length: number; // 3 * u8, 3 bytes
}

const CHUNK_HEADER_BYTES = 8;
export const XET_CHUNK_HEADER_BYTES = 8;

/**
* XetBlob is a blob implementation that fetches data directly from the Xet storage
Expand Down Expand Up @@ -338,7 +338,7 @@ export class XetBlob extends Blob {
continue fetchData;
}

const header = new DataView(result.value.buffer, result.value.byteOffset, CHUNK_HEADER_BYTES);
const header = new DataView(result.value.buffer, result.value.byteOffset, XET_CHUNK_HEADER_BYTES);
const chunkHeader: ChunkHeader = {
version: header.getUint8(0),
compressed_length: header.getUint8(1) | (header.getUint8(2) << 8) | (header.getUint8(3) << 16),
Expand All @@ -353,9 +353,9 @@ export class XetBlob extends Blob {
}

if (
chunkHeader.compression_scheme !== CompressionScheme.None &&
chunkHeader.compression_scheme !== CompressionScheme.LZ4 &&
chunkHeader.compression_scheme !== CompressionScheme.ByteGroupingLZ4
chunkHeader.compression_scheme !== XetChunkCompressionScheme.None &&
chunkHeader.compression_scheme !== XetChunkCompressionScheme.LZ4 &&
chunkHeader.compression_scheme !== XetChunkCompressionScheme.ByteGroupingLZ4
) {
throw new Error(
`Unsupported compression scheme ${
Expand All @@ -364,18 +364,18 @@ export class XetBlob extends Blob {
);
}

if (result.value.byteLength < chunkHeader.compressed_length + CHUNK_HEADER_BYTES) {
if (result.value.byteLength < chunkHeader.compressed_length + XET_CHUNK_HEADER_BYTES) {
// We need more data to read the full chunk
leftoverBytes = result.value;
continue fetchData;
}

result.value = result.value.slice(CHUNK_HEADER_BYTES);
result.value = result.value.slice(XET_CHUNK_HEADER_BYTES);

let uncompressed =
chunkHeader.compression_scheme === CompressionScheme.LZ4
chunkHeader.compression_scheme === XetChunkCompressionScheme.LZ4
? lz4_decompress(result.value.slice(0, chunkHeader.compressed_length), chunkHeader.uncompressed_length)
: chunkHeader.compression_scheme === CompressionScheme.ByteGroupingLZ4
: chunkHeader.compression_scheme === XetChunkCompressionScheme.ByteGroupingLZ4
? bg4_regoup_bytes(
lz4_decompress(
result.value.slice(0, chunkHeader.compressed_length),
Expand Down
121 changes: 121 additions & 0 deletions packages/hub/src/utils/createXorbs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Todo: add dedup: we actually need to remember chunks already written, and not add themm to the xorb, and also
* take that into account for file reconstruction
* Todo: byte grouping?
*/

import { XET_CHUNK_HEADER_BYTES, XetChunkCompressionScheme } from "./XetBlob";
import { compress as lz4_compress } from "../vendor/lz4js";

const TARGET_CHUNK_SIZE = 64 * 1024;
const XORB_SIZE = 64 * 1024 * 1024;
const MAX_XORB_CHUNKS = 8 * 1024;

export async function* createXorbs(
fileSource: Blob
): AsyncGenerator<{ xorb: Uint8Array; hash: string }, void, undefined> {
const chunkModule = await import("../vendor/xet-chunk/chunker_wasm");
await chunkModule.init();
const chunker = new chunkModule.Chunker(TARGET_CHUNK_SIZE);

let xorb = new Uint8Array(XORB_SIZE);
const sourceChunks: Array<Uint8Array> = [];

try {
const reader = fileSource.stream().getReader();
let xorbOffset = 0;
let xorbChunks = Array<{ hash: string; length: number }>();

const addChunks = function* (chunks: Array<{ hash: string; length: number }>) {
for (const chunk of chunks) {
let chunkToCopy: Uint8Array;
if (chunk.length === sourceChunks[0].length) {
chunkToCopy = sourceChunks[0];
sourceChunks.shift();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks more inefficient than using an index approach no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just the leftover data from the chunking process, there should be at most 128kB of it I think and even in 16kB chunks it's only 8 chunks, eg sourceChunks.length <= 8

So with those params I don't think we care too much

} else if (chunk.length < sourceChunks[0].length) {
chunkToCopy = sourceChunks[0].subarray(0, chunk.length);
sourceChunks[0] = sourceChunks[0].subarray(chunk.length);
} else {
chunkToCopy = new Uint8Array(chunk.length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be optimized for less memory allocation btw

Copy link
Member Author

@coyotte508 coyotte508 Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing we could do is have a permanent MAX_CHUNK_SIZE Uint8 array that we reuse?

(since we only have one chunkToCopy at a time)

let copyOffset = 0;
let index = 0;
while (copyOffset < chunk.length) {
chunkToCopy.set(sourceChunks[index].subarray(0, chunk.length - copyOffset), copyOffset);
copyOffset += sourceChunks[index].length;
index++;
}
sourceChunks.splice(0, index);
}
xorbOffset = writeChunk(xorb, xorbOffset, chunkToCopy);
if (xorbOffset === 0) {
// Failure to write chunk, maybe because it went over xorb size limit
yield { xorb: xorb.subarray(0, xorbOffset), hash: "" };
xorb = new Uint8Array(XORB_SIZE);
xorbOffset = writeChunk(xorb, 0, chunkToCopy);

if (xorbOffset === 0) {
throw new Error("Failed to write chunk into xorb");
}
}
xorbChunks.push(chunk);
if (xorbChunks.length >= MAX_XORB_CHUNKS) {
yield { xorb: xorb.subarray(0, xorbOffset), hash: chunkModule.compute_xorb_hash(xorbChunks) };
xorbOffset = 0;
xorbChunks = [];
xorb = new Uint8Array(XORB_SIZE);
}
}
};

while (true) {
const { done, value } = await reader.read();
if (done) {
yield* addChunks(chunker.finish());
break;
}
sourceChunks.push(value);
yield* addChunks(chunker.add_data(value));
}
} finally {
chunker.free();
// ^ is this really needed ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WASM module can allocate memory that exists outside js garbage collector, so it depends really on the wasm Chunker code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some code in chunker_wasm_bg.js but I can't tell for sure, so being prudent atm

}
}

// interface ChunkHeader {
// version: number; // u8, 1 byte
// compressed_length: number; // 3 * u8, 3 bytes
// compression_scheme: CompressionScheme; // u8, 1 byte
// uncompressed_length: number; // 3 * u8, 3 bytes
// }

// const CHUNK_HEADER_BYTES = 8;

/**
* Write a chunk header to the xorb and return the offset of where to write the next chunk
*
* If it returns 0, it means there wasn't enough space in the xorb
*
* Todo: add bg4 compression maybe?
*/
function writeChunk(xorb: Uint8Array, offset: number, chunk: Uint8Array): number {
const compressedChunk = lz4_compress(chunk);
const chunkToWrite = compressedChunk.length < chunk.length ? compressedChunk : chunk;

if (offset + XET_CHUNK_HEADER_BYTES + chunkToWrite.length > XORB_SIZE) {
return 0;
}

xorb[offset] = 0;
xorb[offset + 1] = chunkToWrite.length & 0xff;
xorb[offset + 2] = (chunkToWrite.length >> 8) & 0xff;
xorb[offset + 3] = (chunkToWrite.length >> 16) & 0xff;
xorb[offset + 4] =
chunkToWrite.length < chunk.length ? XetChunkCompressionScheme.LZ4 : XetChunkCompressionScheme.None;
xorb[offset + 5] = chunk.length & 0xff;
xorb[offset + 6] = (chunk.length >> 8) & 0xff;
xorb[offset + 7] = (chunk.length >> 16) & 0xff;

xorb.set(chunkToWrite, offset + XET_CHUNK_HEADER_BYTES);
return offset + XET_CHUNK_HEADER_BYTES + chunkToWrite.length;
}
2 changes: 1 addition & 1 deletion packages/hub/src/vendor/lz4js/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ export function decompress(src: Uint8Array, maxSize: number) {
// Compresses a buffer to an Lz4 frame. maxSize is optional; if not provided,
// a buffer will be created based on the theoretical worst output size for a
// given input size. The buffer returned will always be perfectly-sized.
export function compress(src: Uint8Array, maxSize: number) {
export function compress(src: Uint8Array, maxSize?: number) {
let dst, size;

if (maxSize === undefined) {
Expand Down
7 changes: 7 additions & 0 deletions packages/hub/src/vendor/xet-chunk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Xet-Chunk

WASM utilities to chunk & hash data.

An assembly script implementation is available in `@huggingface/xetchunk-wasm`, but for performance reasons we're using WASM directly compiled from the rust source, see https://github.com/huggingface/xet-core/tree/main/hf_xet_wasm which on my local machine processes data at 480MB/s.

We hope in the future to include the build step directly in this package, or to use assembly script WASM (but blake 3 hashing perf needs to be significantly improved).
62 changes: 62 additions & 0 deletions packages/hub/src/vendor/xet-chunk/chunker_wasm.ts
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewrote the file generated by wasmbindgen to TS + make it work in node & web + add init() function

Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// export * from "./chunker_wasm_bg.js";
import * as __glue_imports from "./chunker_wasm_bg.js";
// @ts-expect-error no types
import { __wbg_set_wasm } from "./chunker_wasm_bg.js";
// @ts-expect-error no types
import { wasmBinary } from "./chunker_wasm_bg.wasm.base64.ts";

let initPromise: Promise<void> | null = null;

async function init(): Promise<void> {
if (initPromise) {
return initPromise;
}
let resolve: (value: void) => void;
let reject: (reason?: unknown) => void;
initPromise = new Promise((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});

await Promise.resolve();
Comment on lines +16 to +21
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit suspicious as a method to create a singleton you can't recreate if the wasm import is failing

Copy link
Member Author

@coyotte508 coyotte508 Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this await in particular is to enforce resolve and reject are assigned before running the rest of the code

not sure it's necessary


try {
const wasmModule = await WebAssembly.compile(wasmBinary);
const imports = Object.entries(
WebAssembly.Module.imports(wasmModule).reduce(
(result, item) => ({
...result,
// @ts-expect-error ok for any type
[item.module]: [...(result[item.module] || []), item.name],
}),
{}
)
).map(([from, names]) => ({ from, names }));
const wasm = await WebAssembly.instantiate(wasmModule, {
"./chunker_wasm_bg.js": Object.fromEntries(
// @ts-expect-error ok for any type
(imports[0].names as string[]).map((name) => [name, __glue_imports[name]])
),
});
__wbg_set_wasm(wasm.exports);
// console.log("exports", exports);
// @ts-expect-error it's assigned
wasm.exports.__wbindgen_start();

// @ts-expect-error it's assigned
resolve();
} catch (error) {
// @ts-expect-error it's assigned
reject(error);
}
}

init();

export { init };

export { compute_xorb_hash, Chunker } from "./chunker_wasm_bg.js";

// const exports = WebAssembly.Module.exports(wasmModule).map((item) => item.name);

// console.log("imports", imports);
9 changes: 9 additions & 0 deletions packages/hub/src/vendor/xet-chunk/chunker_wasm_bg.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/* tslint:disable */
/* eslint-disable */
export function compute_xorb_hash(chunks_array: any): string;
export class Chunker {
free(): void;
constructor(target_chunk_size: number);
add_data(data: Uint8Array): Array<{ hash: string; length: number }>;
finish(): Array<{ hash: string; length: number }>;
}
Loading
Loading