Skip to content

Commit e14e1e4

Browse files
committed
Initial commit (untested) of Postgres date-time types
Signed-off-by: itowlson <[email protected]>
1 parent 1154682 commit e14e1e4

File tree

11 files changed

+646
-53
lines changed

11 files changed

+646
-53
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-outbound-pg/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ edition = { workspace = true }
66

77
[dependencies]
88
anyhow = { workspace = true }
9+
chrono = "0.4"
910
native-tls = "0.2"
1011
postgres-native-tls = "0.5"
1112
spin-core = { path = "../core" }
@@ -14,7 +15,7 @@ spin-factors = { path = "../factors" }
1415
spin-resource-table = { path = "../table" }
1516
spin-world = { path = "../world" }
1617
tokio = { workspace = true, features = ["rt-multi-thread"] }
17-
tokio-postgres = "0.7"
18+
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
1819
tracing = { workspace = true }
1920

2021
[dev-dependencies]

crates/factor-outbound-pg/src/client.rs

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ use anyhow::{anyhow, Result};
22
use native_tls::TlsConnector;
33
use postgres_native_tls::MakeTlsConnector;
44
use spin_world::async_trait;
5-
use spin_world::v2::postgres::{self as v2};
6-
use spin_world::v2::rdbms_types::{Column, DbDataType, DbValue, ParameterValue, RowSet};
5+
use spin_world::spin::postgres::rdbms_types::{
6+
self as v2, Column, DbDataType, DbValue, ParameterValue, RowSet,
7+
};
78
use tokio_postgres::types::Type;
89
use tokio_postgres::{config::SslMode, types::ToSql, Row};
910
use tokio_postgres::{Client as TokioClient, NoTls, Socket};
@@ -55,13 +56,18 @@ impl Client for TokioClient {
5556
statement: String,
5657
params: Vec<ParameterValue>,
5758
) -> Result<u64, v2::Error> {
58-
let params: Vec<&(dyn ToSql + Sync)> = params
59+
let params = params
5960
.iter()
6061
.map(to_sql_parameter)
6162
.collect::<Result<Vec<_>>>()
6263
.map_err(|e| v2::Error::ValueConversionFailed(format!("{:?}", e)))?;
6364

64-
self.execute(&statement, params.as_slice())
65+
let params_refs: Vec<&(dyn ToSql + Sync)> = params
66+
.iter()
67+
.map(|b| b.as_ref() as &(dyn ToSql + Sync))
68+
.collect();
69+
70+
self.execute(&statement, params_refs.as_slice())
6571
.await
6672
.map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))
6773
}
@@ -71,14 +77,19 @@ impl Client for TokioClient {
7177
statement: String,
7278
params: Vec<ParameterValue>,
7379
) -> Result<RowSet, v2::Error> {
74-
let params: Vec<&(dyn ToSql + Sync)> = params
80+
let params = params
7581
.iter()
7682
.map(to_sql_parameter)
7783
.collect::<Result<Vec<_>>>()
7884
.map_err(|e| v2::Error::BadParameter(format!("{:?}", e)))?;
7985

86+
let params_refs: Vec<&(dyn ToSql + Sync)> = params
87+
.iter()
88+
.map(|b| b.as_ref() as &(dyn ToSql + Sync))
89+
.collect();
90+
8091
let results = self
81-
.query(&statement, params.as_slice())
92+
.query(&statement, params_refs.as_slice())
8293
.await
8394
.map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))?;
8495

@@ -111,22 +122,47 @@ where
111122
});
112123
}
113124

