Skip to content

Commit 99c4b66

Browse files
authored
fix: drop fdw state when callback failed (#516)
1 parent 36ce529 commit 99c4b66

File tree

2 files changed

+156
-51
lines changed

2 files changed

+156
-51
lines changed

supabase-wrappers/src/modify.rs

Lines changed: 74 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use super::utils;
2323
// Fdw private state for modify
2424
struct FdwModifyState<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> {
2525
// foreign data wrapper instance
26-
instance: W,
26+
instance: Option<W>,
2727

2828
// row id attribute number and type id
2929
rowid_name: String,
@@ -45,7 +45,7 @@ struct FdwModifyState<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> {
4545
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwModifyState<E, W> {
4646
unsafe fn new(foreigntableid: Oid, tmp_ctx: MemoryContext) -> Self {
4747
Self {
48-
instance: instance::create_fdw_instance_from_table_id(foreigntableid),
48+
instance: Some(instance::create_fdw_instance_from_table_id(foreigntableid)),
4949
rowid_name: String::default(),
5050
rowid_attno: 0,
5151
rowid_typid: Oid::INVALID,
@@ -58,28 +58,69 @@ impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwModifyState<E, W> {
5858
}
5959

6060
fn begin_modify(&mut self) -> Result<(), E> {
61-
self.instance.begin_modify(&self.opts)
61+
if let Some(ref mut instance) = self.instance {
62+
instance.begin_modify(&self.opts)
63+
} else {
64+
Ok(())
65+
}
6266
}
6367

6468
fn insert(&mut self, row: &Row) -> Result<(), E> {
65-
self.instance.insert(row)
69+
if let Some(ref mut instance) = self.instance {
70+
instance.insert(row)
71+
} else {
72+
Ok(())
73+
}
6674
}
6775

6876
fn update(&mut self, rowid: &Cell, new_row: &Row) -> Result<(), E> {
69-
self.instance.update(rowid, new_row)
77+
if let Some(ref mut instance) = self.instance {
78+
instance.update(rowid, new_row)
79+
} else {
80+
Ok(())
81+
}
7082
}
7183

7284
fn delete(&mut self, rowid: &Cell) -> Result<(), E> {
73-
self.instance.delete(rowid)
85+
if let Some(ref mut instance) = self.instance {
86+
instance.delete(rowid)
87+
} else {
88+
Ok(())
89+
}
7490
}
7591

7692
fn end_modify(&mut self) -> Result<(), E> {
77-
self.instance.end_modify()
93+
if let Some(ref mut instance) = self.instance {
94+
instance.end_modify()
95+
} else {
96+
Ok(())
97+
}
7898
}
7999
}
80100

81101
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> utils::SerdeList for FdwModifyState<E, W> {}
82102

103+
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> Drop for FdwModifyState<E, W> {
104+
fn drop(&mut self) {
105+
// drop foreign data wrapper instance
106+
self.instance.take();
107+
108+
// remove the allocated memory context
109+
unsafe {
110+
memctx::delete_wrappers_memctx(self.tmp_ctx);
111+
self.tmp_ctx = ptr::null::<MemoryContextData>() as _;
112+
}
113+
}
114+
}
115+
116+
// drop the modify state, so the inner fdw instance can be dropped too
117+
unsafe fn drop_fdw_modify_state<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
118+
fdw_state: *mut FdwModifyState<E, W>,
119+
) {
120+
let boxed_fdw_state = Box::from_raw(fdw_state);
121+
drop(boxed_fdw_state);
122+
}
123+
83124
// find rowid column in relation description
84125
unsafe fn find_rowid_column(
85126
target_relation: pg_sys::Relation,
@@ -291,7 +332,11 @@ pub(super) extern "C-unwind" fn begin_foreign_modify<
291332
state.rowid_attno =
292333
pg_sys::ExecFindJunkAttributeInTlist((*subplan).targetlist, rowid_name_c);
293334

294-
state.begin_modify().report_unwrap();
335+
let result = state.begin_modify();
336+
if result.is_err() {
337+
drop_fdw_modify_state(state.as_ptr());
338+
result.report_unwrap();
339+
}
295340

296341
(*rinfo).ri_FdwState = state.into_pg() as _;
297342
}
@@ -315,7 +360,12 @@ pub(super) extern "C-unwind" fn exec_foreign_insert<
315360

316361
PgMemoryContexts::For(state.tmp_ctx).switch_to(|_| {
317362
let row = utils::tuple_table_slot_to_row(slot);
318-
state.insert(&row).report_unwrap();
363+
let result = state.insert(&row);
364+
if result.is_err() {
365+
drop_fdw_modify_state(state.as_ptr());
366+
(*rinfo).ri_FdwState = ptr::null::<FdwModifyState<E, W>>() as _;
367+
result.report_unwrap();
368+
}
319369
});
320370
}
321371

@@ -350,7 +400,12 @@ pub(super) extern "C-unwind" fn exec_foreign_delete<
350400
PgMemoryContexts::For(state.tmp_ctx).switch_to(|_| {
351401
let cell = get_rowid_cell(&state, plan_slot);
352402
if let Some(rowid) = cell {
353-
state.delete(&rowid).report_unwrap();
403+
let result = state.delete(&rowid);
404+
if result.is_err() {
405+
drop_fdw_modify_state(state.as_ptr());
406+
(*rinfo).ri_FdwState = ptr::null::<FdwModifyState<E, W>>() as _;
407+
result.report_unwrap();
408+
}
354409
}
355410
});
356411
}
@@ -399,7 +454,12 @@ pub(super) extern "C-unwind" fn exec_foreign_update<
399454
}
400455
});
401456

