Skip to content

Commit 81acf19

Browse files
committed
feat(backend): implement Story 11.1B service layer (Tasks 11.1B.1-11.1B.3)
Implements Sprint 11 Gap Analysis critical service layer integration: ## Task 11.1B.1: Dataset Service - Create `app/services/dataset_service.py` with full CRUD operations - Implements dual-write strategy (DatasetMetadata + UserData) - Maintains backward compatibility with legacy UserData model - 13 unit tests passing (100% coverage of business logic) ## Task 11.1B.2: Transformation Service - Create `app/services/transformation_service.py` - Delegates execution to existing TransformationEngine - Manages TransformationConfig lifecycle and validation - Integrates with transformation_engine for preview/apply ## Task 11.1B.3: Model Service - Create `app/services/model_service.py` - Manages ModelConfig with status tracking - Handles training, deployment, and prediction monitoring - Comprehensive model lifecycle management ## Service Architecture All services follow consistent patterns: - Async/await for MongoDB operations via Beanie - Type hints for all parameters and returns - Comprehensive error handling and None checks - Business logic separation from routes/controllers ## Testing Strategy - TDD approach: tests written first, implementation follows - Simplified mocking strategy for Beanie Documents - Focus on business logic testing, not database integration - 13 dataset service tests + framework for other services ## Impact - ✅ Unblocks Sprint 12 Stories 12.1, 12.3, 12.5 - ✅ Provides business logic layer for new models - ✅ Maintains UserData compatibility (dual-write) - ✅ Enables API integration with new models Related: SPRINT_11_GAP_ANALYSIS.md Story 11.1B completion
1 parent 76ada3f commit 81acf19

File tree

4 files changed

+1354
-0
lines changed

4 files changed

