diff --git a/.changeset/pretty-eagles-fail.md b/.changeset/pretty-eagles-fail.md new file mode 100644 index 000000000..359ac30a2 --- /dev/null +++ b/.changeset/pretty-eagles-fail.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core-tests': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +Fix has_more and other data batch metadata diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 8c6945b99..354b4aab5 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -288,7 +288,7 @@ export class MongoSyncBucketStorage checkpoint: utils.InternalOpId, dataBuckets: Map, options?: storage.BucketDataBatchOptions - ): AsyncIterable { + ): AsyncIterable { if (dataBuckets.size == 0) { return; } @@ -315,8 +315,13 @@ export class MongoSyncBucketStorage }); } - const limit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT; - const sizeLimit = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES; + // Internal naming: + // We do a query for one "batch", which may consist of multiple "chunks". + // Each chunk is limited to single bucket, and is limited in length and size. + // There are also overall batch length and size limits. + + const batchLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT; + const chunkSizeLimitBytes = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES; const cursor = this.db.bucket_data.find( { @@ -325,10 +330,10 @@ export class MongoSyncBucketStorage { session: undefined, sort: { _id: 1 }, - limit: limit, + limit: batchLimit, // Increase batch size above the default 101, so that we can fill an entire batch in // one go. - batchSize: limit, + batchSize: batchLimit, // Raw mode is returns an array of Buffer instead of parsed documents. // We use it so that: // 1. We can calculate the document size accurately without serializing again. @@ -343,14 +348,14 @@ export class MongoSyncBucketStorage // to the lower of the batch count and size limits. // This is similar to using `singleBatch: true` in the find options, but allows // detecting "hasMore". - let { data, hasMore } = await readSingleBatch(cursor); - if (data.length == limit) { + let { data, hasMore: batchHasMore } = await readSingleBatch(cursor); + if (data.length == batchLimit) { // Limit reached - could have more data, despite the cursor being drained. - hasMore = true; + batchHasMore = true; } - let batchSize = 0; - let currentBatch: utils.SyncBucketData | null = null; + let chunkSizeBytes = 0; + let currentChunk: utils.SyncBucketData | null = null; let targetOp: InternalOpId | null = null; // Ordered by _id, meaning buckets are grouped together @@ -358,18 +363,23 @@ export class MongoSyncBucketStorage const row = bson.deserialize(rawData, storage.BSON_DESERIALIZE_INTERNAL_OPTIONS) as BucketDataDocument; const bucket = row._id.b; - if (currentBatch == null || currentBatch.bucket != bucket || batchSize >= sizeLimit) { + if (currentChunk == null || currentChunk.bucket != bucket || chunkSizeBytes >= chunkSizeLimitBytes) { + // We need to start a new chunk let start: ProtocolOpId | undefined = undefined; - if (currentBatch != null) { - if (currentBatch.bucket == bucket) { - currentBatch.has_more = true; + if (currentChunk != null) { + // There is an existing chunk we need to yield + if (currentChunk.bucket == bucket) { + // Current and new chunk have the same bucket, so need has_more on the current one. + // If currentChunk.bucket != bucket, then we reached the end of the previous bucket, + // and has_more = false in that case. + currentChunk.has_more = true; + start = currentChunk.next_after; } - const yieldBatch = currentBatch; - start = currentBatch.after; - currentBatch = null; - batchSize = 0; - yield { batch: yieldBatch, targetOp: targetOp }; + const yieldChunk = currentChunk; + currentChunk = null; + chunkSizeBytes = 0; + yield { chunkData: yieldChunk, targetOp: targetOp }; targetOp = null; } @@ -380,10 +390,10 @@ export class MongoSyncBucketStorage } start = internalToExternalOpId(startOpId); } - currentBatch = { + currentChunk = { bucket, after: start, - has_more: hasMore, + has_more: false, data: [], next_after: start }; @@ -399,16 +409,19 @@ export class MongoSyncBucketStorage } } - currentBatch.data.push(entry); - currentBatch.next_after = entry.op_id; + currentChunk.data.push(entry); + currentChunk.next_after = entry.op_id; - batchSize += rawData.byteLength; + chunkSizeBytes += rawData.byteLength; } - if (currentBatch != null) { - const yieldBatch = currentBatch; - currentBatch = null; - yield { batch: yieldBatch, targetOp: targetOp }; + if (currentChunk != null) { + const yieldChunk = currentChunk; + currentChunk = null; + // This is the final chunk in the batch. + // There may be more data if and only if the batch we retrieved isn't complete. + yieldChunk.has_more = batchHasMore; + yield { chunkData: yieldChunk, targetOp: targetOp }; targetOp = null; } } diff --git a/modules/module-mongodb-storage/test/src/storage_sync.test.ts b/modules/module-mongodb-storage/test/src/storage_sync.test.ts index a6a2056e5..d75c35e8f 100644 --- a/modules/module-mongodb-storage/test/src/storage_sync.test.ts +++ b/modules/module-mongodb-storage/test/src/storage_sync.test.ts @@ -87,7 +87,11 @@ describe('sync - mongodb', () => { }); const batch2 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1[0].batch.next_after)]]), options) + bucketStorage.getBucketDataBatch( + checkpoint, + new Map([['global[]', BigInt(batch1[0].chunkData.next_after)]]), + options + ) ); expect(test_utils.getBatchData(batch2)).toEqual([ { op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 } @@ -99,7 +103,11 @@ describe('sync - mongodb', () => { }); const batch3 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2[0].batch.next_after)]]), options) + bucketStorage.getBucketDataBatch( + checkpoint, + new Map([['global[]', BigInt(batch2[0].chunkData.next_after)]]), + options + ) ); expect(test_utils.getBatchData(batch3)).toEqual([ { op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 } diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index bdff3da83..aa7b669ae 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -138,7 +138,7 @@ export class ChangeStreamTestContext { chunkLimitBytes: options?.chunkLimitBytes }); const batches = await test_utils.fromAsync(batch); - return batches[0]?.batch.data ?? []; + return batches[0]?.chunkData.data ?? []; } async getChecksums(buckets: string[], options?: { timeout?: number }) { diff --git a/modules/module-mysql/test/src/BinlogStreamUtils.ts b/modules/module-mysql/test/src/BinlogStreamUtils.ts index 7844c8e85..575f50aea 100644 --- a/modules/module-mysql/test/src/BinlogStreamUtils.ts +++ b/modules/module-mysql/test/src/BinlogStreamUtils.ts @@ -157,7 +157,7 @@ export class BinlogStreamTestContext { const map = new Map([[bucket, start]]); const batch = this.storage!.getBucketDataBatch(checkpoint, map); const batches = await test_utils.fromAsync(batch); - return batches[0]?.batch.data ?? []; + return batches[0]?.chunkData.data ?? []; } } diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 7aa1bd730..7da3e59e7 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -387,24 +387,31 @@ export class PostgresSyncRulesStorage checkpoint: InternalOpId, dataBuckets: Map, options?: storage.BucketDataBatchOptions - ): AsyncIterable { + ): AsyncIterable { if (dataBuckets.size == 0) { return; } + // Internal naming: + // We do a query for one "batch", which may be returend in multiple "chunks". + // Each chunk is limited to single bucket, and is limited in length and size. + // There are also overall batch length and size limits. + // Each batch query batch are streamed in separate sets of rows, which may or may + // not match up with chunks. + const end = checkpoint ?? BIGINT_MAX; const filters = Array.from(dataBuckets.entries()).map(([name, start]) => ({ bucket_name: name, start: start })); - const rowLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT; - const sizeLimit = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES; + const batchRowLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT; + const chunkSizeLimitBytes = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES; - let batchSize = 0; - let currentBatch: utils.SyncBucketData | null = null; + let chunkSizeBytes = 0; + let currentChunk: utils.SyncBucketData | null = null; let targetOp: InternalOpId | null = null; - let rowCount = 0; + let batchRowCount = 0; /** * It is possible to perform this query with JSONB join. e.g. @@ -458,7 +465,7 @@ export class PostgresSyncRulesStorage params: [ { type: 'int4', value: this.group_id }, { type: 'int8', value: end }, - { type: 'int4', value: rowLimit + 1 }, + { type: 'int4', value: batchRowLimit }, ...filters.flatMap((f) => [ { type: 'varchar' as const, value: f.bucket_name }, { type: 'int8' as const, value: f.start } satisfies StatementParam @@ -469,28 +476,27 @@ export class PostgresSyncRulesStorage for (const row of decodedRows) { const { bucket_name } = row; - const rowSize = row.data ? row.data.length : 0; - - if ( - currentBatch == null || - currentBatch.bucket != bucket_name || - batchSize >= sizeLimit || - (currentBatch?.data.length && batchSize + rowSize > sizeLimit) || - currentBatch.data.length >= rowLimit - ) { + const rowSizeBytes = row.data ? row.data.length : 0; + + const sizeExceeded = + chunkSizeBytes >= chunkSizeLimitBytes || + (currentChunk?.data.length && chunkSizeBytes + rowSizeBytes > chunkSizeLimitBytes) || + (currentChunk?.data.length ?? 0) >= batchRowLimit; + + if (currentChunk == null || currentChunk.bucket != bucket_name || sizeExceeded) { let start: string | undefined = undefined; - if (currentBatch != null) { - if (currentBatch.bucket == bucket_name) { - currentBatch.has_more = true; + if (currentChunk != null) { + if (currentChunk.bucket == bucket_name) { + currentChunk.has_more = true; + start = currentChunk.next_after; } - const yieldBatch = currentBatch; - start = currentBatch.after; - currentBatch = null; - batchSize = 0; - yield { batch: yieldBatch, targetOp: targetOp }; + const yieldChunk = currentChunk; + currentChunk = null; + chunkSizeBytes = 0; + yield { chunkData: yieldChunk, targetOp: targetOp }; targetOp = null; - if (rowCount >= rowLimit) { + if (batchRowCount >= batchRowLimit) { // We've yielded all the requested rows break; } @@ -503,11 +509,13 @@ export class PostgresSyncRulesStorage } start = internalToExternalOpId(startOpId); } - currentBatch = { + currentChunk = { bucket: bucket_name, after: start, + // this is updated when we yield the batch has_more: false, data: [], + // this is updated incrementally next_after: start }; targetOp = null; @@ -527,20 +535,25 @@ export class PostgresSyncRulesStorage } } - currentBatch.data.push(entry); - currentBatch.next_after = entry.op_id; + currentChunk.data.push(entry); + currentChunk.next_after = entry.op_id; - batchSize += rowSize; + chunkSizeBytes += rowSizeBytes; // Manually track the total rows yielded - rowCount++; + batchRowCount++; } } - if (currentBatch != null) { - const yieldBatch = currentBatch; - currentBatch = null; - yield { batch: yieldBatch, targetOp: targetOp }; + if (currentChunk != null) { + const yieldChunk = currentChunk; + currentChunk = null; + // This is the final chunk in the batch. + // There may be more data if and only if the batch we retrieved isn't complete. + // If batchRowCount == batchRowLimit, we don't actually know whether there is more data, + // but it is safe to return true in that case. + yieldChunk.has_more = batchRowCount >= batchRowLimit; + yield { chunkData: yieldChunk, targetOp: targetOp }; targetOp = null; } } diff --git a/modules/module-postgres-storage/test/src/storage.test.ts b/modules/module-postgres-storage/test/src/storage.test.ts index 42a2a935c..5551b9f88 100644 --- a/modules/module-postgres-storage/test/src/storage.test.ts +++ b/modules/module-postgres-storage/test/src/storage.test.ts @@ -93,7 +93,11 @@ describe('Postgres Sync Bucket Storage', () => { }); const batch2 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1[0].batch.next_after)]]), options) + bucketStorage.getBucketDataBatch( + checkpoint, + new Map([['global[]', BigInt(batch1[0].chunkData.next_after)]]), + options + ) ); expect(test_utils.getBatchData(batch2)).toEqual([ { op_id: '2', op: 'PUT', object_id: 'large1', checksum: 1178768505 } @@ -105,7 +109,11 @@ describe('Postgres Sync Bucket Storage', () => { }); const batch3 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2[0].batch.next_after)]]), options) + bucketStorage.getBucketDataBatch( + checkpoint, + new Map([['global[]', BigInt(batch2[0].chunkData.next_after)]]), + options + ) ); expect(test_utils.getBatchData(batch3)).toEqual([ { op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 } @@ -117,7 +125,11 @@ describe('Postgres Sync Bucket Storage', () => { }); const batch4 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch3[0].batch.next_after)]]), options) + bucketStorage.getBucketDataBatch( + checkpoint, + new Map([['global[]', BigInt(batch3[0].chunkData.next_after)]]), + options + ) ); expect(test_utils.getBatchData(batch4)).toEqual([ { op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 } diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index 74c2663f9..992a3765f 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -161,11 +161,11 @@ export class WalStreamTestContext implements AsyncDisposable { const batch = this.storage!.getBucketDataBatch(checkpoint, map); const batches = await test_utils.fromAsync(batch); - data = data.concat(batches[0]?.batch.data ?? []); - if (batches.length == 0 || !batches[0]!.batch.has_more) { + data = data.concat(batches[0]?.chunkData.data ?? []); + if (batches.length == 0 || !batches[0]!.chunkData.has_more) { break; } - map.set(bucket, BigInt(batches[0]!.batch.next_after)); + map.set(bucket, BigInt(batches[0]!.chunkData.next_after)); } return data; } @@ -182,6 +182,6 @@ export class WalStreamTestContext implements AsyncDisposable { const map = new Map([[bucket, start]]); const batch = this.storage!.getBucketDataBatch(checkpoint, map); const batches = await test_utils.fromAsync(batch); - return batches[0]?.batch.data ?? []; + return batches[0]?.chunkData.data ?? []; } } diff --git a/packages/service-core-tests/src/test-utils/general-utils.ts b/packages/service-core-tests/src/test-utils/general-utils.ts index faff7adfa..03a818799 100644 --- a/packages/service-core-tests/src/test-utils/general-utils.ts +++ b/packages/service-core-tests/src/test-utils/general-utils.ts @@ -47,7 +47,7 @@ export function makeTestTable(name: string, replicaIdColumns?: string[] | undefi } export function getBatchData( - batch: utils.SyncBucketData[] | storage.SyncBucketDataBatch[] | storage.SyncBucketDataBatch + batch: utils.SyncBucketData[] | storage.SyncBucketDataChunk[] | storage.SyncBucketDataChunk ) { const first = getFirst(batch); if (first == null) { @@ -64,7 +64,7 @@ export function getBatchData( } export function getBatchMeta( - batch: utils.SyncBucketData[] | storage.SyncBucketDataBatch[] | storage.SyncBucketDataBatch + batch: utils.SyncBucketData[] | storage.SyncBucketDataChunk[] | storage.SyncBucketDataChunk ) { const first = getFirst(batch); if (first == null) { @@ -78,17 +78,17 @@ export function getBatchMeta( } function getFirst( - batch: utils.SyncBucketData[] | storage.SyncBucketDataBatch[] | storage.SyncBucketDataBatch + batch: utils.SyncBucketData[] | storage.SyncBucketDataChunk[] | storage.SyncBucketDataChunk ): utils.SyncBucketData | null { if (!Array.isArray(batch)) { - return batch.batch; + return batch.chunkData; } if (batch.length == 0) { return null; } let first = batch[0]; - if ((first as storage.SyncBucketDataBatch).batch != null) { - return (first as storage.SyncBucketDataBatch).batch; + if ((first as storage.SyncBucketDataChunk).chunkData != null) { + return (first as storage.SyncBucketDataChunk).chunkData; } else { return first as utils.SyncBucketData; } diff --git a/packages/service-core-tests/src/tests/register-compacting-tests.ts b/packages/service-core-tests/src/tests/register-compacting-tests.ts index e5abe124b..31985f477 100644 --- a/packages/service-core-tests/src/tests/register-compacting-tests.ts +++ b/packages/service-core-tests/src/tests/register-compacting-tests.ts @@ -60,7 +60,7 @@ bucket_definitions: const batchBefore = await test_utils.oneFromAsync( bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) ); - const dataBefore = batchBefore.batch.data; + const dataBefore = batchBefore.chunkData.data; const checksumBefore = await bucketStorage.getChecksums(checkpoint, ['global[]']); expect(dataBefore).toMatchObject([ @@ -93,7 +93,7 @@ bucket_definitions: const batchAfter = await test_utils.oneFromAsync( bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) ); - const dataAfter = batchAfter.batch.data; + const dataAfter = batchAfter.chunkData.data; const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['global[]']); expect(batchAfter.targetOp).toEqual(3n); @@ -175,7 +175,7 @@ bucket_definitions: const batchBefore = await test_utils.oneFromAsync( bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) ); - const dataBefore = batchBefore.batch.data; + const dataBefore = batchBefore.chunkData.data; const checksumBefore = await bucketStorage.getChecksums(checkpoint, ['global[]']); expect(dataBefore).toMatchObject([ @@ -214,7 +214,7 @@ bucket_definitions: const batchAfter = await test_utils.oneFromAsync( bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) ); - const dataAfter = batchAfter.batch.data; + const dataAfter = batchAfter.chunkData.data; const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['global[]']); expect(batchAfter.targetOp).toEqual(4n); @@ -299,7 +299,7 @@ bucket_definitions: const batchAfter = await test_utils.oneFromAsync( bucketStorage.getBucketDataBatch(checkpoint2, new Map([['global[]', 0n]])) ); - const dataAfter = batchAfter.batch.data; + const dataAfter = batchAfter.chunkData.data; const checksumAfter = await bucketStorage.getChecksums(checkpoint2, ['global[]']); expect(batchAfter.targetOp).toEqual(4n); @@ -414,7 +414,7 @@ bucket_definitions: ]) ) ); - const dataAfter = batchAfter.flatMap((b) => b.batch.data); + const dataAfter = batchAfter.flatMap((b) => b.chunkData.data); // The op_ids will vary between MongoDB and Postgres storage expect(dataAfter).toMatchObject( diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index 21ab89868..d64631294 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1,6 +1,12 @@ -import { getUuidReplicaIdentityBson, OplogEntry, storage } from '@powersync/service-core'; +import { + BucketDataBatchOptions, + getUuidReplicaIdentityBson, + InternalOpId, + OplogEntry, + storage +} from '@powersync/service-core'; import { ParameterLookup, RequestParameters } from '@powersync/service-sync-rules'; -import { expect, test } from 'vitest'; +import { expect, test, describe, beforeEach } from 'vitest'; import * as test_utils from '../test-utils/test-utils-index.js'; export const TEST_TABLE = test_utils.makeTestTable('test', ['id']); @@ -338,7 +344,7 @@ bucket_definitions: const checkpoint = result!.flushed_op; const batch = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]))); - const data = batch[0].batch.data.map((d) => { + const data = batch[0].chunkData.data.map((d) => { return { op: d.op, object_id: d.object_id, @@ -635,7 +641,7 @@ bucket_definitions: }); const checkpoint = result!.flushed_op; const batch = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]))); - const data = batch[0].batch.data.map((d) => { + const data = batch[0].chunkData.data.map((d) => { return { op: d.op, object_id: d.object_id @@ -699,7 +705,7 @@ bucket_definitions: const checkpoint = result!.flushed_op; const batch = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]))); - const data = batch[0].batch.data.map((d) => { + const data = batch[0].chunkData.data.map((d) => { return { op: d.op, object_id: d.object_id, @@ -815,7 +821,7 @@ bucket_definitions: const batch = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]))); - const data = batch[0].batch.data.map((d) => { + const data = batch[0].chunkData.data.map((d) => { return { op: d.op, object_id: d.object_id, @@ -1014,7 +1020,7 @@ bucket_definitions: bucketStorage.getBucketDataBatch(checkpoint2, new Map([['global[]', checkpoint1]])) ); - const data = batch[0].batch.data.map((d) => { + const data = batch[0].chunkData.data.map((d) => { return { op: d.op, object_id: d.object_id, @@ -1113,7 +1119,7 @@ bucket_definitions: const batch = await test_utils.fromAsync( bucketStorage.getBucketDataBatch(checkpoint3, new Map([['global[]', checkpoint1]])) ); - const data = batch[0].batch.data.map((d) => { + const data = batch[0].chunkData.data.map((d) => { return { op: d.op, object_id: d.object_id, @@ -1221,7 +1227,7 @@ bucket_definitions: const batch = await test_utils.fromAsync( bucketStorage.getBucketDataBatch(checkpoint3, new Map([['global[]', checkpoint1]])) ); - const data = batch[0].batch.data.map((d) => { + const data = batch[0].chunkData.data.map((d) => { return { op: d.op, object_id: d.object_id, @@ -1332,7 +1338,11 @@ bucket_definitions: }); const batch2 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1[0].batch.next_after)]]), options) + bucketStorage.getBucketDataBatch( + checkpoint, + new Map([['global[]', BigInt(batch1[0].chunkData.next_after)]]), + options + ) ); expect(test_utils.getBatchData(batch2)).toEqual([ { op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1795508474 }, @@ -1345,7 +1355,11 @@ bucket_definitions: }); const batch3 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2[0].batch.next_after)]]), options) + bucketStorage.getBucketDataBatch( + checkpoint, + new Map([['global[]', BigInt(batch2[0].chunkData.next_after)]]), + options + ) ); expect(test_utils.getBatchData(batch3)).toEqual([]); expect(test_utils.getBatchMeta(batch3)).toEqual(null); @@ -1400,7 +1414,7 @@ bucket_definitions: }); const batch2 = await test_utils.oneFromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1.batch.next_after)]]), { + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1.chunkData.next_after)]]), { limit: 4 }) ); @@ -1416,7 +1430,7 @@ bucket_definitions: }); const batch3 = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2.batch.next_after)]]), { + bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2.chunkData.next_after)]]), { limit: 4 }) ); @@ -1425,6 +1439,159 @@ bucket_definitions: expect(test_utils.getBatchMeta(batch3)).toEqual(null); }); + describe('batch has_more', () => { + const setup = async (options: BucketDataBatchOptions) => { + const sync_rules = test_utils.testRules( + ` + bucket_definitions: + global1: + data: + - SELECT id, description FROM test WHERE bucket = 'global1' + global2: + data: + - SELECT id, description FROM test WHERE bucket = 'global2' + ` + ); + await using factory = await generateStorageFactory(); + const bucketStorage = factory.getInstance(sync_rules); + + const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + const sourceTable = TEST_TABLE; + + for (let i = 1; i <= 10; i++) { + await batch.save({ + sourceTable, + tag: storage.SaveOperationTag.INSERT, + after: { + id: `test${i}`, + description: `test${i}`, + bucket: i == 1 ? 'global1' : 'global2' + }, + afterReplicaId: `test${i}` + }); + } + }); + + const checkpoint = result!.flushed_op; + return await test_utils.fromAsync( + bucketStorage.getBucketDataBatch( + checkpoint, + new Map([ + ['global1[]', 0n], + ['global2[]', 0n] + ]), + options + ) + ); + }; + + test('batch has_more (1)', async () => { + const batch = await setup({ limit: 5 }); + expect(batch.length).toEqual(2); + + expect(batch[0].chunkData.bucket).toEqual('global1[]'); + expect(batch[1].chunkData.bucket).toEqual('global2[]'); + + expect(test_utils.getBatchData(batch[0])).toEqual([ + { op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 } + ]); + + expect(test_utils.getBatchData(batch[1])).toEqual([ + { op_id: '2', op: 'PUT', object_id: 'test2', checksum: 730027011 }, + { op_id: '3', op: 'PUT', object_id: 'test3', checksum: 1359888332 }, + { op_id: '4', op: 'PUT', object_id: 'test4', checksum: 2049153252 }, + { op_id: '5', op: 'PUT', object_id: 'test5', checksum: 3686902721 } + ]); + + expect(test_utils.getBatchMeta(batch[0])).toEqual({ + after: '0', + has_more: false, + next_after: '1' + }); + + expect(test_utils.getBatchMeta(batch[1])).toEqual({ + after: '0', + has_more: true, + next_after: '5' + }); + }); + + test('batch has_more (2)', async () => { + const batch = await setup({ limit: 11 }); + expect(batch.length).toEqual(2); + + expect(batch[0].chunkData.bucket).toEqual('global1[]'); + expect(batch[1].chunkData.bucket).toEqual('global2[]'); + + expect(test_utils.getBatchData(batch[0])).toEqual([ + { op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 } + ]); + + expect(test_utils.getBatchData(batch[1])).toEqual([ + { op_id: '2', op: 'PUT', object_id: 'test2', checksum: 730027011 }, + { op_id: '3', op: 'PUT', object_id: 'test3', checksum: 1359888332 }, + { op_id: '4', op: 'PUT', object_id: 'test4', checksum: 2049153252 }, + { op_id: '5', op: 'PUT', object_id: 'test5', checksum: 3686902721 }, + { op_id: '6', op: 'PUT', object_id: 'test6', checksum: 1974820016 }, + { op_id: '7', op: 'PUT', object_id: 'test7', checksum: 2477637855 }, + { op_id: '8', op: 'PUT', object_id: 'test8', checksum: 3644033632 }, + { op_id: '9', op: 'PUT', object_id: 'test9', checksum: 1011055869 }, + { op_id: '10', op: 'PUT', object_id: 'test10', checksum: 1331456365 } + ]); + + expect(test_utils.getBatchMeta(batch[0])).toEqual({ + after: '0', + has_more: false, + next_after: '1' + }); + + expect(test_utils.getBatchMeta(batch[1])).toEqual({ + after: '0', + has_more: false, + next_after: '10' + }); + }); + + test('batch has_more (3)', async () => { + // 50 bytes is more than 1 row, less than 2 rows + const batch = await setup({ limit: 3, chunkLimitBytes: 50 }); + + expect(batch.length).toEqual(3); + expect(batch[0].chunkData.bucket).toEqual('global1[]'); + expect(batch[1].chunkData.bucket).toEqual('global2[]'); + expect(batch[2].chunkData.bucket).toEqual('global2[]'); + + expect(test_utils.getBatchData(batch[0])).toEqual([ + { op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 } + ]); + + expect(test_utils.getBatchData(batch[1])).toEqual([ + { op_id: '2', op: 'PUT', object_id: 'test2', checksum: 730027011 } + ]); + expect(test_utils.getBatchData(batch[2])).toEqual([ + { op_id: '3', op: 'PUT', object_id: 'test3', checksum: 1359888332 } + ]); + + expect(test_utils.getBatchMeta(batch[0])).toEqual({ + after: '0', + has_more: false, + next_after: '1' + }); + + expect(test_utils.getBatchMeta(batch[1])).toEqual({ + after: '0', + has_more: true, + next_after: '2' + }); + + expect(test_utils.getBatchMeta(batch[2])).toEqual({ + after: '2', + has_more: true, + next_after: '3' + }); + }); + }); + test('empty storage metrics', async () => { await using f = await generateStorageFactory({ dropAll: true }); const metrics = await f.getStorageMetrics(); diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 148f3be17..1d85a3a00 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -107,7 +107,7 @@ export interface SyncRulesBucketStorage checkpoint: util.InternalOpId, dataBuckets: Map, options?: BucketDataBatchOptions - ): AsyncIterable; + ): AsyncIterable; /** * Compute checksums for a given list of buckets. @@ -223,8 +223,8 @@ export interface BucketDataBatchOptions { chunkLimitBytes?: number; } -export interface SyncBucketDataBatch { - batch: util.SyncBucketData; +export interface SyncBucketDataChunk { + chunkData: util.SyncBucketData; targetOp: util.InternalOpId | null; } diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index e5bcc54a7..8824aa9fa 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -327,7 +327,7 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator