-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
352 lines (292 loc) · 12 KB
/
main.py
File metadata and controls
352 lines (292 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
"""
AI QSR Consolidator - Main ETL Script
Extracts project data from business unit workbooks and consolidates into master Excel file.
"""
import argparse
from pathlib import Path
from datetime import datetime
import sys
# Add src to path
sys.path.insert(0, str(Path(__file__).parent))
from src.utils.config_loader import ConfigLoader
from src.utils.logger import ETLLogger
from src.utils.error_collector import ErrorCollector
from src.utils.helpers import generate_extraction_id, get_submission_week
from src.extractors.file_validator import FileValidator
from src.extractors.project_extractor import ProjectExtractor
from src.transformers.data_cleaner import DataCleaner
from src.transformers.data_validator import DataValidator
from src.transformers.deduplicator import Deduplicator
from src.loaders.excel_loader import ExcelLoader
from src.loaders.archive_manager import ArchiveManager
from src.loaders.log_writer import LogWriter
class AIQSRConsolidator:
"""
Main ETL orchestrator for AI QSR data consolidation.
Pipeline:
1. Validation: Validate workbook (7 validation gates)
2. Extraction: Extract 76 fields from P1-P10 sheets
3. Transformation: Clean, validate, deduplicate
4. Loading: Append to master file, archive, log
"""
def __init__(
self,
config_dir: Path = None,
master_file: Path = None,
archive_dir: Path = None,
log_level: str = "INFO",
environment: str = "test"
):
"""
Initialize ETL consolidator.
Args:
config_dir: Configuration directory (default: ./config)
master_file: Master consolidator file (default: ./data/{env}/AI_QSR_Consolidator.xlsx)
archive_dir: Archive directory (default: ./data/{env}/archive)
log_level: Logging level
environment: Environment to use - 'test' or 'live' (default: test)
"""
# Set default paths based on environment
base_dir = Path(__file__).parent
env_dir = base_dir / "data" / environment
self.environment = environment
self.config_dir = config_dir or base_dir / "config"
self.master_file = master_file or env_dir / "AI_QSR_Consolidator.xlsx"
self.archive_dir = archive_dir or env_dir / "archive"
self.log_dir = env_dir / "logs"
self.extraction_log = env_dir / "extraction_log.csv"
# Initialize logger
self.logger = ETLLogger(log_level=log_level, log_dir=str(self.log_dir))
# Load configuration
self.logger.info("Loading configuration...")
self.config = ConfigLoader(self.config_dir)
try:
if not self.config.load_all():
raise ValueError("Configuration validation failed")
self.logger.info(
f"Configuration loaded: {len(self.config.field_mappings)} fields, "
f"{len(self.config.businesses)} businesses"
)
except Exception as e:
self.logger.error(f"Failed to load configuration: {str(e)}")
raise
# Initialize components
self.validator = FileValidator()
self.extractor = ProjectExtractor(self.config, self.logger)
self.cleaner = DataCleaner()
self.data_validator = DataValidator()
self.deduplicator = Deduplicator(self.master_file, self.logger)
self.loader = ExcelLoader(self.master_file, self.logger)
self.archiver = ArchiveManager(self.archive_dir, self.logger)
self.log_writer = LogWriter(self.extraction_log, self.logger)
def process_workbook(
self,
file_path: Path,
business_id: str,
skip_deduplication: bool = False
) -> dict:
"""
Process a single workbook through the ETL pipeline.
Args:
file_path: Path to Excel workbook
business_id: Business unit identifier
skip_deduplication: Skip duplicate detection (for testing)
Returns:
Result dictionary with extraction_id, project_count, error_count, status
"""
extraction_id = generate_extraction_id()
submission_date = datetime.now()
submission_week = get_submission_week(submission_date)
self.logger.info(f"="*80)
self.logger.info(f"Starting ETL process")
self.logger.info(f"Extraction ID: {extraction_id}")
self.logger.info(f"Business: {business_id}")
self.logger.info(f"File: {file_path}")
self.logger.info(f"="*80)
result = {
'extraction_id': extraction_id,
'business_id': business_id,
'file_path': str(file_path),
'project_count': 0,
'error_count': 0,
'status': 'FAILED',
'errors': []
}
# Step 1: Validation
self.logger.info("Step 1/4: Validating workbook...")
is_valid, error_message = self.validator.validate(file_path)
if not is_valid:
self.logger.error(f"Validation failed: {error_message}")
result['errors'].append(f"Validation failed: {error_message}")
result['status'] = 'FAILED'
return result
self.logger.info("Validation passed")
project_sheets = self.validator.get_project_sheets(file_path)
self.logger.info(f"Found {len(project_sheets)} project sheets: {project_sheets}")
# Step 2: Extraction
self.logger.info("Step 2/4: Extracting project data...")
projects, error_collector = self.extractor.extract_workbook(
file_path=file_path,
business_id=business_id,
sheet_names=project_sheets,
submission_date=submission_date
)
result['error_count'] = error_collector.error_count()
result['errors'] = [str(e) for e in error_collector.get_all_errors()]
if not projects:
self.logger.error("No projects extracted")
result['status'] = 'FAILED'
return result
self.logger.info(f"Extracted {len(projects)} projects")
# Step 3: Transformation
self.logger.info("Step 3/4: Cleaning and validating data...")
# Clean data
projects = self.cleaner.clean_projects(projects)
self.logger.info("Data cleaning complete")
# Validate data
valid_projects, invalid_projects = self.data_validator.validate_projects(projects)
if invalid_projects:
self.logger.warning(f"{len(invalid_projects)} projects failed validation")
for idx, validation_result in invalid_projects:
for error in validation_result.errors:
self.logger.warning(f" Project {idx}: {error}")
result['errors'].append(f"Validation error: {error}")
self.logger.info(f"Validated {len(valid_projects)} projects")
# Deduplicate (mark duplicates but don't skip them)
if not skip_deduplication:
self.logger.info("Loading existing hashes for deduplication...")
self.deduplicator.load_existing_hashes()
all_projects, duplicate_projects = self.deduplicator.detect_duplicates(
valid_projects,
mark_only=True # Mark duplicates but don't filter them out
)
if duplicate_projects:
self.logger.warning(
f"Detected {len(duplicate_projects)} duplicate projects "
f"(will be loaded but marked as duplicates in logs)"
)
duplicate_summary = self.deduplicator.get_duplicate_summary(duplicate_projects)
self.logger.info(f"Duplicate summary: {duplicate_summary}")
valid_projects = all_projects
self.logger.info(f"{len(valid_projects)} projects ready for loading (includes {len(duplicate_projects)} duplicates)")
# Step 4: Loading
self.logger.info("Step 4/4: Loading data...")
if valid_projects:
# Load to Excel
loaded_count = self.loader.load_projects(valid_projects)
result['project_count'] = loaded_count
self.logger.info(f"Loaded {loaded_count} projects to master file")
# Archive workbook
archive_path = self.archiver.archive_workbook(
source_path=file_path,
business_id=business_id,
extraction_id=extraction_id,
submission_date=submission_date
)
self.logger.info(f"Archived workbook to: {archive_path}")
# Write log entry
business = self.config.get_business_by_id(business_id)
business_name = business.business_name if business else business_id
self.log_writer.write_log_entry(
extraction_id=extraction_id,
business_id=business_id,
business_name=business_name,
workbook_filename=file_path.name,
project_count=result['project_count'],
error_count=result['error_count'],
success_rate=error_collector.success_rate(),
notes=f"Processed {len(project_sheets)} sheets"
)
# Determine final status
if result['error_count'] == 0:
result['status'] = 'SUCCESS'
elif result['project_count'] > 0:
result['status'] = 'PARTIAL'
else:
result['status'] = 'FAILED'
self.logger.info(f"="*80)
self.logger.info(f"ETL complete: {result['status']}")
self.logger.info(f"Projects loaded: {result['project_count']}")
self.logger.info(f"Errors: {result['error_count']}")
self.logger.info(f"="*80)
return result
def main():
"""Command-line interface for ETL consolidator."""
parser = argparse.ArgumentParser(
description="AI QSR Consolidator - Extract project data from business unit workbooks"
)
parser.add_argument(
'--file',
type=str,
required=True,
help='Path to Excel workbook'
)
parser.add_argument(
'--business',
type=str,
required=True,
help='Business unit ID (e.g., BU_001)'
)
parser.add_argument(
'--skip-dedup',
action='store_true',
help='Skip deduplication (for testing)'
)
parser.add_argument(
'--log-level',
type=str,
default='INFO',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
help='Logging level'
)
parser.add_argument(
'--environment',
type=str,
default='test',
choices=['test', 'live'],
help='Environment to use - test or live (default: test)'
)
args = parser.parse_args()
# Validate file exists
file_path = Path(args.file)
if not file_path.exists():
print(f"Error: File not found: {file_path}")
sys.exit(1)
# Initialize and run ETL
try:
consolidator = AIQSRConsolidator(
log_level=args.log_level,
environment=args.environment
)
result = consolidator.process_workbook(
file_path=file_path,
business_id=args.business,
skip_deduplication=args.skip_dedup
)
# Print summary
print("\n" + "="*80)
print(f"ETL Result: {result['status']}")
print(f"Extraction ID: {result['extraction_id']}")
print(f"Projects Loaded: {result['project_count']}")
print(f"Errors: {result['error_count']}")
print("="*80)
if result['errors']:
print("\nErrors encountered:")
for error in result['errors'][:5]: # Show first 5 errors
print(f" - {error}")
if len(result['errors']) > 5:
print(f" ... and {len(result['errors']) - 5} more errors")
# Exit code based on status
if result['status'] == 'SUCCESS':
sys.exit(0)
elif result['status'] == 'PARTIAL':
sys.exit(2)
else:
sys.exit(1)
except Exception as e:
print(f"\nFatal error: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()