Skip to content

Commit 4105912

Browse files
committed
Revert migration; check is_called instead.
1 parent 3661908 commit 4105912

File tree

3 files changed

+20
-42
lines changed

3 files changed

+20
-42
lines changed

modules/module-postgres-storage/src/migrations/scripts/1746700486312-op-id-sequence.ts

Lines changed: 0 additions & 27 deletions
This file was deleted.

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ export class PostgresSyncRulesStorage
336336

337337
await callback(batch);
338338
await batch.flush();
339-
if (batch.last_flushed_op) {
339+
if (batch.last_flushed_op != null) {
340340
return { flushed_op: batch.last_flushed_op };
341341
} else {
342342
return null;

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -208,13 +208,7 @@ export class PostgresBucketBatch
208208
return null;
209209
}
210210

211-
const currentSequence = await this.db.sql`
212-
SELECT
213-
LAST_VALUE AS value
214-
FROM
215-
op_id_sequence;
216-
`.first<{ value: bigint }>();
217-
return currentSequence!.value;
211+
return this.getLastOpIdSequence(this.db);
218212
}
219213

220214
async drop(sourceTables: storage.SourceTable[]): Promise<storage.FlushedResult | null> {
@@ -262,13 +256,7 @@ export class PostgresBucketBatch
262256
const lastOp = await this.withReplicationTransaction(async (db) => {
263257
resumeBatch = await this.replicateBatch(db, batch);
264258

265-
const sequence = await db.sql`
266-
SELECT
267-
LAST_VALUE AS value
268-
FROM
269-
op_id_sequence;
270-
`.first<{ value: bigint }>();
271-
return sequence!.value;
259+
return this.getLastOpIdSequence(db);
272260
});
273261

274262
// null if done, set if we need another flush
@@ -895,6 +883,23 @@ export class PostgresBucketBatch
895883
`.execute();
896884
}
897885
}
886+
887+
private async getLastOpIdSequence(db: lib_postgres.AbstractPostgresConnection) {
888+
// When no op_id has been generated, last_value = 1 and nextval() will be 1.
889+
// To cater for this case, we check is_called, and default to 0 if no value has been generated.
890+
const sequence = await db.sql`
891+
SELECT
892+
(
893+
CASE
894+
WHEN is_called THEN last_value
895+
ELSE 0
896+
END
897+
) AS value
898+
FROM
899+
op_id_sequence;
900+
`.first<{ value: bigint }>();
901+
return sequence!.value;
902+
}
898903
}
899904

900905
/**

0 commit comments

Comments
 (0)