Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/beige-clocks-mix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': patch
---

Internal: Improve types for oplog data
2 changes: 1 addition & 1 deletion packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
}
}

function transformLegacyResponse(bucketData: util.SyncBucketData): any {
function transformLegacyResponse(bucketData: util.SyncBucketData): util.SyncBucketData<util.ProtocolOplogData> {
return {
...bucketData,
data: bucketData.data.map((entry) => {
Expand Down
25 changes: 15 additions & 10 deletions packages/service-core/src/util/protocol-types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as t from 'ts-codec';
import { BucketDescription, BucketPriority, SqliteJsonValue } from '@powersync/service-sync-rules';
import { BucketDescription, BucketPriority, SqliteJsonRow } from '@powersync/service-sync-rules';
import { JsonContainer } from '@powersync/service-jsonbig';

export const BucketRequest = t.object({
name: t.string,
Expand Down Expand Up @@ -65,7 +66,7 @@ export interface StreamingSyncCheckpointDiff {
}

export interface StreamingSyncData {
data: SyncBucketData;
data: SyncBucketData<ProtocolOplogData>;
}

export interface StreamingSyncCheckpointComplete {
Expand Down Expand Up @@ -109,13 +110,9 @@ export interface BucketState {
op_id: string;
}

export interface SyncDataBatch {
buckets: SyncBucketData[];
}

export interface SyncBucketData {
export interface SyncBucketData<Data extends ProtocolOplogData = StoredOplogData> {
bucket: string;
data: OplogEntry[];
data: OplogEntry<Data>[];
/**
* True if there _could_ be more data for this bucket, and another request must be made.
*/
Expand All @@ -130,12 +127,20 @@ export interface SyncBucketData {
next_after: ProtocolOpId;
}

export interface OplogEntry {
export type StoredOplogData = string | null;

// Note: When clients have both raw_data and binary_data disabled (this only affects legacy
// clients), data is actually a `Record<string, SqliteJsonValue>`. Oplog entries are always
// stored as a serialized (JSON) string so that they don't have to be parsed in the sync service,
// this representation only exists on the way out for legacy clients.
export type ProtocolOplogData = SqliteJsonRow | JsonContainer | StoredOplogData;

export interface OplogEntry<Data extends ProtocolOplogData = StoredOplogData> {
op_id: ProtocolOpId;
op: 'PUT' | 'REMOVE' | 'MOVE' | 'CLEAR';
object_type?: string;
object_id?: string;
data?: Record<string, SqliteJsonValue> | string | null;
data?: Data;
checksum: number | bigint;
subkey?: string;
}
Expand Down
12 changes: 7 additions & 5 deletions test-client/src/util.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type * as types from '@powersync/service-core';

export type BucketData = Record<string, types.OplogEntry[]>;
export type TestOplogEntry = types.OplogEntry<types.ProtocolOplogData>;

export type BucketData = Record<string, TestOplogEntry[]>;

/**
* Combine all chunks of received data, excluding any data after the checkpoint.
Expand Down Expand Up @@ -57,8 +59,8 @@ export function isCheckpoint(line: types.StreamingSyncLine): line is types.Strea
*
* This is the function $r(B)$, as described in /docs/bucket-properties.md.
*/
export function reduceBucket(operations: types.OplogEntry[]) {
let rowState = new Map<string, types.OplogEntry>();
export function reduceBucket(operations: TestOplogEntry[]) {
let rowState = new Map<string, TestOplogEntry>();
let otherChecksum = 0;

for (let op of operations) {
Expand Down Expand Up @@ -90,7 +92,7 @@ export function reduceBucket(operations: types.OplogEntry[]) {
return Number(BigInt(a.op_id) - BigInt(b.op_id));
});

let finalState: types.OplogEntry[] = [
let finalState: TestOplogEntry[] = [
// Special operation to indiciate the checksum remainder
{ op_id: '0', op: 'CLEAR', checksum: otherChecksum },
...puts
Expand All @@ -99,7 +101,7 @@ export function reduceBucket(operations: types.OplogEntry[]) {
return finalState;
}

function rowKey(entry: types.OplogEntry) {
function rowKey(entry: TestOplogEntry) {
return `${entry.object_type}/${entry.object_id}/${entry.subkey}`;
}

Expand Down