diff --git a/packages/db/src/errors.ts b/packages/db/src/errors.ts index b6d8b385a..a5125cd92 100644 --- a/packages/db/src/errors.ts +++ b/packages/db/src/errors.ts @@ -378,6 +378,23 @@ export class InvalidSourceTypeError extends QueryBuilderError { } } +export class UnionKeyConflictError extends TanStackDBError { + constructor( + unionId: string, + key: string | number, + existingCollectionId: string, + incomingCollectionId: string, + ) { + super( + `Union "${unionId}" cannot merge key "${String( + key, + )}" from collection "${incomingCollectionId}" because it already exists in "${existingCollectionId}". ` + + `Ensure union sources have unique keys or handle duplicates before unioning.`, + ) + this.name = `UnionKeyConflictError` + } +} + export class JoinConditionMustBeEqualityError extends QueryBuilderError { constructor() { super(`Join condition must be an equality expression`) diff --git a/packages/db/src/query/index.ts b/packages/db/src/query/index.ts index 524b4dcf1..be5ef1780 100644 --- a/packages/db/src/query/index.ts +++ b/packages/db/src/query/index.ts @@ -60,6 +60,8 @@ export { export { type LiveQueryCollectionConfig } from './live/types.js' export { type LiveQueryCollectionUtils } from './live/collection-config-builder.js' +export { union, unionFromLiveQuery } from './union.js' + // Predicate utilities for predicate push-down export { isWhereSubset, diff --git a/packages/db/src/query/union.ts b/packages/db/src/query/union.ts new file mode 100644 index 000000000..2299e03e4 --- /dev/null +++ b/packages/db/src/query/union.ts @@ -0,0 +1,519 @@ +import { CollectionImpl, createCollection } from '../collection/index.js' +import { UnionKeyConflictError } from '../errors.js' +import { createLiveQueryCollection } from './live-query-collection.js' +import type { CollectionSubscription } from '../collection/subscription.js' +import type { + ChangeMessage, + ChangeMessageOrDeleteKeyMessage, + SyncConfig, +} from '../types.js' +import type { InitialQueryBuilder, QueryBuilder } from './builder/index.js' +import type { Context, GetResult } from './builder/types.js' + +type SourceCollection< + TOutput extends object, + TKey extends string | number, +> = CollectionImpl + +export type UnionCollection< + TOutput extends object, + TKey extends string | number, +> = CollectionImpl & { + add: (collection: SourceCollection) => void + remove: (collection: SourceCollection) => void + hasSource: (collection: SourceCollection) => boolean + sources: () => Array> +} + +type SyncState = { + collection: CollectionImpl + begin: () => void + write: (message: ChangeMessageOrDeleteKeyMessage) => void + commit: () => void + markReady: () => void +} + +type SourceRecord = { + subscription?: CollectionSubscription + statusUnsubscribe?: () => void + subscriptionReady?: boolean + keys: Set +} + +type SourceDelta = { + added?: TSource + removed?: TSource +} + +class SourceRefTracker { + private sourceByResultKey = new Map() + private sourceRefCounts = new Map() + + addReference(resultKey: TKey, source: TSource): SourceDelta { + const previousSource = this.sourceByResultKey.get(resultKey) + const delta: SourceDelta = {} + + if (previousSource && previousSource !== source) { + delta.removed = this.decrementSource(previousSource) + } + + if (!previousSource || previousSource !== source) { + delta.added = this.incrementSource(source) + this.sourceByResultKey.set(resultKey, source) + } + + return delta + } + + removeReference(resultKey: TKey): SourceDelta { + const previousSource = this.sourceByResultKey.get(resultKey) + if (!previousSource) { + return {} + } + + this.sourceByResultKey.delete(resultKey) + return { removed: this.decrementSource(previousSource) } + } + + private incrementSource(source: TSource): TSource | undefined { + const current = this.sourceRefCounts.get(source) ?? 0 + const next = current + 1 + this.sourceRefCounts.set(source, next) + return next === 1 ? source : undefined + } + + private decrementSource(source: TSource): TSource | undefined { + const current = this.sourceRefCounts.get(source) ?? 0 + const next = current - 1 + if (next <= 0) { + this.sourceRefCounts.delete(source) + return source + } + this.sourceRefCounts.set(source, next) + return undefined + } +} + +let unionCollectionCounter = 0 + +class UnionCollectionManager< + TOutput extends object, + TKey extends string | number, +> { + private sources = new Set>() + private sourceRecords = new Map< + SourceCollection, + SourceRecord + >() + private keyOwners = new Map>() + private resultKeys = new WeakMap() + private syncState?: SyncState + private primarySource?: SourceCollection + private isInError = false + + constructor( + private readonly id: string, + initialSources: Array>, + ) { + initialSources.forEach((source) => this.addSource(source)) + } + + getKeyFromItem(item: TOutput): TKey { + const storedKey = this.resultKeys.get(item) + if (storedKey !== undefined) { + return storedKey + } + if (!this.primarySource) { + throw new Error( + `Union collection "${this.id}" has no sources to derive a key from.`, + ) + } + return this.primarySource.config.getKey(item) + } + + addSource(source: SourceCollection): void { + if (!(source instanceof CollectionImpl)) { + throw new Error( + `Union collection "${this.id}" only accepts Collection instances.`, + ) + } + if (this.sources.has(source)) { + return + } + this.sources.add(source) + this.sourceRecords.set(source, { keys: new Set() }) + this.primarySource ??= source + if (this.syncState) { + this.subscribeToSource(source) + this.updateReady() + } + } + + removeSource(source: SourceCollection): void { + if (!this.sources.has(source)) { + return + } + + if (this.syncState) { + this.unsubscribeFromSource(source) + this.deleteKeysForSource(source) + } + + this.sources.delete(source) + this.sourceRecords.delete(source) + + if (this.primarySource === source) { + this.primarySource = this.sources.values().next().value + } + + if (this.syncState) { + this.updateReady() + } + } + + hasSource(source: SourceCollection): boolean { + return this.sources.has(source) + } + + listSources(): Array> { + return Array.from(this.sources) + } + + getSyncConfig(): SyncConfig { + return { + sync: ({ collection, begin, write, commit, markReady }) => { + this.syncState = { + collection: collection as CollectionImpl< + TOutput, + TKey, + any, + any, + any + >, + begin, + write: write as SyncState[`write`], + commit, + markReady, + } + + this.seedKeyOwners() + this.sources.forEach((source) => this.subscribeToSource(source)) + this.updateReady() + + return () => { + this.sources.forEach((source) => this.unsubscribeFromSource(source)) + this.syncState = undefined + } + }, + rowUpdateMode: `full`, + } + } + + private subscribeToSource(source: SourceCollection) { + if (!this.syncState) { + return + } + + const record = this.sourceRecords.get(source)! + if (record.subscription) { + return + } + + this.assertNoKeyConflicts(source) + + const subscription = source.subscribeChanges( + (changes) => this.applyChanges(source, changes), + { + includeInitialState: true, + onStatusChange: (event) => { + record.subscriptionReady = event.status === `ready` + this.updateReady() + }, + }, + ) + record.subscription = subscription + record.subscriptionReady = subscription.status === `ready` + + record.statusUnsubscribe = source.on(`status:change`, (event) => { + if (event.status === `error`) { + this.transitionToError( + `Source collection "${source.id}" entered error state`, + ) + return + } + if (event.status === `cleaned-up`) { + this.transitionToError( + `Source collection "${source.id}" was cleaned up while union "${this.id}" depends on it.`, + ) + return + } + this.updateReady() + }) + + if (subscription.status === `loadingSubset`) { + record.subscriptionReady = false + } + } + + private unsubscribeFromSource(source: SourceCollection) { + const record = this.sourceRecords.get(source) + if (!record) { + return + } + record.subscription?.unsubscribe() + record.statusUnsubscribe?.() + record.subscription = undefined + record.statusUnsubscribe = undefined + record.subscriptionReady = undefined + } + + private applyChanges( + source: SourceCollection, + changes: Array>, + ) { + if (!this.syncState || this.isInError) { + return + } + + const record = this.sourceRecords.get(source) + if (!record) { + return + } + + const { begin, commit } = this.syncState + begin() + + try { + for (const change of changes) { + this.applyChange(source, record, change) + } + } catch (error) { + commit() + throw error + } + + commit() + } + + private applyChange( + source: SourceCollection, + record: SourceRecord, + change: ChangeMessage, + ) { + const key = change.key as TKey + const existingOwner = this.keyOwners.get(key) + + if (change.type === `delete`) { + if (!existingOwner) { + return + } + if (existingOwner !== source) { + this.throwKeyConflict(key, existingOwner, source) + } + this.keyOwners.delete(key) + record.keys.delete(key) + this.syncState!.write({ type: `delete`, key }) + return + } + + if (existingOwner && existingOwner !== source) { + this.throwKeyConflict(key, existingOwner, source) + } + + if (!existingOwner) { + this.keyOwners.set(key, source) + record.keys.add(key) + } + + this.resultKeys.set(change.value, key) + + const exists = this.syncState!.collection.has(key) + this.syncState!.write({ + type: exists ? `update` : `insert`, + value: change.value, + }) + } + + private deleteKeysForSource(source: SourceCollection) { + if (!this.syncState) { + return + } + + const record = this.sourceRecords.get(source) + if (!record || record.keys.size === 0) { + return + } + + const { begin, commit, write } = this.syncState + begin() + for (const key of record.keys) { + this.keyOwners.delete(key) + write({ type: `delete`, key }) + } + commit() + record.keys.clear() + } + + private updateReady() { + if (!this.syncState || this.isInError) { + return + } + + if (this.sources.size === 0) { + this.syncState.markReady() + return + } + + const allSourcesReady = Array.from(this.sources).every((source) => + source.isReady(), + ) + const allSubscriptionsReady = Array.from(this.sources).every((source) => { + const record = this.sourceRecords.get(source) + return record?.subscriptionReady !== false + }) + + if (allSourcesReady && allSubscriptionsReady) { + this.syncState.markReady() + } + } + + private transitionToError(message: string) { + if (this.isInError) { + return + } + this.isInError = true + console.error(`[Union Collection Error] ${message}`) + this.syncState?.collection._lifecycle.setStatus(`error`) + } + + private seedKeyOwners() { + for (const source of this.sources) { + const record = this.sourceRecords.get(source) + if (!record) { + continue + } + + const initialChanges = source.currentStateAsChanges() + if (!initialChanges) { + continue + } + + for (const change of initialChanges) { + if (change.type === `delete`) { + continue + } + const key = change.key as TKey + const existingOwner = this.keyOwners.get(key) + if (existingOwner && existingOwner !== source) { + this.throwKeyConflict(key, existingOwner, source) + } + if (!existingOwner) { + this.keyOwners.set(key, source) + record.keys.add(key) + } + } + } + } + + private assertNoKeyConflicts(source: SourceCollection) { + const initialChanges = source.currentStateAsChanges() + if (!initialChanges) { + return + } + + for (const change of initialChanges) { + if (change.type === `delete`) { + continue + } + const existingOwner = this.keyOwners.get(change.key as TKey) + if (existingOwner && existingOwner !== source) { + this.throwKeyConflict(change.key as TKey, existingOwner, source) + } + } + } + + private throwKeyConflict( + key: TKey, + existing: SourceCollection, + incoming: SourceCollection, + ): never { + this.transitionToError( + `Key "${String(key)}" already exists in collection "${existing.id}"`, + ) + throw new UnionKeyConflictError(this.id, key, existing.id, incoming.id) + } +} + +export function union( + ...collections: Array> +): UnionCollection { + const id = `union-${++unionCollectionCounter}` + const manager = new UnionCollectionManager(id, collections) + + const collection = createCollection({ + id, + getKey: manager.getKeyFromItem.bind(manager), + sync: manager.getSyncConfig(), + }) as unknown as UnionCollection + + collection.add = manager.addSource.bind(manager) + collection.remove = manager.removeSource.bind(manager) + collection.hasSource = manager.hasSource.bind(manager) + collection.sources = manager.listSources.bind(manager) + + return collection +} + +export function unionFromLiveQuery< + TContext extends Context, + TResult extends object = GetResult & object, + TOutput extends object = TResult, + TKey extends string | number = string | number, +>( + query: + | ((q: InitialQueryBuilder) => QueryBuilder) + | QueryBuilder, + mapToCollection: (result: TResult) => SourceCollection, +): UnionCollection { + const unionCollection = union() + const liveQueryCollection = createLiveQueryCollection({ + query, + startSync: true, + }) + + const tracker = new SourceRefTracker>() + + const subscription = liveQueryCollection.subscribeChanges( + (changes) => { + for (const change of changes) { + const resultKey = change.key as TKey + if (change.type === `delete`) { + const delta = tracker.removeReference(resultKey) + if (delta.removed) { + unionCollection.remove(delta.removed) + } + continue + } + + const nextSource = mapToCollection(change.value) + const delta = tracker.addReference(resultKey, nextSource) + if (delta.removed) { + unionCollection.remove(delta.removed) + } + if (delta.added) { + unionCollection.add(delta.added) + } + } + }, + { includeInitialState: true }, + ) + + const statusUnsubscribe = unionCollection.on(`status:change`, (event) => { + if (event.status === `cleaned-up`) { + subscription.unsubscribe() + statusUnsubscribe() + } + }) + + return unionCollection +} diff --git a/packages/db/tests/query/union.test.ts b/packages/db/tests/query/union.test.ts new file mode 100644 index 000000000..2c6cdb23f --- /dev/null +++ b/packages/db/tests/query/union.test.ts @@ -0,0 +1,108 @@ +import { describe, expect, test } from 'vitest' +import { createCollection } from '../../src/collection/index.js' +import { UnionKeyConflictError } from '../../src/errors.js' +import { union, unionFromLiveQuery } from '../../src/query/index.js' +import { mockSyncCollectionOptions } from '../utils.js' + +type Item = { + id: number + value: string +} + +function createItemsCollection(id: string, items: Array) { + return createCollection( + mockSyncCollectionOptions({ + id, + getKey: (item) => item.id, + initialData: items, + }), + ) +} + +describe(`union`, () => { + test(`combines multiple collections into a single source`, async () => { + const a = createItemsCollection(`items-a`, [ + { id: 1, value: `a1` }, + { id: 2, value: `a2` }, + ]) + const b = createItemsCollection(`items-b`, [{ id: 3, value: `b1` }]) + + const unified = union(a, b) + const items = await unified.toArrayWhenReady() + + const sorted = items.sort((left, right) => left.id - right.id) + expect(sorted).toEqual([ + { id: 1, value: `a1` }, + { id: 2, value: `a2` }, + { id: 3, value: `b1` }, + ]) + }) + + test(`throws when union sources share the same key`, async () => { + const a = createItemsCollection(`items-a`, [{ id: 1, value: `a1` }]) + const b = createItemsCollection(`items-b`, [{ id: 1, value: `b1` }]) + + const unified = union(a, b) + + await expect(unified.preload()).rejects.toThrow(UnionKeyConflictError) + expect(unified.status).toBe(`error`) + }) +}) + +describe(`unionFromLiveQuery`, () => { + test(`adds and removes sources based on live query results`, async () => { + const sourceIndex = createItemsCollection(`sources`, [ + { id: 1, value: `first` }, + { id: 2, value: `second` }, + ]) + + const collectionA = createItemsCollection(`items-a`, [ + { id: 1, value: `a` }, + ]) + const collectionB = createItemsCollection(`items-b`, [ + { id: 2, value: `b` }, + ]) + + const unified = unionFromLiveQuery( + (q) => q.from({ source: sourceIndex }), + (result) => (result.id === 1 ? collectionA : collectionB), + ) + + await unified.preload() + await new Promise((resolve) => setTimeout(resolve, 0)) + expect(unified.toArray.map((row) => row.id)).toEqual([1, 2]) + + sourceIndex.delete(2) + await new Promise((resolve) => setTimeout(resolve, 0)) + + expect(unified.toArray.map((row) => row.id)).toEqual([1]) + }) + + test(`keeps a source while multiple live query rows reference it`, async () => { + const sourceIndex = createItemsCollection(`sources`, [ + { id: 1, value: `first` }, + { id: 2, value: `second` }, + ]) + + const sharedCollection = createItemsCollection(`items-shared`, [ + { id: 1, value: `shared` }, + ]) + + const unified = unionFromLiveQuery( + (q) => q.from({ source: sourceIndex }), + () => sharedCollection, + ) + + await unified.preload() + await new Promise((resolve) => setTimeout(resolve, 0)) + expect(unified.toArray.map((row) => row.id)).toEqual([1]) + + sourceIndex.delete(1) + await new Promise((resolve) => setTimeout(resolve, 0)) + expect(unified.toArray.map((row) => row.id)).toEqual([1]) + + sourceIndex.delete(2) + await new Promise((resolve) => setTimeout(resolve, 0)) + expect(unified.toArray).toEqual([]) + }) +})