|
| 1 | +import { |
| 2 | + AbstractPowerSyncDatabase, |
| 3 | + AbstractRemote, |
| 4 | + BucketChecksum, |
| 5 | + Checkpoint, |
| 6 | + ColumnType, |
| 7 | + DBAdapter, |
| 8 | + isStreamingSyncCheckpoint, |
| 9 | + isStreamingSyncCheckpointDiff, |
| 10 | + isStreamingSyncData, |
| 11 | + PowerSyncControlCommand, |
| 12 | + SqliteBucketStorage, |
| 13 | + StreamingSyncLine, |
| 14 | + SyncDataBucket |
| 15 | +} from '@powersync/web'; |
| 16 | +import { DynamicSchemaManager } from './DynamicSchemaManager'; |
| 17 | + |
| 18 | +export class RustClientInterceptor extends SqliteBucketStorage { |
| 19 | + private rdb: DBAdapter; |
| 20 | + private lastStartedCheckpoint: Checkpoint | null = null; |
| 21 | + |
| 22 | + public tables: Record<string, Record<string, ColumnType>> = {}; |
| 23 | + |
| 24 | + constructor( |
| 25 | + db: DBAdapter, |
| 26 | + private remote: AbstractRemote, |
| 27 | + private schemaManager: DynamicSchemaManager |
| 28 | + ) { |
| 29 | + super(db, (AbstractPowerSyncDatabase as any).transactionMutex); |
| 30 | + this.rdb = db; |
| 31 | + } |
| 32 | + |
| 33 | + async control(op: PowerSyncControlCommand, payload: string | Uint8Array | ArrayBuffer | null): Promise<string> { |
| 34 | + const response = await super.control(op, payload); |
| 35 | + |
| 36 | + if (op == PowerSyncControlCommand.PROCESS_TEXT_LINE) { |
| 37 | + await this.processTextLine(payload as string); |
| 38 | + } else if (op == PowerSyncControlCommand.PROCESS_BSON_LINE) { |
| 39 | + await this.processBinaryLine(payload as Uint8Array); |
| 40 | + } |
| 41 | + |
| 42 | + return response; |
| 43 | + } |
| 44 | + |
| 45 | + private processTextLine(line: string) { |
| 46 | + return this.processParsedLine(JSON.parse(line)); |
| 47 | + } |
| 48 | + |
| 49 | + private async processBinaryLine(line: Uint8Array) { |
| 50 | + const bson = await this.remote.getBSON(); |
| 51 | + await this.processParsedLine(bson.deserialize(line) as StreamingSyncLine); |
| 52 | + } |
| 53 | + |
| 54 | + private async processParsedLine(line: StreamingSyncLine) { |
| 55 | + if (isStreamingSyncCheckpoint(line)) { |
| 56 | + this.lastStartedCheckpoint = line.checkpoint; |
| 57 | + await this.trackCheckpoint(line.checkpoint); |
| 58 | + } else if (isStreamingSyncCheckpointDiff(line) && this.lastStartedCheckpoint) { |
| 59 | + const diff = line.checkpoint_diff; |
| 60 | + const newBuckets = new Map<string, BucketChecksum>(); |
| 61 | + for (const checksum of this.lastStartedCheckpoint.buckets) { |
| 62 | + newBuckets.set(checksum.bucket, checksum); |
| 63 | + } |
| 64 | + for (const checksum of diff.updated_buckets) { |
| 65 | + newBuckets.set(checksum.bucket, checksum); |
| 66 | + } |
| 67 | + for (const bucket of diff.removed_buckets) { |
| 68 | + newBuckets.delete(bucket); |
| 69 | + } |
| 70 | + |
| 71 | + const newCheckpoint: Checkpoint = { |
| 72 | + last_op_id: diff.last_op_id, |
| 73 | + buckets: [...newBuckets.values()], |
| 74 | + write_checkpoint: diff.write_checkpoint |
| 75 | + }; |
| 76 | + this.lastStartedCheckpoint = newCheckpoint; |
| 77 | + await this.trackCheckpoint(newCheckpoint); |
| 78 | + } else if (isStreamingSyncData(line)) { |
| 79 | + const batch = { buckets: [SyncDataBucket.fromRow(line.data)] }; |
| 80 | + |
| 81 | + await this.rdb.writeTransaction(async (tx) => { |
| 82 | + for (const bucket of batch.buckets) { |
| 83 | + // Record metrics |
| 84 | + const size = JSON.stringify(bucket.data).length; |
| 85 | + await tx.execute( |
| 86 | + `UPDATE local_bucket_data SET |
| 87 | + download_size = IFNULL(download_size, 0) + ?, |
| 88 | + last_op = ?, |
| 89 | + downloading = ?, |
| 90 | + downloaded_operations = IFNULL(downloaded_operations, 0) + ? |
| 91 | + WHERE id = ?`, |
| 92 | + [size, bucket.next_after, bucket.has_more, bucket.data.length, bucket.bucket] |
| 93 | + ); |
| 94 | + } |
| 95 | + }); |
| 96 | + |
| 97 | + await this.schemaManager.updateFromOperations(batch); |
| 98 | + } |
| 99 | + } |
| 100 | + |
| 101 | + private async trackCheckpoint(checkpoint: Checkpoint) { |
| 102 | + await this.rdb.writeTransaction(async (tx) => { |
| 103 | + for (const bucket of checkpoint.buckets) { |
| 104 | + await tx.execute( |
| 105 | + `INSERT OR REPLACE INTO local_bucket_data(id, total_operations, last_op, download_size, downloading, downloaded_operations) |
| 106 | + VALUES ( |
| 107 | + ?, |
| 108 | + ?, |
| 109 | + IFNULL((SELECT last_op FROM local_bucket_data WHERE id = ?), '0'), |
| 110 | + IFNULL((SELECT download_size FROM local_bucket_data WHERE id = ?), 0), |
| 111 | + IFNULL((SELECT downloading FROM local_bucket_data WHERE id = ?), TRUE), |
| 112 | + IFNULL((SELECT downloaded_operations FROM local_bucket_data WHERE id = ?), TRUE) |
| 113 | + )`, |
| 114 | + [bucket.bucket, bucket.count, bucket.bucket, bucket.bucket, bucket.bucket, bucket.bucket] |
| 115 | + ); |
| 116 | + } |
| 117 | + }); |
| 118 | + } |
| 119 | +} |
0 commit comments