Skip to content

Commit 01c3454

Browse files
KyleAMathewsclaude
andauthored
feat(query-db-collection): add manual write methods for direct state updates (#303)
Co-authored-by: Claude <[email protected]>
1 parent c208f20 commit 01c3454

File tree

7 files changed

+724
-4
lines changed

7 files changed

+724
-4
lines changed

.changeset/mean-needles-add.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/query-db-collection": patch
3+
---
4+
5+
Add manual write methods to QueryCollectionUtils interface to enable direct state updates from external sources. Introduces writeInsert, writeUpdate, writeDelete, writeUpsert, and writeBatch methods that bypass the normal optimistic update flow for WebSocket/real-time scenarios. All methods include proper transaction handling, data validation, and automatic query cache synchronization.

packages/db/src/collection.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1489,7 +1489,7 @@ export class CollectionImpl<
14891489
return false
14901490
}
14911491

1492-
private validateData(
1492+
public validateData(
14931493
data: unknown,
14941494
type: `insert` | `update`,
14951495
key?: TKey

packages/query-db-collection/src/errors.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,68 @@ export class GetKeyRequiredError extends QueryCollectionError {
3535
this.name = `GetKeyRequiredError`
3636
}
3737
}
38+
39+
export class SyncNotInitializedError extends QueryCollectionError {
40+
constructor() {
41+
super(
42+
`Collection must be in 'ready' state for manual sync operations. Sync not initialized yet.`
43+
)
44+
this.name = `SyncNotInitializedError`
45+
}
46+
}
47+
48+
export class InvalidItemStructureError extends QueryCollectionError {
49+
constructor(message: string) {
50+
super(`Invalid item structure: ${message}`)
51+
this.name = `InvalidItemStructureError`
52+
}
53+
}
54+
55+
export class ItemNotFoundError extends QueryCollectionError {
56+
constructor(key: string | number) {
57+
super(`Item with key '${key}' does not exist.`)
58+
this.name = `ItemNotFoundError`
59+
}
60+
}
61+
62+
export class DuplicateKeyInBatchError extends QueryCollectionError {
63+
constructor(key: string | number) {
64+
super(`Duplicate key '${key}' found within batch operations`)
65+
this.name = `DuplicateKeyInBatchError`
66+
}
67+
}
68+
69+
export class UpdateOperationItemNotFoundError extends QueryCollectionError {
70+
constructor(key: string | number) {
71+
super(`Update operation: Item with key '${key}' does not exist`)
72+
this.name = `UpdateOperationItemNotFoundError`
73+
}
74+
}
75+
76+
export class DeleteOperationItemNotFoundError extends QueryCollectionError {
77+
constructor(key: string | number) {
78+
super(`Delete operation: Item with key '${key}' does not exist`)
79+
this.name = `DeleteOperationItemNotFoundError`
80+
}
81+
}
82+
83+
export class InvalidSyncOperationError extends QueryCollectionError {
84+
constructor(message: string) {
85+
super(`Invalid sync operation: ${message}`)
86+
this.name = `InvalidSyncOperationError`
87+
}
88+
}
89+
90+
export class UnknownOperationTypeError extends QueryCollectionError {
91+
constructor(type: string) {
92+
super(`Unknown operation type: ${type}`)
93+
this.name = `UnknownOperationTypeError`
94+
}
95+
}
96+
97+
export class MissingKeyFieldError extends QueryCollectionError {
98+
constructor(operation: string, message: string) {
99+
super(`${operation} item must contain the key field: ${message}`)
100+
this.name = `MissingKeyFieldError`
101+
}
102+
}

packages/query-db-collection/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ export {
22
queryCollectionOptions,
33
type QueryCollectionConfig,
44
type QueryCollectionUtils,
5+
type SyncOperation,
56
} from "./query"
67

78
export * from "./errors"
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
import {
2+
DeleteOperationItemNotFoundError,
3+
DuplicateKeyInBatchError,
4+
SyncNotInitializedError,
5+
UpdateOperationItemNotFoundError,
6+
} from "./errors"
7+
import type { QueryClient } from "@tanstack/query-core"
8+
import type { ChangeMessage, Collection } from "@tanstack/db"
9+
10+
// Types for sync operations
11+
export type SyncOperation<
12+
TRow extends object,
13+
TKey extends string | number = string | number,
14+
TInsertInput extends object = TRow,
15+
> =
16+
| { type: `insert`; data: TInsertInput | Array<TInsertInput> }
17+
| { type: `update`; data: Partial<TRow> | Array<Partial<TRow>> }
18+
| { type: `delete`; key: TKey | Array<TKey> }
19+
| { type: `upsert`; data: Partial<TRow> | Array<Partial<TRow>> }
20+
21+
export interface SyncContext<
22+
TRow extends object,
23+
TKey extends string | number = string | number,
24+
> {
25+
collection: Collection<TRow>
26+
queryClient: QueryClient
27+
queryKey: Array<unknown>
28+
getKey: (item: TRow) => TKey
29+
begin: () => void
30+
write: (message: Omit<ChangeMessage<TRow>, `key`>) => void
31+
commit: () => void
32+
}
33+
34+
interface NormalizedOperation<
35+
TRow extends object,
36+
TKey extends string | number = string | number,
37+
> {
38+
type: `insert` | `update` | `delete` | `upsert`
39+
key: TKey
40+
data?: TRow | Partial<TRow>
41+
}
42+
43+
// Normalize operations into a consistent format
44+
function normalizeOperations<
45+
TRow extends object,
46+
TKey extends string | number = string | number,
47+
TInsertInput extends object = TRow,
48+
>(
49+
ops:
50+
| SyncOperation<TRow, TKey, TInsertInput>
51+
| Array<SyncOperation<TRow, TKey, TInsertInput>>,
52+
ctx: SyncContext<TRow, TKey>
53+
): Array<NormalizedOperation<TRow, TKey>> {
54+
const operations = Array.isArray(ops) ? ops : [ops]
55+
const normalized: Array<NormalizedOperation<TRow, TKey>> = []
56+
57+
for (const op of operations) {
58+
if (op.type === `delete`) {
59+
const keys = Array.isArray(op.key) ? op.key : [op.key]
60+
for (const key of keys) {
61+
normalized.push({ type: `delete`, key })
62+
}
63+
} else {
64+
const items = Array.isArray(op.data) ? op.data : [op.data]
65+
for (const item of items) {
66+
let key: TKey
67+
if (op.type === `update`) {
68+
// For updates, we need to get the key from the partial data
69+
key = ctx.getKey(item as TRow)
70+
} else {
71+
// For insert/upsert, validate and resolve the full item first
72+
const resolved = ctx.collection.validateData(
73+
item,
74+
op.type === `upsert` ? `insert` : op.type
75+
)
76+
key = ctx.getKey(resolved)
77+
}
78+
normalized.push({ type: op.type, key, data: item })
79+
}
80+
}
81+
}
82+
83+
return normalized
84+
}
85+
86+
// Validate operations before executing
87+
function validateOperations<
88+
TRow extends object,
89+
TKey extends string | number = string | number,
90+
>(
91+
operations: Array<NormalizedOperation<TRow, TKey>>,
92+
ctx: SyncContext<TRow, TKey>
93+
): void {
94+
const seenKeys = new Set<TKey>()
95+
96+
for (const op of operations) {
97+
// Check for duplicate keys within the batch
98+
if (seenKeys.has(op.key)) {
99+
throw new DuplicateKeyInBatchError(op.key)
100+
}
101+
seenKeys.add(op.key)
102+
103+
// Validate operation-specific requirements
104+
if (op.type === `update`) {
105+
if (!ctx.collection.has(op.key)) {
106+
throw new UpdateOperationItemNotFoundError(op.key)
107+
}
108+
} else if (op.type === `delete`) {
109+
if (!ctx.collection.has(op.key)) {
110+
throw new DeleteOperationItemNotFoundError(op.key)
111+
}
112+
}
113+
}
114+
}
115+
116+
// Execute a batch of operations
117+
export function performWriteOperations<
118+
TRow extends object,
119+
TKey extends string | number = string | number,
120+
TInsertInput extends object = TRow,
121+
>(
122+
operations:
123+
| SyncOperation<TRow, TKey, TInsertInput>
124+
| Array<SyncOperation<TRow, TKey, TInsertInput>>,
125+
ctx: SyncContext<TRow, TKey>
126+
): void {
127+
const normalized = normalizeOperations(operations, ctx)
128+
validateOperations(normalized, ctx)
129+
130+
ctx.begin()
131+
132+
for (const op of normalized) {
133+
switch (op.type) {
134+
case `insert`: {
135+
const resolved = ctx.collection.validateData(op.data, `insert`)
136+
ctx.write({
137+
type: `insert`,
138+
value: resolved,
139+
})
140+
break
141+
}
142+
case `update`: {
143+
const currentItem = ctx.collection.get(op.key)!
144+
const updatedItem = {
145+
...currentItem,
146+
...op.data,
147+
}
148+
const resolved = ctx.collection.validateData(
149+
updatedItem,
150+
`update`,
151+
op.key
152+
)
153+
ctx.write({
154+
type: `update`,
155+
value: resolved,
156+
})
157+
break
158+
}
159+
case `delete`: {
160+
const currentItem = ctx.collection.get(op.key)!
161+
ctx.write({
162+
type: `delete`,
163+
value: currentItem,
164+
})
165+
break
166+
}
167+
case `upsert`: {
168+
const resolved = ctx.collection.validateData(
169+
op.data,
170+
ctx.collection.has(op.key) ? `update` : `insert`,
171+
op.key
172+
)
173+
if (ctx.collection.has(op.key)) {
174+
ctx.write({
175+
type: `update`,
176+
value: resolved,
177+
})
178+
} else {
179+
ctx.write({
180+
type: `insert`,
181+
value: resolved,
182+
})
183+
}
184+
break
185+
}
186+
}
187+
}
188+
189+
ctx.commit()
190+
191+
// Update query cache after successful commit
192+
const updatedData = ctx.collection.toArray
193+
ctx.queryClient.setQueryData(ctx.queryKey, updatedData)
194+
}
195+
196+
// Factory function to create write utils
197+
export function createWriteUtils<
198+
TRow extends object,
199+
TKey extends string | number = string | number,
200+
TInsertInput extends object = TRow,
201+
>(getContext: () => SyncContext<TRow, TKey> | null) {
202+
function ensureContext(): SyncContext<TRow, TKey> {
203+
const context = getContext()
204+
if (!context) {
205+
throw new SyncNotInitializedError()
206+
}
207+
return context
208+
}
209+
210+
return {
211+
writeInsert(data: TInsertInput | Array<TInsertInput>) {
212+
const ctx = ensureContext()
213+
performWriteOperations({ type: `insert`, data }, ctx)
214+
},
215+
216+
writeUpdate(data: Partial<TRow> | Array<Partial<TRow>>) {
217+
const ctx = ensureContext()
218+
performWriteOperations({ type: `update`, data }, ctx)
219+
},
220+
221+
writeDelete(key: TKey | Array<TKey>) {
222+
const ctx = ensureContext()
223+
performWriteOperations({ type: `delete`, key }, ctx)
224+
},
225+
226+
writeUpsert(data: Partial<TRow> | Array<Partial<TRow>>) {
227+
const ctx = ensureContext()
228+
performWriteOperations({ type: `upsert`, data }, ctx)
229+
},
230+
231+
writeBatch(operations: Array<SyncOperation<TRow, TKey, TInsertInput>>) {
232+
const ctx = ensureContext()
233+
performWriteOperations(operations, ctx)
234+
},
235+
}
236+
}

0 commit comments

Comments
 (0)