@@ -11,7 +11,7 @@ import {
1111 ReplicationAssertionError ,
1212 ServiceError
1313} from '@powersync/lib-services-framework' ;
14- import { deserializeBson , SaveOperationTag , storage , utils } from '@powersync/service-core' ;
14+ import { deserializeBson , InternalOpId , SaveOperationTag , storage , utils } from '@powersync/service-core' ;
1515import * as timers from 'node:timers/promises' ;
1616import { PowerSyncMongo } from './db.js' ;
1717import { CurrentBucket , CurrentDataDocument , SourceKey , SyncRuleDocument } from './models.js' ;
@@ -39,7 +39,7 @@ export interface MongoBucketBatchOptions {
3939 groupId : number ;
4040 slotName : string ;
4141 lastCheckpointLsn : string | null ;
42- keepaliveOp : string | null ;
42+ keepaliveOp : InternalOpId | null ;
4343 noCheckpointBeforeLsn : string ;
4444 storeCurrentData : boolean ;
4545 /**
@@ -77,12 +77,12 @@ export class MongoBucketBatch
7777
7878 private no_checkpoint_before_lsn : string ;
7979
80- private persisted_op : bigint | null = null ;
80+ private persisted_op : InternalOpId | null = null ;
8181
8282 /**
8383 * For tests only - not for persistence logic.
8484 */
85- public last_flushed_op : bigint | null = null ;
85+ public last_flushed_op : InternalOpId | null = null ;
8686
8787 constructor ( options : MongoBucketBatchOptions ) {
8888 super ( ) ;
@@ -98,9 +98,7 @@ export class MongoBucketBatch
9898 this . skipExistingRows = options . skipExistingRows ;
9999 this . batch = new OperationBatch ( ) ;
100100
101- if ( options . keepaliveOp ) {
102- this . persisted_op = BigInt ( options . keepaliveOp ) ;
103- }
101+ this . persisted_op = options . keepaliveOp ?? null ;
104102 }
105103
106104 addCustomWriteCheckpoint ( checkpoint : storage . BatchedCustomWriteCheckpointOptions ) : void {
@@ -135,7 +133,7 @@ export class MongoBucketBatch
135133 return null ;
136134 }
137135
138- let last_op : bigint | null = null ;
136+ let last_op : InternalOpId | null = null ;
139137 let resumeBatch : OperationBatch | null = null ;
140138
141139 await this . withReplicationTransaction ( `Flushing ${ batch . length } ops` , async ( session , opSeq ) => {
@@ -153,7 +151,7 @@ export class MongoBucketBatch
153151
154152 this . persisted_op = last_op ;
155153 this . last_flushed_op = last_op ;
156- return { flushed_op : String ( last_op ) } ;
154+ return { flushed_op : last_op } ;
157155 }
158156
159157 private async replicateBatch (
@@ -776,22 +774,23 @@ export class MongoBucketBatch
776774 async truncate ( sourceTables : storage . SourceTable [ ] ) : Promise < storage . FlushedResult | null > {
777775 await this . flush ( ) ;
778776
779- let last_op : bigint | null = null ;
777+ let last_op : InternalOpId | null = null ;
780778 for ( let table of sourceTables ) {
781779 last_op = await this . truncateSingle ( table ) ;
782780 }
783781
784782 if ( last_op ) {
785783 this . persisted_op = last_op ;
784+ return {
785+ flushed_op : last_op
786+ } ;
787+ } else {
788+ return null ;
786789 }
787-
788- return {
789- flushed_op : String ( last_op ! )
790- } ;
791790 }
792791
793- async truncateSingle ( sourceTable : storage . SourceTable ) : Promise < bigint > {
794- let last_op : bigint | null = null ;
792+ async truncateSingle ( sourceTable : storage . SourceTable ) : Promise < InternalOpId > {
793+ let last_op : InternalOpId | null = null ;
795794
796795 // To avoid too large transactions, we limit the amount of data we delete per transaction.
797796 // Since we don't use the record data here, we don't have explicit size limits per batch.
0 commit comments