Skip to content

Commit b474516

Browse files
committed
Round 1 of cleaning up SpiTupleTable
- Create `SpiTupleTable::wrap(...)` to construct a new one from the global `pg_sys::SPI_tuptable`, update `SpiCursor` and `Query` impls accordingly. This gets rid of `SpiClient::prepare_tuple_table()`. - Make its members private to fix cross-module leakiness - When dropped, `SpiTupleTable` needs to free its internal `pg_sys::SPITupleTable` pointer Other changes: - Make the lifetime of what `SpiCursor::fetch()` returns explicit - There's no need to set `pg_sys::SPI_tuptable` to NULL before calling SPI_execute/SPI_cursor_fetch -- Postgres does that for us - add a test that passed before this work and still does
1 parent 3b20aa2 commit b474516

File tree

5 files changed

+93
-47
lines changed

5 files changed

+93
-47
lines changed

pgrx-tests/src/tests/spi_tests.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,31 @@ mod tests {
242242
})
243243
}
244244

245+
#[pg_test]
246+
fn two_cursors_at_the_same_time() -> Result<(), pgrx::spi::Error> {
247+
Spi::connect(|client| {
248+
let mut cursor1 = client.open_cursor("SELECT * FROM generate_series(1, 20)", None);
249+
let mut cursor2 = client.open_cursor("SELECT * FROM generate_series(40, 60)", None);
250+
251+
let first_5 = cursor1.fetch(5)?;
252+
let second_5 = cursor2.fetch(5)?;
253+
254+
let first_5 = Vec::from_iter(first_5.map(|row| row.get::<i32>(1)));
255+
let second_5 = Vec::from_iter(second_5.map(|row| row.get::<i32>(1)));
256+
257+
assert_eq!(
258+
first_5,
259+
vec![Ok(Some(1)), Ok(Some(2)), Ok(Some(3)), Ok(Some(4)), Ok(Some(5))]
260+
);
261+
assert_eq!(
262+
second_5,
263+
vec![Ok(Some(40)), Ok(Some(41)), Ok(Some(42)), Ok(Some(43)), Ok(Some(44))]
264+
);
265+
266+
Ok(())
267+
})
268+
}
269+
245270
#[pg_test]
246271
fn test_cursor_by_name() -> Result<(), pgrx::spi::Error> {
247272
let cursor_name = Spi::connect(|client| {

pgrx/src/spi/client.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::ptr::NonNull;
55
use crate::pg_sys::{self, PgOid};
66
use crate::spi::{PreparedStatement, Query, Spi, SpiCursor, SpiError, SpiResult, SpiTupleTable};
77

8+
#[derive(Debug)]
89
pub struct SpiClient {
910
// We need `SpiClient` to be publicly accessible but not constructable because we rely
1011
// on it being properly constructed in order for its Drop impl, which calles `pg_sys::SPI_finish()`,
@@ -76,29 +77,6 @@ impl SpiClient {
7677
query.execute(self, limit, args)
7778
}
7879

79-
pub(super) fn prepare_tuple_table(
80-
&self,
81-
status_code: i32,
82-
) -> std::result::Result<SpiTupleTable, SpiError> {
83-
Ok(SpiTupleTable {
84-
status_code: Spi::check_status(status_code)?,
85-
// SAFETY: no concurrent access
86-
table: unsafe { pg_sys::SPI_tuptable.as_mut()},
87-
#[cfg(any(feature = "pg11", feature = "pg12"))]
88-
size: unsafe { pg_sys::SPI_processed as usize },
89-
#[cfg(not(any(feature = "pg11", feature = "pg12")))]
90-
// SAFETY: no concurrent access
91-
size: unsafe {
92-
if pg_sys::SPI_tuptable.is_null() {
93-
pg_sys::SPI_processed as usize
94-
} else {
95-
(*pg_sys::SPI_tuptable).numvals as usize
96-
}
97-
},
98-
current: -1,
99-
})
100-
}
101-
10280
/// Set up a cursor that will execute the specified query
10381
///
10482
/// Rows may be then fetched using [`SpiCursor::fetch`].

pgrx/src/spi/cursor.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::ptr::NonNull;
33

44
use crate::pg_sys;
55

6-
use super::{SpiClient, SpiError, SpiOkCodes, SpiTupleTable};
6+
use super::{SpiClient, SpiOkCodes, SpiResult, SpiTupleTable};
77

88
type CursorName = String;
99

@@ -67,18 +67,14 @@ pub struct SpiCursor<'client> {
6767
pub(crate) client: &'client SpiClient,
6868
}
6969

70-
impl SpiCursor<'_> {
70+
impl<'client> SpiCursor<'client> {
7171
/// Fetch up to `count` rows from the cursor, moving forward
7272
///
7373
/// If `fetch` runs off the end of the available rows, an empty [`SpiTupleTable`] is returned.
74-
pub fn fetch(&mut self, count: libc::c_long) -> std::result::Result<SpiTupleTable, SpiError> {
75-
// SAFETY: no concurrent access
76-
unsafe {
77-
pg_sys::SPI_tuptable = std::ptr::null_mut();
78-
}
74+
pub fn fetch(&mut self, count: libc::c_long) -> SpiResult<SpiTupleTable<'client>> {
7975
// SAFETY: SPI functions to create/find cursors fail via elog, so self.ptr is valid if we successfully set it
8076
unsafe { pg_sys::SPI_cursor_fetch(self.ptr.as_mut(), true, count) }
81-
Ok(self.client.prepare_tuple_table(SpiOkCodes::Fetch as i32)?)
77+
SpiTupleTable::wrap(&self.client, SpiOkCodes::Fetch as i32)
8278
}
8379

8480
/// Consume the cursor, returning its name

pgrx/src/spi/query.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,6 @@ impl<'client> Query<'client> for &str {
6262
limit: Option<libc::c_long>,
6363
arguments: Self::Arguments,
6464
) -> SpiResult<SpiTupleTable<'client>> {
65-
// SAFETY: no concurrent access
66-
unsafe {
67-
pg_sys::SPI_tuptable = std::ptr::null_mut();
68-
}
69-
7065
let src = CString::new(self).expect("query contained a null byte");
7166
let status_code = match arguments {
7267
Some(args) => {
@@ -99,7 +94,7 @@ impl<'client> Query<'client> for &str {
9994
},
10095
};
10196

102-
Ok(client.prepare_tuple_table(status_code)?)
97+
SpiTupleTable::wrap(client, status_code)
10398
}
10499

105100
fn open_cursor(self, client: &'client SpiClient, args: Self::Arguments) -> SpiCursor<'client> {
@@ -238,7 +233,7 @@ impl<'client: 'stmt, 'stmt> Query<'client> for &'stmt PreparedStatement<'client>
238233
)
239234
};
240235

241-
Ok(client.prepare_tuple_table(status_code)?)
236+
SpiTupleTable::wrap(client, status_code)
242237
}
243238

244239
fn open_cursor(self, client: &'client SpiClient, args: Self::Arguments) -> SpiCursor<'client> {

pgrx/src/spi/tuple.rs

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,56 @@ use crate::memcxt::PgMemoryContexts;
88
use crate::pg_sys::panic::ErrorReportable;
99
use crate::pg_sys::{self, PgOid};
1010
use crate::prelude::*;
11+
use crate::spi::SpiClient;
1112

12-
use super::{SpiError, SpiErrorCodes, SpiOkCodes, SpiResult};
13+
use super::{SpiError, SpiErrorCodes, SpiResult};
1314

1415
#[derive(Debug)]
1516
pub struct SpiTupleTable<'client> {
16-
#[allow(dead_code)]
17-
pub(super) status_code: SpiOkCodes,
18-
pub(super) table: Option<&'client mut pg_sys::SPITupleTable>,
19-
pub(super) size: usize,
20-
pub(super) current: isize,
17+
// SpiTupleTable borrows global state setup by the active SpiClient. It doesn't use the client
18+
// directly, but we need to make sure we don't outlive it, so here it is
19+
_client: PhantomData<&'client SpiClient>,
20+
21+
// and this is that global state. In ::wrap(), this comes from whatever the current value of
22+
// `pg_sys::SPI_tuptable` happens to be. Postgres may change where SPI_tuptable points
23+
// throughout the lifetime of an active SpiClient, but it doesn't mutate (or deallocate) what
24+
// it happens to point to This allows us to have multiple active SpiTupleTables
25+
// within a Spi connection. Whatever this points to is freed via `pg_sys::SPI_freetuptable()`
26+
// when we're dropped.
27+
table: Option<NonNull<pg_sys::SPITupleTable>>,
28+
size: usize,
29+
current: isize,
2130
}
2231

2332
impl<'client> SpiTupleTable<'client> {
33+
/// Wraps the current global `pg_sys::SPI_tuptable` as a new [`SpiTupleTable`] instance, with
34+
/// a lifetime tied to the specified [`SpiClient`].
35+
pub(super) fn wrap(_client: &'client SpiClient, last_spi_status_code: i32) -> SpiResult<Self> {
36+
Spi::check_status(last_spi_status_code)?;
37+
38+
unsafe {
39+
//
40+
// SAFETY: The unsafeness here is that we're accessing static globals. Fortunately,
41+
// Postgres is not multi-threaded so we're okay to do this
42+
//
43+
44+
// different Postgres get the tuptable size different ways
45+
#[cfg(any(feature = "pg11", feature = "pg12"))]
46+
let size = pg_sys::SPI_processed as usize;
47+
48+
#[cfg(not(any(feature = "pg11", feature = "pg12")))]
49+
let size = if pg_sys::SPI_tuptable.is_null() {
50+
pg_sys::SPI_processed as usize
51+
} else {
52+
(*pg_sys::SPI_tuptable).numvals as usize
53+
};
54+
55+
let tuptable = pg_sys::SPI_tuptable;
56+
57+
Ok(Self { _client: PhantomData, table: NonNull::new(tuptable), size, current: -1 })
58+
}
59+
}
60+
2461
/// `SpiTupleTable`s are positioned before the start, for iteration purposes.
2562
///
2663
/// This method moves the position to the first row. If there are no rows, this
@@ -76,9 +113,12 @@ impl<'client> SpiTupleTable<'client> {
76113
fn get_spi_tuptable(
77114
&self,
78115
) -> SpiResult<(*mut pg_sys::SPITupleTable, *mut pg_sys::TupleDescData)> {
79-
let table = self.table.as_deref().ok_or(SpiError::NoTupleTable)?;
80-
// SAFETY: we just assured that `table` is not null
81-
Ok((table as *const _ as *mut _, table.tupdesc))
116+
let table = self.table.map(|table| table.as_ptr()).ok_or(SpiError::NoTupleTable)?;
117+
let tupdesc = unsafe {
118+
// SAFETY: we just assured that `table` is not null
119+
table.as_mut().unwrap().tupdesc
120+
};
121+
Ok((table, tupdesc))
82122
}
83123

84124
pub fn get_heap_tuple(&self) -> SpiResult<Option<SpiHeapTupleData<'client>>> {
@@ -298,6 +338,18 @@ impl<'client> Iterator for SpiTupleTable<'client> {
298338
}
299339
}
300340

341+
impl Drop for SpiTupleTable<'_> {
342+
fn drop(&mut self) {
343+
unsafe {
344+
// SAFETY: self.table was created by Postgres from whatever `pg_sys::SPI_tuptable` pointed
345+
// to at the time this SpiTupleTable was constructed
346+
if let Some(ptr) = self.table.take() {
347+
pg_sys::SPI_freetuptable(ptr.as_ptr())
348+
}
349+
}
350+
}
351+
}
352+
301353
/// Represents a single `pg_sys::Datum` inside a `SpiHeapTupleData`
302354
pub struct SpiHeapTupleDataEntry<'client> {
303355
datum: Option<pg_sys::Datum>,

0 commit comments

Comments
 (0)