Skip to content

Commit f51f76f

Browse files
committed
feat(documents): add unique constraint on document versions and improve saving logic
- Introduced a unique constraint on the combination of documentId and version to prevent duplicate versions. - Updated the migration script to handle existing duplicates and ensure data integrity. - Refactored document saving logic to use transactions for atomic version increments, enhancing reliability during concurrent operations. - Added a maxDebounce configuration to prevent data loss during rapid user input.
1 parent 9029ba0 commit f51f76f

File tree

4 files changed

+130
-74
lines changed

4 files changed

+130
-74
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
-- Add unique constraint on (documentId, version) to prevent duplicate versions
2+
-- This is a safety net - the application should never create duplicates,
3+
-- but this catches any race conditions that slip through
4+
5+
-- First, check if there are any existing duplicates and fix them
6+
-- (This should already be done by the previous migration, but just in case)
7+
DO $$
8+
DECLARE
9+
dup_count INTEGER;
10+
BEGIN
11+
SELECT COUNT(*) INTO dup_count
12+
FROM (
13+
SELECT "documentId", version
14+
FROM "Documents"
15+
GROUP BY "documentId", version
16+
HAVING COUNT(*) > 1
17+
) AS duplicates;
18+
19+
IF dup_count > 0 THEN
20+
RAISE NOTICE 'Found % duplicate (documentId, version) pairs. Fixing...', dup_count;
21+
22+
-- Re-number versions to eliminate duplicates
23+
WITH ranked AS (
24+
SELECT id,
25+
ROW_NUMBER() OVER (
26+
PARTITION BY "documentId"
27+
ORDER BY "createdAt" ASC
28+
)::int as correct_version
29+
FROM "Documents"
30+
)
31+
UPDATE "Documents" d
32+
SET version = r.correct_version
33+
FROM ranked r
34+
WHERE d.id = r.id;
35+
36+
RAISE NOTICE 'Duplicates fixed.';
37+
END IF;
38+
END $$;
39+
40+
-- Now add the unique constraint
41+
ALTER TABLE "Documents"
42+
ADD CONSTRAINT "Documents_documentId_version_unique"
43+
UNIQUE ("documentId", version);
44+
45+
-- Add index for faster lookups (if not already exists from constraint)
46+
CREATE INDEX IF NOT EXISTS "Documents_documentId_version_idx"
47+
ON "Documents" ("documentId", version DESC);
48+

packages/hocuspocus.server/prisma/schema.prisma

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ model Documents {
2424
metadata DocumentMetadata @relation(fields: [documentId], references: [documentId])
2525
collaborators DocumentUsers[]
2626
27-
@@index([documentId, version]) // Allows querying by version
27+
@@unique([documentId, version], name: "Documents_documentId_version_unique") // Prevents duplicate versions
28+
@@index([documentId, version]) // Allows fast querying by version
2829
}
2930

