Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/pretty-eagles-fail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core-tests': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

Fix has_more and other data batch metadata
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ export class MongoSyncBucketStorage
checkpoint: utils.InternalOpId,
dataBuckets: Map<string, InternalOpId>,
options?: storage.BucketDataBatchOptions
): AsyncIterable<storage.SyncBucketDataBatch> {
): AsyncIterable<storage.SyncBucketDataChunk> {
if (dataBuckets.size == 0) {
return;
}
Expand All @@ -315,8 +315,13 @@ export class MongoSyncBucketStorage
});
}

const limit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT;
const sizeLimit = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES;
// Internal naming:
// We do a query for one "batch", which may consist of multiple "chunks".
// Each chunk is limited to single bucket, and is limited in length and size.
// There are also overall batch length and size limits.

const batchLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT;
const chunkSizeLimitBytes = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES;

const cursor = this.db.bucket_data.find(
{
Expand All @@ -325,10 +330,10 @@ export class MongoSyncBucketStorage
{
session: undefined,
sort: { _id: 1 },
limit: limit,
limit: batchLimit,
// Increase batch size above the default 101, so that we can fill an entire batch in
// one go.
batchSize: limit,
batchSize: batchLimit,
// Raw mode is returns an array of Buffer instead of parsed documents.
// We use it so that:
// 1. We can calculate the document size accurately without serializing again.
Expand All @@ -343,33 +348,38 @@ export class MongoSyncBucketStorage
// to the lower of the batch count and size limits.
// This is similar to using `singleBatch: true` in the find options, but allows
// detecting "hasMore".
let { data, hasMore } = await readSingleBatch(cursor);
if (data.length == limit) {
let { data, hasMore: batchHasMore } = await readSingleBatch(cursor);
if (data.length == batchLimit) {
// Limit reached - could have more data, despite the cursor being drained.
hasMore = true;
batchHasMore = true;
}

let batchSize = 0;
let currentBatch: utils.SyncBucketData | null = null;
let chunkSizeBytes = 0;
let currentChunk: utils.SyncBucketData | null = null;
let targetOp: InternalOpId | null = null;

// Ordered by _id, meaning buckets are grouped together
for (let rawData of data) {
const row = bson.deserialize(rawData, storage.BSON_DESERIALIZE_INTERNAL_OPTIONS) as BucketDataDocument;
const bucket = row._id.b;

if (currentBatch == null || currentBatch.bucket != bucket || batchSize >= sizeLimit) {
if (currentChunk == null || currentChunk.bucket != bucket || chunkSizeBytes >= chunkSizeLimitBytes) {
// We need to start a new chunk
let start: ProtocolOpId | undefined = undefined;
if (currentBatch != null) {
if (currentBatch.bucket == bucket) {
currentBatch.has_more = true;
if (currentChunk != null) {
// There is an existing chunk we need to yield
if (currentChunk.bucket == bucket) {
// Current and new chunk have the same bucket, so need has_more on the current one.
// If currentChunk.bucket != bucket, then we reached the end of the previous bucket,
// and has_more = false in that case.
currentChunk.has_more = true;
start = currentChunk.next_after;
}

const yieldBatch = currentBatch;
start = currentBatch.after;
currentBatch = null;
batchSize = 0;
yield { batch: yieldBatch, targetOp: targetOp };
const yieldChunk = currentChunk;
currentChunk = null;
chunkSizeBytes = 0;
yield { chunkData: yieldChunk, targetOp: targetOp };
targetOp = null;
}

Expand All @@ -380,10 +390,10 @@ export class MongoSyncBucketStorage
}
start = internalToExternalOpId(startOpId);
}
currentBatch = {
currentChunk = {
bucket,
after: start,
has_more: hasMore,
has_more: false,
data: [],
next_after: start
};
Expand All @@ -399,16 +409,19 @@ export class MongoSyncBucketStorage
}
}

currentBatch.data.push(entry);
currentBatch.next_after = entry.op_id;
currentChunk.data.push(entry);
currentChunk.next_after = entry.op_id;

batchSize += rawData.byteLength;
chunkSizeBytes += rawData.byteLength;
}

if (currentBatch != null) {
const yieldBatch = currentBatch;
currentBatch = null;
yield { batch: yieldBatch, targetOp: targetOp };
if (currentChunk != null) {
const yieldChunk = currentChunk;
currentChunk = null;
// This is the final chunk in the batch.
// There may be more data if and only if the batch we retrieved isn't complete.
yieldChunk.has_more = batchHasMore;
yield { chunkData: yieldChunk, targetOp: targetOp };
targetOp = null;
}
}
Expand Down
12 changes: 10 additions & 2 deletions modules/module-mongodb-storage/test/src/storage_sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ describe('sync - mongodb', () => {
});

const batch2 = await test_utils.fromAsync(
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1[0].batch.next_after)]]), options)
bucketStorage.getBucketDataBatch(
checkpoint,
new Map([['global[]', BigInt(batch1[0].chunkData.next_after)]]),
options
)
);
expect(test_utils.getBatchData(batch2)).toEqual([
{ op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 }
Expand All @@ -99,7 +103,11 @@ describe('sync - mongodb', () => {
});

const batch3 = await test_utils.fromAsync(
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2[0].batch.next_after)]]), options)
bucketStorage.getBucketDataBatch(
checkpoint,
new Map([['global[]', BigInt(batch2[0].chunkData.next_after)]]),
options
)
);
expect(test_utils.getBatchData(batch3)).toEqual([
{ op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 }
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ export class ChangeStreamTestContext {
chunkLimitBytes: options?.chunkLimitBytes
});
const batches = await test_utils.fromAsync(batch);
return batches[0]?.batch.data ?? [];
return batches[0]?.chunkData.data ?? [];
}

