Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions pgrx-tests/src/tests/spi_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,31 @@ mod tests {
})
}

#[pg_test]
fn two_cursors_at_the_same_time() -> Result<(), pgrx::spi::Error> {
Spi::connect(|client| {
let mut cursor1 = client.open_cursor("SELECT * FROM generate_series(1, 20)", None);
let mut cursor2 = client.open_cursor("SELECT * FROM generate_series(40, 60)", None);

let first_5 = cursor1.fetch(5)?;
let second_5 = cursor2.fetch(5)?;

let first_5 = Vec::from_iter(first_5.map(|row| row.get::<i32>(1)));
let second_5 = Vec::from_iter(second_5.map(|row| row.get::<i32>(1)));

assert_eq!(
first_5,
vec![Ok(Some(1)), Ok(Some(2)), Ok(Some(3)), Ok(Some(4)), Ok(Some(5))]
);
assert_eq!(
second_5,
vec![Ok(Some(40)), Ok(Some(41)), Ok(Some(42)), Ok(Some(43)), Ok(Some(44))]
);

Ok(())
})
}

#[pg_test]
fn test_cursor_by_name() -> Result<(), pgrx::spi::Error> {
let cursor_name = Spi::connect(|client| {
Expand Down
24 changes: 1 addition & 23 deletions pgrx/src/spi/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::ptr::NonNull;
use crate::pg_sys::{self, PgOid};
use crate::spi::{PreparedStatement, Query, Spi, SpiCursor, SpiError, SpiResult, SpiTupleTable};

#[derive(Debug)]
pub struct SpiClient {
// We need `SpiClient` to be publicly accessible but not constructable because we rely
// on it being properly constructed in order for its Drop impl, which calles `pg_sys::SPI_finish()`,
Expand Down Expand Up @@ -76,29 +77,6 @@ impl SpiClient {
query.execute(self, limit, args)
}

pub(super) fn prepare_tuple_table(
&self,
status_code: i32,
) -> std::result::Result<SpiTupleTable, SpiError> {
Ok(SpiTupleTable {
status_code: Spi::check_status(status_code)?,
// SAFETY: no concurrent access
table: unsafe { pg_sys::SPI_tuptable.as_mut()},
#[cfg(any(feature = "pg11", feature = "pg12"))]
size: unsafe { pg_sys::SPI_processed as usize },
#[cfg(not(any(feature = "pg11", feature = "pg12")))]
// SAFETY: no concurrent access
size: unsafe {
if pg_sys::SPI_tuptable.is_null() {
pg_sys::SPI_processed as usize
} else {
(*pg_sys::SPI_tuptable).numvals as usize
}
},
current: -1,
})
}

Comment on lines -79 to -101
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was moved over into the new SpiTupleTable::wrap() function. Doing so allows us to set its members private, which is much easier to reason about.

/// Set up a cursor that will execute the specified query
///
/// Rows may be then fetched using [`SpiCursor::fetch`].
Expand Down
12 changes: 4 additions & 8 deletions pgrx/src/spi/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::ptr::NonNull;

use crate::pg_sys;

use super::{SpiClient, SpiError, SpiOkCodes, SpiTupleTable};
use super::{SpiClient, SpiOkCodes, SpiResult, SpiTupleTable};

type CursorName = String;

Expand Down Expand Up @@ -67,18 +67,14 @@ pub struct SpiCursor<'client> {
pub(crate) client: &'client SpiClient,
}

impl SpiCursor<'_> {
impl<'client> SpiCursor<'client> {
/// Fetch up to `count` rows from the cursor, moving forward
///
/// If `fetch` runs off the end of the available rows, an empty [`SpiTupleTable`] is returned.
pub fn fetch(&mut self, count: libc::c_long) -> std::result::Result<SpiTupleTable, SpiError> {
// SAFETY: no concurrent access
unsafe {
pg_sys::SPI_tuptable = std::ptr::null_mut();
}
Comment on lines -74 to -78
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were a couple of these (when I suppose there "should" have been three!). It's not necessary, however. Postgres does this for us in its SPI_cursor_fetch and SPI_execute operation functions.

pub fn fetch(&mut self, count: libc::c_long) -> SpiResult<SpiTupleTable<'client>> {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 'client would have ended up being used here anyhow, but I wanted it to be explicit.

Note that SpiCursor::fetch() doesn't return anything that's owned by itself. It's returning a SpiTupleTable whose lifetime is governed by the active SpiClient.

Copy link

@workingjubilee workingjubilee Jul 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm feeling a bit suspicious about the lifetimes here: is SpiCursor itself even actually bound to the lifetime of the SpiClient? https://www.postgresql.org/docs/current/spi-spi-cursor-open.html

Not that we cannot or even should not use a shorter lifetime, but it feels unclear here if we are.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't quite gotten to thinking about cursor. It does seem right tho.

The results of SpiCursor::fetch() (the SpiTupleTable) are indeed tied to the active SpiClient.

And I think we need to assume the cursor (Portal) is too, unless SpiCursor::detach_to_name() is called. Then the magic happens inside Postgres to let it live longer.

I don't think you can read those docs as "the PortalData pointer returned by SPI_cursor_open can outlive the active SPI connection"

at least that's my understanding. I have not looked at the Postgres sources around this yet.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but detach_into_name is just allocating a String?

I don't mind if we handle the name-indexing case specially, mind, instead of using a lifetime binding for it, as it seems to be better for our sanity to handle it that way, I just want to figure out what's going on.

// SAFETY: SPI functions to create/find cursors fail via elog, so self.ptr is valid if we successfully set it
unsafe { pg_sys::SPI_cursor_fetch(self.ptr.as_mut(), true, count) }
Ok(self.client.prepare_tuple_table(SpiOkCodes::Fetch as i32)?)
SpiTupleTable::wrap(&self.client, SpiOkCodes::Fetch as i32)
}

/// Consume the cursor, returning its name
Expand Down
9 changes: 2 additions & 7 deletions pgrx/src/spi/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ impl<'client> Query<'client> for &str {
limit: Option<libc::c_long>,
arguments: Self::Arguments,
) -> SpiResult<SpiTupleTable<'client>> {
// SAFETY: no concurrent access
unsafe {
pg_sys::SPI_tuptable = std::ptr::null_mut();
}

let src = CString::new(self).expect("query contained a null byte");
let status_code = match arguments {
Some(args) => {
Expand Down Expand Up @@ -99,7 +94,7 @@ impl<'client> Query<'client> for &str {
},
};

Ok(client.prepare_tuple_table(status_code)?)
SpiTupleTable::wrap(client, status_code)
}

fn open_cursor(self, client: &'client SpiClient, args: Self::Arguments) -> SpiCursor<'client> {
Expand Down Expand Up @@ -238,7 +233,7 @@ impl<'client: 'stmt, 'stmt> Query<'client> for &'stmt PreparedStatement<'client>
)
};

Ok(client.prepare_tuple_table(status_code)?)
SpiTupleTable::wrap(client, status_code)
}

fn open_cursor(self, client: &'client SpiClient, args: Self::Arguments) -> SpiCursor<'client> {
Expand Down
70 changes: 61 additions & 9 deletions pgrx/src/spi/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,56 @@ use crate::memcxt::PgMemoryContexts;
use crate::pg_sys::panic::ErrorReportable;
use crate::pg_sys::{self, PgOid};
use crate::prelude::*;
use crate::spi::SpiClient;

use super::{SpiError, SpiErrorCodes, SpiOkCodes, SpiResult};
use super::{SpiError, SpiErrorCodes, SpiResult};

#[derive(Debug)]
pub struct SpiTupleTable<'client> {
#[allow(dead_code)]
pub(super) status_code: SpiOkCodes,
pub(super) table: Option<&'client mut pg_sys::SPITupleTable>,
pub(super) size: usize,
pub(super) current: isize,
// SpiTupleTable borrows global state setup by the active SpiClient. It doesn't use the client
// directly, but we need to make sure we don't outlive it, so here it is
_client: PhantomData<&'client SpiClient>,
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had first written this as _client: &'client SpiClient, but since we don't use it I don't suppose we need to hold a real reference to it.


// and this is that global state. In ::wrap(), this comes from whatever the current value of
// `pg_sys::SPI_tuptable` happens to be. Postgres may change where SPI_tuptable points
// throughout the lifetime of an active SpiClient, but it doesn't mutate (or deallocate) what
// it happens to point to This allows us to have multiple active SpiTupleTables
// within a Spi connection. Whatever this points to is freed via `pg_sys::SPI_freetuptable()`
// when we're dropped.
table: Option<NonNull<pg_sys::SPITupleTable>>,
size: usize,
current: isize,
}

impl<'client> SpiTupleTable<'client> {
/// Wraps the current global `pg_sys::SPI_tuptable` as a new [`SpiTupleTable`] instance, with
/// a lifetime tied to the specified [`SpiClient`].
pub(super) fn wrap(_client: &'client SpiClient, last_spi_status_code: i32) -> SpiResult<Self> {
Spi::check_status(last_spi_status_code)?;

unsafe {
//
// SAFETY: The unsafeness here is that we're accessing static globals. Fortunately,
// Postgres is not multi-threaded so we're okay to do this
//
Comment on lines +39 to +42

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nervous laughter

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future Eric is gonna love this comment, if this lands.


// different Postgres get the tuptable size different ways
#[cfg(any(feature = "pg11", feature = "pg12"))]
let size = pg_sys::SPI_processed as usize;

#[cfg(not(any(feature = "pg11", feature = "pg12")))]
let size = if pg_sys::SPI_tuptable.is_null() {
pg_sys::SPI_processed as usize
} else {
(*pg_sys::SPI_tuptable).numvals as usize
};

let tuptable = pg_sys::SPI_tuptable;

Ok(Self { _client: PhantomData, table: NonNull::new(tuptable), size, current: -1 })
}
}

/// `SpiTupleTable`s are positioned before the start, for iteration purposes.
///
/// This method moves the position to the first row. If there are no rows, this
Expand Down Expand Up @@ -76,9 +113,12 @@ impl<'client> SpiTupleTable<'client> {
fn get_spi_tuptable(
&self,
) -> SpiResult<(*mut pg_sys::SPITupleTable, *mut pg_sys::TupleDescData)> {
let table = self.table.as_deref().ok_or(SpiError::NoTupleTable)?;
// SAFETY: we just assured that `table` is not null
Ok((table as *const _ as *mut _, table.tupdesc))
let table = self.table.map(|table| table.as_ptr()).ok_or(SpiError::NoTupleTable)?;
let tupdesc = unsafe {
// SAFETY: we just assured that `table` is not null
table.as_mut().unwrap().tupdesc
};
Ok((table, tupdesc))
Comment on lines +116 to +121
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was just trying to make this function readable, esp since self.table looks different now.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it actually possible for this pointer to be NULL?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. It is possible for pg_sys::SPI_tuptable to be NULL at the end of a successful statement. There's some comments around that here:

pgrx/pgrx/src/spi/tuple.rs

Lines 126 to 130 in b474516

// a query like "SELECT 1 LIMIT 0" is a valid "select"-style query that will not produce
// a SPI_tuptable. So are utility queries such as "CREATE INDEX" or "VACUUM". We might
// think that in the latter cases we'd want to produce an error here, but there's no
// way to distinguish from the former. As such, we take a gentle approach and
// processed with "no, we don't have one, but it's okay"

Maybe they're in the wrong place.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, but I suppose... to actually answer your question, in the cases where this function is called, no, it shouldn't be possible for for self.table to be NULL. Sorry.

But I don't know that we can make that true such that we can elide the Option around the NotNull

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be an exceptional condition for it to be NULL -- made it should just panic in that case?

Not sure if you're asking because you see/know something I don't or just forcing me to think about it even more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just trying to figure out for myself if the Option is actually mandatory, here.

}

pub fn get_heap_tuple(&self) -> SpiResult<Option<SpiHeapTupleData<'client>>> {
Expand Down Expand Up @@ -298,6 +338,18 @@ impl<'client> Iterator for SpiTupleTable<'client> {
}
}

impl Drop for SpiTupleTable<'_> {
fn drop(&mut self) {
unsafe {
// SAFETY: self.table was created by Postgres from whatever `pg_sys::SPI_tuptable` pointed
// to at the time this SpiTupleTable was constructed
if let Some(ptr) = self.table.take() {
pg_sys::SPI_freetuptable(ptr.as_ptr())
}
}
}
}

Comment on lines +341 to +352
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've been leaking pg_sys::SPI_tuptable values from SPI_execute and SPI_cursor_fetch for the duration of Spi::connect(|client| ...). This cleans that up.

This addresses part of the last bit of this comment: pgcentralfoundation#1209 (comment)

/// Represents a single `pg_sys::Datum` inside a `SpiHeapTupleData`
pub struct SpiHeapTupleDataEntry<'client> {
datum: Option<pg_sys::Datum>,
Expand Down