Skip to content

Commit 302eb08

Browse files
committed
feat: optimize retry logic
1 parent e0453f0 commit 302eb08

File tree

8 files changed

+52
-31
lines changed

8 files changed

+52
-31
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export class SkipFurtherHooksError extends Error {
2+
constructor(message?: string) {
3+
super(message ?? "Further hooks skipped");
4+
this.name = "SkipFurtherHooksError";
5+
}
6+
}

packages/common/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ export * from "./auth.ts";
22
export * from "./CloseEvents.ts";
33
export * from "./awarenessStatesToArray.ts";
44
export * from "./routingKey.ts";
5+
export * from "./SkipFurtherHooksError.ts";
56
export * from "./types.ts";

packages/extension-redis/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
"@types/lodash.debounce": "^4.0.6"
3232
},
3333
"dependencies": {
34+
"@hocuspocus/common": "workspace:^",
3435
"@hocuspocus/server": "workspace:^",
3536
"@sesamecare-oss/redlock": "^1.4.0",
3637
"ioredis": "~5.6.1",

packages/extension-redis/src/Redis.ts

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
import crypto from "node:crypto";
22
import type {
3-
Document,
4-
Extension,
5-
Hocuspocus,
6-
RedisTransactionOrigin,
73
afterLoadDocumentPayload,
84
afterStoreDocumentPayload,
95
afterUnloadDocumentPayload,
106
beforeBroadcastStatelessPayload,
117
beforeUnloadDocumentPayload,
8+
Document,
9+
Extension,
10+
Hocuspocus,
1211
onAwarenessUpdatePayload,
1312
onChangePayload,
1413
onConfigurePayload,
1514
onStoreDocumentPayload,
15+
RedisTransactionOrigin,
1616
} from "@hocuspocus/server";
17+
import { SkipFurtherHooksError } from "@hocuspocus/common";
1718
import {
1819
IncomingMessage,
20+
isTransactionOrigin,
1921
MessageReceiver,
2022
OutgoingMessage,
21-
isTransactionOrigin,
2223
} from "@hocuspocus/server";
2324
import {
2425
ExecutionError,
@@ -266,10 +267,8 @@ export class Redis implements Extension {
266267
"The operation was unable to achieve a quorum during its retry window."
267268
) {
268269
// Expected behavior: Could not acquire lock, another instance locked it already.
269-
// No further `onStoreDocument` hooks will be executed; should throw a silent error with no message.
270-
throw new Error("", {
271-
cause: "Could not acquire lock, another instance locked it already.",
272-
});
270+
// Skip further hooks and retry — the data is safe on the other instance.
271+
throw new SkipFurtherHooksError("Another instance is already storing this document");
273272
}
274273
//unexpected error
275274
console.error("unexpected error:", error);

packages/server/src/Hocuspocus.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import crypto from "node:crypto";
2-
import { awarenessStatesToArray, ResetConnection } from "@hocuspocus/common";
2+
import { awarenessStatesToArray, ResetConnection, SkipFurtherHooksError } from "@hocuspocus/common";
33
import { applyUpdate, Doc, encodeStateAsUpdate } from "yjs";
44
import meta from "../package.json" with { type: "json" };
55

@@ -321,7 +321,7 @@ export class Hocuspocus<Context = any> {
321321
context?: Context,
322322
): Promise<Document> {
323323
if (!documentName.trim()) {
324-
throw new Error('Document name must not be empty')
324+
throw new Error("Document name must not be empty");
325325
}
326326

327327
const existingLoadingDoc = this.loadingDocuments.get(documentName);
@@ -473,9 +473,20 @@ export class Hocuspocus<Context = any> {
473473
await this.hooks("afterStoreDocument", hookPayload);
474474
});
475475
} catch (error: any) {
476-
console.error("Caught error during storeDocumentHooks, retrying", error);
477-
// Retry to avoid data loss — the document stays in memory until the store succeeds
478-
this.storeDocumentHooks(document, hookPayload);
476+
if (error instanceof SkipFurtherHooksError) {
477+
// Another extension handled this — proceed to unload
478+
setTimeout(() => {
479+
if (this.shouldUnloadDocument(document)) {
480+
this.unloadDocument(document);
481+
}
482+
}, 0);
483+
return;
484+
}
485+
486+
console.error(
487+
"Caught error during storeDocumentHooks. Document stays in memory to avoid data loss",
488+
error,
489+
);
479490
return;
480491
}
481492

pnpm-lock.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/extension-redis/onStoreDocument.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ test('document gets unloaded on both servers after disconnection', async t => {
146146
t.is(server.documents.size, 0)
147147

148148
resolve('')
149-
}, 5000) // must be higher than RedisExtension.disconnectDelay
149+
}, 10000) // must be higher than CustomStorageExtension delay (3s) + RedisExtension.disconnectDelay (1s)
150150
}, 1500)
151151

152152
},

tests/server/onStoreDocument.ts

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -233,29 +233,29 @@ test("stops when one of the onStoreDocument hooks throws a non-Error value", asy
233233
});
234234
});
235235

236-
test("retries onStoreDocument on transient failure and succeeds", async (t) => {
236+
test("keeps document in memory when onStoreDocument fails", async (t) => {
237237
await new Promise(async (resolve) => {
238-
let callCount = 0;
239-
240238
const server = await newHocuspocus(t, {
241239
debounce: 100,
242-
async onStoreDocument({ document }) {
243-
callCount++;
244-
if (callCount < 3) {
245-
throw new Error("transient failure");
246-
}
247-
// Third attempt succeeds
248-
const value = document.getArray("foo").get(0);
249-
t.is(value, "bar");
250-
t.is(callCount, 3, "should succeed on third attempt");
251-
resolve("done");
240+
async onStoreDocument() {
241+
throw new Error("storage unavailable");
252242
},
253243
});
254244

255-
const provider = newHocuspocusProvider(t, server);
245+
const socket = newHocuspocusProviderWebsocket(t, server);
256246

257-
provider.on("synced", () => {
258-
provider.document.getArray("foo").insert(0, ["bar"]);
247+
const provider = newHocuspocusProvider(t, server, {
248+
websocketProvider: socket,
249+
onSynced() {
250+
provider.document.getArray("foo").push(["bar"]);
251+
socket.destroy();
252+
253+
setTimeout(() => {
254+
// Document must remain in memory since the store failed
255+
t.is(server.getDocumentsCount(), 1);
256+
resolve("done");
257+
}, 500);
258+
},
259259
});
260260
});
261261
});

0 commit comments

Comments
 (0)