@@ -89,8 +89,10 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
8989 readonly #changeStream: AsyncIterableStream < T > ;
9090 #error: FetchError | false = false ;
9191 #unsubscribe?: ( ) => void ;
92+ #isStreamClosed: boolean = false ;
9293
9394 stop ( ) {
95+ this . #isStreamClosed = true ;
9496 this . #unsubscribe?.( ) ;
9597 }
9698
@@ -101,52 +103,69 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
101103 const source = new ReadableStream < Message < T > [ ] > ( {
102104 start : ( controller ) => {
103105 this . #unsubscribe = this . #stream. subscribe (
104- ( messages ) => controller . enqueue ( messages ) ,
106+ ( messages ) => {
107+ if ( ! this . #isStreamClosed) {
108+ controller . enqueue ( messages ) ;
109+ }
110+ } ,
105111 this . #handleError. bind ( this )
106112 ) ;
107113 } ,
114+ cancel : ( ) => {
115+ this . #isStreamClosed = true ;
116+ this . #unsubscribe?.( ) ;
117+ }
108118 } ) ;
109119
110120 // Create the transformed stream that processes messages and emits complete rows
111121 this . #changeStream = createAsyncIterableStream ( source , {
112122 transform : ( messages , controller ) => {
113- const updatedKeys = new Set < string > ( ) ;
114-
115- for ( const message of messages ) {
116- if ( isChangeMessage ( message ) ) {
117- const key = message . key ;
118- switch ( message . headers . operation ) {
119- case "insert" : {
120- // New row entirely
121- this . #currentState. set ( key , message . value ) ;
122- updatedKeys . add ( key ) ;
123- break ;
123+ if ( this . #isStreamClosed) {
124+ return ;
125+ }
126+
127+ try {
128+ const updatedKeys = new Set < string > ( ) ;
129+
130+ for ( const message of messages ) {
131+ if ( isChangeMessage ( message ) ) {
132+ const key = message . key ;
133+ switch ( message . headers . operation ) {
134+ case "insert" : {
135+ this . #currentState. set ( key , message . value ) ;
136+ updatedKeys . add ( key ) ;
137+ break ;
138+ }
139+ case "update" : {
140+ const existingRow = this . #currentState. get ( key ) ;
141+ const updatedRow = existingRow
142+ ? { ...existingRow , ...message . value }
143+ : message . value ;
144+ this . #currentState. set ( key , updatedRow ) ;
145+ updatedKeys . add ( key ) ;
146+ break ;
147+ }
124148 }
125- case "update" : {
126- // Merge updates into existing row if any, otherwise treat as new
127- const existingRow = this . #currentState. get ( key ) ;
128- const updatedRow = existingRow
129- ? { ...existingRow , ...message . value }
130- : message . value ;
131- this . #currentState. set ( key , updatedRow ) ;
132- updatedKeys . add ( key ) ;
133- break ;
149+ } else if ( isControlMessage ( message ) ) {
150+ if ( message . headers . control === "must-refetch" ) {
151+ this . #currentState. clear ( ) ;
152+ this . #error = false ;
134153 }
135154 }
136- } else if ( isControlMessage ( message ) ) {
137- if ( message . headers . control === "must-refetch" ) {
138- this . #currentState. clear ( ) ;
139- this . #error = false ;
140- }
141155 }
142- }
143156
144- // Now enqueue only one updated row per key, after all messages have been processed.
145- for ( const key of updatedKeys ) {
146- const finalRow = this . #currentState. get ( key ) ;
147- if ( finalRow ) {
148- controller . enqueue ( finalRow ) ;
157+ // Now enqueue only one updated row per key, after all messages have been processed.
158+ if ( ! this . #isStreamClosed) {
159+ for ( const key of updatedKeys ) {
160+ const finalRow = this . #currentState. get ( key ) ;
161+ if ( finalRow ) {
162+ controller . enqueue ( finalRow ) ;
163+ }
164+ }
149165 }
166+ } catch ( error ) {
167+ console . error ( "Error processing stream messages:" , error ) ;
168+ this . #handleError( error as Error ) ;
150169 }
151170 } ,
152171 } ) ;
@@ -192,6 +211,8 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
192211 if ( e instanceof FetchError ) {
193212 this . #error = e ;
194213 }
214+ this . #isStreamClosed = true ;
215+ this . #unsubscribe?.( ) ;
195216 }
196217}
197218
0 commit comments