Skip to content

Commit dba172c

Browse files
committed
parse chunks
1 parent a4502c2 commit dba172c

File tree

1 file changed

+101
-7
lines changed

1 file changed

+101
-7
lines changed

packages/hub/src/utils/XetBlob.ts

Lines changed: 101 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,25 @@ interface ReconstructionInfo {
5050
offset_into_first_range: number;
5151
}
5252

53+
enum CompressionScheme {
54+
None = 0,
55+
LZ4 = 1,
56+
ByteGroupingLZ4 = 2,
57+
}
58+
59+
const compressionSchemeLabels: Record<CompressionScheme, string> = {
60+
[CompressionScheme.None]: "None",
61+
[CompressionScheme.LZ4]: "LZ4",
62+
[CompressionScheme.ByteGroupingLZ4]: "ByteGroupingLZ4",
63+
};
64+
65+
interface ChunkHeader {
66+
version: number; // u8, 1 byte
67+
compressed_length: number; // 3 * u8, 3 bytes
68+
compression_scheme: CompressionScheme; // u8, 1 byte
69+
uncompressed_length: number; // 3 * u8, 3 bytes
70+
}
71+
5372
/**
5473
* XetBlob is a blob implementation that fetches data directly from the Xet storage
5574
*/
@@ -169,12 +188,87 @@ export class XetBlob extends Blob {
169188
// todo: handle chunk ranges
170189
let done = false;
171190
let isFirstChunk = true;
172-
while (!done) {
173-
const { value, done: doneValue } = await reader.read();
174-
done = doneValue;
175-
if (value) {
176-
yield isFirstChunk ? value.slice(reconstructionInfo.offset_into_first_range) : value;
177-
isFirstChunk = false;
191+
let chunksToSkip = term.range.start - fetchInfo.range.start;
192+
let chunksToRead = term.range.end - term.range.start;
193+
let bytesToSkip = 0;
194+
let bytesToRead = 0;
195+
196+
let leftoverBytes: Uint8Array | undefined = undefined;
197+
198+
readChunks: while (!done) {
199+
const result = await reader.read();
200+
done = result.done;
201+
if (result.value) {
202+
while (1) {
203+
if (bytesToSkip) {
204+
if (bytesToSkip >= result.value.length) {
205+
bytesToSkip -= result.value.length;
206+
continue readChunks;
207+
}
208+
result.value = result.value.slice(bytesToSkip);
209+
}
210+
if (bytesToRead) {
211+
if (bytesToRead >= result.value.length) {
212+
yield result.value;
213+
bytesToRead -= result.value.length;
214+
continue readChunks;
215+
}
216+
yield result.value.slice(0, bytesToRead);
217+
result.value = result.value.slice(bytesToRead);
218+
bytesToRead = 0;
219+
}
220+
if (leftoverBytes) {
221+
result.value = new Uint8Array([...leftoverBytes, ...result.value]);
222+
leftoverBytes = undefined;
223+
}
224+
225+
if (result.value.length < 8) {
226+
// We need 8 bytes to parse the chunk header
227+
leftoverBytes = result.value;
228+
continue readChunks;
229+
}
230+
231+
const header = new DataView(result.value.buffer, result.value.byteOffset, 8);
232+
const chunkHeader: ChunkHeader = {
233+
version: header.getUint8(0),
234+
compressed_length: header.getUint8(1) | (header.getUint8(2) << 8) | (header.getUint8(3) << 16),
235+
compression_scheme: header.getUint8(4),
236+
uncompressed_length: header.getUint8(5) | (header.getUint8(6) << 8) | (header.getUint8(7) << 16),
237+
};
238+
239+
if (chunkHeader.version !== 0) {
240+
throw new Error(`Unsupported chunk version ${chunkHeader.version}`);
241+
}
242+
243+
if (chunkHeader.compression_scheme !== CompressionScheme.None) {
244+
throw new Error(
245+
`Unsupported compression scheme ${
246+
compressionSchemeLabels[chunkHeader.compression_scheme] ?? chunkHeader.compression_scheme
247+
}`
248+
);
249+
}
250+
251+
result.value = result.value.slice(8);
252+
253+
if (chunksToSkip) {
254+
chunksToSkip--;
255+
bytesToSkip = chunkHeader.compressed_length;
256+
continue;
257+
}
258+
if (chunksToRead) {
259+
if (isFirstChunk) {
260+
bytesToSkip = reconstructionInfo.offset_into_first_range;
261+
bytesToRead = chunkHeader.uncompressed_length - reconstructionInfo.offset_into_first_range;
262+
isFirstChunk = false;
263+
} else {
264+
bytesToRead = chunkHeader.uncompressed_length;
265+
}
266+
chunksToRead--;
267+
continue;
268+
}
269+
270+
break;
271+
}
178272
}
179273
}
180274
}
@@ -206,7 +300,7 @@ export class XetBlob extends Blob {
206300
},
207301
// todo : use ByteLengthQueuingStrategy when there's good support for it
208302
{
209-
highWaterMark: 1_000, // 1_000 chunks of 1_000 bytes, for 1MB of RAM
303+
highWaterMark: 1_000, // 1_000 chunks for ~1MB of RAM
210304
}
211305
);
212306
}

0 commit comments

Comments
 (0)