402-
state.update(&rowid, &new_row).report_unwrap();
457+
let result = state.update(&rowid, &new_row);
458+
if result.is_err() {
459+
drop_fdw_modify_state(state.as_ptr());
460+
(*rinfo).ri_FdwState = ptr::null::<FdwModifyState<E, W>>() as _;
461+
result.report_unwrap();
462+
}
403463
}
404464
});
405465
}
@@ -426,16 +486,10 @@ pub(super) extern "C-unwind" fn end_foreign_modify<
426486
// here just to tell PgBox don't free the state, instead we will handle
427487
// drop the state by ourselves
428488
let mut state = PgBox::<FdwModifyState<E, W>>::from_pg(fdw_state);
429-
state.end_modify().report_unwrap();
430-
431-
// remove the allocated memory context
432-
memctx::delete_wrappers_memctx(state.tmp_ctx);
433-
state.tmp_ctx = ptr::null::<MemoryContextData>() as _;
434-
489+
let result = state.end_modify();
490+
drop_fdw_modify_state(state.as_ptr());
435491
(*rinfo).ri_FdwState = ptr::null::<FdwModifyState<E, W>>() as _;
436492

437-
// drop the scan state, so the fdw instance can be dropped too
438-
let boxed_fdw_state = Box::from_raw(fdw_state);
439-
drop(boxed_fdw_state);
493+
result.report_unwrap();
440494
}
441495
}

supabase-wrappers/src/scan.rs

