Skip to content

Commit d1d8184

Browse files
authored
Merge pull request #1900 from fermyon/mysql-v2
Move mysql interface to resources
2 parents 7954a73 + 8d30055 commit d1d8184

File tree

10 files changed

+175
-87
lines changed

10 files changed

+175
-87
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/outbound-mysql/Cargo.toml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@ doctest = false
1111
anyhow = "1.0"
1212
flate2 = "1.0.17"
1313
# Removing default features for mysql_async to remove flate2/zlib feature
14-
mysql_async = { version = "0.32.2", default-features = false, features = ["native-tls-tls"] }
14+
mysql_async = { version = "0.32.2", default-features = false, features = [
15+
"native-tls-tls",
16+
] }
1517
# Removing default features for mysql_common to remove flate2/zlib feature
1618
mysql_common = { version = "0.30.6", default-features = false }
1719
spin-core = { path = "../core" }
1820
spin-world = { path = "../world" }
19-
tokio = { version = "1", features = [ "rt-multi-thread" ] }
20-
tracing = { version = "0.1", features = [ "log" ] }
21+
table = { path = "../table" }
22+
tokio = { version = "1", features = ["rt-multi-thread"] }
23+
tracing = { version = "0.1", features = ["log"] }
2124
url = "2.3.1"

crates/outbound-mysql/src/lib.rs

Lines changed: 121 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,28 @@
11
use anyhow::Result;
2-
pub use mysql::add_to_linker;
32
use mysql_async::{consts::ColumnType, from_value_opt, prelude::*, Opts, OptsBuilder, SslOpts};
3+
use spin_core::wasmtime::component::Resource;
44
use spin_core::{async_trait, HostComponent};
5-
use spin_world::v1::{
6-
mysql::{self, MysqlError},
7-
rdbms_types::{Column, DbDataType, DbValue, ParameterValue, RowSet},
8-
};
9-
use std::collections::HashMap;
5+
use spin_world::v1::mysql as v1;
6+
use spin_world::v1::rdbms_types::{Column, DbDataType, DbValue, ParameterValue, RowSet};
7+
use spin_world::v2::mysql::{self as v2, Connection};
108
use std::sync::Arc;
119
use url::Url;
1210

1311
/// A simple implementation to support outbound mysql connection
1412
#[derive(Default)]
1513
pub struct OutboundMysql {
16-
pub connections: HashMap<String, mysql_async::Conn>,
14+
pub connections: table::Table<mysql_async::Conn>,
15+
}
16+
17+
impl OutboundMysql {
18+
async fn get_conn(
19+
&mut self,
20+
connection: Resource<Connection>,
21+
) -> Result<&mut mysql_async::Conn, v2::Error> {
22+
self.connections
23+
.get_mut(connection.rep())
24+
.ok_or_else(|| v2::Error::ConnectionFailed("no connection found".into()))
25+
}
1726
}
1827

1928
impl HostComponent for OutboundMysql {
@@ -23,37 +32,48 @@ impl HostComponent for OutboundMysql {
2332
linker: &mut spin_core::Linker<T>,
2433
get: impl Fn(&mut spin_core::Data<T>) -> &mut Self::Data + Send + Sync + Copy + 'static,
2534
) -> anyhow::Result<()> {
26-
mysql::add_to_linker(linker, get)
35+
v2::add_to_linker(linker, get)?;
36+
v1::add_to_linker(linker, get)
2737
}
2838

2939
fn build_data(&self) -> Self::Data {
3040
Default::default()
3141
}
3242
}
3343

