diff --git a/docs/overview.md b/docs/overview.md
index 97529708e..7636adeb3 100644
--- a/docs/overview.md
+++ b/docs/overview.md
@@ -33,10 +33,10 @@ const todoCollection = createCollection({
const Todos = () => {
// Bind data using live queries
- const { data: todos } = useLiveQuery((query) =>
- query
- .from({ todoCollection })
- .where('@completed', '=', false)
+ const { data: todos } = useLiveQuery((q) =>
+ q
+ .from({ todo: todoCollection })
+ .where(({ todo }) => todo.completed)
)
const complete = (todo) => {
@@ -344,22 +344,21 @@ Live queries return collections. This allows you to derive collections from othe
For example:
```ts
-import { compileQuery, queryBuilder } from "@tanstack/db"
+import { createLiveQueryCollection, eq } from "@tanstack/db"
-// Imagine you have a collections of todos.
+// Imagine you have a collection of todos.
const todoCollection = createCollection({
// config
})
// You can derive a new collection that's a subset of it.
-const query = queryBuilder()
- .from({ todoCollection })
- .where('@completed', '=', true)
-
-const compiled = compileQuery(query)
-compiled.start()
-
-const completedTodoCollection = compiledQuery.results()
+const completedTodoCollection = createLiveQueryCollection({
+ startSync: true,
+ query: (q) =>
+ q
+ .from({ todo: todoCollection })
+ .where(({ todo }) => todo.completed)
+})
```
This also works with joins to derive collections from multiple source collections. And it works recursively -- you can derive collections from other derived collections. Changes propagate efficiently using differential dataflow and it's collections all the way down.
@@ -378,14 +377,18 @@ Use the `useLiveQuery` hook to assign live query results to a state variable in
```ts
import { useLiveQuery } from '@tanstack/react-db'
+import { eq } from '@tanstack/db'
const Todos = () => {
- const { data: todos } = useLiveQuery(query =>
- query
- .from({ todoCollection })
- .where('@completed', '=', false)
- .orderBy({'@created_at': 'asc'})
- .select('@id', '@text')
+ const { data: todos } = useLiveQuery((q) =>
+ q
+ .from({ todo: todoCollection })
+ .where(({ todo }) => eq(todo.completed, false))
+ .orderBy(({ todo }) => todo.created_at, 'asc')
+ .select(({ todo }) => ({
+ id: todo.id,
+ text: todo.text
+ }))
)
return
@@ -396,18 +399,23 @@ You can also query across collections with joins:
```ts
import { useLiveQuery } from '@tanstack/react-db'
+import { eq } from '@tanstack/db'
const Todos = () => {
- const { data: todos } = useLiveQuery(query =>
- query
+ const { data: todos } = useLiveQuery((q) =>
+ q
.from({ todos: todoCollection })
- .join({
- type: `inner`,
- from: { lists: listCollection },
- on: [`@lists.id`, `=`, `@todos.listId`],
- })
- .where('@lists.active', '=', true)
- .select(`@todos.id`, `@todos.title`, `@lists.name`)
+ .join(
+ { lists: listCollection },
+ ({ todos, lists }) => eq(lists.id, todos.listId),
+ 'inner'
+ )
+ .where(({ lists }) => eq(lists.active, true))
+ .select(({ todos, lists }) => ({
+ id: todos.id,
+ title: todos.title,
+ listName: lists.name
+ }))
)
return
@@ -419,16 +427,16 @@ const Todos = () => {
You can also build queries directly (outside of the component lifecycle) using the underlying `queryBuilder` API:
```ts
-import { compileQuery, queryBuilder } from "@tanstack/db"
+import { createLiveQueryCollection, eq } from "@tanstack/db"
-const query = queryBuilder()
- .from({ todoCollection })
- .where('@completed', '=', true)
-
-const compiled = compileQuery(query)
-compiled.start()
+const completedTodos = createLiveQueryCollection({
+ startSync: true,
+ query: (q) =>
+ q.from({ todo: todoCollection })
+ .where(({ todo }) => eq(todo.completed, true))
+})
-const results = compiledQuery.results()
+const results = completedTodos.toArray
```
Note also that:
@@ -661,16 +669,21 @@ const listCollection = createCollection(queryCollectionOptions({
const Todos = () => {
// Read the data using live queries. Here we show a live
// query that joins across two collections.
- const { data: todos } = useLiveQuery((query) =>
- query
- .from({ t: todoCollection })
- .join({
- type: 'inner',
- from: { l: listCollection },
- on: [`@l.id`, `=`, `@t.list_id`]
- })
- .where('@l.active', '=', true)
- .select('@t.id', '@t.text', '@t.status', '@l.name')
+ const { data: todos } = useLiveQuery((q) =>
+ q
+ .from({ todo: todoCollection })
+ .join(
+ { list: listCollection },
+ ({ todo, list }) => eq(list.id, todo.list_id),
+ 'inner'
+ )
+ .where(({ list }) => eq(list.active, true))
+ .select(({ todo, list }) => ({
+ id: todo.id,
+ text: todo.text,
+ status: todo.status,
+ listName: list.name
+ }))
)
// ...
diff --git a/examples/react/todo/src/App.tsx b/examples/react/todo/src/App.tsx
index f84527ae8..2021a1155 100644
--- a/examples/react/todo/src/App.tsx
+++ b/examples/react/todo/src/App.tsx
@@ -271,10 +271,7 @@ const createConfigCollection = (type: CollectionType) => {
const txids = await Promise.all(
transaction.mutations.map(async (mutation) => {
const { original, changes } = mutation
- const response = await api.config.update(
- original.id as number,
- changes
- )
+ const response = await api.config.update(original.id, changes)
return { txid: String(response.txid) }
})
)
@@ -311,10 +308,7 @@ const createConfigCollection = (type: CollectionType) => {
const txids = await Promise.all(
transaction.mutations.map(async (mutation) => {
const { original, changes } = mutation
- const response = await api.config.update(
- original.id as number,
- changes
- )
+ const response = await api.config.update(original.id, changes)
return { txid: String(response.txid) }
})
)
@@ -348,15 +342,12 @@ export default function App() {
// Always call useLiveQuery hooks
const { data: todos } = useLiveQuery((q) =>
q
- .from({ todoCollection: todoCollection })
- .orderBy(`@created_at`)
- .select(`@*`)
+ .from({ todo: todoCollection })
+ .orderBy(({ todo }) => todo.created_at, `asc`)
)
const { data: configData } = useLiveQuery((q) =>
- q
- .from({ configCollection: configCollection })
- .select(`@id`, `@key`, `@value`)
+ q.from({ config: configCollection })
)
// Handle collection type change directly
@@ -381,6 +372,8 @@ export default function App() {
// Define a helper function to update config values
const setConfigValue = (key: string, value: string): void => {
+ console.log(`setConfigValue`, key, value)
+ console.log(`configData`, configData)
for (const config of configData) {
if (config.key === key) {
configCollection.update(config.id, (draft) => {
@@ -393,7 +386,7 @@ export default function App() {
// If the config doesn't exist yet, create it
configCollection.insert({
- id: Math.random(),
+ id: Math.round(Math.random() * 1000000),
key,
value,
created_at: new Date(),
diff --git a/examples/react/todo/src/api/server.ts b/examples/react/todo/src/api/server.ts
index 0e94c3d76..536b2cdc2 100644
--- a/examples/react/todo/src/api/server.ts
+++ b/examples/react/todo/src/api/server.ts
@@ -7,9 +7,10 @@ import {
validateUpdateConfig,
validateUpdateTodo,
} from "../db/validation"
+import type { Express } from "express"
// Create Express app
-const app = express()
+const app: Express = express()
const PORT = process.env.PORT || 3001
// Middleware
@@ -22,9 +23,15 @@ app.get(`/api/health`, (req, res) => {
})
// Generate a transaction ID
-async function generateTxId(tx: any): Promise {
- const [{ txid }] = await tx`SELECT txid_current() as txid`
- return Number(txid)
+async function generateTxId(tx: any): Promise {
+ const result = await tx`SELECT txid_current() as txid`
+ const txid = result[0]?.txid
+
+ if (txid === undefined) {
+ throw new Error(`Failed to get transaction ID`)
+ }
+
+ return String(txid)
}
// ===== TODOS API =====
@@ -33,10 +40,10 @@ async function generateTxId(tx: any): Promise {
app.get(`/api/todos`, async (req, res) => {
try {
const todos = await sql`SELECT * FROM todos`
- res.status(200).json(todos)
+ return res.status(200).json(todos)
} catch (error) {
console.error(`Error fetching todos:`, error)
- res.status(500).json({
+ return res.status(500).json({
error: `Failed to fetch todos`,
details: error instanceof Error ? error.message : String(error),
})
@@ -53,10 +60,10 @@ app.get(`/api/todos/:id`, async (req, res) => {
return res.status(404).json({ error: `Todo not found` })
}
- res.status(200).json(todo)
+ return res.status(200).json(todo)
} catch (error) {
console.error(`Error fetching todo:`, error)
- res.status(500).json({
+ return res.status(500).json({
error: `Failed to fetch todo`,
details: error instanceof Error ? error.message : String(error),
})
@@ -68,7 +75,7 @@ app.post(`/api/todos`, async (req, res) => {
try {
const todoData = validateInsertTodo(req.body)
- let txid: number
+ let txid!: string
const newTodo = await sql.begin(async (tx) => {
txid = await generateTxId(tx)
@@ -79,10 +86,10 @@ app.post(`/api/todos`, async (req, res) => {
return result
})
- res.status(201).json({ todo: newTodo, txid })
+ return res.status(201).json({ todo: newTodo, txid })
} catch (error) {
console.error(`Error creating todo:`, error)
- res.status(500).json({
+ return res.status(500).json({
error: `Failed to create todo`,
details: error instanceof Error ? error.message : String(error),
})
@@ -95,7 +102,7 @@ app.put(`/api/todos/:id`, async (req, res) => {
const { id } = req.params
const todoData = validateUpdateTodo(req.body)
- let txid: number
+ let txid!: string
const updatedTodo = await sql.begin(async (tx) => {
txid = await generateTxId(tx)
@@ -113,14 +120,14 @@ app.put(`/api/todos/:id`, async (req, res) => {
return result
})
- res.status(200).json({ todo: updatedTodo, txid })
+ return res.status(200).json({ todo: updatedTodo, txid })
} catch (error) {
if (error instanceof Error && error.message === `Todo not found`) {
return res.status(404).json({ error: `Todo not found` })
}
console.error(`Error updating todo:`, error)
- res.status(500).json({
+ return res.status(500).json({
error: `Failed to update todo`,
details: error instanceof Error ? error.message : String(error),
})
@@ -132,7 +139,7 @@ app.delete(`/api/todos/:id`, async (req, res) => {
try {
const { id } = req.params
- let txid: number
+ let txid!: string
await sql.begin(async (tx) => {
txid = await generateTxId(tx)
@@ -147,14 +154,14 @@ app.delete(`/api/todos/:id`, async (req, res) => {
}
})
- res.status(200).json({ success: true, txid })
+ return res.status(200).json({ success: true, txid })
} catch (error) {
if (error instanceof Error && error.message === `Todo not found`) {
return res.status(404).json({ error: `Todo not found` })
}
console.error(`Error deleting todo:`, error)
- res.status(500).json({
+ return res.status(500).json({
error: `Failed to delete todo`,
details: error instanceof Error ? error.message : String(error),
})
@@ -167,10 +174,10 @@ app.delete(`/api/todos/:id`, async (req, res) => {
app.get(`/api/config`, async (req, res) => {
try {
const config = await sql`SELECT * FROM config`
- res.status(200).json(config)
+ return res.status(200).json(config)
} catch (error) {
console.error(`Error fetching config:`, error)
- res.status(500).json({
+ return res.status(500).json({
error: `Failed to fetch config`,
details: error instanceof Error ? error.message : String(error),
})
@@ -187,10 +194,10 @@ app.get(`/api/config/:id`, async (req, res) => {
return res.status(404).json({ error: `Config not found` })
}
- res.status(200).json(config)
+ return res.status(200).json(config)
} catch (error) {
console.error(`Error fetching config:`, error)
- res.status(500).json({
+ return res.status(500).json({
error: `Failed to fetch config`,
details: error instanceof Error ? error.message : String(error),
})
@@ -200,9 +207,10 @@ app.get(`/api/config/:id`, async (req, res) => {
// POST create a new config
app.post(`/api/config`, async (req, res) => {
try {
+ console.log(`POST /api/config`, req.body)
const configData = validateInsertConfig(req.body)
- let txid: number
+ let txid!: string
const newConfig = await sql.begin(async (tx) => {
txid = await generateTxId(tx)
@@ -213,10 +221,10 @@ app.post(`/api/config`, async (req, res) => {
return result
})
- res.status(201).json({ config: newConfig, txid })
+ return res.status(201).json({ config: newConfig, txid })
} catch (error) {
console.error(`Error creating config:`, error)
- res.status(500).json({
+ return res.status(500).json({
error: `Failed to create config`,
details: error instanceof Error ? error.message : String(error),
})
@@ -229,7 +237,7 @@ app.put(`/api/config/:id`, async (req, res) => {
const { id } = req.params
const configData = validateUpdateConfig(req.body)
- let txid: number
+ let txid!: string
const updatedConfig = await sql.begin(async (tx) => {
txid = await generateTxId(tx)
@@ -247,14 +255,14 @@ app.put(`/api/config/:id`, async (req, res) => {
return result
})
- res.status(200).json({ config: updatedConfig, txid })
+ return res.status(200).json({ config: updatedConfig, txid })
} catch (error) {
if (error instanceof Error && error.message === `Config not found`) {
return res.status(404).json({ error: `Config not found` })
}
console.error(`Error updating config:`, error)
- res.status(500).json({
+ return res.status(500).json({
error: `Failed to update config`,
details: error instanceof Error ? error.message : String(error),
})
@@ -266,7 +274,7 @@ app.delete(`/api/config/:id`, async (req, res) => {
try {
const { id } = req.params
- let txid: number
+ let txid!: string
await sql.begin(async (tx) => {
txid = await generateTxId(tx)
@@ -281,14 +289,14 @@ app.delete(`/api/config/:id`, async (req, res) => {
}
})
- res.status(200).json({ success: true, txid })
+ return res.status(200).json({ success: true, txid })
} catch (error) {
if (error instanceof Error && error.message === `Config not found`) {
return res.status(404).json({ error: `Config not found` })
}
console.error(`Error deleting config:`, error)
- res.status(500).json({
+ return res.status(500).json({
error: `Failed to delete config`,
details: error instanceof Error ? error.message : String(error),
})
diff --git a/examples/react/todo/src/api/write-to-pg.ts b/examples/react/todo/src/api/write-to-pg.ts
index 7c9bab119..aee885093 100644
--- a/examples/react/todo/src/api/write-to-pg.ts
+++ b/examples/react/todo/src/api/write-to-pg.ts
@@ -1,5 +1,5 @@
import type postgres from "postgres"
-import type { PendingMutation } from "../types"
+import type { PendingMutation } from "@tanstack/react-db"
/**
* Get the table name from the relation metadata
@@ -11,7 +11,7 @@ function getTableName(relation?: Array): string {
// The table name is typically the second element in the relation array
// e.g. ['public', 'todos'] -> 'todos'
- return relation[1]
+ return relation[1]!
}
/**
@@ -23,7 +23,12 @@ export async function processMutations(
): Promise {
return await sql.begin(async (tx) => {
// Get the transaction ID
- const [{ txid }] = await tx`SELECT txid_current() as txid`
+ const result = await tx`SELECT txid_current() as txid`
+ const txid = result[0]?.txid
+
+ if (txid === undefined) {
+ throw new Error(`Failed to get transaction ID`)
+ }
// Process each mutation in order
for (const mutation of pendingMutations) {
@@ -67,7 +72,9 @@ export async function processMutations(
// Combine all values
const allValues = [
...setValues,
- ...primaryKey.map((k) => mutation.original[k]),
+ ...primaryKey.map(
+ (k) => (mutation.original as Record)[k]
+ ),
]
await tx.unsafe(
@@ -86,7 +93,9 @@ export async function processMutations(
.join(` AND `)
// Extract primary key values in same order as columns
- const primaryKeyValues = primaryKey.map((k) => mutation.original[k])
+ const primaryKeyValues = primaryKey.map(
+ (k) => (mutation.original as Record)[k]
+ )
await tx.unsafe(
`DELETE FROM ${tableName}
diff --git a/examples/react/todo/src/db/validation.ts b/examples/react/todo/src/db/validation.ts
index 1fc244b5c..2aaab77b5 100644
--- a/examples/react/todo/src/db/validation.ts
+++ b/examples/react/todo/src/db/validation.ts
@@ -1,16 +1,34 @@
import { createInsertSchema, createSelectSchema } from "drizzle-zod"
+import { z } from "zod"
import { config, todos } from "./schema"
-import type { z } from "zod"
-// Auto-generated schemas from Drizzle schema
-export const insertTodoSchema = createInsertSchema(todos)
+// Date transformation schema - handles Date objects, ISO strings, and parseable date strings
+const dateStringToDate = z
+ .union([
+ z.date(), // Already a Date object
+ z
+ .string()
+ .datetime()
+ .transform((str) => new Date(str)), // ISO datetime string
+ z.string().transform((str) => new Date(str)), // Any parseable date string
+ ])
+ .optional()
+
+// Auto-generated schemas from Drizzle schema with date transformation
+export const insertTodoSchema = createInsertSchema(todos, {
+ created_at: dateStringToDate,
+ updated_at: dateStringToDate,
+})
export const selectTodoSchema = createSelectSchema(todos)
// Partial schema for updates
export const updateTodoSchema = insertTodoSchema.partial().strict()
-// Config schemas
-export const insertConfigSchema = createInsertSchema(config).strict()
+// Config schemas with date transformation
+export const insertConfigSchema = createInsertSchema(config, {
+ created_at: dateStringToDate,
+ updated_at: dateStringToDate,
+}).strict()
export const selectConfigSchema = createSelectSchema(config)
export const updateConfigSchema = insertConfigSchema.partial().strict()
@@ -25,10 +43,11 @@ export type UpdateConfig = z.infer
// Validation functions
export const validateInsertTodo = (data: unknown): InsertTodo => {
- if (data.text === `really hard todo`) {
+ const parsed = insertTodoSchema.parse(data)
+ if (parsed.text === `really hard todo`) {
throw new Error(`we don't want to do really hard todos`)
}
- return insertTodoSchema.parse(data)
+ return parsed
}
export const validateSelectTodo = (data: unknown): SelectTodo => {
diff --git a/examples/react/todo/src/main.tsx b/examples/react/todo/src/main.tsx
index bc35ab5d7..a2d74309c 100644
--- a/examples/react/todo/src/main.tsx
+++ b/examples/react/todo/src/main.tsx
@@ -1,7 +1,7 @@
import React from "react"
import { createRoot } from "react-dom/client"
import "./index.css"
-import App from "./App.tsx"
+import App from "./App"
createRoot(document.getElementById(`root`)!).render(
diff --git a/examples/react/todo/tsconfig.json b/examples/react/todo/tsconfig.json
new file mode 100644
index 000000000..ec2129448
--- /dev/null
+++ b/examples/react/todo/tsconfig.json
@@ -0,0 +1,19 @@
+{
+ "extends": "../../../tsconfig.json",
+ "compilerOptions": {
+ "baseUrl": ".",
+ "module": "ES2022",
+ "moduleResolution": "node",
+ "paths": {
+ "@/*": ["./src/*"]
+ }
+ },
+ "include": [
+ "src/**/*.ts",
+ "src/**/*.tsx",
+ "scripts/**/*.ts",
+ "vite.config.ts",
+ "drizzle.config.ts"
+ ],
+ "exclude": ["node_modules", "dist"]
+}
diff --git a/packages/db/package.json b/packages/db/package.json
index 1ac1d0316..05349256e 100644
--- a/packages/db/package.json
+++ b/packages/db/package.json
@@ -3,9 +3,8 @@
"description": "A reactive client store for building super fast apps on sync",
"version": "0.0.14",
"dependencies": {
- "@electric-sql/d2mini": "^0.1.2",
- "@standard-schema/spec": "^1.0.0",
- "@tanstack/store": "^0.7.0"
+ "@electric-sql/d2mini": "^0.1.4",
+ "@standard-schema/spec": "^1.0.0"
},
"devDependencies": {
"@vitest/coverage-istanbul": "^3.0.9"
diff --git a/packages/db/src/collection.ts b/packages/db/src/collection.ts
index fc449c3d9..6409ceaa4 100644
--- a/packages/db/src/collection.ts
+++ b/packages/db/src/collection.ts
@@ -1,4 +1,3 @@
-import { Store } from "@tanstack/store"
import { withArrayChangeTracking, withChangeTracking } from "./proxy"
import { createTransaction, getActiveTransaction } from "./transactions"
import { SortedMap } from "./SortedMap"
@@ -149,8 +148,8 @@ export class CollectionImpl<
public syncedMetadata = new Map()
// Optimistic state tracking - make public for testing
- public derivedUpserts = new Map()
- public derivedDeletes = new Set()
+ public optimisticUpserts = new Map()
+ public optimisticDeletes = new Set()
// Cached size for performance
private _size = 0
@@ -173,6 +172,10 @@ export class CollectionImpl<
// Array to store one-time commit listeners
private onFirstCommitCallbacks: Array<() => void> = []
+ // Event batching for preventing duplicate emissions during transaction flows
+ private batchedEvents: Array> = []
+ private shouldBatchEvents = false
+
// Lifecycle management
private _status: CollectionStatus = `idle`
private activeSubscribersCount = 0
@@ -378,12 +381,15 @@ export class CollectionImpl<
}
pendingTransaction.committed = true
- this.commitPendingTransactions()
- // Update status to ready after first commit
+ // Update status to ready
+ // We do this before committing as we want the events from the changes to
+ // be from a "ready" state.
if (this._status === `loading`) {
this.setStatus(`ready`)
}
+
+ this.commitPendingTransactions()
},
})
@@ -473,14 +479,16 @@ export class CollectionImpl<
// Clear data
this.syncedData.clear()
this.syncedMetadata.clear()
- this.derivedUpserts.clear()
- this.derivedDeletes.clear()
+ this.optimisticUpserts.clear()
+ this.optimisticDeletes.clear()
this._size = 0
this.pendingSyncedTransactions = []
this.syncedKeys.clear()
this.hasReceivedFirstCommit = false
this.onFirstCommitCallbacks = []
this.preloadPromise = null
+ this.batchedEvents = []
+ this.shouldBatchEvents = false
// Update status
this.setStatus(`cleaned-up`)
@@ -554,12 +562,12 @@ export class CollectionImpl<
return
}
- const previousState = new Map(this.derivedUpserts)
- const previousDeletes = new Set(this.derivedDeletes)
+ const previousState = new Map(this.optimisticUpserts)
+ const previousDeletes = new Set(this.optimisticDeletes)
// Clear current optimistic state
- this.derivedUpserts.clear()
- this.derivedDeletes.clear()
+ this.optimisticUpserts.clear()
+ this.optimisticDeletes.clear()
const activeTransactions: Array> = []
const completedTransactions: Array> = []
@@ -579,12 +587,12 @@ export class CollectionImpl<
switch (mutation.type) {
case `insert`:
case `update`:
- this.derivedUpserts.set(mutation.key, mutation.modified as T)
- this.derivedDeletes.delete(mutation.key)
+ this.optimisticUpserts.set(mutation.key, mutation.modified as T)
+ this.optimisticDeletes.delete(mutation.key)
break
case `delete`:
- this.derivedUpserts.delete(mutation.key)
- this.derivedDeletes.add(mutation.key)
+ this.optimisticUpserts.delete(mutation.key)
+ this.optimisticDeletes.add(mutation.key)
break
}
}
@@ -657,10 +665,10 @@ export class CollectionImpl<
*/
private calculateSize(): number {
const syncedSize = this.syncedData.size
- const deletesFromSynced = Array.from(this.derivedDeletes).filter(
- (key) => this.syncedData.has(key) && !this.derivedUpserts.has(key)
+ const deletesFromSynced = Array.from(this.optimisticDeletes).filter(
+ (key) => this.syncedData.has(key) && !this.optimisticUpserts.has(key)
).length
- const upsertsNotInSynced = Array.from(this.derivedUpserts.keys()).filter(
+ const upsertsNotInSynced = Array.from(this.optimisticUpserts.keys()).filter(
(key) => !this.syncedData.has(key)
).length
@@ -677,9 +685,9 @@ export class CollectionImpl<
): void {
const allKeys = new Set([
...previousUpserts.keys(),
- ...this.derivedUpserts.keys(),
+ ...this.optimisticUpserts.keys(),
...previousDeletes,
- ...this.derivedDeletes,
+ ...this.optimisticDeletes,
])
for (const key of allKeys) {
@@ -727,34 +735,55 @@ export class CollectionImpl<
}
/**
- * Emit multiple events at once to all listeners
+ * Emit events either immediately or batch them for later emission
*/
- private emitEvents(changes: Array>): void {
- if (changes.length > 0) {
- // Emit to general listeners
- for (const listener of this.changeListeners) {
- listener(changes)
+ private emitEvents(
+ changes: Array>,
+ endBatching = false
+ ): void {
+ if (this.shouldBatchEvents && !endBatching) {
+ // Add events to the batch
+ this.batchedEvents.push(...changes)
+ return
+ }
+
+ // Either we're not batching, or we're ending the batching cycle
+ let eventsToEmit = changes
+
+ if (endBatching) {
+ // End batching: combine any batched events with new events and clean up state
+ if (this.batchedEvents.length > 0) {
+ eventsToEmit = [...this.batchedEvents, ...changes]
}
+ this.batchedEvents = []
+ this.shouldBatchEvents = false
+ }
- // Emit to key-specific listeners
- if (this.changeKeyListeners.size > 0) {
- // Group changes by key, but only for keys that have listeners
- const changesByKey = new Map>>()
- for (const change of changes) {
- if (this.changeKeyListeners.has(change.key)) {
- if (!changesByKey.has(change.key)) {
- changesByKey.set(change.key, [])
- }
- changesByKey.get(change.key)!.push(change)
+ if (eventsToEmit.length === 0) return
+
+ // Emit to all listeners
+ for (const listener of this.changeListeners) {
+ listener(eventsToEmit)
+ }
+
+ // Emit to key-specific listeners
+ if (this.changeKeyListeners.size > 0) {
+ // Group changes by key, but only for keys that have listeners
+ const changesByKey = new Map>>()
+ for (const change of eventsToEmit) {
+ if (this.changeKeyListeners.has(change.key)) {
+ if (!changesByKey.has(change.key)) {
+ changesByKey.set(change.key, [])
}
+ changesByKey.get(change.key)!.push(change)
}
+ }
- // Emit batched changes to each key's listeners
- for (const [key, keyChanges] of changesByKey) {
- const keyListeners = this.changeKeyListeners.get(key)!
- for (const listener of keyListeners) {
- listener(keyChanges)
- }
+ // Emit batched changes to each key's listeners
+ for (const [key, keyChanges] of changesByKey) {
+ const keyListeners = this.changeKeyListeners.get(key)!
+ for (const listener of keyListeners) {
+ listener(keyChanges)
}
}
}
@@ -765,13 +794,13 @@ export class CollectionImpl<
*/
public get(key: TKey): T | undefined {
// Check if optimistically deleted
- if (this.derivedDeletes.has(key)) {
+ if (this.optimisticDeletes.has(key)) {
return undefined
}
// Check optimistic upserts first
- if (this.derivedUpserts.has(key)) {
- return this.derivedUpserts.get(key)
+ if (this.optimisticUpserts.has(key)) {
+ return this.optimisticUpserts.get(key)
}
// Fall back to synced data
@@ -783,12 +812,12 @@ export class CollectionImpl<
*/
public has(key: TKey): boolean {
// Check if optimistically deleted
- if (this.derivedDeletes.has(key)) {
+ if (this.optimisticDeletes.has(key)) {
return false
}
// Check optimistic upserts first
- if (this.derivedUpserts.has(key)) {
+ if (this.optimisticUpserts.has(key)) {
return true
}
@@ -809,14 +838,14 @@ export class CollectionImpl<
public *keys(): IterableIterator {
// Yield keys from synced data, skipping any that are deleted.
for (const key of this.syncedData.keys()) {
- if (!this.derivedDeletes.has(key)) {
+ if (!this.optimisticDeletes.has(key)) {
yield key
}
}
// Yield keys from upserts that were not already in synced data.
- for (const key of this.derivedUpserts.keys()) {
- if (!this.syncedData.has(key) && !this.derivedDeletes.has(key)) {
- // The derivedDeletes check is technically redundant if inserts/updates always remove from deletes,
+ for (const key of this.optimisticUpserts.keys()) {
+ if (!this.syncedData.has(key) && !this.optimisticDeletes.has(key)) {
+ // The optimisticDeletes check is technically redundant if inserts/updates always remove from deletes,
// but it's safer to keep it.
yield key
}
@@ -830,10 +859,7 @@ export class CollectionImpl<
for (const key of this.keys()) {
const value = this.get(key)
if (value !== undefined) {
- const { _orderByIndex, ...copy } = value as T & {
- _orderByIndex?: number | string
- }
- yield copy as T
+ yield value
}
}
}
@@ -845,14 +871,46 @@ export class CollectionImpl<
for (const key of this.keys()) {
const value = this.get(key)
if (value !== undefined) {
- const { _orderByIndex, ...copy } = value as T & {
- _orderByIndex?: number | string
- }
- yield [key, copy as T]
+ yield [key, value]
}
}
}
+ /**
+ * Get all entries (virtual derived state)
+ */
+ public *[Symbol.iterator](): IterableIterator<[TKey, T]> {
+ for (const [key, value] of this.entries()) {
+ yield [key, value]
+ }
+ }
+
+ /**
+ * Execute a callback for each entry in the collection
+ */
+ public forEach(
+ callbackfn: (value: T, key: TKey, index: number) => void
+ ): void {
+ let index = 0
+ for (const [key, value] of this.entries()) {
+ callbackfn(value, key, index++)
+ }
+ }
+
+ /**
+ * Create a new array with the results of calling a function for each entry in the collection
+ */
+ public map(
+ callbackfn: (value: T, key: TKey, index: number) => U
+ ): Array {
+ const result: Array = []
+ let index = 0
+ for (const [key, value] of this.entries()) {
+ result.push(callbackfn(value, key, index++))
+ }
+ return result
+ }
+
/**
* Attempts to commit pending synced transactions if there are no active transactions
* This method processes operations from pending transactions and applies them to the synced data
@@ -894,6 +952,7 @@ export class CollectionImpl<
}
const events: Array> = []
+ const rowUpdateMode = this.config.sync.rowUpdateMode || `partial`
for (const transaction of this.pendingSyncedTransactions) {
for (const operation of transaction.operations) {
@@ -926,12 +985,16 @@ export class CollectionImpl<
this.syncedData.set(key, operation.value)
break
case `update`: {
- const updatedValue = Object.assign(
- {},
- this.syncedData.get(key),
- operation.value
- )
- this.syncedData.set(key, updatedValue)
+ if (rowUpdateMode === `partial`) {
+ const updatedValue = Object.assign(
+ {},
+ this.syncedData.get(key),
+ operation.value
+ )
+ this.syncedData.set(key, updatedValue)
+ } else {
+ this.syncedData.set(key, operation.value)
+ }
break
}
case `delete`:
@@ -942,8 +1005,8 @@ export class CollectionImpl<
}
// Clear optimistic state since sync operations will now provide the authoritative data
- this.derivedUpserts.clear()
- this.derivedDeletes.clear()
+ this.optimisticUpserts.clear()
+ this.optimisticDeletes.clear()
// Reset flag and recompute optimistic state for any remaining active transactions
this.isCommittingSyncTransactions = false
@@ -954,12 +1017,15 @@ export class CollectionImpl<
switch (mutation.type) {
case `insert`:
case `update`:
- this.derivedUpserts.set(mutation.key, mutation.modified as T)
- this.derivedDeletes.delete(mutation.key)
+ this.optimisticUpserts.set(
+ mutation.key,
+ mutation.modified as T
+ )
+ this.optimisticDeletes.delete(mutation.key)
break
case `delete`:
- this.derivedUpserts.delete(mutation.key)
- this.derivedDeletes.add(mutation.key)
+ this.optimisticUpserts.delete(mutation.key)
+ this.optimisticDeletes.add(mutation.key)
break
}
}
@@ -1032,8 +1098,8 @@ export class CollectionImpl<
// Update cached size after synced data changes
this._size = this.calculateSize()
- // Emit all events at once
- this.emitEvents(events)
+ // End batching and emit all events (combines any batched events with sync events)
+ this.emitEvents(events, true)
this.pendingSyncedTransactions = []
@@ -1617,19 +1683,7 @@ export class CollectionImpl<
* @returns An Array containing all items in the collection
*/
get toArray() {
- const array = Array.from(this.values())
-
- // Currently a query with an orderBy will add a _orderByIndex to the items
- // so for now we need to sort the array by _orderByIndex if it exists
- // TODO: in the future it would be much better is the keys are sorted - this
- // should be done by the query engine.
- if (array[0] && (array[0] as { _orderByIndex?: number })._orderByIndex) {
- return (array as Array<{ _orderByIndex: number }>).sort(
- (a, b) => a._orderByIndex - b._orderByIndex
- ) as Array
- }
-
- return array
+ return Array.from(this.values())
}
/**
@@ -1768,45 +1822,13 @@ export class CollectionImpl<
* This method should be called by the Transaction class when state changes
*/
public onTransactionStateChange(): void {
+ // Check if commitPendingTransactions will be called after this
+ // by checking if there are pending sync transactions (same logic as in transactions.ts)
+ this.shouldBatchEvents = this.pendingSyncedTransactions.length > 0
+
// CRITICAL: Capture visible state BEFORE clearing optimistic state
this.capturePreSyncVisibleState()
this.recomputeOptimisticState()
}
-
- private _storeMap: Store