Lines changed: 82 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::utils::{self, report_error, ReportableError, SerdeList};
2727
// Fdw private state for scan
2828
struct FdwState<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> {
2929
// foreign data wrapper instance
30-
instance: W,
30+
instance: Option<W>,
3131

3232
// query conditions
3333
quals: Vec<Qual>,
@@ -58,7 +58,7 @@ struct FdwState<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> {
5858
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwState<E, W> {
5959
unsafe fn new(foreigntableid: Oid, tmp_ctx: MemoryContext) -> Self {
6060
Self {
61-
instance: instance::create_fdw_instance_from_table_id(foreigntableid),
61+
instance: Some(instance::create_fdw_instance_from_table_id(foreigntableid)),
6262
quals: Vec::new(),
6363
tgts: Vec::new(),
6464
sorts: Vec::new(),
@@ -74,44 +74,85 @@ impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> FdwState<E, W> {
7474

7575
#[inline]
7676
fn get_rel_size(&mut self) -> Result<(i64, i32), E> {
77-
self.instance.get_rel_size(
78-
&self.quals,
79-
&self.tgts,
80-
&self.sorts,
81-
&self.limit,
82-
&self.opts,
83-
)
77+
if let Some(ref mut instance) = self.instance {
78+
instance.get_rel_size(
79+
&self.quals,
80+
&self.tgts,
81+
&self.sorts,
82+
&self.limit,
83+
&self.opts,
84+
)
85+
} else {
86+
Ok((0, 0))
87+
}
8488
}
8589

8690
#[inline]
8791
fn begin_scan(&mut self) -> Result<(), E> {
88-
self.instance.begin_scan(
89-
&self.quals,
90-
&self.tgts,
91-
&self.sorts,
92-
&self.limit,
93-
&self.opts,
94-
)
92+
if let Some(ref mut instance) = self.instance {
93+
instance.begin_scan(
94+
&self.quals,
95+
&self.tgts,
96+
&self.sorts,
97+
&self.limit,
98+
&self.opts,
99+
)
100+
} else {
101+
Ok(())
102+
}
95103
}
96104

97105
#[inline]
98106
fn iter_scan(&mut self) -> Result<Option<()>, E> {
99-
self.instance.iter_scan(&mut self.row)
107+
if let Some(ref mut instance) = self.instance {
108+
instance.iter_scan(&mut self.row)
109+
} else {
110+
Ok(None)
111+
}
100112
}
101113

102114
#[inline]
103115
fn re_scan(&mut self) -> Result<(), E> {
104-
self.instance.re_scan()
116+
if let Some(ref mut instance) = self.instance {
117+
instance.re_scan()
118+
} else {
119+
Ok(())
120+
}
105121
}
106122

107123
#[inline]
108124
fn end_scan(&mut self) -> Result<(), E> {
109-
self.instance.end_scan()
125+
if let Some(ref mut instance) = self.instance {
126+
instance.end_scan()
127+
} else {
128+
Ok(())
129+
}
110130
}
111131
}
112132

113133
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> utils::SerdeList for FdwState<E, W> {}
114134

135+
impl<E: Into<ErrorReport>, W: ForeignDataWrapper<E>> Drop for FdwState<E, W> {
136+
fn drop(&mut self) {
137+
// drop foreign data wrapper instance
138+
self.instance.take();
139+
140+
// remove the allocated memory context
141+
unsafe {
142+
memctx::delete_wrappers_memctx(self.tmp_ctx);
143+
self.tmp_ctx = ptr::null::<MemoryContextData>() as _;
144+
}
145+
}
146+
}
147+
148+
// drop the scan state, so the inner fdw instance can be dropped too
149+
unsafe fn drop_fdw_state<E: Into<ErrorReport>, W: ForeignDataWrapper<E>>(
150+
fdw_state: *mut FdwState<E, W>,
151+
) {
152+
let boxed_fdw_state = Box::from_raw(fdw_state);
153+
drop(boxed_fdw_state);
154+
}
155+
115156
#[pg_guard]
116157
pub(super) extern "C-unwind" fn get_foreign_rel_size<
117158
E: Into<ErrorReport>,
@@ -350,7 +391,12 @@ pub(super) extern "C-unwind" fn begin_foreign_scan<
350391

351392
// begin scan if it is not EXPLAIN statement
352393
if eflags & pg_sys::EXEC_FLAG_EXPLAIN_ONLY as c_int <= 0 {
353-
state.begin_scan().report_unwrap();
394+
let result = state.begin_scan();
395+
if result.is_err() {
396+
drop_fdw_state(state.as_ptr());
397+
(*plan).fdw_private = ptr::null::<FdwState<E, W>>() as _;
398+
result.report_unwrap();
399+
}
354400

355401
let rel = scan_state.ss_currentRelation;
356402
let tup_desc = (*rel).rd_att;
@@ -387,7 +433,13 @@ pub(super) extern "C-unwind" fn iterate_foreign_scan<
387433
polyfill::exec_clear_tuple(slot);
388434

389435
state.row.clear();
390-
if state.iter_scan().report_unwrap().is_some() {
436+
437+
let result = state.iter_scan();
438+
if result.is_err() {
439+
drop_fdw_state(state.as_ptr());
440+
(*node).fdw_state = ptr::null::<FdwState<E, W>>() as _;
441+
}
442+
if result.report_unwrap().is_some() {
391443
if state.row.cols.len() != state.tgts.len() {
392444
report_error(
393445
PgSqlErrorCode::ERRCODE_FDW_INVALID_COLUMN_NUMBER,
@@ -431,7 +483,12 @@ pub(super) extern "C-unwind" fn re_scan_foreign_scan<
431483
let fdw_state = (*node).fdw_state as *mut FdwState<E, W>;
432484
if !fdw_state.is_null() {
433485
let mut state = PgBox::<FdwState<E, W>>::from_pg(fdw_state);
434-
state.re_scan().report_unwrap();
486+
let result = state.re_scan();
487+
if result.is_err() {
488+
drop_fdw_state(state.as_ptr());
489+
(*node).fdw_state = ptr::null::<FdwState<E, W>>() as _;
490+
result.report_unwrap();
491+
}
435492
}
436493
}
437494
}
@@ -451,16 +508,10 @@ pub(super) extern "C-unwind" fn end_foreign_scan<E: Into<ErrorReport>, W: Foreig
451508
// here just to tell PgBox don't free the state, instead we will handle
452509
// drop the state by ourselves
453510
let mut state = PgBox::<FdwState<E, W>>::from_pg(fdw_state);
454-
state.end_scan().report_unwrap();
455-
456-
// remove the allocated memory context
457-
memctx::delete_wrappers_memctx(state.tmp_ctx);
458-
state.tmp_ctx = ptr::null::<MemoryContextData>() as _;
459-
511+
let result = state.end_scan();
512+
drop_fdw_state(state.as_ptr());
460513
(*node).fdw_state = ptr::null::<FdwState<E, W>>() as _;
461514

462-
// drop the scan state, so the fdw instance can be dropped too
463-
let boxed_fdw_state = Box::from_raw(fdw_state);
464-
drop(boxed_fdw_state);
515+
result.report_unwrap();
465516
}
466517
}

0 commit comments

Comments
 (0)