diff --git a/eslint.base.mjs b/eslint.base.mjs index 60425fa..d25409a 100644 --- a/eslint.base.mjs +++ b/eslint.base.mjs @@ -51,7 +51,10 @@ export default [ console: true, window: true, document: true, - globalThis: true + globalThis: true, + EventTarget: true, + CustomEvent: true, + EventListener: true }, }, ignores: ['dist/', 'node_modules/'], diff --git a/packages/d2ts/examples/fruit-processed.ts b/packages/d2ts/examples/fruit-processed.ts index ce36e69..d6bb8aa 100644 --- a/packages/d2ts/examples/fruit-processed.ts +++ b/packages/d2ts/examples/fruit-processed.ts @@ -1,22 +1,62 @@ -import { MultiSet } from '../src/multiset' -import { D2 } from '../src/index.js' -import { map, reduce, consolidate, output } from '../src/operators/index.js' -import { v } from '../src/order.js' -import { MessageType } from '../src/types.js' +import { map, reduce, consolidate } from '../src/operators/index.js' +import { Store } from '../src/store.js' type FruitOrder = { - name: string, - quantity: number, - shipping_id: string, + name: string + quantity: number + shipping_id: string status: 'packed' | 'shipped' | 'delivered' } -const graph = new D2({ initialFrontier: v(0) }) -const input = graph.newInput() - -// Track quantities by status -const materializedStatus = new Map(); -const materializedProcessed = new Map(); +const fruitOrders = new Store() + +const { materializedStatus, materializedProcessed } = Store.queryAll( + [fruitOrders], + ([fruitStream]) => { + const statusStream = fruitStream.pipe( + // debug('Raw Input'), + map( + ([orderId, order]) => + [`${order.name}-${order.status}`, order.quantity] as [string, number], + ), + // debug('After Map'), + reduce((values) => { + // The reduce function receives an array of [quantity, diff] for each key + // `diff` being the change in number of occurrences of the specific quantity + // It is not aware of the key, just that everything it is receiving is for the same key + // Here we want to sum the quantity for each key, so a sum of num * diff + let count = 0 + for (const [num, diff] of values) { + count += num * diff + } + return [[count, 1]] + }), + // debug('Status Totals'), + consolidate(), + ) + const processedStream = fruitStream.pipe( + // debug('Raw Input'), + map( + ([orderId, order]) => [order.name, order.quantity] as [string, number], + ), + // debug('After Map'), + reduce((values) => { + // Count the total number of each fruit processed + let count = 0 + for (const [num, diff] of values) { + count += num * diff + } + return [[count, 1]] + }), + // debug('Total Processed'), + consolidate(), + ) + + const materializedStatus = Store.materialize(statusStream) + const materializedProcessed = Store.materialize(processedStream) + return { materializedStatus, materializedProcessed } + }, +) function showStatus() { const obj = Object.fromEntries(materializedStatus.entries()) @@ -30,93 +70,25 @@ function showProcessed() { console.log(JSON.stringify(obj, null, 2)) } -input.pipe( - // debug('Raw Input'), - map((order) => [`${order.name}-${order.status}`, order.quantity] as [string, number]), - // debug('After Map'), - reduce((values) => { - // The reduce function receives an array of [quantity, diff] for each key - // `diff` being the change in number of occurrences of the specific quantity - // It is not aware of the key, just that everything it is receiving is for the same key - // Here we want to sum the quantity for each key, so a sum of num * diff - let count = 0 - for (const [num, diff] of values) { - count += num * diff - } - return [[count, 1]] - }), - // debug('Status Totals'), - consolidate(), - output((msg) => { - if (msg.type === MessageType.DATA) { - const entries = msg.data.collection.getInner(); - // The entreis are: - // key: {fruit-name}-{status} - // count: number of items in that status - // diff: 1 if adding a row, -1 if removing a row - for (const [[key, count], diff] of entries) { - if (diff > 0) { - materializedStatus.set(key, count) - } else if (diff < 0) { - materializedStatus.delete(key) - } - } - } - }) -) - -// Track total processed quantities regardless of status -input.pipe( - // debug('Raw Input'), - map((order) => [order.name, order.quantity] as [string, number]), - // debug('After Map'), - reduce((values) => { - // Count the total number of each fruit processed - let count = 0 - for (const [num, diff] of values) { - count += num * diff - } - return [[count, 1]] - }), - // debug('Total Processed'), - consolidate(), - output((msg) => { - if (msg.type === MessageType.DATA) { - const entries = msg.data.collection.getInner(); - for (const [[key, count], diff] of entries) { - if (diff > 0) { - materializedProcessed.set(key, count) - } else if (diff < 0) { - materializedProcessed.delete(key) - } - } - } - }) -) - -graph.finalize() - console.log('--------------------------------') // Initial packing of orders console.log('Sending initial orders') -input.sendData(v(0), new MultiSet([ - [{ +fruitOrders.transaction((tx) => { + tx.set('A001', { name: 'apple', quantity: 100, shipping_id: 'A001', - status: 'packed' - }, 1], - [{ + status: 'packed', + }) + tx.set('B001', { name: 'banana', quantity: 150, shipping_id: 'B001', - status: 'packed' - }, 1] -])) + status: 'packed', + }) +}) -input.sendFrontier(v(1)) // Send a frontier to set the new minimum version -graph.step() // Step the graph to process the data // Show the materialized status and processed totals: showStatus() showProcessed() @@ -125,38 +97,21 @@ console.log('--------------------------------') // Ship 2 orders console.log('Shipping 2 orders') -input.sendData(v(1), new MultiSet([ - // Remove from packed status - [{ - name: 'apple', - quantity: 100, - shipping_id: 'A001', - status: 'packed' - }, -1], - // Add to shipped status - [{ +fruitOrders.transaction((tx) => { + tx.set('A001', { name: 'apple', quantity: 100, shipping_id: 'A001', - status: 'shipped' - }, 1], - - [{ - name: 'banana', - quantity: 150, - shipping_id: 'B001', - status: 'packed' - }, -1], - [{ + status: 'shipped', + }) + tx.set('B001', { name: 'banana', quantity: 150, shipping_id: 'B001', - status: 'shipped' - }, 1] -])) + status: 'shipped', + }) +}) -input.sendFrontier(v(2)) -graph.step() showStatus() showProcessed() @@ -164,25 +119,15 @@ console.log('--------------------------------') // One order arrives console.log('One order arrives') -input.sendData(v(2), new MultiSet([ - // Remove from shipped status - [{ - name: 'apple', - quantity: 100, - shipping_id: 'A001', - status: 'shipped' - }, -1], - // Add to delivered status - [{ +fruitOrders.transaction((tx) => { + tx.set('A001', { name: 'apple', quantity: 100, shipping_id: 'A001', - status: 'delivered' - }, 1] -])) + status: 'delivered', + }) +}) -input.sendFrontier(v(3)) -graph.step() showStatus() showProcessed() diff --git a/packages/d2ts/src/store.ts b/packages/d2ts/src/store.ts new file mode 100644 index 0000000..98307de --- /dev/null +++ b/packages/d2ts/src/store.ts @@ -0,0 +1,288 @@ +import { MessageType } from './types' + +import { output } from './operators/output' +import { IStreamBuilder } from './types' +import { D2 } from './d2' +import { MultiSet, MultiSetArray } from './multiset' + +export type ChangeInsert = { + type: 'insert' + key: K + value: V +} + +export type ChangeDelete = { + type: 'delete' + key: K + previousValue: V | undefined +} + +export type ChangeUpdate = { + type: 'update' + key: K + value: V + previousValue: V | undefined +} + +export type Change = + | ChangeInsert + | ChangeDelete + | ChangeUpdate + +export type ChangeSet = Change[] + +export class Store { + #inner: Map + #inTransaction: boolean = false + #pendingChanges: ChangeSet = [] + #subscribers: Set<(changes: ChangeSet) => void> = new Set() + + constructor(initial?: Map) { + this.#inner = new Map() + if (initial) { + this.#inTransaction = true + for (const [key, value] of initial) { + this.set(key, value) + } + this.#inTransaction = false + this.#emitChanges() + } + } + + #emitChanges() { + if (this.#pendingChanges.length > 0) { + const changes = this.#pendingChanges + this.#subscribers.forEach((subscriber) => subscriber(changes)) + this.#pendingChanges = [] + } + } + + subscribe(callback: (changes: ChangeSet) => void): () => void { + this.#subscribers.add(callback) + return () => { + this.#subscribers.delete(callback) + } + } + + clear(): void { + for (const key of this.#inner.keys()) { + this.delete(key) + } + } + + delete(key: K): void { + const previousValue = this.#inner.get(key) + this.#inner.delete(key) + this.#pendingChanges.push({ + type: 'delete', + key, + previousValue, + }) + if (!this.#inTransaction) { + this.#emitChanges() + } + } + + entries(): IterableIterator<[K, V]> { + return this.#inner.entries() + } + + forEach( + callbackfn: (value: V, key: K, map: Map) => void, + thisArg?: unknown, + ): void { + this.#inner.forEach(callbackfn, thisArg) + } + + get(key: K): V | undefined { + return this.#inner.get(key) + } + + entriesAsChanges(): ChangeSet { + return Array.from(this.#inner.entries()).map(([key, value]) => ({ + type: 'insert', + key, + value, + })) + } + + has(key: K): boolean { + return this.#inner.has(key) + } + + keys(): IterableIterator { + return this.#inner.keys() + } + + set(key: K, value: V): void { + const previousValue = this.#inner.get(key) + this.#inner.set(key, value) + if (previousValue) { + this.#pendingChanges.push({ + type: 'update', + key, + value, + previousValue, + }) + } else { + this.#pendingChanges.push({ + type: 'insert', + key, + value, + }) + } + if (!this.#inTransaction) { + this.#emitChanges() + } + } + + transaction(fn: (store: Store) => void): void { + this.#inTransaction = true + fn(this) + this.#inTransaction = false + this.#emitChanges() + } + + query(fn: (stream: IStreamBuilder<[K, V]>) => R): R { + return Store.queryAll([this], ([stream]) => fn(stream)) + } + + update(key: K, fn: (value: V | undefined) => V): void { + const previousValue = this.#inner.get(key) + const value = fn(previousValue) + this.set(key, value) + } + + values(): IterableIterator { + return this.#inner.values() + } + + [Symbol.iterator](): IterableIterator<[K, V]> { + return this.#inner[Symbol.iterator]() + } + + get size(): number { + return this.#inner.size + } + + static materialize(stream: IStreamBuilder<[K, V]>): Store { + const store = new Store() + stream.pipe( + output((msg) => { + if (msg.type === MessageType.DATA) { + const collection = msg.data.collection + store.transaction((tx) => { + const changesByKey = new Map< + K, + { deletes: number; inserts: number; value: V } + >() + + for (const [[key, value], multiplicity] of collection.getInner()) { + let changes = changesByKey.get(key) + if (!changes) { + changes = { deletes: 0, inserts: 0, value: value } + changesByKey.set(key, changes) + } + + if (multiplicity < 0) { + changes.deletes += Math.abs(multiplicity) + } else if (multiplicity > 0) { + changes.inserts += multiplicity + changes.value = value + } + } + + for (const [key, changes] of changesByKey) { + const { deletes, inserts, value } = changes + if (inserts >= deletes) { + tx.set(key, value) + } else if (deletes > 0) { + tx.delete(key) + } + } + }) + } + }), + ) + return store + } + + static queryAll( + stores: Store[], + fn: (streams: IStreamBuilder<[K, V]>[]) => R, + ): R { + let time = 0 + const graph = new D2({ initialFrontier: time }) + const inputs = stores.map(() => graph.newInput<[K, V]>()) + const ret = fn(inputs) + graph.finalize() + + for (let i = 0; i < stores.length; i++) { + const store = stores[i] + const input = inputs[i] + store.subscribe((rawChanges) => { + const changes: MultiSetArray<[K, V]> = [] + for (const change of rawChanges) { + switch (change.type) { + case 'insert': + changes.push([[change.key, change.value], 1]) + break + case 'delete': + changes.push([[change.key, change.previousValue!], -1]) + break + case 'update': + changes.push([[change.key, change.value], 1]) + changes.push([[change.key, change.previousValue!], -1]) + break + } + } + input.sendData(time, new MultiSet(changes)) + input.sendFrontier(++time) + graph.step() + time++ + }) + } + + // Send the initial data + for (let i = 0; i < stores.length; i++) { + const store = stores[i] + const input = inputs[i] + const rawChanges = store.entriesAsChanges() + const changes: MultiSetArray<[K, V]> = [] + for (const change of rawChanges) { + switch (change.type) { + case 'insert': + changes.push([[change.key, change.value], 1]) + break + case 'delete': + changes.push([[change.key, change.previousValue!], -1]) + break + case 'update': + changes.push([[change.key, change.value], 1]) + changes.push([[change.key, change.previousValue!], -1]) + break + } + } + input.sendData(time, new MultiSet(changes)) + input.sendFrontier(++time) + graph.step() + } + + return ret + } + + static transactionAll( + stores: Store[], + fn: (stores: Store[]) => void, + ): void { + stores.forEach((store) => (store.#inTransaction = true)) + try { + fn(stores) + } finally { + stores.forEach((store) => { + store.#inTransaction = false + store.#emitChanges() + }) + } + } +} diff --git a/packages/d2ts/tests/materialize.test.ts b/packages/d2ts/tests/materialize.test.ts new file mode 100644 index 0000000..631fb4a --- /dev/null +++ b/packages/d2ts/tests/materialize.test.ts @@ -0,0 +1,177 @@ +import { describe, it, expect } from 'vitest' +import { D2 } from '../src/d2' +import { MultiSet } from '../src/multiset' +import { map } from '../src/operators' +import { v } from '../src/order' +import { Store } from '../src/store' + +describe('StreamBuilder.materialize', () => { + it('should create a store with initial data', () => { + const graph = new D2({ initialFrontier: v(0) }) + const input = graph.newInput<{ key: string; value: number }>() + + const materialized = Store.materialize( + input.pipe(map((data) => [data.key, data.value] as [string, number])), + ) + + graph.finalize() + + input.sendData( + v(0), + new MultiSet([ + [{ key: 'a', value: 1 }, 1], + [{ key: 'b', value: 2 }, 1], + ]), + ) + + input.sendFrontier(v(1)) + graph.step() + + expect(materialized.get('a')).toBe(1) + expect(materialized.get('b')).toBe(2) + expect(materialized.size).toBe(2) + }) + + it('should update store when data changes', () => { + const graph = new D2({ initialFrontier: v(0) }) + const input = graph.newInput<{ key: string; value: number }>() + + const materialized = Store.materialize( + input.pipe(map((data) => [data.key, data.value] as [string, number])), + ) + + graph.finalize() + + // Initial data + input.sendData(v(0), new MultiSet([[{ key: 'a', value: 1 }, 1]])) + input.sendFrontier(v(1)) + graph.step() + + // Update data + input.sendData( + v(1), + new MultiSet([ + [{ key: 'a', value: 1 }, -1], // Remove old value + [{ key: 'a', value: 10 }, 1], // Add new value + ]), + ) + input.sendFrontier(v(2)) + graph.step() + + expect(materialized.get('a')).toBe(10) + expect(materialized.size).toBe(1) + }) + + it('should remove entries when they are deleted', () => { + const graph = new D2({ initialFrontier: v(0) }) + const input = graph.newInput<{ key: string; value: number }>() + + const materialized = Store.materialize( + input.pipe(map((data) => [data.key, data.value] as [string, number])), + ) + + graph.finalize() + + // Initial data + input.sendData( + v(0), + new MultiSet([ + [{ key: 'a', value: 1 }, 1], + [{ key: 'b', value: 2 }, 1], + ]), + ) + input.sendFrontier(v(1)) + graph.step() + + // Delete entry + input.sendData(v(1), new MultiSet([[{ key: 'a', value: 1 }, -1]])) + input.sendFrontier(v(2)) + graph.step() + + expect(materialized.has('a')).toBe(false) + expect(materialized.get('b')).toBe(2) + expect(materialized.size).toBe(1) + }) + + it('should emit change events when store is updated', () => { + const graph = new D2({ initialFrontier: v(0) }) + const input = graph.newInput<{ key: string; value: number }>() + + const materialized = Store.materialize( + input.pipe(map((data) => [data.key, data.value] as [string, number])), + ) + + graph.finalize() + + const changes: any[] = [] + const unsubscribe = materialized.subscribe((change) => { + changes.push(change) + }) + + // Initial data + input.sendData(v(0), new MultiSet([[{ key: 'a', value: 1 }, 1]])) + input.sendFrontier(v(1)) + graph.step() + + expect(changes).toHaveLength(1) + expect(changes[0]).toEqual([ + { + type: 'insert', + key: 'a', + value: 1, + }, + ]) + + // Update data + input.sendData( + v(1), + new MultiSet([ + [{ key: 'a', value: 1 }, -1], + [{ key: 'a', value: 10 }, 1], + ]), + ) + input.sendFrontier(v(2)) + graph.step() + + expect(changes).toHaveLength(2) + expect(changes[1]).toEqual([ + { + type: 'update', + key: 'a', + value: 10, + previousValue: 1, + }, + ]) + + unsubscribe() + }) + + it('should handle multiple updates in a single step', () => { + const graph = new D2({ initialFrontier: v(0) }) + const input = graph.newInput<{ key: string; value: number }>() + + const materialized = Store.materialize( + input.pipe(map((data) => [data.key, data.value] as [string, number])), + ) + + graph.finalize() + + input.sendData( + v(0), + new MultiSet([ + [{ key: 'a', value: 1 }, 1], + [{ key: 'b', value: 2 }, 1], + [{ key: 'c', value: 3 }, 1], + [{ key: 'a', value: 1 }, -1], + [{ key: 'a', value: 10 }, 1], + ]), + ) + input.sendFrontier(v(1)) + graph.step() + + expect(materialized.get('a')).toBe(10) + expect(materialized.get('b')).toBe(2) + expect(materialized.get('c')).toBe(3) + expect(materialized.size).toBe(3) + }) +}) diff --git a/packages/d2ts/tests/store.test.ts b/packages/d2ts/tests/store.test.ts new file mode 100644 index 0000000..3309d69 --- /dev/null +++ b/packages/d2ts/tests/store.test.ts @@ -0,0 +1,443 @@ +import { describe, it, expect, beforeEach } from 'vitest' +import { Store } from '../src/store' +import type { ChangeSet } from '../src/store' +import { map, reduce, concat } from '../src/operators' + +describe('Store', () => { + let store: Store + + beforeEach(() => { + store = new Store( + new Map([ + ['a', 1], + ['b', 2], + ]), + ) + }) + + it('should initialize with initial values', () => { + expect(store.get('a')).toBe(1) + expect(store.get('b')).toBe(2) + expect(store.size).toBe(2) + }) + + describe('basic operations', () => { + it('should set and get values', () => { + store.set('c', 3) + expect(store.get('c')).toBe(3) + }) + + it('should delete values', () => { + store.delete('a') + expect(store.get('a')).toBeUndefined() + expect(store.size).toBe(1) + }) + + it('should check if key exists', () => { + expect(store.has('a')).toBe(true) + expect(store.has('z')).toBe(false) + }) + + it('should clear all values', () => { + store.clear() + expect(store.size).toBe(0) + }) + }) + + describe('iteration methods', () => { + it('should iterate over entries', () => { + const entries = Array.from(store.entries()) + expect(entries).toEqual([ + ['a', 1], + ['b', 2], + ]) + }) + + it('should iterate over keys', () => { + const keys = Array.from(store.keys()) + expect(keys).toEqual(['a', 'b']) + }) + + it('should iterate over values', () => { + const values = Array.from(store.values()) + expect(values).toEqual([1, 2]) + }) + + it('should support forEach', () => { + const result: Array<[string, number]> = [] + store.forEach((value, key) => { + result.push([key, value]) + }) + expect(result).toEqual([ + ['a', 1], + ['b', 2], + ]) + }) + }) + + describe('change events', () => { + it('should emit insert events', () => { + const changes: ChangeSet[] = [] + const unsubscribe = store.subscribe((change) => { + changes.push(change) + }) + + store.set('c', 3) + + expect(changes).toHaveLength(1) + expect(changes[0]).toEqual([ + { + type: 'insert', + key: 'c', + value: 3, + }, + ]) + + unsubscribe() + }) + + it('should emit update events', () => { + const changes: ChangeSet[] = [] + const unsubscribe = store.subscribe((change) => { + changes.push(change) + }) + + store.set('a', 10) + + expect(changes).toHaveLength(1) + expect(changes[0]).toEqual([ + { + type: 'update', + key: 'a', + value: 10, + previousValue: 1, + }, + ]) + + unsubscribe() + }) + + it('should emit delete events', () => { + const changes: ChangeSet[] = [] + const unsubscribe = store.subscribe((change) => { + changes.push(change) + }) + + store.delete('a') + + expect(changes).toHaveLength(1) + expect(changes[0]).toEqual([ + { + type: 'delete', + key: 'a', + previousValue: 1, + }, + ]) + + unsubscribe() + }) + + it('should stop receiving events after unsubscribe', () => { + const changes: ChangeSet[] = [] + const unsubscribe = store.subscribe((change) => { + changes.push(change) + }) + + store.set('c', 3) + unsubscribe() + store.set('d', 4) + + expect(changes).toHaveLength(1) + expect(changes[0]).toEqual([ + { + type: 'insert', + key: 'c', + value: 3, + }, + ]) + }) + }) + + describe('transactions', () => { + it('should batch changes in transactions', () => { + const changes: ChangeSet[] = [] + const unsubscribe = store.subscribe((change) => { + changes.push(change) + }) + + store.transaction((store) => { + store.set('c', 3) + store.set('d', 4) + store.delete('a') + }) + + expect(changes).toHaveLength(1) + expect(changes[0]).toEqual([ + { + type: 'insert', + key: 'c', + value: 3, + }, + { + type: 'insert', + key: 'd', + value: 4, + }, + { + type: 'delete', + key: 'a', + previousValue: 1, + }, + ]) + + unsubscribe() + }) + }) + + describe('update method', () => { + it('should update existing values', () => { + store.update('a', (value) => (value || 0) + 10) + expect(store.get('a')).toBe(11) + }) + + it('should handle updates on non-existing keys', () => { + store.update('z', (value) => (value || 0) + 5) + expect(store.get('z')).toBe(5) + }) + }) + + describe('entriesAsChanges', () => { + it('should return all entries as insert changes', () => { + const changes = store.entriesAsChanges() + expect(changes).toEqual([ + { type: 'insert', key: 'a', value: 1 }, + { type: 'insert', key: 'b', value: 2 }, + ]) + }) + }) + + describe('Symbol.iterator', () => { + it('should support for...of iteration', () => { + const entries: Array<[string, number]> = [] + for (const entry of store) { + entries.push(entry) + } + expect(entries).toEqual([ + ['a', 1], + ['b', 2], + ]) + }) + }) + + describe('queryAll', () => { + it('should allow querying multiple stores', () => { + const store1 = new Store( + new Map([ + ['a', 1], + ['b', 2], + ]), + ) + const store2 = new Store( + new Map([ + ['c', 3], + ['d', 4], + ]), + ) + + const materialized = Store.queryAll([store1, store2], ([s1, s2]) => { + return Store.materialize(s1.pipe(concat(s2))) + }) + + expect(Array.from(materialized.entries())).toEqual([ + ['a', 1], + ['b', 2], + ['c', 3], + ['d', 4], + ]) + }) + + it('should react to changes in source stores', () => { + const store1 = new Store(new Map([['a', 1]])) + const store2 = new Store(new Map([['b', 2]])) + + const materialized = Store.queryAll([store1, store2], ([s1, s2]) => { + // Combine both streams into one store + return Store.materialize(s1.pipe(concat(s2))) + }) + + expect(Array.from(materialized.entries())).toEqual([ + ['a', 1], + ['b', 2], + ]) + + // Make changes to source stores + store1.set('a', 10) + store2.set('c', 3) + + expect(Array.from(materialized.entries())).toEqual([ + ['a', 10], + ['b', 2], + ['c', 3], + ]) + }) + + it('should handle complex transformations', () => { + type FruitOrder = { + name: string + quantity: number + status: 'packed' | 'shipped' + } + + const orders = new Store() + + const { byStatus, totals } = Store.queryAll([orders], ([orderStream]) => { + // Count by status + const byStatus = Store.materialize( + orderStream.pipe( + map( + ([_, order]) => + [`${order.name}-${order.status}`, order.quantity] as [ + string, + number, + ], + ), + ), + ) + + // Count total by fruit + const totals = Store.materialize( + orderStream.pipe( + map( + ([_, order]) => [order.name, order.quantity] as [string, number], + ), + reduce((values) => { + let sum = 0 + for (const [qty, diff] of values) { + sum += qty * diff + } + return [[sum, 1]] + }), + ), + ) + + return { byStatus, totals } + }) + + // Add initial orders + orders.transaction((tx) => { + tx.set('1', { name: 'apple', quantity: 100, status: 'packed' }) + tx.set('2', { name: 'banana', quantity: 150, status: 'packed' }) + }) + + expect(Array.from(byStatus.entries())).toEqual([ + ['apple-packed', 100], + ['banana-packed', 150], + ]) + + expect(Array.from(totals.entries())).toEqual([ + ['apple', 100], + ['banana', 150], + ]) + + // Update an order status + orders.set('1', { name: 'apple', quantity: 100, status: 'shipped' }) + + expect(Array.from(byStatus.entries())).toEqual([ + ['banana-packed', 150], + ['apple-shipped', 100], + ]) + + // Totals shouldn't change since only status changed + expect(Array.from(totals.entries())).toEqual([ + ['apple', 100], + ['banana', 150], + ]) + }) + }) + + describe('query', () => { + it('should allow querying a single store', () => { + const store = new Store( + new Map([ + ['a', 1], + ['b', 2], + ]), + ) + + const materialized = store.query((stream) => + Store.materialize( + stream.pipe( + map(([key, value]) => [key, value * 2] as [string, number]), + ), + ), + ) + + expect(Array.from(materialized.entries())).toEqual([ + ['a', 2], + ['b', 4], + ]) + + store.set('c', 3) + expect(Array.from(materialized.entries())).toEqual([ + ['a', 2], + ['b', 4], + ['c', 6], + ]) + }) + }) + + describe('transactionAll', () => { + it('should batch changes across multiple stores', () => { + const store1 = new Store() + const store2 = new Store() + + const changes1: ChangeSet[] = [] + const changes2: ChangeSet[] = [] + + const unsubscribe1 = store1.subscribe((change) => changes1.push(change)) + const unsubscribe2 = store2.subscribe((change) => changes2.push(change)) + + Store.transactionAll([store1, store2], ([s1, s2]) => { + s1.set('a', 1) + s2.set('b', 2) + s1.set('c', 3) + s2.set('d', 4) + }) + + expect(changes1).toHaveLength(1) + expect(changes1[0]).toEqual([ + { type: 'insert', key: 'a', value: 1 }, + { type: 'insert', key: 'c', value: 3 }, + ]) + + expect(changes2).toHaveLength(1) + expect(changes2[0]).toEqual([ + { type: 'insert', key: 'b', value: 2 }, + { type: 'insert', key: 'd', value: 4 }, + ]) + + unsubscribe1() + unsubscribe2() + }) + + it('should handle errors and still emit changes', () => { + const store1 = new Store() + const store2 = new Store() + + const changes: ChangeSet[] = [] + const unsubscribe = store1.subscribe((change) => changes.push(change)) + + expect(() => { + Store.transactionAll([store1, store2], ([s1, _s2]) => { + s1.set('a', 1) + throw new Error('test error') + }) + }).toThrow('test error') + + expect(changes).toHaveLength(1) + expect(changes[0]).toEqual([{ type: 'insert', key: 'a', value: 1 }]) + + unsubscribe() + }) + }) +})