Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/pretty-eagles-fail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core-tests': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

Fix has_more and other data batch metadata
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,13 @@ export class MongoSyncBucketStorage
});
}

const limit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT;
const sizeLimit = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES;
// Internal naming:
// We do a query for one "batch", which may consist of multiple "chunks".
// Each chunk is limited to single bucket, and is limited in length and size.
// There are also overall batch length and size limits.

const batchLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT;
const chunkSizeLimitBytes = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES;

const cursor = this.db.bucket_data.find(
{
Expand All @@ -325,10 +330,10 @@ export class MongoSyncBucketStorage
{
session: undefined,
sort: { _id: 1 },
limit: limit,
limit: batchLimit,
// Increase batch size above the default 101, so that we can fill an entire batch in
// one go.
batchSize: limit,
batchSize: batchLimit,
// Raw mode is returns an array of Buffer instead of parsed documents.
// We use it so that:
// 1. We can calculate the document size accurately without serializing again.
Expand All @@ -343,32 +348,36 @@ export class MongoSyncBucketStorage
// to the lower of the batch count and size limits.
// This is similar to using `singleBatch: true` in the find options, but allows
// detecting "hasMore".
let { data, hasMore } = await readSingleBatch(cursor);
if (data.length == limit) {
let { data, hasMore: batchHasMore } = await readSingleBatch(cursor);
if (data.length == batchLimit) {
// Limit reached - could have more data, despite the cursor being drained.
hasMore = true;
batchHasMore = true;
}

let batchSize = 0;
let currentBatch: utils.SyncBucketData | null = null;
let chunkSizeBytes = 0;
let currentChunk: utils.SyncBucketData | null = null;
let targetOp: InternalOpId | null = null;

// Ordered by _id, meaning buckets are grouped together
for (let rawData of data) {
const row = bson.deserialize(rawData, storage.BSON_DESERIALIZE_INTERNAL_OPTIONS) as BucketDataDocument;
const bucket = row._id.b;

if (currentBatch == null || currentBatch.bucket != bucket || batchSize >= sizeLimit) {
if (currentChunk == null || currentChunk.bucket != bucket || chunkSizeBytes >= chunkSizeLimitBytes) {
// We need to start a new chunk
let start: ProtocolOpId | undefined = undefined;
if (currentBatch != null) {
if (currentBatch.bucket == bucket) {
currentBatch.has_more = true;
if (currentChunk != null) {
// There is an existing chunk we need to yield
if (currentChunk.bucket == bucket) {
// Current and new chunk have the same bucket, so need has_more on the current one
// if currentChunk.bucket != bucket, the we reached the end of the previous bucket.
currentChunk.has_more = true;
start = currentChunk.next_after;
}

const yieldBatch = currentBatch;
start = currentBatch.after;
currentBatch = null;
batchSize = 0;
const yieldBatch = currentChunk;
currentChunk = null;
chunkSizeBytes = 0;
yield { batch: yieldBatch, targetOp: targetOp };
targetOp = null;
}
Expand All @@ -380,10 +389,10 @@ export class MongoSyncBucketStorage
}
start = internalToExternalOpId(startOpId);
}
currentBatch = {
currentChunk = {
bucket,
after: start,
has_more: hasMore,
has_more: false,
data: [],
next_after: start
};
Expand All @@ -399,16 +408,21 @@ export class MongoSyncBucketStorage
}
}

currentBatch.data.push(entry);
currentBatch.next_after = entry.op_id;
currentChunk.data.push(entry);
currentChunk.next_after = entry.op_id;

batchSize += rawData.byteLength;
chunkSizeBytes += rawData.byteLength;
}

if (currentBatch != null) {
const yieldBatch = currentBatch;
currentBatch = null;
yield { batch: yieldBatch, targetOp: targetOp };
if (currentChunk != null) {
const yieldChunk = currentChunk;
currentChunk = null;
// This is the final chunk in the batch.
// There may be more data if and only if the batch we retrieved isn't complete.
if (batchHasMore) {
yieldChunk.has_more = true;
}
yield { batch: yieldChunk, targetOp: targetOp };
targetOp = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,19 +392,26 @@ export class PostgresSyncRulesStorage
return;
}

// Internal naming:
// We do a query for one "batch", which may be returend in multiple "chunks".
// Each chunk is limited to single bucket, and is limited in length and size.
// There are also overall batch length and size limits.
// Each batch query batch are streamed in separate sets of rows, which may or may
// not match up with chunks.

const end = checkpoint ?? BIGINT_MAX;
const filters = Array.from(dataBuckets.entries()).map(([name, start]) => ({
bucket_name: name,
start: start
}));

const rowLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT;
const sizeLimit = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES;
const batchRowLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT;
const chunkSizeLimitBytes = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES;

let batchSize = 0;
let currentBatch: utils.SyncBucketData | null = null;
let chunkSizeBytes = 0;
let currentChunk: utils.SyncBucketData | null = null;
let targetOp: InternalOpId | null = null;
let rowCount = 0;
let batchRowCount = 0;

/**
* It is possible to perform this query with JSONB join. e.g.
Expand Down Expand Up @@ -458,7 +465,7 @@ export class PostgresSyncRulesStorage
params: [
{ type: 'int4', value: this.group_id },
{ type: 'int8', value: end },
{ type: 'int4', value: rowLimit + 1 },
{ type: 'int4', value: batchRowLimit },
...filters.flatMap((f) => [
{ type: 'varchar' as const, value: f.bucket_name },
{ type: 'int8' as const, value: f.start } satisfies StatementParam
Expand All @@ -469,28 +476,27 @@ export class PostgresSyncRulesStorage

for (const row of decodedRows) {
const { bucket_name } = row;
const rowSize = row.data ? row.data.length : 0;

if (
currentBatch == null ||
currentBatch.bucket != bucket_name ||
batchSize >= sizeLimit ||
(currentBatch?.data.length && batchSize + rowSize > sizeLimit) ||
currentBatch.data.length >= rowLimit
) {
const rowSizeBytes = row.data ? row.data.length : 0;

const sizeExceeded =
chunkSizeBytes >= chunkSizeLimitBytes ||
(currentChunk?.data.length && chunkSizeBytes + rowSizeBytes > chunkSizeLimitBytes) ||
(currentChunk?.data.length ?? 0) >= batchRowLimit;

if (currentChunk == null || currentChunk.bucket != bucket_name || sizeExceeded) {
let start: string | undefined = undefined;
if (currentBatch != null) {
if (currentBatch.bucket == bucket_name) {
currentBatch.has_more = true;
if (currentChunk != null) {
if (currentChunk.bucket == bucket_name) {
currentChunk.has_more = true;
start = currentChunk.next_after;
}

const yieldBatch = currentBatch;
start = currentBatch.after;
currentBatch = null;
batchSize = 0;
yield { batch: yieldBatch, targetOp: targetOp };
const yieldChunk = currentChunk;
currentChunk = null;
chunkSizeBytes = 0;
yield { batch: yieldChunk, targetOp: targetOp };
targetOp = null;
if (rowCount >= rowLimit) {
if (batchRowCount >= batchRowLimit) {
// We've yielded all the requested rows
break;
}
Expand All @@ -503,11 +509,13 @@ export class PostgresSyncRulesStorage
}
start = internalToExternalOpId(startOpId);
}
currentBatch = {
currentChunk = {
bucket: bucket_name,
after: start,
// this is updated when we yield the batch
has_more: false,
data: [],
// this is updated incrementally
next_after: start
};
targetOp = null;
Expand All @@ -527,20 +535,25 @@ export class PostgresSyncRulesStorage
}
}

currentBatch.data.push(entry);
currentBatch.next_after = entry.op_id;
currentChunk.data.push(entry);
currentChunk.next_after = entry.op_id;

batchSize += rowSize;
chunkSizeBytes += rowSizeBytes;

// Manually track the total rows yielded
rowCount++;
batchRowCount++;
}
}

if (currentBatch != null) {
const yieldBatch = currentBatch;
currentBatch = null;
yield { batch: yieldBatch, targetOp: targetOp };
if (currentChunk != null) {
const yieldChunk = currentChunk;
currentChunk = null;
// This is the final chunk in the batch.
// There may be more data if and only if the batch we retrieved isn't complete.
if (batchRowCount >= batchRowLimit) {
yieldChunk.has_more = true;
}
yield { batch: yieldChunk, targetOp: targetOp };
targetOp = null;
}
}
Expand Down
Loading