Skip to content

Commit 2e17504

Browse files
committed
Fix aborting low-priority syncs
1 parent 4d9d4a0 commit 2e17504

File tree

3 files changed

+231
-4
lines changed

3 files changed

+231
-4
lines changed

modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,80 @@ exports[`sync - mongodb > expiring token 2`] = `
127127
]
128128
`;
129129

130+
exports[`sync - mongodb > sync buckets in order 1`] = `
131+
[
132+
{
133+
"checkpoint": {
134+
"buckets": [
135+
{
136+
"bucket": "b0[]",
137+
"checksum": 920318466,
138+
"count": 1,
139+
"priority": 2,
140+
},
141+
{
142+
"bucket": "b1[]",
143+
"checksum": -1382098757,
144+
"count": 1,
145+
"priority": 1,
146+
},
147+
],
148+
"last_op_id": "2",
149+
"write_checkpoint": undefined,
150+
},
151+
},
152+
{
153+
"data": {
154+
"after": "0",
155+
"bucket": "b1[]",
156+
"data": [
157+
{
158+
"checksum": 2912868539n,
159+
"data": "{"id":"earlier","description":"Test 2"}",
160+
"object_id": "earlier",
161+
"object_type": "test",
162+
"op": "PUT",
163+
"op_id": "2",
164+
"subkey": "0dfe86bd-d15b-5fd0-9c7b-a31693030ee0",
165+
},
166+
],
167+
"has_more": false,
168+
"next_after": "2",
169+
},
170+
},
171+
{
172+
"partial_checkpoint_complete": {
173+
"last_op_id": "2",
174+
"priority": 1,
175+
},
176+
},
177+
{
178+
"data": {
179+
"after": "0",
180+
"bucket": "b0[]",
181+
"data": [
182+
{
183+
"checksum": 920318466n,
184+
"data": "{"id":"t1","description":"Test 1"}",
185+
"object_id": "t1",
186+
"object_type": "test",
187+
"op": "PUT",
188+
"op_id": "1",
189+
"subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a",
190+
},
191+
],
192+
"has_more": false,
193+
"next_after": "1",
194+
},
195+
},
196+
{
197+
"checkpoint_complete": {
198+
"last_op_id": "2",
199+
},
200+
},
201+
]
202+
`;
203+
130204
exports[`sync - mongodb > sync global data 1`] = `
131205
[
132206
{

packages/service-core-tests/src/tests/register-sync-tests.ts

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,159 @@ export function registerSyncTests(factory: storage.TestStorageFactory) {
8383
expect(lines).toMatchSnapshot();
8484
});
8585

86+
test('sync buckets in order', async () => {
87+
await using f = await factory();
88+
89+
const syncRules = await f.updateSyncRules({
90+
content: `
91+
bucket_definitions:
92+
b0:
93+
priority: 2
94+
data:
95+
- SELECT * FROM test WHERE LENGTH(id) <= 2;
96+
b1:
97+
priority: 1
98+
data:
99+
- SELECT * FROM test WHERE LENGTH(id) > 2;
100+
`
101+
});
102+
103+
const bucketStorage = f.getInstance(syncRules);
104+
await bucketStorage.autoActivate();
105+
106+
const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
107+
await batch.save({
108+
sourceTable: TEST_TABLE,
109+
tag: storage.SaveOperationTag.INSERT,
110+
after: {
111+
id: 't1',
112+
description: 'Test 1'
113+
},
114+
afterReplicaId: 't1'
115+
});
116+
117+
await batch.save({
118+
sourceTable: TEST_TABLE,
119+
tag: storage.SaveOperationTag.INSERT,
120+
after: {
121+
id: 'earlier',
122+
description: 'Test 2'
123+
},
124+
afterReplicaId: 'earlier'
125+
});
126+
127+
await batch.commit('0/1');
128+
});
129+
130+
const stream = sync.streamResponse({
131+
storage: f,
132+
params: {
133+
buckets: [],
134+
include_checksum: true,
135+
raw_data: true
136+
},
137+
parseOptions: test_utils.PARSE_OPTIONS,
138+
tracker,
139+
syncParams: new RequestParameters({ sub: '' }, {}),
140+
token: { exp: Date.now() / 1000 + 10 } as any
141+
});
142+
143+
const lines = await consumeCheckpointLines(stream);
144+
expect(lines).toMatchSnapshot();
145+
});
146+
147+
test('sync interrupts low-priority buckets on new checkpoints', async () => {
148+
await using f = await factory();
149+
150+
const syncRules = await f.updateSyncRules({
151+
content: `
152+
bucket_definitions:
153+
b0:
154+
priority: 2
155+
data:
156+
- SELECT * FROM test WHERE LENGTH(id) <= 5;
157+
b1:
158+
priority: 1
159+
data:
160+
- SELECT * FROM test WHERE LENGTH(id) > 5;
161+
`
162+
});
163+
164+
const bucketStorage = f.getInstance(syncRules);
165+
await bucketStorage.autoActivate();
166+
167+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
168+
// Initial data: Add one priority row and 10k low-priority rows.
169+
await batch.save({
170+
sourceTable: TEST_TABLE,
171+
tag: storage.SaveOperationTag.INSERT,
172+
after: {
173+
id: 'highprio',
174+
description: 'High priority row'
175+
},
176+
afterReplicaId: 'highprio'
177+
});
178+
for (let i = 0; i < 10_000; i++) {
179+
await batch.save({
180+
sourceTable: TEST_TABLE,
181+
tag: storage.SaveOperationTag.INSERT,
182+
after: {
183+
id: `${i}`,
184+
description: 'low prio'
185+
},
186+
afterReplicaId: `${i}`
187+
});
188+
}
189+
190+
await batch.commit('0/1');
191+
});
192+
193+
const stream = sync.streamResponse({
194+
storage: f,
195+
params: {
196+
buckets: [],
197+
include_checksum: true,
198+
raw_data: true
199+
},
200+
parseOptions: test_utils.PARSE_OPTIONS,
201+
tracker,
202+
syncParams: new RequestParameters({ sub: '' }, {}),
203+
token: { exp: Date.now() / 1000 + 10 } as any
204+
});
205+
206+
let sentCheckpoints = 0;
207+
for await (const next of stream) {
208+
if (typeof next === 'object' && next !== null) {
209+
if ('partial_checkpoint_complete' in next) {
210+
expect(sentCheckpoints).toBe(1);
211+
212+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
213+
// Add another high-priority row. This should interrupt the long-running low-priority sync.
214+
await batch.save({
215+
sourceTable: TEST_TABLE,
216+
tag: storage.SaveOperationTag.INSERT,
217+
after: {
218+
id: 'highprio2',
219+
description: 'Another high-priority row'
220+
},
221+
afterReplicaId: 'highprio2'
222+
});
223+
224+
await batch.commit('0/2');
225+
});
226+
}
227+
if ('checkpoint' in next || 'checkpoint_diff' in next) {
228+
sentCheckpoints += 1;
229+
}
230+
if ('checkpoint_complete' in next) {
231+
break;
232+
}
233+
}
234+
}
235+
236+
expect(sentCheckpoints).toBe(2);
237+
});
238+
86239
test('sync legacy non-raw data', async () => {
87240
const f = await factory();
88241

packages/service-core/src/sync/sync.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ async function* streamResponseInner(
241241
const abortCheckpointSignal = AbortSignal.any([abortCheckpointController.signal, signal]);
242242

243243
const bucketsByPriority = [...Map.groupBy(bucketsToFetch, (bucket) => bucket.priority).entries()];
244-
bucketsByPriority.sort((a, b) => b[0] - a[0]); // Inverting sort order, high priority buckets have smaller priority values
244+
bucketsByPriority.sort((a, b) => a[0] - b[0]); // Sort from high to lower priorities
245245
const lowestPriority = bucketsByPriority.at(-1)?.[0];
246246

247247
function maybeRaceForNewCheckpoint() {
@@ -267,11 +267,11 @@ async function* streamResponseInner(
267267
// This incrementally updates dataBuckets with each individual bucket position.
268268
// At the end of this, we can be sure that all buckets have data up to the checkpoint.
269269
for (const [priority, buckets] of bucketsByPriority) {
270+
const isLast = priority === lowestPriority;
270271
if (abortCheckpointSignal.aborted) {
271272
break;
272273
}
273274

274-
const isLast = priority === lowestPriority;
275275
yield* bucketDataInBatches({
276276
storage,
277277
checkpoint,
@@ -323,7 +323,7 @@ interface BucketDataRequest {
323323

324324
async function* bucketDataInBatches(request: BucketDataRequest) {
325325
let isDone = false;
326-
while (!request.abort_connection.aborted && !isDone) {
326+
while (!request.abort_batch.aborted && !isDone) {
327327
// The code below is functionally the same as this for-await loop below.
328328
// However, the for-await loop appears to have a memory leak, so we avoid it.
329329
// for await (const { done, data } of bucketDataBatch(storage, checkpoint, dataBuckets, raw_data, signal)) {
@@ -453,7 +453,7 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
453453
// Check if syncing bucket data is supposed to stop before fetching more data
454454
// from storage.
455455
if (abort_batch.aborted) {
456-
break;
456+
return;
457457
}
458458
}
459459

0 commit comments

Comments
 (0)