Skip to content

Commit e2d25df

Browse files
committed
PostgreSQL UUID and JSONB support
Signed-off-by: itowlson <[email protected]>
1 parent 0a26f5d commit e2d25df

File tree

14 files changed

+728
-132
lines changed

14 files changed

+728
-132
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-outbound-pg/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,16 @@ deadpool-postgres = { version = "0.14", features = ["rt_tokio_1"] }
1111
moka = { version = "0.12", features = ["sync"] }
1212
native-tls = "0.2"
1313
postgres-native-tls = "0.5"
14+
serde_json = { workspace = true }
1415
spin-core = { path = "../core" }
1516
spin-factor-outbound-networking = { path = "../factor-outbound-networking" }
1617
spin-factors = { path = "../factors" }
1718
spin-resource-table = { path = "../table" }
1819
spin-world = { path = "../world" }
1920
tokio = { workspace = true, features = ["rt-multi-thread"] }
20-
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
21+
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] }
2122
tracing = { workspace = true }
23+
uuid = "1"
2224

2325
[dev-dependencies]
2426
spin-factor-variables = { path = "../factor-variables" }

crates/factor-outbound-pg/src/client.rs

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use anyhow::{anyhow, Context, Result};
22
use native_tls::TlsConnector;
33
use postgres_native_tls::MakeTlsConnector;
44
use spin_world::async_trait;
5-
use spin_world::spin::postgres::postgres::{
6-
self as v3, Column, DbDataType, DbValue, ParameterValue, RowSet,
5+
use spin_world::spin::postgres4_0_0::postgres::{
6+
self as v4, Column, DbDataType, DbValue, ParameterValue, RowSet,
77
};
88
use tokio_postgres::types::Type;
99
use tokio_postgres::{config::SslMode, types::ToSql, NoTls, Row};
@@ -88,13 +88,13 @@ pub trait Client: Send + Sync + 'static {
8888
&self,
8989
statement: String,
9090
params: Vec<ParameterValue>,
91-
) -> Result<u64, v3::Error>;
91+
) -> Result<u64, v4::Error>;
9292

9393
async fn query(
9494
&self,
9595
statement: String,
9696
params: Vec<ParameterValue>,
97-
) -> Result<RowSet, v3::Error>;
97+
) -> Result<RowSet, v4::Error>;
9898
}
9999

