Skip to content

Commit 447de0a

Browse files
Move Differential diff out of state to a listener.
1 parent 43f24fd commit 447de0a

File tree

4 files changed

+104
-45
lines changed

4 files changed

+104
-45
lines changed

demos/yjs-react-supabase-text-collab/src/library/powersync/PowerSyncYjsProvider.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ export class PowerSyncYjsProvider extends ObservableV2<PowerSyncYjsEvents> {
3535
public readonly documentId: string
3636
) {
3737
super();
38-
3938
/**
4039
* Watch for changes to the `document_updates` table for this document.
4140
* This will be used to apply updates from other editors.
@@ -71,8 +70,8 @@ export class PowerSyncYjsProvider extends ObservableV2<PowerSyncYjsEvents> {
7170
let synced = false;
7271

7372
updateQuery.registerListener({
74-
onStateChange: async () => {
75-
for (const added of updateQuery.state.diff.added) {
73+
onDiff: async (diff) => {
74+
for (const added of diff.added) {
7675
Y.applyUpdateV2(doc, b64ToUint8Array(added.update_b64));
7776
}
7877
if (!synced) {

packages/common/src/client/watched/WatchedQuery.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,11 @@ export const DEFAULT_WATCH_QUERY_OPTIONS: WatchedQueryOptions = {
8282
reportFetching: true
8383
};
8484

85-
export interface WatchedQuery<Data = unknown, Settings extends WatchedQueryOptions = WatchedQueryOptions>
86-
extends MetaBaseObserverInterface<WatchedQueryListener<Data>> {
85+
export interface WatchedQuery<
86+
Data = unknown,
87+
Settings extends WatchedQueryOptions = WatchedQueryOptions,
88+
Listener extends WatchedQueryListener<Data> = WatchedQueryListener<Data>
89+
> extends MetaBaseObserverInterface<Listener> {
8790
/**
8891
* Current state of the watched query.
8992
*/
@@ -95,7 +98,7 @@ export interface WatchedQuery<Data = unknown, Settings extends WatchedQueryOptio
9598
* Subscribe to watched query events.
9699
* @returns A function to unsubscribe from the events.
97100
*/
98-
registerListener(listener: WatchedQueryListener<Data>): () => void;
101+
registerListener(listener: Listener): () => void;
99102

100103
/**
101104
* Updates the underlying query options.

packages/common/src/client/watched/processors/DifferentialQueryProcessor.ts

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
import { WatchCompatibleQuery, WatchedQuery, WatchedQueryOptions, WatchedQueryState } from '../WatchedQuery.js';
1+
import {
2+
WatchCompatibleQuery,
3+
WatchedQuery,
4+
WatchedQueryListener,
5+
WatchedQueryOptions,
6+
WatchedQueryState
7+
} from '../WatchedQuery.js';
28
import {
39
AbstractQueryProcessor,
410
AbstractQueryProcessorOptions,
@@ -95,11 +101,17 @@ type MutableDifferentialWatchedQueryState<RowType> = MutableWatchedQueryState<Ro
95101
diff: WatchedQueryDifferential<RowType>;
96102
};
97103

98-
export interface DifferentialWatchedQuery<RowType>
99-
extends WatchedQuery<ReadonlyArray<Readonly<RowType>>, DifferentialWatchedQuerySettings<RowType>> {
100-
readonly state: DifferentialWatchedQueryState<RowType>;
104+
export interface DifferentialWatchedQueryListener<RowType>
105+
extends WatchedQueryListener<ReadonlyArray<Readonly<RowType>>> {
106+
onDiff?: (diff: WatchedQueryDifferential<RowType>) => void | Promise<void>;
101107
}
102108

109+
export type DifferentialWatchedQuery<RowType> = WatchedQuery<
110+
ReadonlyArray<Readonly<RowType>>,
111+
DifferentialWatchedQuerySettings<RowType>,
112+
DifferentialWatchedQueryListener<RowType>
113+
>;
114+
103115
/**
104116
* @internal
105117
*/
@@ -286,9 +298,9 @@ export class DifferentialQueryProcessor<RowType>
286298
currentMap = map;
287299

288300
if (hasChanged) {
301+
await this.iterateAsyncListenersWithError((l) => l.onDiff?.(diff));
289302
Object.assign(partialStateUpdate, {
290-
data: diff.all,
291-
diff
303+
data: diff.all
292304
});
293305
}
294306

packages/web/tests/watch.test.ts

Lines changed: 78 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
import { AbstractPowerSyncDatabase, ArrayComparator, GetAllQuery, WatchedQueryState } from '@powersync/common';
1+
import {
2+
AbstractPowerSyncDatabase,
3+
ArrayComparator,
4+
GetAllQuery,
5+
WatchedQueryDifferential,
6+
WatchedQueryState
7+
} from '@powersync/common';
28
import { PowerSyncDatabase } from '@powersync/web';
39
import { v4 as uuid } from 'uuid';
410
import { afterEach, beforeEach, describe, expect, it, onTestFinished, vi } from 'vitest';
@@ -495,6 +501,14 @@ describe('Watch Tests', { sequential: true }, () => {
495501
})
496502
.differentialWatch();
497503

504+
const diffs: WatchedQueryDifferential<{ id: string; make: string }>[] = [];
505+
506+
watch.registerListener({
507+
onDiff: (diff) => {
508+
diffs.push(diff);
509+
}
510+
});
511+
498512
// Create sample data
499513
await powersync.execute(
500514
/* sql */ `
@@ -508,7 +522,7 @@ describe('Watch Tests', { sequential: true }, () => {
508522

509523
await vi.waitFor(
510524
() => {
511-
expect(watch.state.diff.added[0]?.make).equals('test1');
525+
expect(diffs[0].added[0]?.make).equals('test1');
512526
},
513527
{ timeout: 1000 }
514528
);
@@ -525,12 +539,13 @@ describe('Watch Tests', { sequential: true }, () => {
525539

526540
await vi.waitFor(
527541
() => {
542+
expect(diffs).toHaveLength(2);
528543
// This should now reflect that we had one change since the last event
529-
expect(watch.state.diff.added).toHaveLength(1);
530-
expect(watch.state.diff.added[0]?.make).equals('test2');
544+
expect(diffs[1].added).toHaveLength(1);
545+
expect(diffs[1].added[0]?.make).equals('test2');
531546

532-
expect(watch.state.diff.removed).toHaveLength(0);
533-
expect(watch.state.diff.all).toHaveLength(2);
547+
expect(diffs[1].removed).toHaveLength(0);
548+
expect(diffs[1].all).toHaveLength(2);
534549
},
535550
{ timeout: 1000 }
536551
);
@@ -546,13 +561,14 @@ describe('Watch Tests', { sequential: true }, () => {
546561

547562
await vi.waitFor(
548563
() => {
549-
expect(watch.state.diff.added).toHaveLength(0);
550-
expect(watch.state.diff.all).toHaveLength(1);
551-
expect(watch.state.diff.unchanged).toHaveLength(1);
552-
expect(watch.state.diff.unchanged[0]?.make).equals('test1');
553-
554-
expect(watch.state.diff.removed).toHaveLength(1);
555-
expect(watch.state.diff.removed[0]?.make).equals('test2');
564+
expect(diffs).toHaveLength(3);
565+
expect(diffs[2].added).toHaveLength(0);
566+
expect(diffs[2].all).toHaveLength(1);
567+
expect(diffs[2].unchanged).toHaveLength(1);
568+
expect(diffs[2].unchanged[0]?.make).equals('test1');
569+
570+
expect(diffs[2].removed).toHaveLength(1);
571+
expect(diffs[2].removed[0]?.make).equals('test2');
556572
},
557573
{ timeout: 1000 }
558574
);
@@ -592,9 +608,18 @@ describe('Watch Tests', { sequential: true }, () => {
592608
['test1', uuid()]
593609
);
594610

611+
const diffs: WatchedQueryDifferential<{ id: string; make: string }>[] = [];
612+
613+
watch.registerListener({
614+
onDiff: (diff) => {
615+
diffs.push(diff);
616+
}
617+
});
618+
595619
await vi.waitFor(
596620
() => {
597-
expect(watch.state.diff.added[0]?.make).equals('test1');
621+
expect(diffs).toHaveLength(1);
622+
expect(diffs[0].added[0]?.make).equals('test1');
598623
},
599624
{ timeout: 1000 }
600625
);
@@ -611,12 +636,13 @@ describe('Watch Tests', { sequential: true }, () => {
611636

612637
await vi.waitFor(
613638
() => {
639+
expect(diffs).toHaveLength(2);
614640
// This should now reflect that we had one change since the last event
615-
expect(watch.state.diff.added).toHaveLength(1);
616-
expect(watch.state.diff.added[0]?.make).equals('test2');
641+
expect(diffs[1].added).toHaveLength(1);
642+
expect(diffs[1].added[0]?.make).equals('test2');
617643

618-
expect(watch.state.diff.removed).toHaveLength(0);
619-
expect(watch.state.diff.all).toHaveLength(2);
644+
expect(diffs[1].removed).toHaveLength(0);
645+
expect(diffs[1].all).toHaveLength(2);
620646
},
621647
{ timeout: 1000 }
622648
);
@@ -741,6 +767,14 @@ describe('Watch Tests', { sequential: true }, () => {
741767
// It should have the initial value
742768
expect(watch.state.data).toHaveLength(1);
743769

770+
const diffs: WatchedQueryDifferential<{ id: string; make: string }>[] = [];
771+
772+
watch.registerListener({
773+
onDiff: (diff) => {
774+
diffs.push(diff);
775+
}
776+
});
777+
744778
await powersync.execute(
745779
/* sql */ `
746780
INSERT INTO
@@ -753,12 +787,13 @@ describe('Watch Tests', { sequential: true }, () => {
753787

754788
await vi.waitFor(
755789
() => {
790+
expect(diffs).toHaveLength(1);
756791
// This should now reflect that we had one change since the last event
757-
expect(watch.state.diff.added).toHaveLength(1);
758-
expect(watch.state.diff.added[0]?.make).equals('test2');
792+
expect(diffs[0].added).toHaveLength(1);
793+
expect(diffs[0].added[0]?.make).equals('test2');
759794

760-
expect(watch.state.diff.removed).toHaveLength(0);
761-
expect(watch.state.diff.all).toHaveLength(2);
795+
expect(diffs[0].removed).toHaveLength(0);
796+
expect(diffs[0].all).toHaveLength(2);
762797
},
763798
{ timeout: 1000 }
764799
);
@@ -774,13 +809,14 @@ describe('Watch Tests', { sequential: true }, () => {
774809

775810
await vi.waitFor(
776811
() => {
777-
expect(watch.state.diff.added).toHaveLength(0);
778-
expect(watch.state.diff.all).toHaveLength(1);
779-
expect(watch.state.diff.unchanged).toHaveLength(1);
780-
expect(watch.state.diff.unchanged[0]?.make).equals('test1');
781-
782-
expect(watch.state.diff.removed).toHaveLength(1);
783-
expect(watch.state.diff.removed[0]?.make).equals('test2');
812+
expect(diffs).toHaveLength(2);
813+
expect(diffs[1].added).toHaveLength(0);
814+
expect(diffs[1].all).toHaveLength(1);
815+
expect(diffs[1].unchanged).toHaveLength(1);
816+
expect(diffs[1].unchanged[0]?.make).equals('test1');
817+
818+
expect(diffs[1].removed).toHaveLength(1);
819+
expect(diffs[1].removed[0]?.make).equals('test2');
784820
},
785821
{ timeout: 1000 }
786822
);
@@ -823,6 +859,14 @@ describe('Watch Tests', { sequential: true }, () => {
823859
{ timeout: 1000, interval: 100 }
824860
);
825861

862+
const diffs: WatchedQueryDifferential<{ id: string; make: string }>[] = [];
863+
864+
watch.registerListener({
865+
onDiff: (diff) => {
866+
diffs.push(diff);
867+
}
868+
});
869+
826870
await powersync.execute(
827871
/* sql */ `
828872
UPDATE assets
@@ -836,16 +880,17 @@ describe('Watch Tests', { sequential: true }, () => {
836880

837881
await vi.waitFor(
838882
() => {
839-
expect(watch.state.diff.added).toHaveLength(0);
840-
const updated = watch.state.diff.updated[0];
883+
expect(diffs).toHaveLength(1);
884+
expect(diffs[0].added).toHaveLength(0);
885+
const updated = diffs[0].updated[0];
841886

842887
// The update should contain previous and current values of changed rows
843888
expect(updated).toBeDefined();
844889
expect(updated.previous.make).equals('test1');
845890
expect(updated.current.make).equals('test2');
846891

847-
expect(watch.state.diff.removed).toHaveLength(0);
848-
expect(watch.state.diff.all).toHaveLength(1);
892+
expect(diffs[0].removed).toHaveLength(0);
893+
expect(diffs[0].all).toHaveLength(1);
849894
},
850895
{ timeout: 1000 }
851896
);

0 commit comments

Comments
 (0)