|
| 1 | +/** |
| 2 | + * @since 1.0.0 |
| 3 | + */ |
| 4 | +/// <reference lib="webworker" /> |
| 5 | +import { SqlError } from "@effect/sql/SqlError" |
| 6 | +import * as WaSqlite from "@effect/wa-sqlite" |
| 7 | +import SQLiteESMFactory from "@effect/wa-sqlite/dist/wa-sqlite.mjs" |
| 8 | +import { AccessHandlePoolVFS } from "@effect/wa-sqlite/src/examples/AccessHandlePoolVFS.js" |
| 9 | +import * as Effect from "effect/Effect" |
| 10 | +import type { OpfsWorkerMessage } from "./internal/opfsWorker.js" |
| 11 | + |
| 12 | +/** |
| 13 | + * @category models |
| 14 | + * @since 1.0.0 |
| 15 | + */ |
| 16 | +export interface OpfsWorkerConfig { |
| 17 | + readonly port: EventTarget & Pick<MessagePort, "postMessage" | "close"> |
| 18 | + readonly dbName: string |
| 19 | +} |
| 20 | + |
| 21 | +/** |
| 22 | + * @category constructor |
| 23 | + * @since 1.0.0 |
| 24 | + */ |
| 25 | +export const run = ( |
| 26 | + options: OpfsWorkerConfig |
| 27 | +): Effect.Effect<void, SqlError> => |
| 28 | + Effect.gen(function*() { |
| 29 | + const factory = yield* Effect.promise(() => SQLiteESMFactory()) |
| 30 | + const sqlite3 = WaSqlite.Factory(factory) |
| 31 | + const vfs = yield* Effect.promise(() => AccessHandlePoolVFS.create("opfs", factory)) |
| 32 | + sqlite3.vfs_register(vfs, false) |
| 33 | + const db = yield* Effect.acquireRelease( |
| 34 | + Effect.try({ |
| 35 | + try: () => sqlite3.open_v2(options.dbName, undefined, "opfs"), |
| 36 | + catch: (cause) => new SqlError({ cause, message: "Failed to open database" }) |
| 37 | + }), |
| 38 | + (db) => Effect.sync(() => sqlite3.close(db)) |
| 39 | + ) |
| 40 | + |
| 41 | + return yield* Effect.async<void>((resume) => { |
| 42 | + const onMessage = (event: any) => { |
| 43 | + let messageId: number |
| 44 | + const message = event.data as OpfsWorkerMessage |
| 45 | + try { |
| 46 | + switch (message[0]) { |
| 47 | + case "close": { |
| 48 | + options.port.close() |
| 49 | + return resume(Effect.void) |
| 50 | + } |
| 51 | + case "import": { |
| 52 | + const [, id, data] = message |
| 53 | + messageId = id |
| 54 | + sqlite3.deserialize(db, "main", data, data.length, data.length, 1 | 2) |
| 55 | + options.port.postMessage([id, void 0, void 0]) |
| 56 | + return |
| 57 | + } |
| 58 | + case "export": { |
| 59 | + const [, id] = message |
| 60 | + messageId = id |
| 61 | + const data = sqlite3.serialize(db, "main") |
| 62 | + options.port.postMessage([id, undefined, data], [data.buffer]) |
| 63 | + return |
| 64 | + } |
| 65 | + case "update_hook": { |
| 66 | + messageId = -1 |
| 67 | + sqlite3.update_hook(db, (_op, _db, table, rowid) => { |
| 68 | + if (!table) return |
| 69 | + options.port.postMessage(["update_hook", table, Number(rowid)]) |
| 70 | + }) |
| 71 | + return |
| 72 | + } |
| 73 | + default: { |
| 74 | + const [id, sql, params] = message |
| 75 | + messageId = id |
| 76 | + const results: Array<any> = [] |
| 77 | + let columns: Array<string> | undefined |
| 78 | + for (const stmt of sqlite3.statements(db, sql)) { |
| 79 | + sqlite3.bind_collection(stmt, params as any) |
| 80 | + while (sqlite3.step(stmt) === WaSqlite.SQLITE_ROW) { |
| 81 | + columns = columns ?? sqlite3.column_names(stmt) |
| 82 | + const row = sqlite3.row(stmt) |
| 83 | + results.push(row) |
| 84 | + } |
| 85 | + } |
| 86 | + options.port.postMessage([id, undefined, [columns, results]]) |
| 87 | + return |
| 88 | + } |
| 89 | + } |
| 90 | + } catch (e: any) { |
| 91 | + const message = "message" in e ? e.message : String(e) |
| 92 | + options.port.postMessage([messageId!, message, undefined]) |
| 93 | + } |
| 94 | + } |
| 95 | + options.port.addEventListener("message", onMessage) |
| 96 | + options.port.postMessage(["ready", undefined, undefined]) |
| 97 | + return Effect.sync(() => { |
| 98 | + options.port.removeEventListener("message", onMessage) |
| 99 | + }) |
| 100 | + }) |
| 101 | + }).pipe(Effect.scoped) |
0 commit comments