Skip to content

Commit a4502c2

Browse files
committed
Progress in reconstruction algorithm
1 parent 4f316c6 commit a4502c2

File tree

3 files changed

+127
-11
lines changed

3 files changed

+127
-11
lines changed

CONTRIBUTING.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ It's not a hard requirement, but please consider using an icon from [Gitmoji](ht
1818

1919
## Tests
2020

21-
If you want to run only specific tests, you can do `pnpm test -- -t "test name"`
21+
If you want to run only specific tests, you can do `pnpm test -- -t "test name"`.
22+
23+
You can also do `npx vitest ./packages/hub/src/utils/XetBlob.spec.ts` to run a specific test file.
2224

2325
## Adding a package
2426

packages/hub/src/utils/XetBlob.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ describe("XetBlob", () => {
88
type: "model",
99
name: "celinah/xet-experiments",
1010
},
11-
hash: " 7b3b6d07673a88cf467e67c1f7edef1a8c268cbf66e9dd9b0366322d4ab56d9b",
11+
hash: "7b3b6d07673a88cf467e67c1f7edef1a8c268cbf66e9dd9b0366322d4ab56d9b",
1212
size: 5_234_139_343,
1313
});
1414

1515
expect(await blob.slice(10, 22).text()).toBe("__metadata__");
16-
});
16+
}, 30_000);
1717
});

packages/hub/src/utils/XetBlob.ts

Lines changed: 122 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,38 @@ type XetBlobCreateOptions = {
1818
size: number;
1919
} & Partial<CredentialsParams>;
2020

