@@ -25,8 +25,9 @@ pub struct DbPools {
2525
2626/// Create both the primary and maintenance database pools.
2727///
28- /// The schema is applied once via the primary pool. The maintenance pool
29- /// shares the same connection string but has its own, smaller budget.
28+ /// Pending migrations are applied via the primary pool on startup.
29+ /// The maintenance pool shares the same connection string but has its
30+ /// own, smaller budget.
3031pub async fn create_database_pools ( config : & AppConfig ) -> Result < DbPools > {
3132 tracing:: info!(
3233 "Initializing PostgreSQL connections with URL: {}" ,
@@ -48,16 +49,16 @@ pub async fn create_database_pools(config: &AppConfig) -> Result<DbPools> {
4849 )
4950 . await ?;
5051
51- // Apply schema through the primary pool (idempotent )
52- tracing:: info!( "Applying database schema ..." ) ;
53- if let Err ( e) = apply_schema ( & primary) . await {
52+ // Run pending migrations (idempotent, tracked in _sqlx_migrations table )
53+ tracing:: info!( "Running database migrations ..." ) ;
54+ if let Err ( e) = run_migrations ( & primary) . await {
5455 return Err ( DbError ( format ! (
55- "Database schema could not be applied : {}. \
56- Run manually: psql -f db/schema.sql ",
56+ "Database migrations failed : {}. \
57+ Check the migrations/ directory for issues. ",
5758 e
5859 ) ) ) ;
5960 }
60- tracing:: info!( "Database schema applied successfully " ) ;
61+ tracing:: info!( "Database migrations complete " ) ;
6162
6263 // --- maintenance pool ---
6364 let maintenance = create_pool_with_retries (
@@ -156,172 +157,14 @@ async fn create_pool_with_retries(
156157 ) ) )
157158}
158159
159- /// Apply the embedded schema.sql to the database.
160- /// First tries `raw_sql` (simple query protocol). If that fails, falls back
161- /// to splitting the SQL into individual statements and executing them one by one.
162- async fn apply_schema ( pool : & PgPool ) -> Result < ( ) > {
163- let schema_sql = include_str ! ( "../../db/schema.sql" ) ;
164-
165- // Attempt 1: raw_sql sends the entire script via the simple query protocol
166- match sqlx:: raw_sql ( schema_sql) . execute ( pool) . await {
167- Ok ( _) => return Ok ( ( ) ) ,
168- Err ( e) => {
169- tracing:: warn!(
170- "raw_sql failed ({}), falling back to statement-by-statement execution" ,
171- e
172- ) ;
173- }
174- }
175-
176- // Attempt 2: split into individual statements respecting dollar-quoting
177- let statements = split_sql_statements ( schema_sql) ;
178- for ( i, stmt) in statements. iter ( ) . enumerate ( ) {
179- let trimmed = stmt. trim ( ) ;
180- if trimmed. is_empty ( ) || trimmed == ";" {
181- continue ;
182- }
183- if let Err ( e) = sqlx:: raw_sql ( trimmed) . execute ( pool) . await {
184- let preview = if trimmed. len ( ) > 200 {
185- & trimmed[ ..200 ]
186- } else {
187- trimmed
188- } ;
189- tracing:: error!(
190- "Schema statement {} failed: {}\n --- SQL ---\n {}\n -----------" ,
191- i + 1 ,
192- e,
193- preview
194- ) ;
195- return Err ( DbError ( format ! ( "Schema statement {} failed: {}" , i + 1 , e) ) ) ;
196- }
197- }
198-
199- Ok ( ( ) )
200- }
201-
202- /// Split a SQL script into individual statements, correctly handling:
203- /// - Dollar-quoted blocks (`$BODY$...$BODY$`, `$$...$$`)
204- /// - Single-quoted strings (`'...'`)
205- /// - Line comments (`-- ...`)
206- /// - Block comments (`/* ... */`)
160+ /// Run pending migrations from the `migrations/` directory.
207161///
208- /// Uses byte-level iteration over the `&str` directly — no intermediate
209- /// `Vec<char>` allocation (saves ~4× the input size in heap memory).
210- /// SQL is ASCII-safe, so byte comparison is sufficient for all delimiters.
211- fn split_sql_statements ( sql : & str ) -> Vec < String > {
212- let mut statements = Vec :: new ( ) ;
213- let mut current = String :: new ( ) ;
214- let bytes = sql. as_bytes ( ) ;
215- let len = bytes. len ( ) ;
216- let mut i = 0 ;
217-
218- while i < len {
219- let b = bytes[ i] ;
220-
221- // Line comment
222- if b == b'-' && i + 1 < len && bytes[ i + 1 ] == b'-' {
223- while i < len && bytes[ i] != b'\n' {
224- current. push ( bytes[ i] as char ) ;
225- i += 1 ;
226- }
227- continue ;
228- }
229-
230- // Block comment
231- if b == b'/' && i + 1 < len && bytes[ i + 1 ] == b'*' {
232- current. push ( '/' ) ;
233- current. push ( '*' ) ;
234- i += 2 ;
235- while i + 1 < len && !( bytes[ i] == b'*' && bytes[ i + 1 ] == b'/' ) {
236- current. push ( bytes[ i] as char ) ;
237- i += 1 ;
238- }
239- if i + 1 < len {
240- current. push ( '*' ) ;
241- current. push ( '/' ) ;
242- i += 2 ;
243- }
244- continue ;
245- }
246-
247- // Single-quoted string
248- if b == b'\'' {
249- current. push ( '\'' ) ;
250- i += 1 ;
251- while i < len {
252- current. push ( bytes[ i] as char ) ;
253- if bytes[ i] == b'\'' {
254- if i + 1 < len && bytes[ i + 1 ] == b'\'' {
255- current. push ( '\'' ) ;
256- i += 2 ;
257- } else {
258- i += 1 ;
259- break ;
260- }
261- } else {
262- i += 1 ;
263- }
264- }
265- continue ;
266- }
267-
268- // Dollar-quoted string ($tag$...$tag$ or $$...$$)
269- if b == b'$' {
270- i += 1 ;
271- let mut tag = String :: from ( "$" ) ;
272- while i < len && ( bytes[ i] . is_ascii_alphanumeric ( ) || bytes[ i] == b'_' ) {
273- tag. push ( bytes[ i] as char ) ;
274- i += 1 ;
275- }
276- if i < len && bytes[ i] == b'$' {
277- tag. push ( '$' ) ;
278- i += 1 ;
279- // We have a dollar-quote tag, find the closing tag
280- current. push_str ( & tag) ;
281- let tag_bytes = tag. as_bytes ( ) ;
282- loop {
283- if i >= len {
284- break ;
285- }
286- if bytes[ i] == b'$'
287- && i + tag_bytes. len ( ) <= len
288- && & bytes[ i..i + tag_bytes. len ( ) ] == tag_bytes
289- {
290- current. push_str ( & tag) ;
291- i += tag_bytes. len ( ) ;
292- break ;
293- }
294- current. push ( bytes[ i] as char ) ;
295- i += 1 ;
296- }
297- } else {
298- // Not a valid dollar-quote, push what we consumed
299- current. push_str ( & tag) ;
300- }
301- continue ;
302- }
303-
304- // Statement separator
305- if b == b';' {
306- current. push ( ';' ) ;
307- let trimmed = current. trim ( ) . to_string ( ) ;
308- if !trimmed. is_empty ( ) && trimmed != ";" {
309- statements. push ( trimmed) ;
310- }
311- current. clear ( ) ;
312- i += 1 ;
313- continue ;
314- }
315-
316- current. push ( b as char ) ;
317- i += 1 ;
318- }
319-
320- // Trailing statement without semicolon
321- let trimmed = current. trim ( ) . to_string ( ) ;
322- if !trimmed. is_empty ( ) && trimmed != ";" {
323- statements. push ( trimmed) ;
162+ /// Uses sqlx's built-in migration system which tracks applied migrations
163+ /// in a `_sqlx_migrations` table. Each migration runs in its own transaction.
164+ /// Migration files are embedded at compile time via `sqlx::migrate!()`.
165+ async fn run_migrations ( pool : & PgPool ) -> Result < ( ) > {
166+ match sqlx:: migrate!( ) . run ( pool) . await {
167+ Ok ( ( ) ) => Ok ( ( ) ) ,
168+ Err ( e) => Err ( DbError ( format ! ( "Migration error: {}" , e) ) ) ,
324169 }
325-
326- statements
327170}
0 commit comments