async getChecksums(buckets: string[], options?: { timeout?: number }) {
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mysql/test/src/BinlogStreamUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export class BinlogStreamTestContext {
const map = new Map<string, InternalOpId>([[bucket, start]]);
const batch = this.storage!.getBucketDataBatch(checkpoint, map);
const batches = await test_utils.fromAsync(batch);
return batches[0]?.batch.data ?? [];
return batches[0]?.chunkData.data ?? [];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,24 +387,31 @@ export class PostgresSyncRulesStorage
checkpoint: InternalOpId,
dataBuckets: Map<string, InternalOpId>,
options?: storage.BucketDataBatchOptions
): AsyncIterable<storage.SyncBucketDataBatch> {
): AsyncIterable<storage.SyncBucketDataChunk> {
if (dataBuckets.size == 0) {
return;
}

// Internal naming:
// We do a query for one "batch", which may be returend in multiple "chunks".
// Each chunk is limited to single bucket, and is limited in length and size.
// There are also overall batch length and size limits.
// Each batch query batch are streamed in separate sets of rows, which may or may
// not match up with chunks.

const end = checkpoint ?? BIGINT_MAX;
const filters = Array.from(dataBuckets.entries()).map(([name, start]) => ({
bucket_name: name,
start: start
}));

const rowLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT;
const sizeLimit = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES;
const batchRowLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT;
const chunkSizeLimitBytes = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES;

let batchSize = 0;
let currentBatch: utils.SyncBucketData | null = null;
let chunkSizeBytes = 0;
let currentChunk: utils.SyncBucketData | null = null;
let targetOp: InternalOpId | null = null;
let rowCount = 0;
let batchRowCount = 0;

/**
* It is possible to perform this query with JSONB join. e.g.
Expand Down Expand Up @@ -458,7 +465,7 @@ export class PostgresSyncRulesStorage
params: [
{ type: 'int4', value: this.group_id },
{ type: 'int8', value: end },
{ type: 'int4', value: rowLimit + 1 },
{ type: 'int4', value: batchRowLimit },
...filters.flatMap((f) => [
{ type: 'varchar' as const, value: f.bucket_name },
{ type: 'int8' as const, value: f.start } satisfies StatementParam
Expand All @@ -469,28 +476,27 @@ export class PostgresSyncRulesStorage

for (const row of decodedRows) {
const { bucket_name } = row;
const rowSize = row.data ? row.data.length : 0;

if (
currentBatch == null ||
currentBatch.bucket != bucket_name ||
batchSize >= sizeLimit ||
(currentBatch?.data.length && batchSize + rowSize > sizeLimit) ||
currentBatch.data.length >= rowLimit
) {
const rowSizeBytes = row.data ? row.data.length : 0;

const sizeExceeded =
chunkSizeBytes >= chunkSizeLimitBytes ||
(currentChunk?.data.length && chunkSizeBytes + rowSizeBytes > chunkSizeLimitBytes) ||
(currentChunk?.data.length ?? 0) >= batchRowLimit;

if (currentChunk == null || currentChunk.bucket != bucket_name || sizeExceeded) {
let start: string | undefined = undefined;
if (currentBatch != null) {
if (currentBatch.bucket == bucket_name) {
currentBatch.has_more = true;
if (currentChunk != null) {
if (currentChunk.bucket == bucket_name) {
currentChunk.has_more = true;
start = currentChunk.next_after;
}

const yieldBatch = currentBatch;
start = currentBatch.after;
currentBatch = null;
batchSize = 0;
yield { batch: yieldBatch, targetOp: targetOp };
const yieldChunk = currentChunk;
currentChunk = null;
chunkSizeBytes = 0;
yield { chunkData: yieldChunk, targetOp: targetOp };
targetOp = null;
if (rowCount >= rowLimit) {
if (batchRowCount >= batchRowLimit) {
// We've yielded all the requested rows
break;
}
Expand All @@ -503,11 +509,13 @@ export class PostgresSyncRulesStorage
}
start = internalToExternalOpId(startOpId);
}
currentBatch = {
currentChunk = {
bucket: bucket_name,
after: start,
// this is updated when we yield the batch
has_more: false,
data: [],
// this is updated incrementally
next_after: start
};
targetOp = null;
Expand All @@ -527,20 +535,25 @@ export class PostgresSyncRulesStorage
}
}

currentBatch.data.push(entry);
currentBatch.next_after = entry.op_id;
currentChunk.data.push(entry);
currentChunk.next_after = entry.op_id;

batchSize += rowSize;
chunkSizeBytes += rowSizeBytes;

// Manually track the total rows yielded
rowCount++;
batchRowCount++;
}
}

if (currentBatch != null) {
const yieldBatch = currentBatch;
currentBatch = null;
yield { batch: yieldBatch, targetOp: targetOp };
if (currentChunk != null) {
const yieldChunk = currentChunk;
currentChunk = null;
// This is the final chunk in the batch.
// There may be more data if and only if the batch we retrieved isn't complete.
// If batchRowCount == batchRowLimit, we don't actually know whether there is more data,
// but it is safe to return true in that case.
yieldChunk.has_more = batchRowCount >= batchRowLimit;
yield { chunkData: yieldChunk, targetOp: targetOp };
targetOp = null;
}
}
Expand Down
18 changes: 15 additions & 3 deletions modules/module-postgres-storage/test/src/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ describe('Postgres Sync Bucket Storage', () => {
});

const batch2 = await test_utils.fromAsync(
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1[0].batch.next_after)]]), options)
bucketStorage.getBucketDataBatch(
checkpoint,
new Map([['global[]', BigInt(batch1[0].chunkData.next_after)]]),
options
)
);
expect(test_utils.getBatchData(batch2)).toEqual([
{ op_id: '2', op: 'PUT', object_id: 'large1', checksum: 1178768505 }
Expand All @@ -105,7 +109,11 @@ describe('Postgres Sync Bucket Storage', () => {
});

const batch3 = await test_utils.fromAsync(
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2[0].batch.next_after)]]), options)
bucketStorage.getBucketDataBatch(
checkpoint,
new Map([['global[]', BigInt(batch2[0].chunkData.next_after)]]),
options
)
);
expect(test_utils.getBatchData(batch3)).toEqual([
{ op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 }
Expand All @@ -117,7 +125,11 @@ describe('Postgres Sync Bucket Storage', () => {
});

const batch4 = await test_utils.fromAsync(
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch3[0].batch.next_after)]]), options)
bucketStorage.getBucketDataBatch(
checkpoint,
new Map([['global[]', BigInt(batch3[0].chunkData.next_after)]]),
options
)
);
expect(test_utils.getBatchData(batch4)).toEqual([
{ op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 }
Expand Down
8 changes: 4 additions & 4 deletions modules/module-postgres/test/src/wal_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ export class WalStreamTestContext implements AsyncDisposable {
const batch = this.storage!.getBucketDataBatch(checkpoint, map);

const batches = await test_utils.fromAsync(batch);
data = data.concat(batches[0]?.batch.data ?? []);
if (batches.length == 0 || !batches[0]!.batch.has_more) {
data = data.concat(batches[0]?.chunkData.data ?? []);
if (batches.length == 0 || !batches[0]!.chunkData.has_more) {
break;
}
map.set(bucket, BigInt(batches[0]!.batch.next_after));
map.set(bucket, BigInt(batches[0]!.chunkData.next_after));
}
return data;
}
Expand All @@ -182,6 +182,6 @@ export class WalStreamTestContext implements AsyncDisposable {
const map = new Map<string, InternalOpId>([[bucket, start]]);
const batch = this.storage!.getBucketDataBatch(checkpoint, map);
const batches = await test_utils.fromAsync(batch);
return batches[0]?.batch.data ?? [];
return batches[0]?.chunkData.data ?? [];
}
}
Loading