44+
impl v2::Host for OutboundMysql {}
45+
3446
#[async_trait]
35-
impl mysql::Host for OutboundMysql {
47+
impl v2::HostConnection for OutboundMysql {
48+
async fn open(&mut self, address: String) -> Result<Result<Resource<Connection>, v2::Error>> {
49+
Ok(async {
50+
self.connections
51+
.push(
52+
build_conn(&address)
53+
.await
54+
.map_err(|e| v2::Error::ConnectionFailed(format!("{e:?}")))?,
55+
)
56+
.map_err(|_| v2::Error::ConnectionFailed("too many connections".into()))
57+
.map(Resource::new_own)
58+
}
59+
.await)
60+
}
61+
3662
async fn execute(
3763
&mut self,
38-
address: String,
64+
connection: Resource<Connection>,
3965
statement: String,
4066
params: Vec<ParameterValue>,
41-
) -> Result<Result<(), MysqlError>> {
67+
) -> Result<Result<(), v2::Error>> {
4268
Ok(async {
43-
let db_params = params
44-
.iter()
45-
.map(to_sql_parameter)
46-
.collect::<anyhow::Result<Vec<_>>>()
47-
.map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?;
48-
69+
let db_params = params.into_iter().map(to_sql_parameter).collect::<Vec<_>>();
4970
let parameters = mysql_async::Params::Positional(db_params);
5071

51-
self.get_conn(&address)
52-
.await
53-
.map_err(|e| MysqlError::ConnectionFailed(format!("{:?}", e)))?
72+
self.get_conn(connection)
73+
.await?
5474
.exec_batch(&statement, &[parameters])
5575
.await
56-
.map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?;
76+
.map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))?;
5777

5878
Ok(())
5979
}
@@ -62,63 +82,105 @@ impl mysql::Host for OutboundMysql {
6282

6383
async fn query(
6484
&mut self,
65-
address: String,
85+
connection: Resource<Connection>,
6686
statement: String,
6787
params: Vec<ParameterValue>,
68-
) -> Result<Result<RowSet, MysqlError>> {
88+
) -> Result<Result<RowSet, v2::Error>> {
6989
Ok(async {
70-
let db_params = params
71-
.iter()
72-
.map(to_sql_parameter)
73-
.collect::<anyhow::Result<Vec<_>>>()
74-
.map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?;
75-
90+
let db_params = params.into_iter().map(to_sql_parameter).collect::<Vec<_>>();
7691
let parameters = mysql_async::Params::Positional(db_params);
7792

7893
let mut query_result = self
79-
.get_conn(&address)
80-
.await
81-
.map_err(|e| MysqlError::ConnectionFailed(format!("{:?}", e)))?
94+
.get_conn(connection)
95+
.await?
8296
.exec_iter(&statement, parameters)
8397
.await
84-
.map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?;
98+
.map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))?;
8599

86100
// We have to get these before collect() destroys them
87101
let columns = convert_columns(query_result.columns());
88102

89103
match query_result.collect::<mysql_async::Row>().await {
90-
Err(e) => Err(MysqlError::OtherError(format!("{:?}", e))),
104+
Err(e) => Err(v2::Error::Other(e.to_string())),
91105
Ok(result_set) => {
92106
let rows = result_set
93107
.into_iter()
94108
.map(|row| convert_row(row, &columns))
95-
.collect::<Result<Vec<_>, _>>()
96-
.map_err(|e| MysqlError::QueryFailed(format!("{:?}", e)))?;
109+
.collect::<Result<Vec<_>, _>>()?;
97110

98111
Ok(RowSet { columns, rows })
99112
}
100113
}
101114
}
102115
.await)
103116
}
117+
118+
fn drop(&mut self, connection: Resource<Connection>) -> Result<()> {
119+
self.connections.remove(connection.rep());
120+
Ok(())
121+
}
122+
}
123+
124+
/// Delegate a function call to the v2::HostConnection implementation
125+
macro_rules! delegate {
126+
($self:ident.$name:ident($address:expr, $($arg:expr),*)) => {{
127+
let connection = match <Self as v2::HostConnection>::open($self, $address).await? {
128+
Ok(c) => c,
129+
Err(e) => return Ok(Err(to_legacy_error(e))),
130+
};
131+
Ok(<Self as v2::HostConnection>::$name($self, connection, $($arg),*)
132+
.await?
133+
.map_err(|e| to_legacy_error(e)))
134+
}};
135+
}
136+
137+
#[async_trait]
138+
impl v1::Host for OutboundMysql {
139+
async fn execute(
140+
&mut self,
141+
address: String,
142+
statement: String,
143+
params: Vec<ParameterValue>,
144+
) -> Result<Result<(), v1::MysqlError>> {
145+
delegate!(self.execute(address, statement, params))
146+
}
147+
148+
async fn query(
149+
&mut self,
150+
address: String,
151+
statement: String,
152+
params: Vec<ParameterValue>,
153+
) -> Result<Result<RowSet, v1::MysqlError>> {
154+
delegate!(self.query(address, statement, params))
155+
}
156+
}
157+
158+
fn to_legacy_error(error: v2::Error) -> v1::MysqlError {
159+
match error {
160+
v2::Error::ConnectionFailed(e) => v1::MysqlError::ConnectionFailed(e),
161+
v2::Error::BadParameter(e) => v1::MysqlError::BadParameter(e),
162+
v2::Error::QueryFailed(e) => v1::MysqlError::QueryFailed(e),
163+
v2::Error::ValueConversionFailed(e) => v1::MysqlError::ValueConversionFailed(e),
164+
v2::Error::Other(e) => v1::MysqlError::OtherError(e),
165+
}
104166
}
105167

