11use database_strategy:: { DatabaseStrategy , QueryParams } ;
2+
23use devup_migrate:: generate_revision_sql:: GenerateRevisionActionSqlResult ;
34use devup_migrate:: get_all_revisions;
45use devup_migrate:: revision:: DevupSqlRevision ;
56use devup_migrate:: {
67 generate:: generate_models_from_revisions, generate_revision_sql:: generate_revision_action_sql,
78} ;
9+ use inflections:: case:: to_snake_case;
810use model:: { DevupModel , get_all_models} ;
9- use sqlx:: query;
11+ use serde_json:: Value ;
12+ use sqlx:: any:: AnyTransactionManager ;
13+ use sqlx:: { AnyConnection , TransactionManager , query} ;
1014use std:: time:: Duration ;
1115
1216use devup_database_diff:: diff_models;
@@ -160,7 +164,7 @@ impl DevupSqlPool {
160164 . execute ( & mut * tx)
161165 . await ?;
162166
163- self . commit ( tx) . await ?;
167+ self . commit ( & mut * tx) . await ?;
164168 return Ok ( ( ) ) ;
165169 }
166170
@@ -224,7 +228,7 @@ impl DevupSqlPool {
224228 . bind ( target_revision)
225229 . execute ( & mut * tx)
226230 . await ?;
227- self . commit ( tx) . await ?;
231+ self . commit ( & mut * tx) . await ?;
228232 Ok ( ( ) )
229233 }
230234
@@ -244,7 +248,7 @@ impl DevupSqlPool {
244248 . database_strategy
245249 . setup_revision ( & mut tx)
246250 . await ?;
247- self . commit ( tx) . await ?;
251+ self . commit ( & mut * tx) . await ?;
248252 Ok ( ( ) )
249253 }
250254
@@ -269,11 +273,66 @@ impl DevupSqlPool {
269273 }
270274 }
271275
272- pub async fn commit ( & self , tx : Transaction < ' _ , Any > ) -> Result < ( ) , Error > {
273- tx. commit ( ) . await
276+ pub async fn commit ( & self , tx : & mut AnyConnection ) -> Result < ( ) , Error > {
277+ AnyTransactionManager :: commit ( tx) . await ?;
278+ Ok ( ( ) )
279+ }
280+
281+ pub async fn rollback ( & self , tx : & mut AnyConnection ) -> Result < ( ) , Error > {
282+ AnyTransactionManager :: rollback ( tx) . await ?;
283+ Ok ( ( ) )
274284 }
275285
276- pub async fn rollback ( & self , tx : Transaction < ' _ , Any > ) -> Result < ( ) , Error > {
277- tx. rollback ( ) . await
286+ pub async fn save ( & self , obj : & Value , tx : Option < & mut AnyConnection > ) -> Result < ( ) , Error > {
287+ let table_name = obj
288+ . get :: < String > ( "__devup_name" . to_string ( ) )
289+ . ok_or ( Error :: ColumnNotFound ( "__devup_name" . to_string ( ) ) ) ?
290+ . as_str ( )
291+ . ok_or ( Error :: ColumnNotFound ( "__devup_name" . to_string ( ) ) ) ?;
292+
293+ let mut keys = Vec :: new ( ) ;
294+ let mut values = Vec :: new ( ) ;
295+
296+ for ( key, value) in obj
297+ . as_object ( )
298+ . ok_or ( Error :: ColumnNotFound ( "__devup_name" . to_string ( ) ) ) ?
299+ . iter ( )
300+ {
301+ if key. starts_with ( "__devup_" ) {
302+ continue ;
303+ }
304+ if let Some ( value) = value. as_str ( ) {
305+ keys. push ( to_snake_case ( key) ) ;
306+ values. push ( value) ;
307+ }
308+ }
309+
310+ let sql = format ! (
311+ "INSERT INTO {} ({}) VALUES ({})" ,
312+ to_snake_case( table_name) ,
313+ keys. join( ", " ) ,
314+ values
315+ . iter( )
316+ . enumerate( )
317+ . map(
318+ |( idx, _) | match self . options. database_strategy. get_query_params( ) {
319+ QueryParams :: Number => format!( "${}" , idx + 1 ) ,
320+ QueryParams :: QuestionMark => "?" . to_string( ) ,
321+ }
322+ )
323+ . collect:: <Vec <String >>( )
324+ . join( ", " )
325+ ) ;
326+
327+ let mut query = query ( & sql) ;
328+ for value in values {
329+ query = query. bind ( value) ;
330+ }
331+ if let Some ( tx) = tx {
332+ query. execute ( & mut * tx) . await ?;
333+ } else {
334+ query. execute ( self . pool . as_ref ( ) . unwrap ( ) ) . await ?;
335+ }
336+ Ok ( ( ) )
278337 }
279338}
0 commit comments