Skip to content

Commit 7c85251

Browse files
committed
PostgreSQL UUID and JSONB support
Signed-off-by: itowlson <[email protected]>
1 parent 7f2a6bf commit 7c85251

File tree

14 files changed

+729
-133
lines changed

14 files changed

+729
-133
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-outbound-pg/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,16 @@ anyhow = { workspace = true }
99
chrono = { workspace = true }
1010
native-tls = "0.2"
1111
postgres-native-tls = "0.5"
12+
serde_json = { workspace = true }
1213
spin-core = { path = "../core" }
1314
spin-factor-outbound-networking = { path = "../factor-outbound-networking" }
1415
spin-factors = { path = "../factors" }
1516
spin-resource-table = { path = "../table" }
1617
spin-world = { path = "../world" }
1718
tokio = { workspace = true, features = ["rt-multi-thread"] }
18-
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
19+
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] }
1920
tracing = { workspace = true }
21+
uuid = "1"
2022

2123
[dev-dependencies]
2224
spin-factor-variables = { path = "../factor-variables" }

crates/factor-outbound-pg/src/client.rs

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use anyhow::{anyhow, Result};
1+
use anyhow::{anyhow, Context, Result};
22
use native_tls::TlsConnector;
33
use postgres_native_tls::MakeTlsConnector;
44
use spin_world::async_trait;
5-
use spin_world::spin::postgres::postgres::{
6-
self as v3, Column, DbDataType, DbValue, ParameterValue, RowSet,
5+
use spin_world::spin::postgres4_0_0::postgres::{
6+
self as v4, Column, DbDataType, DbValue, ParameterValue, RowSet,
77
};
88
use tokio_postgres::types::Type;
99
use tokio_postgres::{config::SslMode, types::ToSql, Row};
@@ -19,13 +19,13 @@ pub trait Client {
1919
&self,
2020
statement: String,
2121
params: Vec<ParameterValue>,
22-
) -> Result<u64, v3::Error>;
22+
) -> Result<u64, v4::Error>;
2323

2424
async fn query(
2525
&self,
2626
statement: String,
2727
params: Vec<ParameterValue>,
28-
) -> Result<RowSet, v3::Error>;
28+
) -> Result<RowSet, v4::Error>;
2929
}
3030

