|
| 1 | +from typing import Union, List |
1 | 2 | import os |
2 | 3 | import sqlite3 |
3 | 4 | import pandas as pd |
4 | 5 | import logging |
5 | 6 | from pathlib import Path |
6 | | -from typing import Union, List |
| 7 | +from tqdm import tqdm |
| 8 | +import time |
7 | 9 |
|
8 | 10 |
|
9 | 11 | def run_sql_queries( |
10 | | - query_dir: Union[str, os.PathLike], |
11 | | - db_file: Union[str, os.PathLike], |
12 | | - output_dir: Union[str, os.PathLike], |
13 | | - rerun_all: bool = False, |
14 | | - rerun_queries: List[str] = None |
| 12 | + query_dir: Union[str, os.PathLike], |
| 13 | + db_file: Union[str, os.PathLike], |
| 14 | + output_dir: Union[str, os.PathLike], |
| 15 | + rerun_all: bool = False, |
| 16 | + rerun_queries: List[str] = None |
15 | 17 | ) -> None: |
16 | 18 | """ |
17 | 19 | Execute all SQL queries in a directory (including subdirectories) on a |
@@ -44,78 +46,110 @@ def run_sql_queries( |
44 | 46 | rerun_queries=rerun_queries |
45 | 47 | ) |
46 | 48 |
|
47 | | - # Normalize rerun_queries for comparison |
48 | 49 | rerun_queries = set(rerun_queries or []) |
49 | | - |
50 | | - # Configure logging |
51 | 50 | logging.basicConfig( |
52 | 51 | filename='query_manager.log', |
53 | 52 | level=logging.INFO, |
54 | 53 | format='%(asctime)s - %(levelname)s - %(message)s' |
55 | 54 | ) |
56 | 55 | logger = logging.getLogger() |
57 | | - |
58 | 56 | logger.info("Starting SQL query execution process.") |
59 | 57 |
|
60 | | - # Connect to SQLite database |
| 58 | + # Collect all SQL query files |
| 59 | + query_files = [] |
| 60 | + for root, _, files in os.walk(query_dir): |
| 61 | + for file_name in files: |
| 62 | + if file_name.endswith('.sql'): |
| 63 | + sql_file_path = os.path.join( |
| 64 | + root, |
| 65 | + file_name |
| 66 | + ) |
| 67 | + query_files.append(sql_file_path) |
| 68 | + |
| 69 | + # Filter queries based on rerun criteria |
| 70 | + queries_to_execute = [] |
| 71 | + for sql_file_path in query_files: |
| 72 | + relative_path = os.path.relpath( |
| 73 | + os.path.dirname(sql_file_path), |
| 74 | + query_dir |
| 75 | + ) |
| 76 | + |
| 77 | + target_output_dir = os.path.join( |
| 78 | + output_dir, |
| 79 | + relative_path |
| 80 | + ) |
| 81 | + |
| 82 | + os.makedirs( |
| 83 | + target_output_dir, |
| 84 | + exist_ok=True |
| 85 | + ) |
| 86 | + output_file_path = os.path.join( |
| 87 | + target_output_dir, |
| 88 | + f"{Path(sql_file_path).stem}.csv" |
| 89 | + ) |
| 90 | + if ( |
| 91 | + not os.path.exists(output_file_path) or |
| 92 | + rerun_all or |
| 93 | + Path(sql_file_path).name in rerun_queries |
| 94 | + ): |
| 95 | + queries_to_execute.append(( |
| 96 | + sql_file_path, |
| 97 | + output_file_path |
| 98 | + )) |
| 99 | + |
| 100 | + if not queries_to_execute: |
| 101 | + logger.info("No queries to execute. Exiting.") |
| 102 | + print("No queries to execute.") |
| 103 | + return |
| 104 | + |
| 105 | + # Connect to the SQLite database |
61 | 106 | try: |
62 | 107 | conn = sqlite3.connect(db_file) |
63 | 108 | logger.info(f"Connected to database: {db_file}") |
64 | 109 | except Exception as e: |
65 | 110 | logger.error(f"Failed to connect to database: {db_file}. Error: {e}") |
66 | 111 | return |
67 | 112 |
|
68 | | - # Walk through the SQL directory structure |
69 | | - for root, _, files in os.walk(query_dir): |
70 | | - relative_path = os.path.relpath(root, query_dir) |
71 | | - target_output_dir = os.path.join(output_dir, relative_path) |
| 113 | + # Progress bar setup |
| 114 | + progress = tqdm( |
| 115 | + total=len(queries_to_execute), |
| 116 | + desc="Running queries", |
| 117 | + unit=" query", |
| 118 | + unit_scale=True |
| 119 | + ) |
72 | 120 |
|
73 | | - # Ensure output subdirectory exists |
74 | | - os.makedirs(target_output_dir, exist_ok=True) |
| 121 | + execution_times = [] |
75 | 122 |
|
76 | | - for file_name in files: |
77 | | - if file_name.endswith('.sql'): # Process only .sql files |
78 | | - sql_file_path = os.path.join(root, file_name) |
79 | | - output_file_path = os.path.join( |
80 | | - target_output_dir, |
81 | | - f"{Path(file_name).stem}.csv" |
82 | | - ) |
| 123 | + for sql_file_path, output_file_path in queries_to_execute: |
| 124 | + start_query_time = time.time() |
| 125 | + try: |
| 126 | + with open(sql_file_path, 'r') as query_file: |
| 127 | + query = query_file.read() |
| 128 | + |
| 129 | + logger.info(f"Executing query: {sql_file_path}") |
| 130 | + df = pd.read_sql_query(query, conn) |
| 131 | + df.to_csv(output_file_path, index=False) |
| 132 | + logger.info( |
| 133 | + f"Query executed successfully. Output saved to:" |
| 134 | + f" {output_file_path}" |
| 135 | + ) |
| 136 | + |
| 137 | + except Exception as e: |
| 138 | + logger.error(f"Error executing query: {sql_file_path}. Error: {e}") |
| 139 | + |
| 140 | + # Update progress and estimate remaining time |
| 141 | + query_duration = time.time() - start_query_time |
| 142 | + execution_times.append(query_duration) |
| 143 | + avg_time_per_query = sum(execution_times) / len(execution_times) |
| 144 | + remaining_queries = len(queries_to_execute) - progress.n - 1 |
| 145 | + estimated_time_left = avg_time_per_query * remaining_queries |
| 146 | + progress.set_postfix_str(f"ETA: {estimated_time_left:.2f}s") |
| 147 | + progress.update(1) |
83 | 148 |
|
84 | | - # Skip if output already exists and rerun_all is False, |
85 | | - # unless in rerun_queries |
86 | | - if (os.path.exists(output_file_path) |
87 | | - and not rerun_all |
88 | | - and file_name not in rerun_queries): |
89 | | - logger.info( |
90 | | - f"Skipping query (output exists):" |
91 | | - f" {sql_file_path}" |
92 | | - ) |
93 | | - continue |
94 | | - |
95 | | - # Read and execute the SQL query |
96 | | - try: |
97 | | - with open(sql_file_path, 'r') as query_file: |
98 | | - query = query_file.read() |
99 | | - |
100 | | - logger.info(f"Executing query: {sql_file_path}") |
101 | | - df = pd.read_sql_query(query, conn) |
102 | | - |
103 | | - # Save result to CSV |
104 | | - df.to_csv(output_file_path, index=False) |
105 | | - logger.info( |
106 | | - f"Query executed successfully. Output saved to:" |
107 | | - f" {output_file_path}" |
108 | | - ) |
109 | | - |
110 | | - except Exception as e: |
111 | | - logger.error( |
112 | | - f"Error executing query:" |
113 | | - f" {sql_file_path}. Error: {e}" |
114 | | - ) |
115 | | - |
116 | | - # Close the database connection |
| 149 | + progress.close() |
117 | 150 | conn.close() |
118 | 151 | logger.info("SQL query execution process completed.") |
| 152 | + print("SQL query execution process completed.") |
119 | 153 |
|
120 | 154 |
|
121 | 155 | def validate_inputs( |
@@ -184,7 +218,7 @@ def validate_inputs( |
184 | 218 |
|
185 | 219 | # No need to check if output_dir exists, as it may be created later |
186 | 220 |
|
187 | | - # Validate force_rerun |
| 221 | + # Validate rerun_all |
188 | 222 | if not isinstance(rerun_all, bool): |
189 | 223 | raise TypeError( |
190 | 224 | f"'force_rerun' must be a boolean." |
|
0 commit comments