106-
fn to_sql_parameter(value: &ParameterValue) -> anyhow::Result<mysql_async::Value> {
168+
fn to_sql_parameter(value: ParameterValue) -> mysql_async::Value {
107169
match value {
108-
ParameterValue::Boolean(v) => Ok(mysql_async::Value::from(v)),
109-
ParameterValue::Int32(v) => Ok(mysql_async::Value::from(v)),
110-
ParameterValue::Int64(v) => Ok(mysql_async::Value::from(v)),
111-
ParameterValue::Int8(v) => Ok(mysql_async::Value::from(v)),
112-
ParameterValue::Int16(v) => Ok(mysql_async::Value::from(v)),
113-
ParameterValue::Floating32(v) => Ok(mysql_async::Value::from(v)),
114-
ParameterValue::Floating64(v) => Ok(mysql_async::Value::from(v)),
115-
ParameterValue::Uint8(v) => Ok(mysql_async::Value::from(v)),
116-
ParameterValue::Uint16(v) => Ok(mysql_async::Value::from(v)),
117-
ParameterValue::Uint32(v) => Ok(mysql_async::Value::from(v)),
118-
ParameterValue::Uint64(v) => Ok(mysql_async::Value::from(v)),
119-
ParameterValue::Str(v) => Ok(mysql_async::Value::from(v)),
120-
ParameterValue::Binary(v) => Ok(mysql_async::Value::from(v)),
121-
ParameterValue::DbNull => Ok(mysql_async::Value::NULL),
170+
ParameterValue::Boolean(v) => mysql_async::Value::from(v),
171+
ParameterValue::Int32(v) => mysql_async::Value::from(v),
172+
ParameterValue::Int64(v) => mysql_async::Value::from(v),
173+
ParameterValue::Int8(v) => mysql_async::Value::from(v),
174+
ParameterValue::Int16(v) => mysql_async::Value::from(v),
175+
ParameterValue::Floating32(v) => mysql_async::Value::from(v),
176+
ParameterValue::Floating64(v) => mysql_async::Value::from(v),
177+
ParameterValue::Uint8(v) => mysql_async::Value::from(v),
178+
ParameterValue::Uint16(v) => mysql_async::Value::from(v),
179+
ParameterValue::Uint32(v) => mysql_async::Value::from(v),
180+
ParameterValue::Uint64(v) => mysql_async::Value::from(v),
181+
ParameterValue::Str(v) => mysql_async::Value::from(v),
182+
ParameterValue::Binary(v) => mysql_async::Value::from(v),
183+
ParameterValue::DbNull => mysql_async::Value::NULL,
122184
}
123185
}
124186

@@ -130,7 +192,7 @@ fn convert_columns(columns: Option<Arc<[mysql_async::Column]>>) -> Vec<Column> {
130192
}
131193

132194
fn convert_column(column: &mysql_async::Column) -> Column {
133-
let name = column.name_str().to_string();
195+
let name = column.name_str().into_owned();
134196
let data_type = convert_data_type(column);
135197

136198
Column { name, data_type }
@@ -192,7 +254,7 @@ fn is_binary(column: &mysql_async::Column) -> bool {
192254
.contains(mysql_async::consts::ColumnFlags::BINARY_FLAG)
193255
}
194256

195-
fn convert_row(mut row: mysql_async::Row, columns: &[Column]) -> Result<Vec<DbValue>, MysqlError> {
257+
fn convert_row(mut row: mysql_async::Row, columns: &[Column]) -> Result<Vec<DbValue>, v2::Error> {
196258
let mut result = Vec::with_capacity(row.len());
197259
for index in 0..row.len() {
198260
result.push(convert_entry(&mut row, index, columns)?);
@@ -204,10 +266,10 @@ fn convert_entry(
204266
row: &mut mysql_async::Row,
205267
index: usize,
206268
columns: &[Column],
207-
) -> Result<DbValue, MysqlError> {
269+
) -> Result<DbValue, v2::Error> {
208270
match (row.take(index), columns.get(index)) {
209271
(None, _) => Ok(DbValue::DbNull), // TODO: is this right or is this an "index out of range" thing
210-
(_, None) => Err(MysqlError::OtherError(format!(
272+
(_, None) => Err(v2::Error::Other(format!(
211273
"Can't get column at index {}",
212274
index
213275
))),
@@ -216,7 +278,7 @@ fn convert_entry(
216278
}
217279
}
218280

219-
fn convert_value(value: mysql_async::Value, column: &Column) -> Result<DbValue, MysqlError> {
281+
fn convert_value(value: mysql_async::Value, column: &Column) -> Result<DbValue, v2::Error> {
220282
match column.data_type {
221283
DbDataType::Binary => convert_value_to::<Vec<u8>>(value).map(DbValue::Binary),
222284
DbDataType::Boolean => convert_value_to::<bool>(value).map(DbValue::Boolean),
@@ -231,23 +293,13 @@ fn convert_value(value: mysql_async::Value, column: &Column) -> Result<DbValue,
231293
DbDataType::Uint16 => convert_value_to::<u16>(value).map(DbValue::Uint16),
232294
DbDataType::Uint32 => convert_value_to::<u32>(value).map(DbValue::Uint32),
233295
DbDataType::Uint64 => convert_value_to::<u64>(value).map(DbValue::Uint64),
234-
DbDataType::Other => Err(MysqlError::ValueConversionFailed(format!(
296+
DbDataType::Other => Err(v2::Error::ValueConversionFailed(format!(
235297
"Cannot convert value {:?} in column {} data type {:?}",
236298
value, column.name, column.data_type
237299
))),
238300
}
239301
}
240302

241-
impl OutboundMysql {
242-
async fn get_conn(&mut self, address: &str) -> anyhow::Result<&mut mysql_async::Conn> {
243-
let client = match self.connections.entry(address.to_owned()) {
244-
std::collections::hash_map::Entry::Occupied(o) => o.into_mut(),
245-
std::collections::hash_map::Entry::Vacant(v) => v.insert(build_conn(address).await?),
246-
};
247-
Ok(client)
248-
}
249-
}
250-
251303
async fn build_conn(address: &str) -> Result<mysql_async::Conn, mysql_async::Error> {
252304
tracing::log::debug!("Build new connection: {}", address);
253305

@@ -295,8 +347,8 @@ fn build_opts(address: &str) -> Result<Opts, mysql_async::Error> {
295347
.into())
296348
}
297349

298-
fn convert_value_to<T: FromValue>(value: mysql_async::Value) -> Result<T, MysqlError> {
299-
from_value_opt::<T>(value).map_err(|e| MysqlError::ValueConversionFailed(format!("{}", e)))
350+
fn convert_value_to<T: FromValue>(value: mysql_async::Value) -> Result<T, v2::Error> {
351+
from_value_opt::<T>(value).map_err(|e| v2::Error::ValueConversionFailed(format!("{}", e)))
300352
}
301353

302354
#[cfg(test)]

crates/outbound-pg/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl v2::HostConnection for OutboundPg {
5656
.await
5757
.map_err(|e| v2::Error::ConnectionFailed(format!("{e:?}")))?,
5858
)
59-
.map_err(|_| v2::Error::Other("too many connections".into()))
59+
.map_err(|_| v2::Error::ConnectionFailed("too many connections".into()))
6060
.map(Resource::new_own)
6161
}
6262
.await)

0 commit comments

Comments
 (0)