3131
#[async_trait]
@@ -55,12 +55,12 @@ impl Client for TokioClient {
5555
&self,
5656
statement: String,
5757
params: Vec<ParameterValue>,
58-
) -> Result<u64, v3::Error> {
58+
) -> Result<u64, v4::Error> {
5959
let params = params
6060
.iter()
6161
.map(to_sql_parameter)
6262
.collect::<Result<Vec<_>>>()
63-
.map_err(|e| v3::Error::ValueConversionFailed(format!("{e:?}")))?;
63+
.map_err(|e| v4::Error::ValueConversionFailed(format!("{e:?}")))?;
6464

6565
let params_refs: Vec<&(dyn ToSql + Sync)> = params
6666
.iter()
@@ -69,19 +69,19 @@ impl Client for TokioClient {
6969

7070
self.execute(&statement, params_refs.as_slice())
7171
.await
72-
.map_err(|e| v3::Error::QueryFailed(format!("{e:?}")))
72+
.map_err(|e| v4::Error::QueryFailed(format!("{e:?}")))
7373
}
7474

7575
async fn query(
7676
&self,
7777
statement: String,
7878
params: Vec<ParameterValue>,
79-
) -> Result<RowSet, v3::Error> {
79+
) -> Result<RowSet, v4::Error> {
8080
let params = params
8181
.iter()
8282
.map(to_sql_parameter)
8383
.collect::<Result<Vec<_>>>()
84-
.map_err(|e| v3::Error::BadParameter(format!("{e:?}")))?;
84+
.map_err(|e| v4::Error::BadParameter(format!("{e:?}")))?;
8585

8686
let params_refs: Vec<&(dyn ToSql + Sync)> = params
8787
.iter()
@@ -91,7 +91,7 @@ impl Client for TokioClient {
9191
let results = self
9292
.query(&statement, params_refs.as_slice())
9393
.await
94-
.map_err(|e| v3::Error::QueryFailed(format!("{e:?}")))?;
94+
.map_err(|e| v4::Error::QueryFailed(format!("{e:?}")))?;
9595

9696
if results.is_empty() {
9797
return Ok(RowSet {
@@ -105,7 +105,7 @@ impl Client for TokioClient {
105105
.iter()
106106
.map(convert_row)
107107
.collect::<Result<Vec<_>, _>>()
108-
.map_err(|e| v3::Error::QueryFailed(format!("{e:?}")))?;
108+
.map_err(|e| v4::Error::QueryFailed(format!("{e:?}")))?;
109109

110110
Ok(RowSet { columns, rows })
111111
}
@@ -158,6 +158,15 @@ fn to_sql_parameter(value: &ParameterValue) -> Result<Box<dyn ToSql + Send + Syn
158158
.ok_or_else(|| anyhow!("invalid epoch timestamp {v}"))?;
159159
Ok(Box::new(ts))
160160
}
161+
ParameterValue::Uuid(v) => {
162+
let u = uuid::Uuid::parse_str(v).with_context(|| format!("invalid UUID {v}"))?;
163+
Ok(Box::new(u))
164+
}
165+
ParameterValue::Jsonb(v) => {
166+
let j: serde_json::Value = serde_json::from_slice(v)
167+
.with_context(|| format!("invalid JSON {}", String::from_utf8_lossy(v)))?;
168+
Ok(Box::new(j))
169+
}
161170
ParameterValue::DbNull => Ok(Box::new(PgNull)),
162171
}
163172
}
@@ -190,6 +199,8 @@ fn convert_data_type(pg_type: &Type) -> DbDataType {
190199
Type::TIMESTAMP | Type::TIMESTAMPTZ => DbDataType::Timestamp,
191200
Type::DATE => DbDataType::Date,
192201
Type::TIME => DbDataType::Time,
202+
Type::UUID => DbDataType::Uuid,
203+
Type::JSONB => DbDataType::Jsonb,
193204
_ => {
194205
tracing::debug!("Couldn't convert Postgres type {} to WIT", pg_type.name(),);
195206
DbDataType::Other
@@ -285,6 +296,22 @@ fn convert_entry(row: &Row, index: usize) -> anyhow::Result<DbValue> {
285296
None => DbValue::DbNull,
286297
}
287298
}
299+
&Type::UUID => {
300+
let value: Option<uuid::Uuid> = row.try_get(index)?;
301+
match value {
302+
Some(v) => DbValue::Uuid(v.to_string()),
303+
None => DbValue::DbNull,
304+
}
305+
}
306+
&Type::JSONB => {
307+
let value: Option<serde_json::Value> = row.try_get(index)?;
308+
match value {
309+
Some(v) => {
310+
DbValue::Jsonb(serde_json::to_vec(&v).context("invalid JSON from database")?)
311+
}
312+
None => DbValue::DbNull,
313+
}
314+
}
288315
t => {
289316
tracing::debug!(
290317
"Couldn't convert Postgres type {} in column {}",

crates/factor-outbound-pg/src/host.rs

Lines changed: 78 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use anyhow::Result;
22
use spin_core::wasmtime::component::Resource;
3-
use spin_world::spin::postgres::postgres::{self as v3};
3+
use spin_world::spin::postgres3_0_0::postgres::{self as v3};
4+
use spin_world::spin::postgres4_0_0::postgres::{self as v4};
45
use spin_world::v1::postgres as v1;
56
use spin_world::v1::rdbms_types as v1_types;
67
use spin_world::v2::postgres::{self as v2};
@@ -16,24 +17,24 @@ impl<C: Client> InstanceState<C> {
1617
async fn open_connection<Conn: 'static>(
1718
&mut self,
1819
address: &str,
19-
) -> Result<Resource<Conn>, v3::Error> {
20+
) -> Result<Resource<Conn>, v4::Error> {
2021
self.connections
2122
.push(
2223
C::build_client(address)
2324
.await
24-
.map_err(|e| v3::Error::ConnectionFailed(format!("{e:?}")))?,
25+
.map_err(|e| v4::Error::ConnectionFailed(format!("{e:?}")))?,
2526
)
26-
.map_err(|_| v3::Error::ConnectionFailed("too many connections".into()))
27+
.map_err(|_| v4::Error::ConnectionFailed("too many connections".into()))
2728
.map(Resource::new_own)
2829
}
2930

3031
async fn get_client<Conn: 'static>(
3132
&mut self,
3233
connection: Resource<Conn>,
33-
) -> Result<&C, v3::Error> {
34+
) -> Result<&C, v4::Error> {
3435
self.connections
3536
.get(connection.rep())
36-
.ok_or_else(|| v3::Error::ConnectionFailed("no connection found".into()))
37+
.ok_or_else(|| v4::Error::ConnectionFailed("no connection found".into()))
3738
}
3839

3940
async fn is_address_allowed(&self, address: &str) -> Result<bool> {
@@ -67,11 +68,15 @@ impl<C: Client> InstanceState<C> {
6768

6869
fn v2_params_to_v3(
6970
params: Vec<v2_types::ParameterValue>,
70-
) -> Result<Vec<v3::ParameterValue>, v2::Error> {
71+
) -> Result<Vec<v4::ParameterValue>, v2::Error> {
7172
params.into_iter().map(|p| p.try_into()).collect()
7273
}
7374

74-
impl<C: Send + Sync + Client> spin_world::spin::postgres::postgres::HostConnection
75+
fn v3_params_to_v4(params: Vec<v3::ParameterValue>) -> Vec<v4::ParameterValue> {
76+
params.into_iter().map(|p| p.into()).collect()
77+
}
78+
79+
impl<C: Send + Sync + Client> spin_world::spin::postgres3_0_0::postgres::HostConnection
7580
for InstanceState<C>
7681
{
7782
#[instrument(name = "spin_outbound_pg.open", skip(self, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", db.address = Empty, server.port = Empty, db.namespace = Empty))]
@@ -87,7 +92,7 @@ impl<C: Send + Sync + Client> spin_world::spin::postgres::postgres::HostConnecti
8792
"address {address} is not permitted"
8893
)));
8994
}
90-
self.open_connection(&address).await
95+
Ok(self.open_connection(&address).await?)
9196
}
9297

9398
#[instrument(name = "spin_outbound_pg.execute", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
@@ -97,10 +102,11 @@ impl<C: Send + Sync + Client> spin_world::spin::postgres::postgres::HostConnecti
97102
statement: String,
98103
params: Vec<v3::ParameterValue>,
99104
) -> Result<u64, v3::Error> {
100-
self.get_client(connection)
105+
Ok(self
106+
.get_client(connection)
101107
.await?
102-
.execute(statement, params)
103-
.await
108+
.execute(statement, v3_params_to_v4(params))
109+
.await?)
104110
}
105111

106112
#[instrument(name = "spin_outbound_pg.query", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
@@ -110,13 +116,64 @@ impl<C: Send + Sync + Client> spin_world::spin::postgres::postgres::HostConnecti
110116
statement: String,
111117
params: Vec<v3::ParameterValue>,
112118
) -> Result<v3::RowSet, v3::Error> {
119+
Ok(self
120+
.get_client(connection)
121+
.await?
122+
.query(statement, v3_params_to_v4(params))
123+
.await?
124+
.into())
125+
}
126+
127+
async fn drop(&mut self, connection: Resource<v3::Connection>) -> anyhow::Result<()> {
128+
self.connections.remove(connection.rep());
129+
Ok(())
130+
}
131+
}
132+
133+
impl<C: Send + Sync + Client> v4::HostConnection for InstanceState<C> {
134+
#[instrument(name = "spin_outbound_pg.open", skip(self, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", db.address = Empty, server.port = Empty, db.namespace = Empty))]
135+
async fn open(&mut self, address: String) -> Result<Resource<v4::Connection>, v4::Error> {
136+
spin_factor_outbound_networking::record_address_fields(&address);
137+
138+
if !self
139+
.is_address_allowed(&address)
140+
.await
141+
.map_err(|e| v4::Error::Other(e.to_string()))?
142+
{
143+
return Err(v4::Error::ConnectionFailed(format!(
144+
"address {address} is not permitted"
145+
)));
146+
}
147+
self.open_connection(&address).await
148+
}
149+
150+
#[instrument(name = "spin_outbound_pg.execute", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
151+
async fn execute(
152+
&mut self,
153+
connection: Resource<v4::Connection>,
154+
statement: String,
155+
params: Vec<v4::ParameterValue>,
156+
) -> Result<u64, v4::Error> {
157+
self.get_client(connection)
158+
.await?
159+
.execute(statement, params)
160+
.await
161+
}
162+
163+
#[instrument(name = "spin_outbound_pg.query", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
164+
async fn query(
165+
&mut self,
166+
connection: Resource<v4::Connection>,
167+
statement: String,
168+
params: Vec<v4::ParameterValue>,
169+
) -> Result<v4::RowSet, v4::Error> {
113170
self.get_client(connection)
114171
.await?
115172
.query(statement, params)
116173
.await
117174
}
118175

119-
async fn drop(&mut self, connection: Resource<v3::Connection>) -> anyhow::Result<()> {
176+
async fn drop(&mut self, connection: Resource<v4::Connection>) -> anyhow::Result<()> {
120177
self.connections.remove(connection.rep());
121178
Ok(())
122179
}
@@ -134,10 +191,16 @@ impl<C: Send + Sync + Client> v3::Host for InstanceState<C> {
134191
}
135192
}
136193

194+
impl<C: Send + Sync + Client> v4::Host for InstanceState<C> {
195+
fn convert_error(&mut self, error: v4::Error) -> Result<v4::Error> {
196+
Ok(error)
197+
}
198+
}
199+
137200
/// Delegate a function call to the v3::HostConnection implementation
138201
macro_rules! delegate {
139202
($self:ident.$name:ident($address:expr, $($arg:expr),*)) => {{
140-
if !$self.is_address_allowed(&$address).await.map_err(|e| v3::Error::Other(e.to_string()))? {
203+
if !$self.is_address_allowed(&$address).await.map_err(|e| v4::Error::Other(e.to_string()))? {
141204
return Err(v1::PgError::ConnectionFailed(format!(
142205
"address {} is not permitted", $address
143206
)));
@@ -146,7 +209,7 @@ macro_rules! delegate {
146209
Ok(c) => c,
147210
Err(e) => return Err(e.into()),
148211
};
149-
<Self as v3::HostConnection>::$name($self, connection, $($arg),*)
212+
<Self as v4::HostConnection>::$name($self, connection, $($arg),*)
150213
.await
151214
.map_err(|e| e.into())
152215
}};

crates/factor-outbound-pg/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ impl<C: Send + Sync + Client + 'static> Factor for OutboundPgFactor<C> {
2424
ctx.link_bindings(spin_world::v1::postgres::add_to_linker::<_, FactorData<Self>>)?;
2525
ctx.link_bindings(spin_world::v2::postgres::add_to_linker::<_, FactorData<Self>>)?;
2626
ctx.link_bindings(
27-
spin_world::spin::postgres::postgres::add_to_linker::<_, FactorData<Self>>,
27+
spin_world::spin::postgres3_0_0::postgres::add_to_linker::<_, FactorData<Self>>,
28+
)?;
29+
ctx.link_bindings(
30+
spin_world::spin::postgres4_0_0::postgres::add_to_linker::<_, FactorData<Self>>,
2831
)?;
2932
Ok(())
3033
}

crates/factor-outbound-pg/tests/factor_test.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ use spin_factor_variables::VariablesFactor;
66
use spin_factors::{anyhow, RuntimeFactors};
77
use spin_factors_test::{toml, TestEnvironment};
88
use spin_world::async_trait;
9-
use spin_world::spin::postgres::postgres::Error as PgError;
10-
use spin_world::spin::postgres::postgres::HostConnection;
11-
use spin_world::spin::postgres::postgres::{self as v2};
12-
use spin_world::spin::postgres::postgres::{ParameterValue, RowSet};
9+
use spin_world::spin::postgres4_0_0::postgres::Error as PgError;
10+
use spin_world::spin::postgres4_0_0::postgres::HostConnection;
11+
use spin_world::spin::postgres4_0_0::postgres::{self as v2};
12+
use spin_world::spin::postgres4_0_0::postgres::{ParameterValue, RowSet};
1313

1414
#[derive(RuntimeFactors)]
1515
struct TestFactors {

0 commit comments

Comments
 (0)