@@ -25,8 +25,11 @@ import {
2525 CompatibilityContext ,
2626 DatabaseInputRow ,
2727 SqliteInputRow ,
28+ SqliteInputValue ,
29+ SqliteRow ,
2830 SqlSyncRules ,
2931 TablePattern ,
32+ ToastableSqliteRow ,
3033 toSyncRulesRow
3134} from '@powersync/service-sync-rules' ;
3235
@@ -635,7 +638,8 @@ WHERE oid = $1::regclass`,
635638 hasRemainingData = true ;
636639 }
637640
638- for ( const record of WalStream . getQueryData ( rows ) ) {
641+ for ( const inputRecord of WalStream . getQueryData ( rows ) ) {
642+ const record = this . syncRulesRecord ( inputRecord ) ;
639643 // This auto-flushes when the batch reaches its size limit
640644 await batch . save ( {
641645 tag : storage . SaveOperationTag . INSERT ,
@@ -787,6 +791,20 @@ WHERE oid = $1::regclass`,
787791 return table ;
788792 }
789793
794+ private syncRulesRecord ( row : SqliteInputRow ) : SqliteRow ;
795+ private syncRulesRecord ( row : SqliteInputRow | undefined ) : SqliteRow | undefined ;
796+
797+ private syncRulesRecord ( row : SqliteInputRow | undefined ) : SqliteRow | undefined {
798+ if ( row == null ) {
799+ return undefined ;
800+ }
801+ return this . sync_rules . applyRowContext < never > ( row ) ;
802+ }
803+
804+ private toastableSyncRulesRecord ( row : ToastableSqliteRow < SqliteInputValue > ) : ToastableSqliteRow {
805+ return this . sync_rules . applyRowContext ( row ) ;
806+ }
807+
790808 async writeChange (
791809 batch : storage . BucketStorageBatch ,
792810 msg : pgwire . PgoutputMessage
@@ -803,7 +821,7 @@ WHERE oid = $1::regclass`,
803821
804822 if ( msg . tag == 'insert' ) {
805823 this . metrics . getCounter ( ReplicationMetric . ROWS_REPLICATED ) . add ( 1 ) ;
806- const baseRecord = this . connections . types . constructAfterRecord ( msg ) ;
824+ const baseRecord = this . syncRulesRecord ( this . connections . types . constructAfterRecord ( msg ) ) ;
807825 return await batch . save ( {
808826 tag : storage . SaveOperationTag . INSERT ,
809827 sourceTable : table ,
@@ -816,8 +834,8 @@ WHERE oid = $1::regclass`,
816834 this . metrics . getCounter ( ReplicationMetric . ROWS_REPLICATED ) . add ( 1 ) ;
817835 // "before" may be null if the replica id columns are unchanged
818836 // It's fine to treat that the same as an insert.
819- const before = this . connections . types . constructBeforeRecord ( msg ) ;
820- const after = this . connections . types . constructAfterRecord ( msg ) ;
837+ const before = this . syncRulesRecord ( this . connections . types . constructBeforeRecord ( msg ) ) ;
838+ const after = this . toastableSyncRulesRecord ( this . connections . types . constructAfterRecord ( msg ) ) ;
821839 return await batch . save ( {
822840 tag : storage . SaveOperationTag . UPDATE ,
823841 sourceTable : table ,
@@ -828,7 +846,7 @@ WHERE oid = $1::regclass`,
828846 } ) ;
829847 } else if ( msg . tag == 'delete' ) {
830848 this . metrics . getCounter ( ReplicationMetric . ROWS_REPLICATED ) . add ( 1 ) ;
831- const before = this . connections . types . constructBeforeRecord ( msg ) ! ;
849+ const before = this . syncRulesRecord ( this . connections . types . constructBeforeRecord ( msg ) ! ) ;
832850
833851 return await batch . save ( {
834852 tag : storage . SaveOperationTag . DELETE ,
0 commit comments