21+
interface ReconstructionInfo {
22+
/**
23+
* List of CAS blocks
24+
*/
25+
terms: Array<{
26+
/** Hash of the CAS block */
27+
hash: string;
28+
/** Total uncompressed length of the CAS block */
29+
unpacked_length: number;
30+
/** Chunks. Eg start: 10, end: 100 = chunks 10-99 */
31+
range: { start: number; end: number };
32+
}>;
33+
34+
/**
35+
* Dictionnary of CAS block hash => list of ranges in the block + url to fetch it
36+
*/
37+
fetch_info: Record<
38+
string,
39+
Array<{
40+
url: string;
41+
/** Chunk range */
42+
range: { start: number; end: number };
43+
/** Byte range, when making the call to the URL */
44+
url_range: { start: number; end: number };
45+
}>
46+
>;
47+
/**
48+
* When doing a range request, the offset into the first range
49+
*/
50+
offset_into_first_range: number;
51+
}
52+
2153
/**
2254
* XetBlob is a blob implementation that fetches data directly from the Xet storage
2355
*/
@@ -29,7 +61,7 @@ export class XetBlob extends Blob {
2961
hash: string;
3062
start = 0;
3163
end = 0;
32-
reconstructionInfo: { terms: unknown[]; fetch_info: unknown } | undefined;
64+
reconstructionInfo: ReconstructionInfo | undefined;
3365

3466
constructor(params: XetBlobCreateOptions) {
3567
super([]);
@@ -70,34 +102,113 @@ export class XetBlob extends Blob {
70102
}
71103

72104
const slice = this.#clone();
105+
73106
slice.start = this.start + start;
74107
slice.end = Math.min(this.start + end, this.end);
75108

109+
if (slice.start !== this.start || slice.end !== this.end) {
110+
slice.reconstructionInfo = undefined;
111+
}
112+
76113
return slice;
77114
}
78115

79116
async #fetch(): Promise<ReadableStream<Uint8Array>> {
80117
let connParams = await getAccessToken(this.repoId, this.accessToken, this.fetch, this.hubUrl);
81118

82-
if (!this.reconstructionInfo) {
119+
let reconstructionInfo = this.reconstructionInfo;
120+
if (!reconstructionInfo) {
121+
// console.log(
122+
// `curl '${connParams.casUrl}/reconstruction/${this.hash}' -H 'Authorization: Bearer ${connParams.accessToken}'`
123+
// );
83124
const resp = await this.fetch(`${connParams.casUrl}/reconstruction/${this.hash}`, {
84125
headers: {
85126
Authorization: `Bearer ${connParams.accessToken}`,
127+
Range: `bytes=${this.start}-${this.end - 1}`,
86128
},
87129
});
88130

89131
if (!resp.ok) {
90132
throw await createApiError(resp);
91133
}
92134

93-
this.reconstructionInfo = await resp.json();
94-
console.log("reconstruction info", this.reconstructionInfo);
135+
this.reconstructionInfo = reconstructionInfo = (await resp.json()) as ReconstructionInfo;
95136
}
137+
// todo: also refresh reconstruction info if it's expired, (and avoid concurrent requests when doing so)
96138

97139
// Refetch the token if it's expired
98140
connParams = await getAccessToken(this.repoId, this.accessToken, this.fetch, this.hubUrl);
99141

100-
throw new Error("Reconstruction not implemented: " + JSON.stringify(this.reconstructionInfo));
142+
async function* readData(reconstructionInfo: ReconstructionInfo, customFetch: typeof fetch) {
143+
for (const term of reconstructionInfo.terms) {
144+
const fetchInfo = reconstructionInfo.fetch_info[term.hash].find(
145+
(info) => info.range.start <= term.range.start && info.range.end >= term.range.end
146+
);
147+
148+
if (!fetchInfo) {
149+
throw new Error(
150+
`Failed to find fetch info for term ${term.hash} and range ${term.range.start}-${term.range.end}`
151+
);
152+
}
153+
154+
const resp = await customFetch(fetchInfo.url, {
155+
headers: {
156+
Range: `bytes=${fetchInfo.url_range.start}-${fetchInfo.url_range.end}`,
157+
},
158+
});
159+
160+
if (!resp.ok) {
161+
throw await createApiError(resp);
162+
}
163+
164+
const reader = resp.body?.getReader();
165+
if (!reader) {
166+
throw new Error("Failed to get reader from response body");
167+
}
168+
169+
// todo: handle chunk ranges
170+
let done = false;
171+
let isFirstChunk = true;
172+
while (!done) {
173+
const { value, done: doneValue } = await reader.read();
174+
done = doneValue;
175+
if (value) {
176+
yield isFirstChunk ? value.slice(reconstructionInfo.offset_into_first_range) : value;
177+
isFirstChunk = false;
178+
}
179+
}
180+
}
181+
}
182+
183+
const iterator = readData(reconstructionInfo, this.fetch);
184+
185+
// todo: when Chrome/Safari support it, use ReadableStream.from(readData)
186+
return new ReadableStream<Uint8Array>(
187+
{
188+
// todo: when Safari supports it, type controller as ReadableByteStreamController
189+
async pull(controller) {
190+
const result = await iterator.next();
191+
192+
if (result.value) {
193+
// Split into chunks of 1000 bytes since `ByteLengthQueuingStrategy` fails in Node.js due to size being a function
194+
const chunkSize = 1_000;
195+
for (let i = 0; i < result.value.length; i += chunkSize) {
196+
controller.enqueue(result.value.slice(i, i + chunkSize));
197+
}
198+
}
199+
200+
if (result.done) {
201+
controller.close();
202+
}
203+
},
204+
type: "bytes",
205+
// todo: when Safari supports it, add autoAllocateChunkSize param
206+
},
207+
// todo : use ByteLengthQueuingStrategy when there's good support for it
208+
{
209+
highWaterMark: 1_000, // 1_000 chunks of 1_000 bytes, for 1MB of RAM
210+
}
211+
);
101212
}
102213

103214
override async arrayBuffer(): Promise<ArrayBuffer> {
@@ -182,10 +293,10 @@ async function getAccessToken(
182293
throw new Error(`Failed to get JWT token: ${resp.status} ${await resp.text()}`);
183294
}
184295

185-
const json = await resp.json();
296+
const json: { accessToken: string; casUrl: string; exp: number } = await resp.json();
186297
const jwt = {
187298
repoId,
188-
accessToken: json.token,
299+
accessToken: json.accessToken,
189300
expiresAt: new Date(json.exp * 1000),
190301
initialAccessToken,
191302
hubUrl,
@@ -209,7 +320,10 @@ async function getAccessToken(
209320
}
210321
jwts.set(key, jwt);
211322

212-
return jwt.accessToken;
323+
return {
324+
accessToken: json.accessToken,
325+
casUrl: json.casUrl,
326+
};
213327
})();
214328

215329
jwtPromises.set(repoId.name, promise);

0 commit comments

Comments
 (0)