11import type { BucketState , Checkpoint } from '@powersync/service-core' ;
22import type { BucketOperationProgress , BucketStorage , SyncDataBatch } from './BucketStorage.js' ;
3+ import type { SyncOperation , SyncOperationsHandler } from './SyncOperationsHandler.js' ;
34import { constructKey , toStringOrNull } from './bucketHelpers.js' ;
4- import type { PSBucket } from './buckets/ps_buckets.js' ;
5- import type { PSCrud } from './buckets/ps_crud.js' ;
6- import type { PSKeyValue } from './buckets/ps_kv.js' ;
7- import type { PSOplog } from './buckets/ps_oplog.js' ;
8- import type { PSTx } from './buckets/ps_tx.js' ;
9- import type { PSUntyped } from './buckets/ps_untyped.js' ;
105import { addChecksums , normalizeChecksum , subtractChecksums } from './checksumUtils.js' ;
6+ import type { PSBucket } from './storage-types/ps_buckets.js' ;
7+ import type { PSCrud } from './storage-types/ps_crud.js' ;
8+ import type { PSKeyValue } from './storage-types/ps_kv.js' ;
9+ import type { PSOplog } from './storage-types/ps_oplog.js' ;
10+ import type { PSTx } from './storage-types/ps_tx.js' ;
11+ import type { PSUntyped } from './storage-types/ps_untyped.js' ;
1112
1213export type OpType = 'PUT' | 'REMOVE' | 'MOVE' | 'CLEAR' ;
1314
1415export const MAX_OP_ID = '9223372036854775807' ;
1516
16- export class BucketStorageImpl implements BucketStorage {
17+ export type MemoryBucketStorageImplOptions = {
18+ /** Array of handlers for processing sync operations collected from the protocol */
19+ operationsHandlers : SyncOperationsHandler [ ] ;
20+ } ;
21+
22+ export class MemoryBucketStorageImpl implements BucketStorage {
1723 protected ps_buckets : PSBucket [ ] ;
1824 protected ps_oplog : PSOplog [ ] ;
1925 protected ps_updated_rows : PSUntyped [ ] ;
@@ -29,7 +35,10 @@ export class BucketStorageImpl implements BucketStorage {
2935 // TODO: This should be properly managed when ps_crud is implemented
3036 protected ps_crud_seq : number = 0 ;
3137
32- constructor ( ) {
38+ /** Handlers for processing sync operations collected from the protocol */
39+ protected operationsHandlers : SyncOperationsHandler [ ] ;
40+
41+ constructor ( options : MemoryBucketStorageImplOptions ) {
3342 this . ps_buckets = [ ] ;
3443 this . ps_oplog = [ ] ;
3544 this . ps_tx = {
@@ -40,6 +49,7 @@ export class BucketStorageImpl implements BucketStorage {
4049 this . ps_crud = [ ] ;
4150 this . ps_kv = [ ] ;
4251 this . clientId = 'TODO' ;
52+ this . operationsHandlers = options . operationsHandlers ;
4353 }
4454
4555 async init ( ) : Promise < void > { }
@@ -70,6 +80,7 @@ export class BucketStorageImpl implements BucketStorage {
7080 async updateLocalTarget ( cb : ( ) => Promise < string > ) : Promise < boolean > {
7181 // Find the '$local' bucket and check if target_op = MAX_OP_ID
7282 // SQL: SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = CAST(? as INTEGER)
83+ // TODO: maybe store local state separately
7384 const localBucket = this . ps_buckets . find ( ( b ) => b . name === '$local' ) ;
7485 if ( ! localBucket ) {
7586 // Nothing to update
@@ -381,11 +392,12 @@ export class BucketStorageImpl implements BucketStorage {
381392 // Collect operations that need to be applied
382393 const operations = await this . collectFullOperations ( checkpoint , priority ) ;
383394
384- console . log ( 'operations' , operations ) ;
385-
386- // TODO: Apply operations to output collections (pluggable approach)
387- // For now, we just collect them but don't actually write to output collections
388- // This will be handled in a pluggable manner later
395+ // Process operations using all handlers if provided
396+ if ( this . operationsHandlers . length > 0 && operations . length > 0 ) {
397+ for ( const handler of this . operationsHandlers ) {
398+ await handler . processOperations ( operations ) ;
399+ }
400+ }
389401
390402 // Update last_applied_op for buckets
391403 await this . setLastAppliedOp ( checkpoint , priority ) ;
@@ -457,8 +469,8 @@ export class BucketStorageImpl implements BucketStorage {
457469 private async collectFullOperations (
458470 checkpoint : Checkpoint ,
459471 priority : number | undefined
460- ) : Promise < Array < { type : string ; id : string ; op : 'PUT' | 'REMOVE' ; data : string | null } > > {
461- const operations : Array < { type : string ; id : string ; op : 'PUT' | 'REMOVE' ; data : string | null } > = [ ] ;
472+ ) : Promise < Array < SyncOperation > > {
473+ const operations : Array < SyncOperation > = [ ] ;
462474
463475 if ( priority === undefined ) {
464476 // Complete sync - collect all updated rows
0 commit comments