Skip to content

Commit a514086

Browse files
committed
Support LZ4 decompression
1 parent cc91ec5 commit a514086

File tree

1 file changed

+41
-32
lines changed

1 file changed

+41
-32
lines changed

packages/hub/src/utils/XetBlob.ts

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { createApiError } from "../error";
33
import type { CredentialsParams, RepoDesignation, RepoId } from "../types/public";
44
import { checkCredentials } from "./checkCredentials";
55
import { toRepoId } from "./toRepoId";
6+
import lz4 from "lz4js";
67

78
const JWT_SAFETY_PERIOD = 60_000;
89
const JWT_CACHE_SIZE = 1_000;
@@ -69,6 +70,8 @@ interface ChunkHeader {
6970
uncompressed_length: number; // 3 * u8, 3 bytes
7071
}
7172

73+
const CHUNK_HEADER_BYTES = 8;
74+
7275
/**
7376
* XetBlob is a blob implementation that fetches data directly from the Xet storage
7477
*/
@@ -160,6 +163,7 @@ export class XetBlob extends Blob {
160163

161164
async function* readData(reconstructionInfo: ReconstructionInfo, customFetch: typeof fetch, maxBytes: number) {
162165
let totalBytesRead = 0;
166+
let isFirstChunk = true;
163167

164168
for (const term of reconstructionInfo.terms) {
165169
if (totalBytesRead >= maxBytes) {
@@ -190,40 +194,25 @@ export class XetBlob extends Blob {
190194
throw new Error("Failed to get reader from response body");
191195
}
192196

193-
// todo: handle chunk ranges
194197
let done = false;
195-
let isFirstChunk = true;
196198
let chunksToSkip = term.range.start - fetchInfo.range.start;
197199
let chunksToRead = term.range.end - term.range.start;
198200
let bytesToSkip = 0;
199-
let bytesToRead = 0;
200201

201202
let leftoverBytes: Uint8Array | undefined = undefined;
202203

203204
readChunks: while (!done && totalBytesRead < maxBytes) {
204205
const result = await reader.read();
205206
done = result.done;
206207
if (result.value) {
207-
while (totalBytesRead < maxBytes) {
208+
while (totalBytesRead < maxBytes && chunksToRead) {
208209
if (bytesToSkip) {
209210
if (bytesToSkip >= result.value.length) {
210211
bytesToSkip -= result.value.length;
211212
continue readChunks;
212213
}
213214
result.value = result.value.slice(bytesToSkip);
214215
}
215-
if (bytesToRead) {
216-
if (bytesToRead >= result.value.length) {
217-
yield result.value;
218-
bytesToRead -= result.value.length;
219-
totalBytesRead += result.value.length;
220-
continue readChunks;
221-
}
222-
yield result.value.slice(0, bytesToRead);
223-
result.value = result.value.slice(bytesToRead);
224-
totalBytesRead += bytesToRead;
225-
bytesToRead = 0;
226-
}
227216
if (leftoverBytes) {
228217
result.value = new Uint8Array([...leftoverBytes, ...result.value]);
229218
leftoverBytes = undefined;
@@ -235,7 +224,7 @@ export class XetBlob extends Blob {
235224
continue readChunks;
236225
}
237226

238-
const header = new DataView(result.value.buffer, result.value.byteOffset, 8);
227+
const header = new DataView(result.value.buffer, result.value.byteOffset, CHUNK_HEADER_BYTES);
239228
const chunkHeader: ChunkHeader = {
240229
version: header.getUint8(0),
241230
compressed_length: header.getUint8(1) | (header.getUint8(2) << 8) | (header.getUint8(3) << 16),
@@ -247,35 +236,55 @@ export class XetBlob extends Blob {
247236
throw new Error(`Unsupported chunk version ${chunkHeader.version}`);
248237
}
249238

250-
if (chunkHeader.compression_scheme !== CompressionScheme.None) {
239+
if (
240+
chunkHeader.compression_scheme !== CompressionScheme.None &&
241+
chunkHeader.compression_scheme !== CompressionScheme.LZ4
242+
) {
251243
throw new Error(
252244
`Unsupported compression scheme ${
253245
compressionSchemeLabels[chunkHeader.compression_scheme] ?? chunkHeader.compression_scheme
254246
}`
255247
);
256248
}
257249

258-
result.value = result.value.slice(8);
259-
260250
if (chunksToSkip) {
261251
chunksToSkip--;
252+
leftoverBytes = result.value.slice(CHUNK_HEADER_BYTES);
262253
bytesToSkip = chunkHeader.compressed_length;
263254
continue;
264255
}
265-
if (chunksToRead) {
266-
if (isFirstChunk) {
267-
bytesToSkip = reconstructionInfo.offset_into_first_range;
268-
bytesToRead = chunkHeader.uncompressed_length - reconstructionInfo.offset_into_first_range;
269-
isFirstChunk = false;
270-
} else {
271-
bytesToRead = chunkHeader.uncompressed_length;
272-
}
273-
bytesToRead = Math.min(bytesToRead, maxBytes - totalBytesRead);
274-
chunksToRead--;
275-
continue;
256+
257+
if (result.value.length < chunkHeader.compressed_length + CHUNK_HEADER_BYTES) {
258+
// We need more data to read the full chunk
259+
leftoverBytes = result.value;
260+
continue readChunks;
276261
}
277262

278-
break;
263+
result.value = result.value.slice(CHUNK_HEADER_BYTES);
264+
265+
const uncompressed =
266+
chunkHeader.compression_scheme === CompressionScheme.LZ4
267+
? lz4.decompress(
268+
result.value.slice(0, chunkHeader.compressed_length),
269+
chunkHeader.uncompressed_length
270+
)
271+
: result.value.slice(0, chunkHeader.compressed_length);
272+
273+
if (isFirstChunk) {
274+
yield uncompressed.slice(
275+
reconstructionInfo.offset_into_first_range,
276+
Math.min(uncompressed.length, reconstructionInfo.offset_into_first_range + maxBytes - totalBytesRead)
277+
);
278+
totalBytesRead += Math.min(
279+
uncompressed.length,
280+
reconstructionInfo.offset_into_first_range + maxBytes - totalBytesRead
281+
);
282+
isFirstChunk = false;
283+
} else {
284+
yield uncompressed.slice(0, Math.min(uncompressed.length, maxBytes - totalBytesRead));
285+
totalBytesRead += Math.min(uncompressed.length, maxBytes - totalBytesRead);
286+
}
287+
chunksToRead--;
279288
}
280289
}
281290
}

0 commit comments

Comments
 (0)