114-
fn to_sql_parameter(value: &ParameterValue) -> Result<&(dyn ToSql + Sync)> {
125+
fn to_sql_parameter(value: &ParameterValue) -> Result<Box<dyn ToSql + Send + Sync>> {
115126
match value {
116-
ParameterValue::Boolean(v) => Ok(v),
117-
ParameterValue::Int32(v) => Ok(v),
118-
ParameterValue::Int64(v) => Ok(v),
119-
ParameterValue::Int8(v) => Ok(v),
120-
ParameterValue::Int16(v) => Ok(v),
121-
ParameterValue::Floating32(v) => Ok(v),
122-
ParameterValue::Floating64(v) => Ok(v),
127+
ParameterValue::Boolean(v) => Ok(Box::new(*v)),
128+
ParameterValue::Int32(v) => Ok(Box::new(*v)),
129+
ParameterValue::Int64(v) => Ok(Box::new(*v)),
130+
ParameterValue::Int8(v) => Ok(Box::new(*v)),
131+
ParameterValue::Int16(v) => Ok(Box::new(*v)),
132+
ParameterValue::Floating32(v) => Ok(Box::new(*v)),
133+
ParameterValue::Floating64(v) => Ok(Box::new(*v)),
123134
ParameterValue::Uint8(_)
124135
| ParameterValue::Uint16(_)
125136
| ParameterValue::Uint32(_)
126137
| ParameterValue::Uint64(_) => Err(anyhow!("Postgres does not support unsigned integers")),
127-
ParameterValue::Str(v) => Ok(v),
128-
ParameterValue::Binary(v) => Ok(v),
129-
ParameterValue::DbNull => Ok(&PgNull),
138+
ParameterValue::Str(v) => Ok(Box::new(v.clone())),
139+
ParameterValue::Binary(v) => Ok(Box::new(v.clone())),
140+
ParameterValue::Date((y, mon, d)) => {
141+
let naive_date = chrono::NaiveDate::from_ymd_opt(*y, (*mon).into(), (*d).into())
142+
.ok_or_else(|| anyhow!("invalid date y={y}, m={mon}, d={d}"))?;
143+
Ok(Box::new(naive_date))
144+
}
145+
ParameterValue::Time((h, min, s, ns)) => {
146+
let naive_time =
147+
chrono::NaiveTime::from_hms_nano_opt((*h).into(), (*min).into(), (*s).into(), *ns)
148+
.ok_or_else(|| anyhow!("invalid time {h}:{min}:{s}:{ns}"))?;
149+
Ok(Box::new(naive_time))
150+
}
151+
ParameterValue::Datetime((y, mon, d, h, min, s, ns)) => {
152+
let naive_date = chrono::NaiveDate::from_ymd_opt(*y, (*mon).into(), (*d).into())
153+
.ok_or_else(|| anyhow!("invalid date y={y}, m={mon}, d={d}"))?;
154+
let naive_time =
155+
chrono::NaiveTime::from_hms_nano_opt((*h).into(), (*min).into(), (*s).into(), *ns)
156+
.ok_or_else(|| anyhow!("invalid time {h}:{min}:{s}:{ns}"))?;
157+
let dt = chrono::NaiveDateTime::new(naive_date, naive_time);
158+
Ok(Box::new(dt))
159+
}
160+
ParameterValue::Timestamp(v) => {
161+
let ts = chrono::DateTime::<chrono::Utc>::from_timestamp(*v, 0)
162+
.ok_or_else(|| anyhow!("invalid epoch timestamp {v}"))?;
163+
Ok(Box::new(ts))
164+
}
165+
ParameterValue::DbNull => Ok(Box::new(PgNull)),
130166
}
131167
}
132168

crates/factor-outbound-pg/src/host.rs

Lines changed: 101 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use anyhow::Result;
22
use spin_core::{async_trait, wasmtime::component::Resource};
3+
use spin_world::spin::postgres::{self as v3};
34
use spin_world::v1::postgres as v1;
45
use spin_world::v1::rdbms_types as v1_types;
5-
use spin_world::v2::postgres::{self as v2, Connection};
6-
use spin_world::v2::rdbms_types;
7-
use spin_world::v2::rdbms_types::{ParameterValue, RowSet};
6+
use spin_world::v2::postgres::{self as v2};
7+
use spin_world::v2::rdbms_types as v2types;
88
use tracing::field::Empty;
99
use tracing::instrument;
1010
use tracing::Level;
@@ -13,21 +13,27 @@ use crate::client::Client;
1313
use crate::InstanceState;
1414

1515
impl<C: Client> InstanceState<C> {
16-
async fn open_connection(&mut self, address: &str) -> Result<Resource<Connection>, v2::Error> {
16+
async fn open_connection<Conn: 'static>(
17+
&mut self,
18+
address: &str,
19+
) -> Result<Resource<Conn>, v3::rdbms_types::Error> {
1720
self.connections
1821
.push(
1922
C::build_client(address)
2023
.await
21-
.map_err(|e| v2::Error::ConnectionFailed(format!("{e:?}")))?,
24+
.map_err(|e| v3::rdbms_types::Error::ConnectionFailed(format!("{e:?}")))?,
2225
)
23-
.map_err(|_| v2::Error::ConnectionFailed("too many connections".into()))
26+
.map_err(|_| v3::rdbms_types::Error::ConnectionFailed("too many connections".into()))
2427
.map(Resource::new_own)
2528
}
2629

27-
async fn get_client(&mut self, connection: Resource<Connection>) -> Result<&C, v2::Error> {
30+
async fn get_client<Conn: 'static>(
31+
&mut self,
32+
connection: Resource<Conn>,
33+
) -> Result<&C, v3::rdbms_types::Error> {
2834
self.connections
2935
.get(connection.rep())
30-
.ok_or_else(|| v2::Error::ConnectionFailed("no connection found".into()))
36+
.ok_or_else(|| v3::rdbms_types::Error::ConnectionFailed("no connection found".into()))
3137
}
3238

3339
async fn is_address_allowed(&self, address: &str) -> Result<bool> {
@@ -60,20 +66,29 @@ impl<C: Client> InstanceState<C> {
6066
}
6167

6268
#[async_trait]
63-
impl<C: Send + Sync + Client> v2::Host for InstanceState<C> {}
69+
impl<C: Send + Sync + Client> v3::postgres::Host for InstanceState<C> {}
70+
71+
fn v2_params_to_v3(params: Vec<v2types::ParameterValue>) -> Vec<v3::rdbms_types::ParameterValue> {
72+
params.into_iter().map(|p| p.into()).collect()
73+
}
6474

6575
#[async_trait]
66-
impl<C: Send + Sync + Client> v2::HostConnection for InstanceState<C> {
76+
impl<C: Send + Sync + Client> spin_world::spin::postgres::postgres::HostConnection
77+
for InstanceState<C>
78+
{
6779
#[instrument(name = "spin_outbound_pg.open", skip(self, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", db.address = Empty, server.port = Empty, db.namespace = Empty))]
68-
async fn open(&mut self, address: String) -> Result<Resource<Connection>, v2::Error> {
80+
async fn open(
81+
&mut self,
82+
address: String,
83+
) -> Result<Resource<v3::postgres::Connection>, v3::rdbms_types::Error> {
6984
spin_factor_outbound_networking::record_address_fields(&address);
7085

7186
if !self
7287
.is_address_allowed(&address)
7388
.await
74-
.map_err(|e| v2::Error::Other(e.to_string()))?
89+
.map_err(|e| v3::rdbms_types::Error::Other(e.to_string()))?
7590
{
76-
return Err(v2::Error::ConnectionFailed(format!(
91+
return Err(v3::rdbms_types::Error::ConnectionFailed(format!(
7792
"address {address} is not permitted"
7893
)));
7994
}
@@ -83,10 +98,10 @@ impl<C: Send + Sync + Client> v2::HostConnection for InstanceState<C> {
8398
#[instrument(name = "spin_outbound_pg.execute", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
8499
async fn execute(
85100
&mut self,
86-
connection: Resource<Connection>,
101+
connection: Resource<v3::postgres::Connection>,
87102
statement: String,
88-
params: Vec<ParameterValue>,
89-
) -> Result<u64, v2::Error> {
103+
params: Vec<v3::rdbms_types::ParameterValue>,
104+
) -> Result<u64, v3::rdbms_types::Error> {
90105
Ok(self
91106
.get_client(connection)
92107
.await?
@@ -97,33 +112,39 @@ impl<C: Send + Sync + Client> v2::HostConnection for InstanceState<C> {
97112
#[instrument(name = "spin_outbound_pg.query", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
98113
async fn query(
99114
&mut self,
100-
connection: Resource<Connection>,
115+
connection: Resource<v3::postgres::Connection>,
101116
statement: String,
102-
params: Vec<ParameterValue>,
103-
) -> Result<RowSet, v2::Error> {
117+
params: Vec<v3::rdbms_types::ParameterValue>,
118+
) -> Result<v3::rdbms_types::RowSet, v3::rdbms_types::Error> {
104119
Ok(self
105120
.get_client(connection)
106121
.await?
107122
.query(statement, params)
108123
.await?)
109124
}
110125

111-
async fn drop(&mut self, connection: Resource<Connection>) -> anyhow::Result<()> {
126+
async fn drop(&mut self, connection: Resource<v3::postgres::Connection>) -> anyhow::Result<()> {
112127
self.connections.remove(connection.rep());
113128
Ok(())
114129
}
115130
}
116131

117-
impl<C: Send> rdbms_types::Host for InstanceState<C> {
132+
impl<C: Send> v2types::Host for InstanceState<C> {
118133
fn convert_error(&mut self, error: v2::Error) -> Result<v2::Error> {
119134
Ok(error)
120135
}
121136
}
122137

123-
/// Delegate a function call to the v2::HostConnection implementation
138+
impl<C: Send> v3::rdbms_types::Host for InstanceState<C> {
139+
fn convert_error(&mut self, error: v3::rdbms_types::Error) -> Result<v3::rdbms_types::Error> {
140+
Ok(error)
141+
}
142+
}
143+
144+
/// Delegate a function call to the v3::HostConnection implementation
124145
macro_rules! delegate {
125146
($self:ident.$name:ident($address:expr, $($arg:expr),*)) => {{
126-
if !$self.is_address_allowed(&$address).await.map_err(|e| v2::Error::Other(e.to_string()))? {
147+
if !$self.is_address_allowed(&$address).await.map_err(|e| v3::rdbms_types::Error::Other(e.to_string()))? {
127148
return Err(v1::PgError::ConnectionFailed(format!(
128149
"address {} is not permitted", $address
129150
)));
@@ -132,12 +153,68 @@ macro_rules! delegate {
132153
Ok(c) => c,
133154
Err(e) => return Err(e.into()),
134155
};
135-
<Self as v2::HostConnection>::$name($self, connection, $($arg),*)
156+
<Self as v3::postgres::HostConnection>::$name($self, connection, $($arg),*)
136157
.await
137158
.map_err(|e| e.into())
138159
}};
139160
}
140161

162+
#[async_trait]
163+
impl<C: Send + Sync + Client> v2::Host for InstanceState<C> {}
164+
165+
#[async_trait]
166+
impl<C: Send + Sync + Client> v2::HostConnection for InstanceState<C> {
167+
#[instrument(name = "spin_outbound_pg.open", skip(self, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", db.address = Empty, server.port = Empty, db.namespace = Empty))]
168+
async fn open(&mut self, address: String) -> Result<Resource<v2::Connection>, v2::Error> {
169+
spin_factor_outbound_networking::record_address_fields(&address);
170+
171+
if !self
172+
.is_address_allowed(&address)
173+
.await
174+
.map_err(|e| v2::Error::Other(e.to_string()))?
175+
{
176+
return Err(v2::Error::ConnectionFailed(format!(
177+
"address {address} is not permitted"
178+
)));
179+
}
180+
Ok(self.open_connection(&address).await?)
181+
}
182+
183+
#[instrument(name = "spin_outbound_pg.execute", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
184+
async fn execute(
185+
&mut self,
186+
connection: Resource<v2::Connection>,
187+
statement: String,
188+
params: Vec<v2types::ParameterValue>,
189+
) -> Result<u64, v2::Error> {
190+
Ok(self
191+
.get_client(connection)
192+
.await?
193+
.execute(statement, v2_params_to_v3(params))
194+
.await?)
195+
}
196+
197+
#[instrument(name = "spin_outbound_pg.query", skip(self, connection, params), err(level = Level::INFO), fields(otel.kind = "client", db.system = "postgresql", otel.name = statement))]
198+
async fn query(
199+
&mut self,
200+
connection: Resource<v2::Connection>,
201+
statement: String,
202+
params: Vec<v2types::ParameterValue>,
203+
) -> Result<v2types::RowSet, v2::Error> {
204+
Ok(self
205+
.get_client(connection)
206+
.await?
207+
.query(statement, v2_params_to_v3(params))
208+
.await?
209+
.into())
210+
}
211+
212+
async fn drop(&mut self, connection: Resource<v2::Connection>) -> anyhow::Result<()> {
213+
self.connections.remove(connection.rep());
214+
Ok(())
215+
}
216+
}
217+
141218
#[async_trait]
142219
impl<C: Send + Sync + Client> v1::Host for InstanceState<C> {
143220
async fn execute(

crates/factor-outbound-pg/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ impl<C: Send + Sync + Client + 'static> Factor for OutboundPgFactor<C> {
2323
) -> anyhow::Result<()> {
2424
ctx.link_bindings(spin_world::v1::postgres::add_to_linker)?;
2525
ctx.link_bindings(spin_world::v2::postgres::add_to_linker)?;
26+
ctx.link_bindings(spin_world::spin::postgres::postgres::add_to_linker)?;
2627
Ok(())
2728
}
2829

crates/factor-outbound-pg/tests/factor_test.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ use spin_factor_variables::VariablesFactor;
66
use spin_factors::{anyhow, RuntimeFactors};
77
use spin_factors_test::{toml, TestEnvironment};
88
use spin_world::async_trait;
9-
use spin_world::v2::postgres::HostConnection;
10-
use spin_world::v2::postgres::{self as v2};
11-
use spin_world::v2::rdbms_types::Error as PgError;
12-
use spin_world::v2::rdbms_types::{ParameterValue, RowSet};
9+
use spin_world::spin::postgres::postgres::HostConnection;
10+
use spin_world::spin::postgres::postgres::{self as v2};
11+
use spin_world::spin::postgres::rdbms_types::Error as PgError;
12+
use spin_world::spin::postgres::rdbms_types::{ParameterValue, RowSet};
1313

1414
#[derive(RuntimeFactors)]
1515
struct TestFactors {

0 commit comments

Comments
 (0)