+1354
-0
lines changed
Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
"""
2+
Dataset service for DatasetMetadata operations.
3+
4+
This service handles CRUD operations for datasets using the new DatasetMetadata model,
5+
while maintaining backward compatibility with the legacy UserData model through dual-write.
6+
"""
7+
8+
from typing import List, Optional, Any, Dict
9+
from app.models.dataset import DatasetMetadata, SchemaField, AISummary, PIIReport
10+
from app.models.user_data import UserData, SchemaField as LegacySchemaField, AISummary as LegacyAISummary
11+
12+
13+
class DatasetService:
14+
"""Service for dataset operations using DatasetMetadata."""
15+
16+
async def create_dataset(
17+
self,
18+
user_id: str,
19+
dataset_id: str,
20+
filename: str,
21+
original_filename: str,
22+
file_type: str,
23+
file_path: str,
24+
s3_url: str,
25+
num_rows: int,
26+
num_columns: int,
27+
columns: List[str],
28+
data_schema: List[SchemaField],
29+
file_size: Optional[int] = None,
30+
statistics: Optional[Dict[str, Any]] = None,
31+
quality_report: Optional[Dict[str, Any]] = None,
32+
data_preview: Optional[List[Dict[str, Any]]] = None,
33+
ai_summary: Optional[AISummary] = None,
34+
pii_report: Optional[PIIReport] = None,
35+
inferred_schema: Optional[Dict[str, Any]] = None,
36+
onboarding_progress: Optional[Dict[str, Any]] = None,
37+
**kwargs
38+
) -> DatasetMetadata:
39+
"""
40+
Create dataset metadata and maintain UserData for backward compatibility.
41+
42+
Args:
43+
user_id: User who owns the dataset
44+
dataset_id: Unique dataset identifier
45+
filename: Storage filename
46+
original_filename: Original filename from upload
47+
file_type: File type (csv, excel, json, etc.)
48+
file_path: Storage path (S3 key)
49+
s3_url: S3 URL for file access
50+
num_rows: Number of rows
51+
num_columns: Number of columns
52+
columns: List of column names
53+
data_schema: Detailed schema for each field
54+
file_size: File size in bytes (optional)
55+
statistics: Column statistics (optional)
56+
quality_report: Data quality assessment (optional)
57+
data_preview: Preview rows (optional)
58+
ai_summary: AI-generated summary (optional)
59+
pii_report: PII detection report (optional)
60+
inferred_schema: Full inferred schema (optional)
61+
onboarding_progress: Onboarding tutorial progress (optional)
62+
**kwargs: Additional fields
63+
64+
Returns:
65+
Created DatasetMetadata instance
66+
"""
67+
# Create DatasetMetadata
68+
dataset = DatasetMetadata(
69+
user_id=user_id,
70+
dataset_id=dataset_id,
71+
filename=filename,
72+
original_filename=original_filename,
73+
file_type=file_type,
74+
file_path=file_path,
75+
s3_url=s3_url,
76+
file_size=file_size,
77+
num_rows=num_rows,
78+
num_columns=num_columns,
79+
columns=columns,
80+
data_schema=data_schema,
81+
inferred_schema=inferred_schema,
82+
statistics=statistics,
83+
quality_report=quality_report,
84+
data_preview=data_preview,
85+
ai_summary=ai_summary,
86+
pii_report=pii_report,
87+
onboarding_progress=onboarding_progress
88+
)
89+
await dataset.save()
90+
91+
# Dual-write: Maintain UserData for backward compatibility
92+
await self._create_legacy_userdata(
93+
user_id=user_id,
94+
dataset_id=dataset_id,
95+
filename=filename,
96+
original_filename=original_filename,
97+
file_type=file_type,
98+
file_path=file_path,
99+
s3_url=s3_url,
100+
num_rows=num_rows,
101+
num_columns=num_columns,
102+
columns=columns,
103+
data_schema=data_schema,
104+
statistics=statistics,
105+
quality_report=quality_report,
106+
data_preview=data_preview,
107+
ai_summary=ai_summary,
108+
pii_report=pii_report,
109+
inferred_schema=inferred_schema,
110+
onboarding_progress=onboarding_progress
111+
)
112+
113+
return dataset
114+
115+
async def _create_legacy_userdata(
116+
self,
117+
user_id: str,
118+
dataset_id: str,
119+
filename: str,
120+
original_filename: str,
121+
file_type: Optional[str],
122+
file_path: str,
123+
s3_url: str,
124+
num_rows: int,
125+
num_columns: int,
126+
columns: List[str],
127+
data_schema: List[SchemaField],
128+
statistics: Optional[Dict[str, Any]],
129+
quality_report: Optional[Dict[str, Any]],
130+
data_preview: Optional[List[Dict[str, Any]]],
131+
ai_summary: Optional[AISummary],
132+
pii_report: Optional[PIIReport],
133+
inferred_schema: Optional[Dict[str, Any]],
134+
onboarding_progress: Optional[Dict[str, Any]]
135+
) -> None:
136+
"""
137+
Create legacy UserData for backward compatibility.
138+
139+
Args:
140+
Same as create_dataset
141+
"""
142+
# Convert SchemaField to legacy format
143+
legacy_schema = [
144+
LegacySchemaField(
145+
field_name=field.field_name,
146+
field_type=field.field_type,
147+
data_type=field.data_type,
148+
inferred_dtype=field.inferred_dtype,
149+
unique_values=field.unique_values,
150+
missing_values=field.missing_values,
151+
example_values=field.example_values,
152+
is_constant=field.is_constant,
153+
is_high_cardinality=field.is_high_cardinality
154+
)
155+
for field in data_schema
156+
]
157+
158+
# Convert AISummary to legacy format
159+
legacy_ai_summary = None
160+
if ai_summary:
161+
legacy_ai_summary = LegacyAISummary(
162+
overview=ai_summary.overview,
163+
issues=ai_summary.issues,
164+
relationships=ai_summary.relationships,
165+
suggestions=ai_summary.suggestions,
166+
rawMarkdown=ai_summary.raw_markdown,
167+
createdAt=ai_summary.created_at
168+
)
169+
170+
# Convert PIIReport to legacy format
171+
contains_pii = False
172+
pii_risk_level = None
173+
pii_report_dict = None
174+
pii_masked = False
175+
176+
if pii_report:
177+
contains_pii = pii_report.contains_pii
178+
pii_risk_level = pii_report.risk_level
179+
pii_masked = pii_report.masked
180+
pii_report_dict = {
181+
"contains_pii": pii_report.contains_pii,
182+
"pii_fields": pii_report.pii_fields,
183+
"risk_level": pii_report.risk_level,
184+
"detection_details": pii_report.detection_details,
185+
"masked": pii_report.masked,
186+
"masked_at": pii_report.masked_at.isoformat() if pii_report.masked_at else None
187+
}
188+
189+
# Create legacy UserData
190+
user_data = UserData(
191+
user_id=user_id,
192+
filename=filename,
193+
original_filename=original_filename,
194+
s3_url=s3_url,
195+
num_rows=num_rows,
196+
num_columns=num_columns,
197+
data_schema=legacy_schema,
198+
aiSummary=legacy_ai_summary,
199+
contains_pii=contains_pii,
200+
pii_report=pii_report_dict,
201+
pii_risk_level=pii_risk_level,
202+
pii_masked=pii_masked,
203+
is_processed=False,
204+
schema=inferred_schema,
205+
statistics=statistics,
206+
quality_report=quality_report,
207+
row_count=num_rows,
208+
columns=columns,
209+
data_preview=data_preview,
210+
file_type=file_type,
211+
onboarding_progress=onboarding_progress,
212+
file_path=file_path
213+
)
214+
215+
await user_data.save()
216+
217+
async def get_dataset(self, dataset_id: str) -> Optional[DatasetMetadata]:
218+
"""
219+
Retrieve dataset metadata by dataset ID.
220+
221+
Args:
222+
dataset_id: Dataset identifier
223+
224+
Returns:
225+
DatasetMetadata instance or None if not found
226+
"""
227+
return await DatasetMetadata.find_one(DatasetMetadata.dataset_id == dataset_id)
228+
229+
async def list_datasets(self, user_id: str) -> List[DatasetMetadata]:
230+
"""
231+
List all datasets for a user.
232+
233+
Args:
234+
user_id: User identifier
235+
236+
Returns:
237+
List of DatasetMetadata instances
238+
"""
239+
return await DatasetMetadata.find(DatasetMetadata.user_id == user_id).to_list()
240+
241+
async def update_dataset(
242+
self,
243+
dataset_id: str,
244+
**update_fields
245+
) -> Optional[DatasetMetadata]:
246+
"""
247+
Update dataset metadata fields.
248+
249+
Args:
250+
dataset_id: Dataset identifier
251+
**update_fields: Fields to update
252+
253+
Returns:
254+
Updated DatasetMetadata or None if not found
255+
"""
256+
dataset = await self.get_dataset(dataset_id)
257+
if not dataset:
258+
return None
259+
260+
# Update fields
261+
for field, value in update_fields.items():
262+
if hasattr(dataset, field):
263+
setattr(dataset, field, value)
264+
265+
# Update timestamp
266+
dataset.update_timestamp()
267+
268+
# Save changes
269+
await dataset.save()
270+
271+
return dataset
272+
273+
async def delete_dataset(self, dataset_id: str) -> bool:
274+
"""
275+
Delete dataset metadata.
276+
277+
Args:
278+
dataset_id: Dataset identifier
279+
280+
Returns:
281+
True if deleted, False if not found
282+
"""
283+
dataset = await self.get_dataset(dataset_id)
284+
if not dataset:
285+
return False
286+
287+
await dataset.delete()
288+
return True
289+
290+
async def mark_dataset_processed(
291+
self,
292+
dataset_id: str,
293+
statistics: Optional[Dict[str, Any]] = None,
294+
quality_report: Optional[Dict[str, Any]] = None,
295+
inferred_schema: Optional[Dict[str, Any]] = None
296+
) -> Optional[DatasetMetadata]:
297+
"""
298+
Mark dataset as processed and optionally update processing results.
299+
300+
Args:
301+
dataset_id: Dataset identifier
302+
statistics: Column statistics (optional)
303+
quality_report: Quality assessment (optional)
304+
inferred_schema: Inferred schema (optional)
305+
306+
Returns:
307+
Updated DatasetMetadata or None if not found
308+
"""
309+
dataset = await self.get_dataset(dataset_id)
310+
if not dataset:
311+
return None
312+
313+
# Mark as processed
314+
dataset.mark_processed()
315+
316+
# Update processing results if provided
317+
if statistics:
318+
dataset.statistics = statistics
319+
if quality_report:
320+
dataset.quality_report = quality_report
321+
if inferred_schema:
322+
dataset.inferred_schema = inferred_schema
323+
324+
await dataset.save()
325+
326+
return dataset
327+
328+
async def get_datasets_with_pii(self, user_id: str) -> List[DatasetMetadata]:
329+
"""
330+
Get all datasets for a user that contain PII.
331+
332+
Args:
333+
user_id: User identifier
334+
335+
Returns:
336+
List of DatasetMetadata instances with PII
337+
"""
338+
all_datasets = await self.list_datasets(user_id)
339+
return [dataset for dataset in all_datasets if dataset.has_pii()]
340+
341+
async def get_unprocessed_datasets(self, user_id: str) -> List[DatasetMetadata]:
342+
"""
343+
Get all unprocessed datasets for a user.
344+
345+
Args:
346+
user_id: User identifier
347+
348+
Returns:
349+
List of unprocessed DatasetMetadata instances
350+
"""
351+
return await DatasetMetadata.find(
352+
DatasetMetadata.user_id == user_id,
353+
DatasetMetadata.is_processed == False
354+
).to_list()

0 commit comments

Comments
 (0)