|
1 | 1 | import { WorkerRunner } from "@effect/platform"; |
2 | 2 | import { BrowserWorkerRunner } from "@effect/platform-browser"; |
3 | | -import { Snapshot } from "@local/sync"; |
4 | | -import { liveQuery } from "dexie"; |
5 | | -import { |
6 | | - Array, |
7 | | - Effect, |
8 | | - Layer, |
9 | | - Number, |
10 | | - Schema, |
11 | | - Stream, |
12 | | - SynchronizedRef, |
13 | | -} from "effect"; |
14 | | -import { Dexie } from "../lib/dexie"; |
15 | | -import { RuntimeClient } from "../lib/runtime-client"; |
16 | | -import { Sync } from "../lib/services/sync"; |
17 | | -import { WorkspaceManager } from "../lib/services/workspace-manager"; |
18 | | -import { type LiveQuery } from "./schema"; |
| 3 | +import { RuntimeLib, SyncWorker } from "@local/client-lib"; |
| 4 | +import { Effect, Layer } from "effect"; |
19 | 5 |
|
20 | | -const main = (params: { workspaceId: string }) => |
21 | | - Effect.gen(function* () { |
22 | | - const manager = yield* WorkspaceManager; |
23 | | - const { db } = yield* Dexie; |
24 | | - const { push } = yield* Sync; |
25 | | - |
26 | | - const snapshotEq = Array.getEquivalence(Number.Equivalence); |
27 | | - |
28 | | - yield* Effect.log(`Live query workspace '${params.workspaceId}'`); |
29 | | - |
30 | | - const workspace = yield* manager |
31 | | - .getById({ workspaceId: params.workspaceId }) |
32 | | - .pipe(Effect.flatMap(Effect.fromNullable)); |
33 | | - |
34 | | - const live = liveQuery(() => |
35 | | - db.temp_workspace |
36 | | - .where("workspaceId") |
37 | | - .equals(params.workspaceId) |
38 | | - .toArray() |
39 | | - ); |
40 | | - |
41 | | - yield* Effect.forkScoped( |
42 | | - Effect.acquireRelease( |
43 | | - Effect.gen(function* () { |
44 | | - yield* Effect.log("Subscribing"); |
45 | | - |
46 | | - const ref = yield* SynchronizedRef.make(0); |
47 | | - return live.subscribe((payload) => |
48 | | - Effect.runPromise( |
49 | | - Effect.gen(function* () { |
50 | | - yield* Effect.log(`Change detected`); |
51 | | - |
52 | | - const id = yield* ref.pipe( |
53 | | - SynchronizedRef.updateAndGet((n) => n + 1) |
54 | | - ); |
55 | | - |
56 | | - yield* Stream.runDrain( |
57 | | - Stream.make(...payload).pipe( |
58 | | - Stream.changesWith((a, b) => |
59 | | - snapshotEq(a.snapshot, b.snapshot) |
60 | | - ), |
61 | | - Stream.debounce("3 seconds"), |
62 | | - Stream.tap((message) => |
63 | | - Effect.gen(function* () { |
64 | | - const streamId = yield* ref.get; |
65 | | - if (streamId === id) { |
66 | | - yield* Effect.log( |
67 | | - `Syncing ${payload.length} changes` |
68 | | - ); |
69 | | - |
70 | | - const snapshot = yield* Schema.decode(Snapshot)( |
71 | | - message.snapshot |
72 | | - ); |
73 | | - |
74 | | - yield* push({ |
75 | | - snapshot, |
76 | | - snapshotId: message.snapshotId, |
77 | | - workspaceId: workspace.workspaceId, |
78 | | - }); |
79 | | - } |
80 | | - }) |
81 | | - ) |
82 | | - ) |
83 | | - ); |
84 | | - }) |
85 | | - ) |
86 | | - ); |
87 | | - }), |
88 | | - (subscription) => |
89 | | - Effect.gen(function* () { |
90 | | - yield* Effect.log("Live query unsubscribing"); |
91 | | - return subscription.unsubscribe(); |
92 | | - }) |
93 | | - ) |
94 | | - ); |
95 | | - |
96 | | - return true; |
97 | | - }); |
98 | | - |
99 | | -const WorkerLive = WorkerRunner.layer((params: LiveQuery) => |
| 6 | +const WorkerLive = WorkerRunner.layer((params: SyncWorker.LiveQuery) => |
100 | 7 | Effect.scoped( |
101 | 8 | Effect.gen(function* () { |
| 9 | + const worker = yield* SyncWorker.SyncWorker; |
102 | 10 | yield* Effect.log("Startup live query connection"); |
103 | 11 |
|
104 | 12 | yield* Effect.addFinalizer(() => |
105 | 13 | Effect.log("Closed live query connection") |
106 | 14 | ); |
107 | 15 |
|
108 | | - yield* Effect.fork(main({ workspaceId: params.workspaceId })); |
| 16 | + yield* Effect.fork(worker.liveSync({ workspaceId: params.workspaceId })); |
109 | 17 | yield* Effect.never; |
110 | 18 | }).pipe(Effect.mapError(() => "Live query error")) |
111 | 19 | ) |
112 | 20 | ).pipe(Layer.provide(BrowserWorkerRunner.layer)); |
113 | 21 |
|
114 | | -RuntimeClient.runFork(WorkerRunner.launch(WorkerLive)); |
| 22 | +RuntimeLib.runFork(WorkerRunner.launch(WorkerLive)); |
0 commit comments