Skip to content

Commit b4511b7

Browse files
authored
Automatically sharePendingBlobs (#25731)
1 parent a33a2e3 commit b4511b7

File tree

3 files changed

+89
-27
lines changed

3 files changed

+89
-27
lines changed

packages/runtime/container-runtime/src/blobManager/blobManager.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -924,7 +924,6 @@ export class BlobManager {
924924
public readonly sharePendingBlobs = async (): Promise<void> => {
925925
const localIdsToUpload = [...this.pendingOnlyLocalIds];
926926
this.pendingOnlyLocalIds.clear();
927-
// TODO: Determine if Promise.all is ergonomic at the callsite. Would Promise.allSettled be better?
928927
await Promise.all<void>(
929928
localIdsToUpload.map(async (localId) => this.uploadAndAttach(localId)),
930929
);

packages/runtime/container-runtime/src/containerRuntime.ts

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1215,6 +1215,8 @@ export class ContainerRuntime
12151215
recentBatchInfo,
12161216
);
12171217

1218+
runtime.sharePendingBlobs();
1219+
12181220
// Initialize the base state of the runtime before it's returned.
12191221
await runtime.initializeBaseState(context.loader);
12201222

@@ -4549,7 +4551,6 @@ export class ContainerRuntime
45494551
contents: any,
45504552
localOpMetadata: unknown = undefined,
45514553
): void {
4552-
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
45534554
this.submit({ type, contents }, localOpMetadata);
45544555
}
45554556

@@ -5134,6 +5135,43 @@ export class ContainerRuntime
51345135
);
51355136
}
51365137

