@@ -5,7 +5,7 @@ import async from 'async';
55import { ColumnDescriptor , framework , getUuidReplicaIdentityBson , Metrics , storage } from '@powersync/service-core' ;
66import mysql , { FieldPacket } from 'mysql2' ;
77
8- import { BinLogEvent } from '@powersync/mysql-zongji' ;
8+ import { BinLogEvent , TableMapEntry } from '@powersync/mysql-zongji' ;
99import * as common from '../common/common-index.js' ;
1010import * as zongji_utils from './zongji/zongji-utils.js' ;
1111import { MySQLConnectionManager } from './MySQLConnectionManager.js' ;
@@ -31,6 +31,7 @@ interface WriteChangePayload {
3131 database : string ;
3232 table : string ;
3333 sourceTable : storage . SourceTable ;
34+ columns : Map < string , ColumnDescriptor > ;
3435}
3536
3637export type Data = Record < string , any > ;
@@ -408,14 +409,7 @@ AND table_type = 'BASE TABLE';`,
408409 await this . writeChanges ( batch , {
409410 type : storage . SaveOperationTag . INSERT ,
410411 data : evt . rows ,
411- database : writeTableInfo . parentSchema ,
412- table : writeTableInfo . tableName ,
413- sourceTable : this . getTable (
414- getMysqlRelId ( {
415- schema : writeTableInfo . parentSchema ,
416- name : writeTableInfo . tableName
417- } )
418- )
412+ tableEntry : writeTableInfo
419413 } ) ;
420414 break ;
421415 case zongji_utils . eventIsUpdateMutation ( evt ) :
@@ -424,14 +418,7 @@ AND table_type = 'BASE TABLE';`,
424418 type : storage . SaveOperationTag . UPDATE ,
425419 data : evt . rows . map ( ( row ) => row . after ) ,
426420 previous_data : evt . rows . map ( ( row ) => row . before ) ,
427- database : updateTableInfo . parentSchema ,
428- table : updateTableInfo . tableName ,
429- sourceTable : this . getTable (
430- getMysqlRelId ( {
431- schema : updateTableInfo . parentSchema ,
432- name : updateTableInfo . tableName
433- } )
434- )
421+ tableEntry : updateTableInfo
435422 } ) ;
436423 break ;
437424 case zongji_utils . eventIsDeleteMutation ( evt ) :
@@ -440,15 +427,7 @@ AND table_type = 'BASE TABLE';`,
440427 await this . writeChanges ( batch , {
441428 type : storage . SaveOperationTag . DELETE ,
442429 data : evt . rows ,
443- database : deleteTableInfo . parentSchema ,
444- table : deleteTableInfo . tableName ,
445- // TODO cleanup
446- sourceTable : this . getTable (
447- getMysqlRelId ( {
448- schema : deleteTableInfo . parentSchema ,
449- name : deleteTableInfo . tableName
450- } )
451- )
430+ tableEntry : deleteTableInfo
452431 } ) ;
453432 break ;
454433 case zongji_utils . eventIsXid ( evt ) :
@@ -524,14 +503,26 @@ AND table_type = 'BASE TABLE';`,
524503 type : storage . SaveOperationTag ;
525504 data : Data [ ] ;
526505 previous_data ?: Data [ ] ;
527- database : string ;
528- table : string ;
529- sourceTable : storage . SourceTable ;
506+ tableEntry : TableMapEntry ;
530507 }
531508 ) : Promise < storage . FlushedResult | null > {
509+ const columns = new Map < string , ColumnDescriptor > ( ) ;
510+ msg . tableEntry . columns . forEach ( ( column ) => {
511+ columns . set ( column . name , { name : column . name , typeId : column . type } ) ;
512+ } ) ;
513+
532514 for ( const [ index , row ] of msg . data . entries ( ) ) {
533515 await this . writeChange ( batch , {
534- ...msg ,
516+ type : msg . type ,
517+ database : msg . tableEntry . parentSchema ,
518+ sourceTable : this . getTable (
519+ getMysqlRelId ( {
520+ schema : msg . tableEntry . parentSchema ,
521+ name : msg . tableEntry . tableName
522+ } )
523+ ) ,
524+ table : msg . tableEntry . tableName ,
525+ columns : columns ,
535526 data : row ,
536527 previous_data : msg . previous_data ?. [ index ]
537528 } ) ;
@@ -546,7 +537,7 @@ AND table_type = 'BASE TABLE';`,
546537 switch ( payload . type ) {
547538 case storage . SaveOperationTag . INSERT :
548539 Metrics . getInstance ( ) . rows_replicated_total . add ( 1 ) ;
549- const record = common . toSQLiteRow ( payload . data ) ;
540+ const record = common . toSQLiteRow ( payload . data , payload . columns ) ;
550541 return await batch . save ( {
551542 tag : storage . SaveOperationTag . INSERT ,
552543 sourceTable : payload . sourceTable ,
0 commit comments