@@ -213,7 +213,7 @@ impl PerConnectionState {
213213 lock. remove ( & id. table_name ) ;
214214
215215 if lock. is_empty ( ) {
216- conn. preupdate_hook ( NO_HOOK ) ;
216+ conn. preupdate_hook ( NO_HOOK ) . expect ( "owned conn" ) ;
217217 }
218218 }
219219 } ) ;
@@ -224,59 +224,63 @@ impl PerConnectionState {
224224 let conn = api. conn ( ) . clone ( ) ;
225225 let state = self . clone ( ) ;
226226
227- api. conn ( ) . write_lock ( ) . preupdate_hook ( Some (
228- move |action : Action , db : & str , table_name : & str , case : & PreUpdateCase | {
229- let action: RecordAction = match action {
230- Action :: SQLITE_UPDATE | Action :: SQLITE_INSERT | Action :: SQLITE_DELETE => action. into ( ) ,
231- a => {
232- error ! ( "Unknown action: {a:?}" ) ;
227+ api
228+ . conn ( )
229+ . write_lock ( )
230+ . preupdate_hook ( Some (
231+ move |action : Action , db : & str , table_name : & str , case : & PreUpdateCase | {
232+ let action: RecordAction = match action {
233+ Action :: SQLITE_UPDATE | Action :: SQLITE_INSERT | Action :: SQLITE_DELETE => action. into ( ) ,
234+ a => {
235+ error ! ( "Unknown action: {a:?}" ) ;
236+ return ;
237+ }
238+ } ;
239+
240+ let Some ( rowid) = extract_row_id ( case) else {
241+ error ! ( "Failed to extract row id" ) ;
233242 return ;
234- }
235- } ;
243+ } ;
236244
237- let Some ( rowid ) = extract_row_id ( case ) else {
238- error ! ( "Failed to extract row id" ) ;
239- return ;
240- } ;
245+ let qualified_table_name = QualifiedName {
246+ name : table_name . to_string ( ) ,
247+ database_schema : Some ( db . to_string ( ) ) ,
248+ } ;
241249
242- let qualified_table_name = QualifiedName {
243- name : table_name. to_string ( ) ,
244- database_schema : Some ( db. to_string ( ) ) ,
245- } ;
250+ // If there are no matching subscriptions, skip.
251+ {
252+ let lock = state. subscriptions . read ( ) ;
253+ let Some ( subscriptions) = lock. get ( & qualified_table_name) . map ( |r| r. read ( ) ) else {
254+ return ;
255+ } ;
246256
247- // If there are no matching subscriptions, skip.
248- {
249- let lock = state. subscriptions . read ( ) ;
250- let Some ( subscriptions) = lock. get ( & qualified_table_name) . map ( |r| r. read ( ) ) else {
251- return ;
252- } ;
257+ if subscriptions. table . is_empty ( ) && !subscriptions. record . contains_key ( & rowid) {
258+ return ;
259+ }
260+ }
253261
254- if subscriptions. table . is_empty ( ) && !subscriptions. record . contains_key ( & rowid) {
262+ let Some ( record_values) = extract_record_values ( case) else {
263+ error ! ( "Failed to extract values" ) ;
255264 return ;
256- }
257- }
265+ } ;
258266
259- let Some ( record_values) = extract_record_values ( case) else {
260- error ! ( "Failed to extract values" ) ;
261- return ;
262- } ;
267+ let s = ContinuationState {
268+ state : state. clone ( ) ,
269+ table_name : qualified_table_name,
270+ action,
271+ rowid,
272+ record_values,
273+ } ;
263274
264- let s = ContinuationState {
265- state : state. clone ( ) ,
266- table_name : qualified_table_name,
267- action,
268- rowid,
269- record_values,
270- } ;
271-
272- // TODO: Optimization: in cases where there's only table-level access restrictions, we
273- // could avoid the continuation and even dispatch the subscription handling to a
274- // different thread entirely to take more work off the SQLite thread.
275- conn. call_and_forget ( move |conn| {
276- hook_continuation ( conn, s) ;
277- } ) ;
278- } ,
279- ) ) ;
275+ // TODO: Optimization: in cases where there's only table-level access restrictions, we
276+ // could avoid the continuation and even dispatch the subscription handling to a
277+ // different thread entirely to take more work off the SQLite thread.
278+ conn. call_and_forget ( move |conn| {
279+ hook_continuation ( conn, s) ;
280+ } ) ;
281+ } ,
282+ ) )
283+ . expect ( "owned conn" ) ;
280284 }
281285
282286 async fn add_record_subscription (
@@ -408,9 +412,9 @@ impl PerConnectionState {
408412impl Drop for PerConnectionState {
409413 fn drop ( & mut self ) {
410414 if let Some ( first) = self . record_apis . read ( ) . values ( ) . nth ( 0 ) {
411- first
412- . conn ( )
413- . call_and_forget ( |conn| conn . preupdate_hook ( NO_HOOK ) ) ;
415+ first. conn ( ) . call_and_forget ( |conn| {
416+ conn. preupdate_hook ( NO_HOOK ) . expect ( "owned conn" ) ;
417+ } ) ;
414418 }
415419 }
416420}
@@ -643,7 +647,7 @@ fn hook_continuation(conn: &rusqlite::Connection, s: ContinuationState) {
643647 let mut subscriptions = state. subscriptions . write ( ) ;
644648 subscriptions. remove ( & table_name) ;
645649 if subscriptions. is_empty ( ) {
646- conn. preupdate_hook ( NO_HOOK ) ;
650+ conn. preupdate_hook ( NO_HOOK ) . expect ( "owned conn" ) ;
647651 }
648652
649653 return ;
@@ -753,7 +757,7 @@ fn hook_continuation(conn: &rusqlite::Connection, s: ContinuationState) {
753757 if lock. get ( & table_name) . is_some_and ( |e| e. read ( ) . is_empty ( ) ) {
754758 lock. remove ( & table_name) ;
755759 if lock. is_empty ( ) {
756- conn. preupdate_hook ( NO_HOOK ) ;
760+ conn. preupdate_hook ( NO_HOOK ) . expect ( "owned conn" ) ;
757761 }
758762 }
759763 } ) ;
0 commit comments