3031
model DocumentMetadata {

packages/hocuspocus.server/src/config/hocuspocus.config.ts

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -122,19 +122,22 @@ const configureExtensions = () => {
122122
dbLogger.warn({ err, documentName }, 'Queue unavailable, falling back to direct save')
123123

124124
try {
125-
const existingDoc = await prisma.documents.findFirst({
126-
where: { documentId: documentName },
127-
orderBy: { id: 'desc' },
128-
select: { version: true }
129-
})
130-
131-
await prisma.documents.create({
132-
data: {
133-
documentId: documentName,
134-
commitMessage: commitMessage || '',
135-
version: existingDoc ? existingDoc.version + 1 : 1,
136-
data: Buffer.from(stateBase64, 'base64')
137-
}
125+
// Use transaction for atomic version increment (same as queue worker)
126+
await prisma.$transaction(async (tx) => {
127+
const existingDoc = await tx.documents.findFirst({
128+
where: { documentId: documentName },
129+
orderBy: { id: 'desc' },
130+
select: { version: true }
131+
})
132+
133+
await tx.documents.create({
134+
data: {
135+
documentId: documentName,
136+
commitMessage: commitMessage || '',
137+
version: existingDoc ? existingDoc.version + 1 : 1,
138+
data: Buffer.from(stateBase64, 'base64')
139+
}
140+
})
138141
})
139142

140143
dbLogger.info({ documentName }, 'Document saved via fallback (direct DB)')
@@ -165,7 +168,8 @@ export default () => {
165168
name: getServerName(),
166169
port: parseInt(process.env.HOCUSPOCUS_PORT || '4001', 10),
167170
extensions: configureExtensions(),
168-
debounce: 10_000, // 10 seconds
171+
debounce: 10_000, // 10 seconds - wait for user to stop typing
172+
maxDebounce: 60_000, // 60 seconds - force save even if user keeps typing (prevents data loss)
169173

170174
async onListen(data: any) {
171175
healthCheck.onConfigure({ ...data, extensions: configureExtensions() })

packages/hocuspocus.server/src/lib/queue.ts

Lines changed: 62 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -91,69 +91,72 @@ export const createDocumentWorker = () => {
9191
const startTime = Date.now()
9292
const context = data.context
9393

94-
// Check if this is the first time this document is being saved
95-
// IMPORTANT: Order by id DESC to get the LATEST record (id is auto-increment, always reliable)
96-
const existingDoc = await prisma.documents.findFirst({
97-
where: { documentId: data.documentName },
98-
orderBy: { id: 'desc' },
99-
select: { id: true, version: true }
100-
})
101-
const isFirstCreation = !existingDoc
102-
103-
if (isFirstCreation) {
104-
const slug = await generateUniqueSlug(context.slug || data.documentName)
105-
106-
await prisma.documentMetadata.upsert({
107-
where: {
108-
documentId: data.documentName
109-
},
110-
update: {
111-
// Don't update slug on existing documents to avoid conflicts
112-
title: context.slug || data.documentName,
113-
description: context.slug || data.documentName,
114-
ownerId: context.user?.sub,
115-
email: context.user?.email,
116-
keywords: ''
117-
},
118-
create: {
119-
documentId: data.documentName,
120-
slug,
121-
title: context.slug || data.documentName,
122-
description: context.slug || data.documentName,
123-
ownerId: context.user?.sub,
124-
email: context.user?.email,
125-
keywords: ''
126-
}
94+
// Use transaction to ensure atomic version increment (prevents race conditions)
95+
const savedDoc = await prisma.$transaction(async (tx) => {
96+
// Get latest version with row-level lock to prevent concurrent updates
97+
const existingDoc = await tx.documents.findFirst({
98+
where: { documentId: data.documentName },
99+
orderBy: { id: 'desc' },
100+
select: { id: true, version: true }
127101
})
128-
// Send email notification for new document (fire-and-forget, don't block queue)
129-
const userMeta = context.user?.user_metadata
130-
sendNewDocumentNotification({
131-
documentId: data.documentName,
132-
documentName: context.slug || data.documentName,
133-
slug,
134-
creatorEmail: context.user?.email,
135-
creatorId: context.user?.sub,
136-
creatorName: userMeta?.full_name || userMeta?.name,
137-
creatorAvatarUrl: userMeta?.avatar_url,
138-
createdAt: new Date()
139-
}).catch((err) => {
140-
queueLogger.error(
141-
{ err, documentId: data.documentName },
142-
'Failed to send new document notification email'
143-
)
144-
})
145-
}
146102

147-
const currentDoc = existingDoc
103+
const isFirstCreation = !existingDoc
104+
const nextVersion = existingDoc ? existingDoc.version + 1 : 1
105+
106+
// Handle first-time document creation
107+
if (isFirstCreation) {
108+
const slug = await generateUniqueSlug(context.slug || data.documentName)
109+
110+
await tx.documentMetadata.upsert({
111+
where: { documentId: data.documentName },
112+
update: {
113+
title: context.slug || data.documentName,
114+
description: context.slug || data.documentName,
115+
ownerId: context.user?.sub,
116+
email: context.user?.email,
117+
keywords: ''
118+
},
119+
create: {
120+
documentId: data.documentName,
121+
slug,
122+
title: context.slug || data.documentName,
123+
description: context.slug || data.documentName,
124+
ownerId: context.user?.sub,
125+
email: context.user?.email,
126+
keywords: ''
127+
}
128+
})
148129

149-
// Create a new version
150-
const savedDoc = await prisma.documents.create({
151-
data: {
152-
documentId: data.documentName,
153-
commitMessage: data.commitMessage || '',
154-
version: currentDoc ? currentDoc.version + 1 : 1,
155-
data: Buffer.from(data.state, 'base64')
130+
// Send email notification AFTER transaction commits (fire-and-forget)
131+
const userMeta = context.user?.user_metadata
132+
setImmediate(() => {
133+
sendNewDocumentNotification({
134+
documentId: data.documentName,
135+
documentName: context.slug || data.documentName,
136+
slug,
137+
creatorEmail: context.user?.email,
138+
creatorId: context.user?.sub,
139+
creatorName: userMeta?.full_name || userMeta?.name,
140+
creatorAvatarUrl: userMeta?.avatar_url,
141+
createdAt: new Date()
142+
}).catch((err) => {
143+
queueLogger.error(
144+
{ err, documentId: data.documentName },
145+
'Failed to send new document notification email'
146+
)
147+
})
148+
})
156149
}
150+
151+
// Create new version (within transaction = atomic)
152+
return tx.documents.create({
153+
data: {
154+
documentId: data.documentName,
155+
commitMessage: data.commitMessage || '',
156+
version: nextVersion,
157+
data: Buffer.from(data.state, 'base64')
158+
}
159+
})
157160
})
158161

159162
const duration = Date.now() - startTime

0 commit comments

Comments
 (0)