|
| 1 | +import { WatchCompatibleQuery, WatchedQuery, WatchedQueryOptions, WatchedQueryState } from '../WatchedQuery.js'; |
| 2 | +import { AbstractQueryProcessor, AbstractQueryProcessorOptions, LinkQueryOptions } from './AbstractQueryProcessor.js'; |
| 3 | + |
| 4 | +export interface Differential<RowType> { |
| 5 | + current: RowType; |
| 6 | + previous: RowType; |
| 7 | +} |
| 8 | + |
| 9 | +export interface WatchedQueryDifferential<RowType> { |
| 10 | + added: RowType[]; |
| 11 | + all: RowType[]; |
| 12 | + removed: RowType[]; |
| 13 | + updated: Differential<RowType>[]; |
| 14 | + unchanged: RowType[]; |
| 15 | +} |
| 16 | + |
| 17 | +export interface Differentiator<RowType> { |
| 18 | + identify: (item: RowType) => string; |
| 19 | + compareBy: (item: RowType) => string; |
| 20 | +} |
| 21 | + |
| 22 | +export interface DifferentialWatchedQuerySettings<RowType> |
| 23 | + extends WatchedQueryOptions<WatchedQueryDifferential<RowType>> { |
| 24 | + query: WatchCompatibleQuery<RowType[]>; |
| 25 | +} |
| 26 | + |
| 27 | +/** |
| 28 | + * @internal |
| 29 | + */ |
| 30 | +export interface DifferentialQueryProcessorOptions<RowType> |
| 31 | + extends AbstractQueryProcessorOptions<WatchedQueryDifferential<RowType>, DifferentialWatchedQuerySettings<RowType>> { |
| 32 | + differentiator: Differentiator<RowType>; |
| 33 | +} |
| 34 | + |
| 35 | +type DataHashMap<RowType> = Map<string, { hash: string; item: RowType }>; |
| 36 | + |
| 37 | +export const EMPTY_DIFFERENTIAL = { |
| 38 | + added: [], |
| 39 | + all: [], |
| 40 | + removed: [], |
| 41 | + updated: [], |
| 42 | + unchanged: [] |
| 43 | +}; |
| 44 | + |
| 45 | +/** |
| 46 | + * Uses the PowerSync onChange event to trigger watched queries. |
| 47 | + * Results are emitted on every change of the relevant tables. |
| 48 | + * @internal |
| 49 | + */ |
| 50 | +export class DifferentialQueryProcessor<RowType> |
| 51 | + extends AbstractQueryProcessor<WatchedQueryDifferential<RowType>, DifferentialWatchedQuerySettings<RowType>> |
| 52 | + implements WatchedQuery<WatchedQueryDifferential<RowType>, DifferentialWatchedQuerySettings<RowType>> |
| 53 | +{ |
| 54 | + constructor(protected options: DifferentialQueryProcessorOptions<RowType>) { |
| 55 | + super(options); |
| 56 | + } |
| 57 | + |
| 58 | + /* |
| 59 | + * @returns If the sets are equal |
| 60 | + */ |
| 61 | + protected differentiate( |
| 62 | + current: RowType[], |
| 63 | + previousMap: DataHashMap<RowType> |
| 64 | + ): { diff: WatchedQueryDifferential<RowType>; map: DataHashMap<RowType>; hasChanged: boolean } { |
| 65 | + const { identify, compareBy } = this.options.differentiator; |
| 66 | + |
| 67 | + let hasChanged = false; |
| 68 | + const currentMap = new Map<string, { hash: string; item: RowType }>(); |
| 69 | + current.forEach((item) => { |
| 70 | + currentMap.set(identify(item), { |
| 71 | + hash: compareBy(item), |
| 72 | + item |
| 73 | + }); |
| 74 | + }); |
| 75 | + |
| 76 | + const removedTracker = new Set(previousMap.keys()); |
| 77 | + |
| 78 | + const diff: WatchedQueryDifferential<RowType> = { |
| 79 | + all: current, |
| 80 | + added: [], |
| 81 | + removed: [], |
| 82 | + updated: [], |
| 83 | + unchanged: [] |
| 84 | + }; |
| 85 | + |
| 86 | + for (const [key, { hash, item }] of currentMap) { |
| 87 | + const previousItem = previousMap.get(key); |
| 88 | + if (!previousItem) { |
| 89 | + // New item |
| 90 | + hasChanged = true; |
| 91 | + diff.added.push(item); |
| 92 | + } else { |
| 93 | + // Existing item |
| 94 | + if (hash == previousItem.hash) { |
| 95 | + diff.unchanged.push(item); |
| 96 | + } else { |
| 97 | + hasChanged = true; |
| 98 | + diff.updated.push({ current: item, previous: previousItem.item }); |
| 99 | + } |
| 100 | + } |
| 101 | + // The item is present, we don't consider it removed |
| 102 | + removedTracker.delete(key); |
| 103 | + } |
| 104 | + |
| 105 | + diff.removed = Array.from(removedTracker).map((key) => previousMap.get(key)!.item); |
| 106 | + hasChanged = hasChanged || diff.removed.length > 0; |
| 107 | + |
| 108 | + return { |
| 109 | + diff, |
| 110 | + hasChanged, |
| 111 | + map: currentMap |
| 112 | + }; |
| 113 | + } |
| 114 | + |
| 115 | + protected async linkQuery(options: LinkQueryOptions<WatchedQueryDifferential<RowType>>): Promise<void> { |
| 116 | + const { db, watchOptions } = this.options; |
| 117 | + const { abortSignal } = options; |
| 118 | + |
| 119 | + const compiledQuery = watchOptions.query.compile(); |
| 120 | + const tables = await db.resolveTables(compiledQuery.sql, compiledQuery.parameters as any[]); |
| 121 | + |
| 122 | + let currentMap: DataHashMap<RowType> = new Map(); |
| 123 | + |
| 124 | + // populate the currentMap from the placeholder data |
| 125 | + if (this.state.data) { |
| 126 | + this.state.data.all.forEach((item) => { |
| 127 | + currentMap.set(this.options.differentiator.identify(item), { |
| 128 | + hash: this.options.differentiator.compareBy(item), |
| 129 | + item |
| 130 | + }); |
| 131 | + }); |
| 132 | + } |
| 133 | + |
| 134 | + db.onChangeWithCallback( |
| 135 | + { |
| 136 | + onChange: async () => { |
| 137 | + if (this.closed) { |
| 138 | + return; |
| 139 | + } |
| 140 | + // This fires for each change of the relevant tables |
| 141 | + try { |
| 142 | + if (this.reportFetching && !this.state.isFetching) { |
| 143 | + await this.updateState({ isFetching: true }); |
| 144 | + } |
| 145 | + |
| 146 | + const partialStateUpdate: Partial<WatchedQueryState<WatchedQueryDifferential<RowType>>> = {}; |
| 147 | + |
| 148 | + // Always run the query if an underlying table has changed |
| 149 | + const result = await watchOptions.query.execute({ |
| 150 | + sql: compiledQuery.sql, |
| 151 | + // Allows casting from ReadOnlyArray[unknown] to Array<unknown> |
| 152 | + // This allows simpler compatibility with PowerSync queries |
| 153 | + parameters: [...compiledQuery.parameters], |
| 154 | + db: this.options.db |
| 155 | + }); |
| 156 | + |
| 157 | + if (this.reportFetching) { |
| 158 | + partialStateUpdate.isFetching = false; |
| 159 | + } |
| 160 | + |
| 161 | + if (this.state.isLoading) { |
| 162 | + partialStateUpdate.isLoading = false; |
| 163 | + } |
| 164 | + |
| 165 | + const { diff, hasChanged, map } = this.differentiate(result, currentMap); |
| 166 | + // Update for future comparisons |
| 167 | + currentMap = map; |
| 168 | + |
| 169 | + if (hasChanged) { |
| 170 | + partialStateUpdate.data = diff; |
| 171 | + } |
| 172 | + |
| 173 | + if (Object.keys(partialStateUpdate).length > 0) { |
| 174 | + await this.updateState(partialStateUpdate); |
| 175 | + } |
| 176 | + } catch (error) { |
| 177 | + await this.updateState({ error }); |
| 178 | + } |
| 179 | + }, |
| 180 | + onError: async (error) => { |
| 181 | + await this.updateState({ error }); |
| 182 | + } |
| 183 | + }, |
| 184 | + { |
| 185 | + signal: abortSignal, |
| 186 | + tables, |
| 187 | + throttleMs: watchOptions.throttleMs, |
| 188 | + triggerImmediate: true // used to emit the initial state |
| 189 | + } |
| 190 | + ); |
| 191 | + } |
| 192 | +} |
0 commit comments