5138+
/**
5139+
* ContainerRuntime knows about additional restrictions on when blob sharing can be resumed as compared
5140+
* to BlobManager. In particular, it wants to avoid sharing blobs while in readonly state, and it also
5141+
* wants to avoid sharing blobs before connection completes (otherwise it may cause the sharing to happen
5142+
* before processing shared ops).
5143+
*
5144+
* This method can be called safely before those conditions are met. In the background, it will wait until
5145+
* it is safe before initiating sharing. It will close the container on any error.
5146+
*/
5147+
public sharePendingBlobs = (): void => {
5148+
new Promise<void>((resolve) => {
5149+
// eslint-disable-next-line unicorn/consistent-function-scoping
5150+
const canStartSharing = (): boolean =>
5151+
this.connected && this.deltaManager.readOnlyInfo.readonly !== true;
5152+
5153+
if (canStartSharing()) {
5154+
resolve();
5155+
return;
5156+
}
5157+
5158+
const checkCanShare = (readonly: boolean): void => {
5159+
if (canStartSharing()) {
5160+
this.deltaManager.off("readonly", checkCanShare);
5161+
this.off("connected", checkCanShare);
5162+
resolve();
5163+
}
5164+
};
5165+
this.deltaManager.on("readonly", checkCanShare);
5166+
this.on("connected", checkCanShare);
5167+
})
5168+
.then(this.blobManager.sharePendingBlobs)
5169+
// It may not be necessary to close the container on failures - this should just mean there's
5170+
// a handle in the container that is stuck pending, which is a scenario that customers need to
5171+
// handle anyway. Starting with this more aggressive/restrictive behavior to be cautious.
5172+
.catch(this.closeFn);
5173+
};
5174+
51375175
public summarizeOnDemand(options: IOnDemandSummarizeOptions): ISummarizeResults {
51385176
if (this._summarizer !== undefined) {
51395177
return this._summarizer.summarizeOnDemand(options);

packages/test/test-end-to-end-tests/src/test/offline/stashedOps.spec.ts

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1660,14 +1660,16 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => {
16601660
"Expect the upload hasn't completed yet",
16611661
);
16621662

1663+
// Second container loads with the pending state
16631664
const container2 = await loader.resolve({ url }, pendingState);
16641665
const dataStore2 = (await container2.getEntryPoint()) as ITestFluidObject;
16651666
const map2 = await dataStore2.getSharedObject<ISharedMap>(mapId);
16661667

1667-
// TODO: We've not yet decided where to expose sharePendingBlobs(), so for now casting and reaching.
1668-
// Replace with calling the proper API when available.
1669-
// Share the pending blob so container1 will be able to find it.
1670-
await (dataStore2.context.containerRuntime as any).blobManager.sharePendingBlobs();
1668+
// Third container verifies that it can access the blob despite not being the original uploader or having the pending state
1669+
const container3 = await loader.resolve({ url });
1670+
const dataStore3 = (await container3.getEntryPoint()) as ITestFluidObject;
1671+
const map3 = await dataStore3.getSharedObject<ISharedMap>(mapId);
1672+
16711673
await provider.ensureSynchronized();
16721674
assert.strictEqual(
16731675
bufferToString(await map1.get("blob handle").get(), "utf8"),
@@ -1677,6 +1679,10 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => {
16771679
bufferToString(await map2.get("blob handle").get(), "utf8"),
16781680
"blob contents",
16791681
);
1682+
assert.strictEqual(
1683+
bufferToString(await map3.get("blob handle").get(), "utf8"),
1684+
"blob contents",
1685+
);
16801686
});
16811687

16821688
it("close while uploading multiple blob", async function () {
@@ -1710,13 +1716,16 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => {
17101716
"Expect none of the uploads have completed yet",
17111717
);
17121718

1719+
// Second container loads with the pending state
17131720
const container2 = await loader.resolve({ url }, pendingState);
17141721
const dataStore2 = (await container2.getEntryPoint()) as ITestFluidObject;
17151722
const map2 = await dataStore2.getSharedObject<ISharedMap>(mapId);
1716-
// TODO: We've not yet decided where to expose sharePendingBlobs(), so for now casting and reaching.
1717-
// Replace with calling the proper API when available.
1718-
// Share the pending blob so container1 will be able to find it.
1719-
await (dataStore2.context.containerRuntime as any).blobManager.sharePendingBlobs();
1723+
1724+
// Third container verifies that it can access the blobs despite not being the original uploader or having the pending state
1725+
const container3 = await loader.resolve({ url });
1726+
const dataStore3 = (await container3.getEntryPoint()) as ITestFluidObject;
1727+
const map3 = await dataStore3.getSharedObject<ISharedMap>(mapId);
1728+
17201729
await provider.ensureSynchronized();
17211730
for (let i = 1; i <= 3; i++) {
17221731
assert.strictEqual(
@@ -1727,6 +1736,10 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => {
17271736
bufferToString(await map2.get(`blob handle ${i}`).get(), "utf8"),
17281737
`blob contents ${i}`,
17291738
);
1739+
assert.strictEqual(
1740+
bufferToString(await map3.get(`blob handle ${i}`).get(), "utf8"),
1741+
`blob contents ${i}`,
1742+
);
17301743
}
17311744
});
17321745

@@ -1743,34 +1756,40 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => {
17431756
const pendingState = await container.container.getPendingLocalState();
17441757
container.container.close();
17451758

1746-
const container3 = await loadContainerOffline(
1759+
// Second container loads offline with the pending state
1760+
const container2 = await loadContainerOffline(
17471761
testContainerConfig,
17481762
provider,
17491763
{ url },
17501764
pendingState,
17511765
);
1752-
const dataStore3 = (await container3.container.getEntryPoint()) as ITestFluidObject;
1753-
const map3 = await dataStore3.getSharedObject<ISharedMap>(mapId);
1766+
const dataStore2 = (await container2.container.getEntryPoint()) as ITestFluidObject;
1767+
const map2 = await dataStore2.getSharedObject<ISharedMap>(mapId);
17541768

17551769
// blob is accessible offline
17561770
assert.strictEqual(
1757-
bufferToString(await map3.get("blob handle 1").get(), "utf8"),
1771+
bufferToString(await map2.get("blob handle 1").get(), "utf8"),
17581772
"blob contents 1",
17591773
);
1760-
container3.connect();
1761-
await waitForContainerConnection(container3.container);
1762-
// TODO: We've not yet decided where to expose sharePendingBlobs(), so for now casting and reaching.
1763-
// Replace with calling the proper API when available.
1764-
// Share the pending blob so container1 will be able to find it.
1765-
await (dataStore3.context.containerRuntime as any).blobManager.sharePendingBlobs();
1774+
container2.connect();
1775+
await waitForContainerConnection(container2.container);
1776+
1777+
// Third container verifies that it can access the blob despite not being the original uploader or having the pending state
1778+
const container3 = await loader.resolve({ url });
1779+
const dataStore3 = (await container3.getEntryPoint()) as ITestFluidObject;
1780+
const map3 = await dataStore3.getSharedObject<ISharedMap>(mapId);
17661781
await provider.ensureSynchronized();
17671782

17681783
assert.strictEqual(
1769-
bufferToString(await map3.get("blob handle 1").get(), "utf8"),
1784+
bufferToString(await map1.get("blob handle 1").get(), "utf8"),
17701785
"blob contents 1",
17711786
);
17721787
assert.strictEqual(
1773-
bufferToString(await map1.get("blob handle 1").get(), "utf8"),
1788+
bufferToString(await map2.get("blob handle 1").get(), "utf8"),
1789+
"blob contents 1",
1790+
);
1791+
assert.strictEqual(
1792+
bufferToString(await map3.get("blob handle 1").get(), "utf8"),
17741793
"blob contents 1",
17751794
);
17761795
});
@@ -1788,21 +1807,27 @@ describeCompat("stashed ops", "NoCompat", (getTestObjectProvider, apis) => {
17881807
const pendingState = await container.container.getPendingLocalState();
17891808
container.container.close();
17901809

1791-
const container3 = await loader.resolve({ url }, pendingState);
1810+
// Second container loads with the pending state
1811+
const container2 = await loader.resolve({ url }, pendingState);
1812+
const dataStore2 = (await container2.getEntryPoint()) as ITestFluidObject;
1813+
const map2 = await dataStore2.getSharedObject<ISharedMap>(mapId);
1814+
1815+
// Third container verifies that it can access the blob despite not being the original uploader or having the pending state
1816+
const container3 = await loader.resolve({ url });
17921817
const dataStore3 = (await container3.getEntryPoint()) as ITestFluidObject;
17931818
const map3 = await dataStore3.getSharedObject<ISharedMap>(mapId);
17941819

1795-
// TODO: We've not yet decided where to expose sharePendingBlobs(), so for now casting and reaching.
1796-
// Replace with calling the proper API when available.
1797-
// Share the pending blob so container1 will be able to find it.
1798-
await (dataStore3.context.containerRuntime as any).blobManager.sharePendingBlobs();
17991820
await provider.ensureSynchronized();
18001821

18011822
// Blob is uploaded and accessible by all clients
18021823
assert.strictEqual(
18031824
bufferToString(await map1.get("blob handle 1").get(), "utf8"),
18041825
"blob contents 1",
18051826
);
1827+
assert.strictEqual(
1828+
bufferToString(await map2.get("blob handle 1").get(), "utf8"),
1829+
"blob contents 1",
1830+
);
18061831
assert.strictEqual(
18071832
bufferToString(await map3.get("blob handle 1").get(), "utf8"),
18081833
"blob contents 1",

0 commit comments

Comments
 (0)