Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ async function* streamResponseInner(
bucketsByPriority.sort((a, b) => a[0] - b[0]); // Sort from high to lower priorities
const lowestPriority = bucketsByPriority.at(-1)?.[0];

// Ensure that we have at least one priority batch: After sending the checkpoint line, clients expect to
// receive a sync complete message after the synchronization is done (which happens in the last
// bucketDataInBatches iteration). Without any batch, the line is missing and clients might not complete their
// sync properly.
const priorityBatches: [BucketPriority | null, BucketDescription[]][] = bucketsByPriority;
if (priorityBatches.length == 0) {
priorityBatches.push([null, []]);
}

function maybeRaceForNewCheckpoint() {
if (syncedOperations >= 1000 && nextCheckpointPromise === undefined) {
nextCheckpointPromise = (async () => {
Expand All @@ -179,7 +188,7 @@ async function* streamResponseInner(

// This incrementally updates dataBuckets with each individual bucket position.
// At the end of this, we can be sure that all buckets have data up to the checkpoint.
for (const [priority, buckets] of bucketsByPriority) {
for (const [priority, buckets] of priorityBatches) {
const isLast = priority === lowestPriority;
if (abortCheckpointSignal.aborted) {
break;
Expand All @@ -198,7 +207,7 @@ async function* streamResponseInner(
user_id: syncParams.user_id,
// Passing undefined will emit a full sync complete message at the end. If we pass a priority, we'll emit a partial
// sync complete message.
forPriority: !isLast ? priority : undefined
forPriority: !isLast ? priority : null
});
}

Expand Down Expand Up @@ -227,7 +236,7 @@ interface BucketDataRequest {
*/
abort_batch: AbortSignal;
user_id?: string;
forPriority?: BucketPriority;
forPriority: BucketPriority | null;
onRowsSent: (amount: number) => void;
}

Expand Down Expand Up @@ -372,7 +381,7 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<Buck
// More data should be available immediately for a new checkpoint.
yield { data: null, done: true };
} else {
if (request.forPriority !== undefined) {
if (request.forPriority != null) {
const line: util.StreamingSyncCheckpointPartiallyComplete = {
partial_checkpoint_complete: {
last_op_id: checkpoint,
Expand Down