100100
#[async_trait]
@@ -103,12 +103,12 @@ impl Client for deadpool_postgres::Object {
103103
&self,
104104
statement: String,
105105
params: Vec<ParameterValue>,
106-
) -> Result<u64, v3::Error> {
106+
) -> Result<u64, v4::Error> {
107107
let params = params
108108
.iter()
109109
.map(to_sql_parameter)
110110
.collect::<Result<Vec<_>>>()
111-
.map_err(|e| v3::Error::ValueConversionFailed(format!("{e:?}")))?;
111+
.map_err(|e| v4::Error::ValueConversionFailed(format!("{e:?}")))?;
112112

113113
let params_refs: Vec<&(dyn ToSql + Sync)> = params
114114
.iter()
@@ -118,19 +118,19 @@ impl Client for deadpool_postgres::Object {
118118
self.as_ref()
119119
.execute(&statement, params_refs.as_slice())
120120
.await
121-
.map_err(|e| v3::Error::QueryFailed(format!("{e:?}")))
121+
.map_err(|e| v4::Error::QueryFailed(format!("{e:?}")))
122122
}
123123

124124
async fn query(
125125
&self,
126126
statement: String,
127127
params: Vec<ParameterValue>,
128-
) -> Result<RowSet, v3::Error> {
128+
) -> Result<RowSet, v4::Error> {
129129
let params = params
130130
.iter()
131131
.map(to_sql_parameter)
132132
.collect::<Result<Vec<_>>>()
133-
.map_err(|e| v3::Error::BadParameter(format!("{e:?}")))?;
133+
.map_err(|e| v4::Error::BadParameter(format!("{e:?}")))?;
134134

135135
let params_refs: Vec<&(dyn ToSql + Sync)> = params
136136
.iter()
@@ -141,7 +141,7 @@ impl Client for deadpool_postgres::Object {
141141
.as_ref()
142142
.query(&statement, params_refs.as_slice())
143143
.await
144-
.map_err(|e| v3::Error::QueryFailed(format!("{e:?}")))?;
144+
.map_err(|e| v4::Error::QueryFailed(format!("{e:?}")))?;
145145

146146
if results.is_empty() {
147147
return Ok(RowSet {
@@ -155,7 +155,7 @@ impl Client for deadpool_postgres::Object {
155155
.iter()
156156
.map(convert_row)
157157
.collect::<Result<Vec<_>, _>>()
158-
.map_err(|e| v3::Error::QueryFailed(format!("{e:?}")))?;
158+
.map_err(|e| v4::Error::QueryFailed(format!("{e:?}")))?;
159159

160160
Ok(RowSet { columns, rows })
161161
}
@@ -197,6 +197,15 @@ fn to_sql_parameter(value: &ParameterValue) -> Result<Box<dyn ToSql + Send + Syn
197197
.ok_or_else(|| anyhow!("invalid epoch timestamp {v}"))?;
198198
Ok(Box::new(ts))
199199
}
200+
ParameterValue::Uuid(v) => {
201+
let u = uuid::Uuid::parse_str(v).with_context(|| format!("invalid UUID {v}"))?;
202+
Ok(Box::new(u))
203+
}
204+
ParameterValue::Jsonb(v) => {
205+
let j: serde_json::Value = serde_json::from_slice(v)
206+
.with_context(|| format!("invalid JSON {}", String::from_utf8_lossy(v)))?;
207+
Ok(Box::new(j))
208+
}
200209
ParameterValue::DbNull => Ok(Box::new(PgNull)),
201210
}
202211
}
@@ -229,6 +238,8 @@ fn convert_data_type(pg_type: &Type) -> DbDataType {
229238
Type::TIMESTAMP | Type::TIMESTAMPTZ => DbDataType::Timestamp,
230239
Type::DATE => DbDataType::Date,
231240
Type::TIME => DbDataType::Time,
241+
Type::UUID => DbDataType::Uuid,
242+
Type::JSONB => DbDataType::Jsonb,
232243
_ => {
233244
tracing::debug!("Couldn't convert Postgres type {} to WIT", pg_type.name(),);
234245
DbDataType::Other
@@ -324,6 +335,22 @@ fn convert_entry(row: &Row, index: usize) -> anyhow::Result<DbValue> {
324335
None => DbValue::DbNull,
325336
}
326337
}
338+
&Type::UUID => {
339+
let value: Option<uuid::Uuid> = row.try_get(index)?;
340+
match value {
341+
Some(v) => DbValue::Uuid(v.to_string()),
342+
None => DbValue::DbNull,
343+
}
344+
}
345+
&Type::JSONB => {
346+
let value: Option<serde_json::Value> = row.try_get(index)?;
347+
match value {
348+
Some(v) => {
349+
DbValue::Jsonb(serde_json::to_vec(&v).context("invalid JSON from database")?)
350+
}
351+
None => DbValue::DbNull,
352+
}
353+
}
327354
t => {
328355
tracing::debug!(
329356
"Couldn't convert Postgres type {} in column {}",

crates/factor-outbound-pg/src/host.rs

Lines changed: 78 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use anyhow::Result;
22
use spin_core::wasmtime::component::Resource;
3-
use spin_world::spin::postgres::postgres::{self as v3};
3+
use spin_world::spin::postgres3_0_0::postgres::{self as v3};
4+
use spin_world::spin::postgres4_0_0::postgres::{self as v4};
45
use spin_world::v1::postgres as v1;
56
use spin_world::v1::rdbms_types as v1_types;
67
use spin_world::v2::postgres::{self as v2};
@@ -16,25 +17,25 @@ impl<CF: ClientFactory> InstanceState<CF> {
1617
async fn open_connection<Conn: 'static>(
1718
&mut self,
1819
address: &str,
19-
) -> Result<Resource<Conn>, v3::Error> {
20+
) -> Result<Resource<Conn>, v4::Error> {
2021
self.connections
2122
.push(
2223
self.client_factory
2324
.get_client(address)
2425
.await
25-
.map_err(|e| v3::Error::ConnectionFailed(format!("{e:?}")))?,
26+
.map_err(|e| v4::Error::ConnectionFailed(format!("{e:?}")))?,
2627
)
27-
.map_err(|_| v3::Error::ConnectionFailed("too many connections".into()))
28+
.map_err(|_| v4::Error::ConnectionFailed("too many connections".into()))
2829
.map(Resource::new_own)
2930
}
3031

3132
async fn get_client<Conn: 'static>(
3233
&self,
3334
connection: Resource<Conn>,
34-
) -> Result<&CF::Client, v3::Error> {
35+
) -> Result<&CF::Client, v4::Error> {
3536
self.connections
3637
.get(connection.rep())
37-
.ok_or_else(|| v3::Error::ConnectionFailed("no connection found".into()))
38+
.ok_or_else(|| v4::Error::ConnectionFailed("no connection found".into()))
3839
}
3940

4041
async fn is_address_allowed(&self, address: &str) -> Result<bool> {
@@ -68,11 +69,15 @@ impl<CF: ClientFactory> InstanceState<CF> {
6869

6970
fn v2_params_to_v3(
7071
params: Vec<v2_types::ParameterValue>,
71-
) -> Result<Vec<v3::ParameterValue>, v2::Error> {
72+
) -> Result<Vec<v4::ParameterValue>, v2::Error> {
7273
params.into_iter().map(|p| p.try_into()).collect()
7374
}
7475

75-
impl<CF: ClientFactory> spin_world::spin::postgres::postgres::HostConnection for InstanceState<CF> {
76+
fn v3_params_to_v4(params: Vec<v3::ParameterValue>) -> Vec<v4::ParameterValue> {
77+
params.into_iter().map(|p| p.into()).collect()
78+
}
79+
80+
impl<CF: ClientFactory> v3::HostConnection for InstanceState<CF> {
7681
#[instrument(name = "spin_outbound_pg.open", skip(self, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", db.address = Empty, server.port = Empty, db.namespace = Empty))]
7782
async fn open(&mut self, address: String) -> Result<Resource<v3::Connection>, v3::Error> {
7883
spin_factor_outbound_networking::record_address_fields(&address);
@@ -86,7 +91,7 @@ impl<CF: ClientFactory> spin_world::spin::postgres::postgres::HostConnection for
8691
"address {address} is not permitted"
8792
)));
8893
}
89-
self.open_connection(&address).await
94+
Ok(self.open_connection(&address).await?)
9095
}
9196

9297
#[instrument(name = "spin_outbound_pg.execute", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
@@ -96,10 +101,11 @@ impl<CF: ClientFactory> spin_world::spin::postgres::postgres::HostConnection for
96101
statement: String,
97102
params: Vec<v3::ParameterValue>,
98103
) -> Result<u64, v3::Error> {
99-
self.get_client(connection)
104+
Ok(self
105+
.get_client(connection)
100106
.await?
101-
.execute(statement, params)
102-
.await
107+
.execute(statement, v3_params_to_v4(params))
108+
.await?)
103109
}
104110

105111
#[instrument(name = "spin_outbound_pg.query", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
@@ -109,13 +115,64 @@ impl<CF: ClientFactory> spin_world::spin::postgres::postgres::HostConnection for
109115
statement: String,
110116
params: Vec<v3::ParameterValue>,
111117
) -> Result<v3::RowSet, v3::Error> {
118+
Ok(self
119+
.get_client(connection)
120+
.await?
121+
.query(statement, v3_params_to_v4(params))
122+
.await?
123+
.into())
124+
}
125+
126+
async fn drop(&mut self, connection: Resource<v3::Connection>) -> anyhow::Result<()> {
127+
self.connections.remove(connection.rep());
128+
Ok(())
129+
}
130+
}
131+
132+
impl<CF: ClientFactory> v4::HostConnection for InstanceState<CF> {
133+
#[instrument(name = "spin_outbound_pg.open", skip(self, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", db.address = Empty, server.port = Empty, db.namespace = Empty))]
134+
async fn open(&mut self, address: String) -> Result<Resource<v4::Connection>, v4::Error> {
135+
spin_factor_outbound_networking::record_address_fields(&address);
136+
137+
if !self
138+
.is_address_allowed(&address)
139+
.await
140+
.map_err(|e| v4::Error::Other(e.to_string()))?
141+
{
142+
return Err(v4::Error::ConnectionFailed(format!(
143+
"address {address} is not permitted"
144+
)));
145+
}
146+
self.open_connection(&address).await
147+
}
148+
149+
#[instrument(name = "spin_outbound_pg.execute", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
150+
async fn execute(
151+
&mut self,
152+
connection: Resource<v4::Connection>,
153+
statement: String,
154+
params: Vec<v4::ParameterValue>,
155+
) -> Result<u64, v4::Error> {
156+
self.get_client(connection)
157+
.await?
158+
.execute(statement, params)
159+
.await
160+
}
161+
162+
#[instrument(name = "spin_outbound_pg.query", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
163+
async fn query(
164+
&mut self,
165+
connection: Resource<v4::Connection>,
166+
statement: String,
167+
params: Vec<v4::ParameterValue>,
168+
) -> Result<v4::RowSet, v4::Error> {
112169
self.get_client(connection)
113170
.await?
114171
.query(statement, params)
115172
.await
116173
}
117174

118-
async fn drop(&mut self, connection: Resource<v3::Connection>) -> anyhow::Result<()> {
175+
async fn drop(&mut self, connection: Resource<v4::Connection>) -> anyhow::Result<()> {
119176
self.connections.remove(connection.rep());
120177
Ok(())
121178
}
@@ -133,10 +190,16 @@ impl<CF: ClientFactory> v3::Host for InstanceState<CF> {
133190
}
134191
}
135192

193+
impl<CF: ClientFactory> v4::Host for InstanceState<CF> {
194+
fn convert_error(&mut self, error: v4::Error) -> Result<v4::Error> {
195+
Ok(error)
196+
}
197+
}
198+
136199
/// Delegate a function call to the v3::HostConnection implementation
137200
macro_rules! delegate {
138201
($self:ident.$name:ident($address:expr, $($arg:expr),*)) => {{
139-
if !$self.is_address_allowed(&$address).await.map_err(|e| v3::Error::Other(e.to_string()))? {
202+
if !$self.is_address_allowed(&$address).await.map_err(|e| v4::Error::Other(e.to_string()))? {
140203
return Err(v1::PgError::ConnectionFailed(format!(
141204
"address {} is not permitted", $address
142205
)));
@@ -145,7 +208,7 @@ macro_rules! delegate {
145208
Ok(c) => c,
146209
Err(e) => return Err(e.into()),
147210
};
148-
<Self as v3::HostConnection>::$name($self, connection, $($arg),*)
211+
<Self as v4::HostConnection>::$name($self, connection, $($arg),*)
149212
.await
150213
.map_err(|e| e.into())
151214
}};

crates/factor-outbound-pg/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ impl<CF: ClientFactory> Factor for OutboundPgFactor<CF> {
2525
ctx.link_bindings(spin_world::v1::postgres::add_to_linker::<_, FactorData<Self>>)?;
2626
ctx.link_bindings(spin_world::v2::postgres::add_to_linker::<_, FactorData<Self>>)?;
2727
ctx.link_bindings(
28-
spin_world::spin::postgres::postgres::add_to_linker::<_, FactorData<Self>>,
28+
spin_world::spin::postgres3_0_0::postgres::add_to_linker::<_, FactorData<Self>>,
29+
)?;
30+
ctx.link_bindings(
31+
spin_world::spin::postgres4_0_0::postgres::add_to_linker::<_, FactorData<Self>>,
2932
)?;
3033
Ok(())
3134
}

crates/factor-outbound-pg/tests/factor_test.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ use spin_factor_variables::VariablesFactor;
77
use spin_factors::{anyhow, RuntimeFactors};
88
use spin_factors_test::{toml, TestEnvironment};
99
use spin_world::async_trait;
10-
use spin_world::spin::postgres::postgres::Error as PgError;
11-
use spin_world::spin::postgres::postgres::HostConnection;
12-
use spin_world::spin::postgres::postgres::{self as v2};
13-
use spin_world::spin::postgres::postgres::{ParameterValue, RowSet};
10+
use spin_world::spin::postgres4_0_0::postgres::Error as PgError;
11+
use spin_world::spin::postgres4_0_0::postgres::HostConnection;
12+
use spin_world::spin::postgres4_0_0::postgres::{self as v2};
13+
use spin_world::spin::postgres4_0_0::postgres::{ParameterValue, RowSet};
1414

1515
#[derive(RuntimeFactors)]
1616
struct TestFactors {

0 commit comments

Comments
 (0)