Skip to content

Commit c80c557

Browse files
feat: added wrapper connection implementation
1 parent 18dfc5d commit c80c557

File tree

5 files changed

+227
-16
lines changed

5 files changed

+227
-16
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ diesel = { git = "https://github.com/juspay/diesel.git", branch = "dynamic-schem
4545
"chrono",
4646
"uuid",
4747
"postgres_backend",
48+
"i-implement-a-third-party-backend-and-opt-into-breaking-changes"
4849
] }
4950
fred = { version = "9.2.1" }
5051
futures-util = "0.3.28"

crates/context_aware_config/src/api/default_config/handlers.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHel
1010
use jsonschema::{Draft, JSONSchema, ValidationError};
1111
use serde_json::Value;
1212
use service_utils::{
13+
db::types::ConnectionImpl,
1314
helpers::{parse_config_tags, validation_err_to_str},
1415
service::types::{AppHeader, AppState, CustomHeaders, DbConnection, SchemaName},
1516
};
@@ -317,38 +318,30 @@ fn fetch_default_key(
317318

318319
#[get("")]
319320
async fn get(
320-
db_conn: DbConnection,
321+
mut db_conn: ConnectionImpl,
321322
filters: Query<PaginationParams>,
322-
schema_name: SchemaName,
323323
) -> superposition::Result<Json<PaginatedResponse<DefaultConfig>>> {
324-
let DbConnection(mut conn) = db_conn;
325-
326324
if let Some(true) = filters.all {
327-
let result: Vec<DefaultConfig> = dsl::default_configs
328-
.schema_name(&schema_name)
329-
.get_results(&mut conn)?;
325+
let result: Vec<DefaultConfig> =
326+
dsl::default_configs.get_results(&mut db_conn)?;
330327
return Ok(Json(PaginatedResponse {
331328
total_pages: 1,
332329
total_items: result.len() as i64,
333330
data: result,
334331
}));
335332
}
336333

337-
let n_default_configs: i64 = dsl::default_configs
338-
.count()
339-
.schema_name(&schema_name)
340-
.get_result(&mut conn)?;
334+
let n_default_configs: i64 = dsl::default_configs.count().get_result(&mut db_conn)?;
341335
let limit = filters.count.unwrap_or(10);
342336
let mut builder = dsl::default_configs
343337
.order(dsl::created_at.desc())
344338
.limit(limit)
345-
.schema_name(&schema_name)
346339
.into_boxed();
347340
if let Some(page) = filters.page {
348341
let offset = (page - 1) * limit;
349342
builder = builder.offset(offset);
350343
}
351-
let result: Vec<DefaultConfig> = builder.load(&mut conn)?;
344+
let result: Vec<DefaultConfig> = builder.load(&mut db_conn)?;
352345
let total_pages = (n_default_configs as f64 / limit as f64).ceil() as i64;
353346
Ok(Json(PaginatedResponse {
354347
total_pages,

crates/service_utils/src/db.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ use diesel::{
44
};
55

66
pub mod utils;
7+
pub mod types;
78

89
pub type PgSchemaConnectionPool = Pool<ConnectionManager<PgConnection>>;
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
use actix_web::web::Data;
2+
use actix_web::{FromRequest, HttpMessage};
3+
use derive_more::{Deref, DerefMut};
4+
use diesel::connection::{
5+
AnsiTransactionManager, Connection, ConnectionSealed, DefaultLoadingMode,
6+
LoadConnection, SimpleConnection, TransactionManager,
7+
};
8+
use diesel::pg::{Pg, PgQueryBuilder};
9+
use diesel::r2d2::{ConnectionManager, PooledConnection};
10+
use diesel::PgConnection;
11+
use diesel::RunQueryDsl;
12+
13+
use crate::service::types::{AppState, SchemaName};
14+
15+
pub struct TransactionManagerImpl;
16+
17+
impl TransactionManager<ConnectionImpl> for TransactionManagerImpl {
18+
type TransactionStateData = <AnsiTransactionManager as TransactionManager<
19+
PgConnection,
20+
>>::TransactionStateData;
21+
22+
fn begin_transaction(conn: &mut ConnectionImpl) -> diesel::prelude::QueryResult<()> {
23+
AnsiTransactionManager::begin_transaction(&mut *conn.conn)?;
24+
let result = diesel::sql_query("SELECT set_config('search_path', $1, true)")
25+
.bind::<diesel::sql_types::Text, _>(&conn.namespace)
26+
.execute(&mut *conn.conn)?;
27+
log::info!("{:?}", result);
28+
Ok(())
29+
}
30+
31+
fn rollback_transaction(
32+
conn: &mut ConnectionImpl,
33+
) -> diesel::prelude::QueryResult<()> {
34+
AnsiTransactionManager::rollback_transaction(&mut *conn.conn)
35+
}
36+
37+
fn commit_transaction(conn: &mut ConnectionImpl) -> diesel::prelude::QueryResult<()> {
38+
AnsiTransactionManager::commit_transaction(&mut *conn.conn)
39+
}
40+
41+
fn transaction_manager_status_mut(
42+
conn: &mut ConnectionImpl,
43+
) -> &mut diesel::connection::TransactionManagerStatus {
44+
AnsiTransactionManager::transaction_manager_status_mut(&mut *conn.conn)
45+
}
46+
}
47+
48+
pub struct ConnectionImpl {
49+
namespace: String,
50+
conn: PooledConnection<ConnectionManager<PgConnection>>,
51+
}
52+
53+
impl ConnectionImpl {
54+
pub fn new(
55+
namespace: String,
56+
mut conn: PooledConnection<ConnectionManager<PgConnection>>,
57+
) -> Self {
58+
conn.set_prepared_statement_cache_size(diesel::connection::CacheSize::Disabled);
59+
ConnectionImpl { namespace, conn }
60+
}
61+
62+
pub fn set_namespace(&mut self, namespace: String) {
63+
self.namespace = namespace;
64+
}
65+
66+
pub fn from_request_override(
67+
req: &actix_web::HttpRequest,
68+
schema_name: String,
69+
) -> Result<Self, actix_web::Error> {
70+
let app_state = match req.app_data::<Data<AppState>>() {
71+
Some(state) => state,
72+
None => {
73+
log::info!(
74+
"DbConnection-FromRequest: Unable to get app_data from request"
75+
);
76+
return Err(actix_web::error::ErrorInternalServerError(""));
77+
}
78+
};
79+
80+
match app_state.db_pool.get() {
81+
Ok(conn) => Ok(ConnectionImpl::new(schema_name, conn)),
82+
Err(e) => {
83+
log::info!("Unable to get db connection from pool, error: {e}");
84+
Err(actix_web::error::ErrorInternalServerError(""))
85+
}
86+
}
87+
}
88+
}
89+
90+
impl ConnectionSealed for ConnectionImpl {}
91+
92+
impl SimpleConnection for ConnectionImpl {
93+
fn batch_execute(&mut self, query: &str) -> diesel::prelude::QueryResult<()> {
94+
self.conn.batch_execute(query)
95+
}
96+
}
97+
98+
impl Connection for ConnectionImpl {
99+
type Backend = Pg;
100+
type TransactionManager = TransactionManagerImpl;
101+
102+
// NOTE: this function will never be used, so namespace here doesn't matter
103+
fn establish(database_url: &str) -> diesel::prelude::ConnectionResult<Self> {
104+
let conn = PooledConnection::establish(database_url)?;
105+
Ok(ConnectionImpl {
106+
namespace: String::new(),
107+
conn,
108+
})
109+
}
110+
111+
fn execute_returning_count<T>(
112+
&mut self,
113+
source: &T,
114+
) -> diesel::prelude::QueryResult<usize>
115+
where
116+
T: diesel::query_builder::QueryFragment<Self::Backend>
117+
+ diesel::query_builder::QueryId,
118+
{
119+
log::info!("{:?}", source.to_sql(&mut PgQueryBuilder::default(), &Pg));
120+
self.transaction::<usize, diesel::result::Error, _>(|conn| {
121+
(*conn.conn).execute_returning_count(source)
122+
})
123+
}
124+
125+
fn transaction_state(&mut self,) -> &mut<Self::TransactionManager as diesel::connection::TransactionManager<Self>>::TransactionStateData{
126+
self.conn.transaction_state()
127+
}
128+
129+
fn set_prepared_statement_cache_size(&mut self, size: diesel::connection::CacheSize) {
130+
self.conn.set_prepared_statement_cache_size(size)
131+
}
132+
133+
fn set_instrumentation(
134+
&mut self,
135+
instrumentation: impl diesel::connection::Instrumentation,
136+
) {
137+
self.conn.set_instrumentation(instrumentation)
138+
}
139+
140+
fn instrumentation(&mut self) -> &mut dyn diesel::connection::Instrumentation {
141+
self.conn.instrumentation()
142+
}
143+
}
144+
145+
impl LoadConnection<DefaultLoadingMode> for ConnectionImpl {
146+
type Cursor<'conn, 'query> =
147+
<PgConnection as LoadConnection<DefaultLoadingMode>>::Cursor<'conn, 'query>;
148+
type Row<'conn, 'query> =
149+
<PgConnection as LoadConnection<DefaultLoadingMode>>::Row<'conn, 'query>;
150+
151+
fn load<'conn, 'query, T>(
152+
&'conn mut self,
153+
source: T,
154+
) -> diesel::prelude::QueryResult<Self::Cursor<'conn, 'query>>
155+
where
156+
T: diesel::query_builder::Query
157+
+ diesel::query_builder::QueryFragment<Self::Backend>
158+
+ diesel::query_builder::QueryId
159+
+ 'query,
160+
Self::Backend: diesel::expression::QueryMetadata<T::SqlType>,
161+
{
162+
self.transaction::<Self::Cursor<'conn, 'query>, diesel::result::Error, _>(
163+
|conn| {
164+
log::info!("{:?}", source.to_sql(&mut PgQueryBuilder::default(), &Pg));
165+
<PgConnection as LoadConnection<DefaultLoadingMode>>::load::<T>(
166+
&mut *conn.conn,
167+
source,
168+
)
169+
},
170+
)
171+
}
172+
}
173+
174+
impl FromRequest for ConnectionImpl {
175+
type Error = actix_web::Error;
176+
type Future = std::future::Ready<Result<ConnectionImpl, Self::Error>>;
177+
178+
fn from_request(
179+
req: &actix_web::HttpRequest,
180+
_: &mut actix_web::dev::Payload,
181+
) -> Self::Future {
182+
let schema_name = req.extensions().get::<SchemaName>().cloned().unwrap().0;
183+
std::future::ready(ConnectionImpl::from_request_override(req, schema_name))
184+
}
185+
}
186+
187+
#[derive(Deref, DerefMut)]
188+
pub struct PublicConnection(pub ConnectionImpl);
189+
impl FromRequest for PublicConnection {
190+
type Error = actix_web::Error;
191+
type Future = std::future::Ready<Result<PublicConnection, Self::Error>>;
192+
193+
fn from_request(
194+
req: &actix_web::HttpRequest,
195+
_: &mut actix_web::dev::Payload,
196+
) -> Self::Future {
197+
std::future::ready(
198+
ConnectionImpl::from_request_override(req, String::from("public"))
199+
.map(|conn| PublicConnection(conn)),
200+
)
201+
}
202+
}
203+
204+
#[derive(Deref, DerefMut)]
205+
pub struct SuperpositionConnection(pub ConnectionImpl);
206+
impl FromRequest for SuperpositionConnection {
207+
type Error = actix_web::Error;
208+
type Future = std::future::Ready<Result<SuperpositionConnection, Self::Error>>;
209+
210+
fn from_request(
211+
req: &actix_web::HttpRequest,
212+
_: &mut actix_web::dev::Payload,
213+
) -> Self::Future {
214+
std::future::ready(
215+
ConnectionImpl::from_request_override(req, String::from("superposition"))
216+
.map(|conn| SuperpositionConnection(conn)),
217+
)
218+
}
219+
}

crates/service_utils/src/service/types.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,6 @@ pub struct DbConnection(pub PooledConnection<ConnectionManager<PgConnection>>);
171171
impl FromRequest for DbConnection {
172172
type Error = Error;
173173
type Future = Ready<Result<DbConnection, Self::Error>>;
174-
175174
fn from_request(
176175
req: &actix_web::HttpRequest,
177176
_: &mut actix_web::dev::Payload,
@@ -185,15 +184,13 @@ impl FromRequest for DbConnection {
185184
return ready(Err(error::ErrorInternalServerError("")));
186185
}
187186
};
188-
189187
let result = match app_state.db_pool.get() {
190188
Ok(conn) => Ok(DbConnection(conn)),
191189
Err(e) => {
192190
log::info!("Unable to get db connection from pool, error: {e}");
193191
Err(error::ErrorInternalServerError(""))
194192
}
195193
};
196-
197194
ready(result)
198195
}
199196
}

0 commit comments

Comments
 (0)