@@ -103,6 +103,7 @@ export interface WatchOnChangeHandler {
103103
104104export interface PowerSyncDBListener extends StreamingSyncImplementationListener {
105105 initialized : ( ) => void ;
106+ schemaChanged : ( schema : Schema ) => void ;
106107}
107108
108109export interface PowerSyncCloseOptions {
@@ -359,6 +360,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
359360 }
360361 this . _schema = schema ;
361362 await this . database . execute ( 'SELECT powersync_replace_schema(?)' , [ JSON . stringify ( this . schema . toJSON ( ) ) ] ) ;
363+ this . iterateListeners ( ( cb ) => cb . schemaChanged ?.( schema ) ) ;
362364 }
363365
364366 /**
@@ -756,7 +758,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
756758 throw new Error ( 'onResult is required' ) ;
757759 }
758760
759- ( async ( ) => {
761+ const watchQuery = async ( abortSignal : AbortSignal ) => {
760762 try {
761763 const resolvedTables = await this . resolveTables ( sql , parameters , options ) ;
762764
@@ -778,13 +780,43 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
778780 } ,
779781 {
780782 ...( options ?? { } ) ,
781- tables : resolvedTables
783+ tables : resolvedTables ,
784+ // Override the abort signal since we intercept it
785+ signal : abortSignal
782786 }
783787 ) ;
784788 } catch ( error ) {
785789 onError ?.( error ) ;
786790 }
787- } ) ( ) ;
791+ } ;
792+
793+ const triggerWatchedQuery = ( ) => {
794+ const abortController = new AbortController ( ) ;
795+
796+ let disposeSchemaListener : ( ( ) => void ) | null = null ;
797+
798+ const stopWatching = ( ) => {
799+ abortController . abort ( 'Abort triggered' ) ;
800+ disposeSchemaListener ?.( ) ;
801+ disposeSchemaListener = null ;
802+ // Stop listening to upstream abort for this watch
803+ options ?. signal ?. removeEventListener ( 'abort' , stopWatching ) ;
804+ } ;
805+
806+ options ?. signal ?. addEventListener ( 'abort' , stopWatching ) ;
807+
808+ disposeSchemaListener = this . registerListener ( {
809+ schemaChanged : ( ) => {
810+ stopWatching ( ) ;
811+ // Re trigger the watched query (recursively)
812+ triggerWatchedQuery ( ) ;
813+ }
814+ } ) ;
815+
816+ return watchQuery ( abortController . signal ) ;
817+ } ;
818+
819+ triggerWatchedQuery ( ) ;
788820 }
789821
790822 /**
0 commit comments