Skip to content

Commit 798ba49

Browse files
authored
Merge pull request #86 from bellanov/feat/issue-85-examples-and-coverage
feat: refactor examples and increase code coverage
2 parents 9173fad + 6274290 commit 798ba49

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2937
-1446
lines changed

.flake8

Lines changed: 0 additions & 5 deletions
This file was deleted.

README.md

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# SerenadeFlow
22

3-
*SerenadeFlow* is a powerful and flexible ETL (Extract, Transform, Load) pipeline framework designed to streamline data processing from both local and remote sources.
3+
**SerenadeFlow** is a powerful and flexible *ETL (Extract, Transform, Load)* pipeline framework designed to streamline data processing from both **local** and **remote** sources.
44

5-
It Extracts data from diverse sources, Transforms it through customizable and reusable operations, and Loads it into your desired destination with minimal configuration.
5+
It *Extracts* data from diverse sources, *Transforms* it through customizable and reusable operations, and *Loads* it into your desired destination with minimal configuration.
66

77
Built to be the Swiss Army Knife of ETL solutions, SerenadeFlow offers a simple yet extensible architecture that makes data movement and transformation intuitive—whether you're a data engineer, analyst, or developer.
88

@@ -12,7 +12,37 @@ The project is configured to operate in _Python >= 3.8_ enviornments.
1212

1313
## Quickstart
1414

15-
Coming Soon.
15+
The best way to get started with SerenadeFlow is to explore the examples. Each example is a self-contained recipe that demonstrates a specific use case:
16+
17+
### Basic ETL Pipeline
18+
19+
```bash
20+
python3 examples/basic_etl_pipeline.py --data-dir ./data --output-prefix output
21+
```
22+
23+
This example demonstrates the core ETL workflow: extracting data from local JSON files, transforming it, and loading it to output files.
24+
25+
### Remote Data Extraction
26+
27+
```bash
28+
python3 examples/quickstart.py
29+
```
30+
31+
This example shows how to extract data from a remote JSON API endpoint.
32+
33+
### Using Plugins
34+
35+
See the [Examples Documentation](examples/README.md) for more recipes and use cases.
36+
37+
## Examples and Recipes
38+
39+
SerenadeFlow is built around **examples** (recipes) that demonstrate how to use the framework. Each example is a self-contained script showing a specific ETL pattern:
40+
41+
- **Basic Examples**: Core ETL workflows (`basic_etl_pipeline.py`, `hello_world.py`, `quickstart.py`)
42+
- **Cloud Integrations**: Examples for GCS, Firestore, and other cloud services
43+
- **Plugin Examples**: Demonstrations of using community plugins
44+
45+
See the [Examples Documentation](examples/README.md) for a complete guide to available examples and how to create your own.
1646

1747
## Data Source Configuration
1848

@@ -129,3 +159,18 @@ pipeline.load(transformed, output_prefix="fantasyace")
129159
### Contributing Plugins
130160

131161
See `serenade_flow/community/PLUGIN_TEMPLATE.md` for how to document and contribute your own plugins.
162+
163+
## Architecture
164+
165+
SerenadeFlow follows a plugin-based architecture where:
166+
167+
- **Core Utilities**: The `pipeline` module provides basic utilities for data processing
168+
- **Plugins**: Extensible components for specific data sources and transformations
169+
- **Examples**: Self-contained recipes that demonstrate complete ETL workflows
170+
171+
This architecture allows you to:
172+
- Use existing examples as templates for your own pipelines
173+
- Extend functionality through plugins
174+
- Build custom ETL workflows by combining plugins and utilities
175+
176+
For new projects, we recommend starting with an example that matches your use case and customizing it as needed.

examples/README.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# SerenadeFlow Examples
2+
3+
Examples are recipes that demonstrate how to use SerenadeFlow with different plugins, data sources, and configurations.
4+
5+
## Example Structure
6+
7+
Each example should be:
8+
- Self-contained and runnable independently
9+
- Configurable via command-line arguments
10+
- Testable
11+
12+
## Example Categories
13+
14+
### Basic Examples
15+
- `hello_world.py`: Simple introduction
16+
- `quickstart.py`: Remote data extraction
17+
- `basic_etl_pipeline.py`: Core ETL pipeline
18+
- `sports_odds_processing.py`: Sports betting data processing
19+
20+
### Cloud Integrations
21+
- `cloud_integrations/gcs/`: Google Cloud Storage integration examples
22+
- `cloud_integrations/Firestore/`: Firestore integration examples
23+
24+
## Running Examples
25+
26+
Most examples can be run directly:
27+
28+
```bash
29+
python3 examples/basic_etl_pipeline.py
30+
```
31+
32+
Some examples accept command-line arguments:
33+
34+
```bash
35+
python3 examples/cloud_integrations/Firestore/fantasyace_cf_example.py --sport-key americanfootball_nfl --limit 10
36+
```
37+
38+
## Contributing Examples
39+
40+
When contributing a new example:
41+
1. Place it in the appropriate directory
42+
2. Add command-line argument support where useful
43+
3. Add tests in `tests/test_examples.py`
44+

examples/cloud/aws/s3.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import boto3
2+
3+
4+
def get_public_bucket_keys(bucket_name):
5+
"""
6+
Lists the keys (object names) in a public S3 bucket.
7+
"""
8+
s3 = boto3.client("s3")
9+
keys = []
10+
paginator = s3.get_paginator("list_objects_v2")
11+
pages = paginator.paginate(Bucket=bucket_name)
12+
for page in pages:
13+
if "Contents" in page:
14+
for obj in page["Contents"]:
15+
keys.append(obj["Key"])
16+
return keys
17+
18+
19+
# Example usage:
20+
public_bucket_name = "your-public-bucket-name" # Replace with your bucket name
21+
object_keys = get_public_bucket_keys(public_bucket_name)
22+
for key in object_keys:
23+
print(key)
File renamed without changes.

examples/cloud_integrations/gcs/gcs_batch_processing_example.py renamed to examples/cloud/gcs/gcs_batch_processing_example.py

Lines changed: 53 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,20 @@
55
for the organized bucket structure.
66
"""
77

8-
from serenade_flow import pipeline
8+
import statistics
99
import time
10-
from typing import Dict, List, Any
10+
from collections import defaultdict
1111
from dataclasses import dataclass, field
1212
from datetime import datetime
13-
from collections import defaultdict
14-
import statistics
13+
from typing import Any, Dict, List
14+
15+
from serenade_flow import pipeline
1516

1617

1718
@dataclass
1819
class ProcessingMetrics:
1920
"""Comprehensive metrics for batch processing operations."""
21+
2022
total_files: int
2123
successful_files: int
2224
failed_files: int
@@ -31,6 +33,7 @@ class ProcessingMetrics:
3133
@dataclass
3234
class DataQualityReport:
3335
"""Data quality assessment for extracted data."""
36+
3437
file_path: str
3538
record_count: int
3639
column_count: int
@@ -55,27 +58,30 @@ def _extract_dataframe(self, data: Dict[str, Any]) -> Any:
5558
if not data:
5659
return None
5760
for key, value in data.items():
58-
if hasattr(value, 'shape') and value.shape[0] > 0:
61+
if hasattr(value, "shape") and value.shape[0] > 0:
5962
return value
6063
return None
6164

6265
def _analyze_data_types(self, df: Any) -> Dict[str, str]:
6366
"""Analyze data types of DataFrame columns."""
6467
data_types = {}
6568
for col in df.columns:
66-
if df[col].dtype == 'object':
67-
data_types[col] = 'string'
68-
elif 'int' in str(df[col].dtype):
69-
data_types[col] = 'integer'
70-
elif 'float' in str(df[col].dtype):
71-
data_types[col] = 'float'
69+
if df[col].dtype == "object":
70+
data_types[col] = "string"
71+
elif "int" in str(df[col].dtype):
72+
data_types[col] = "integer"
73+
elif "float" in str(df[col].dtype):
74+
data_types[col] = "float"
7275
else:
7376
data_types[col] = str(df[col].dtype)
7477
return data_types
7578

7679
def _calculate_quality_score(
77-
self, record_count: int, column_count: int,
78-
missing_values: int, duplicate_records: int
80+
self,
81+
record_count: int,
82+
column_count: int,
83+
missing_values: int,
84+
duplicate_records: int,
7985
) -> tuple[float, List[str]]:
8086
"""Calculate quality score and identify issues."""
8187
quality_score = 100.0
@@ -113,7 +119,7 @@ def assess_data_quality(
113119
duplicate_records=0,
114120
data_types={},
115121
quality_score=0.0,
116-
issues=["No valid DataFrame found"]
122+
issues=["No valid DataFrame found"],
117123
)
118124

119125
record_count = len(df)
@@ -133,31 +139,33 @@ def assess_data_quality(
133139
duplicate_records=duplicate_records,
134140
data_types=data_types,
135141
quality_score=quality_score,
136-
issues=issues
142+
issues=issues,
137143
)
138144

139145
def process_file_with_analytics(self, file_path: str) -> Dict[str, Any]:
140146
"""Process a single file with comprehensive analytics."""
141147
start_time = time.time()
142148
try:
143149
print(f"🔍 Processing: {file_path}")
144-
pipeline.configure({
145-
"data_source": "remote",
146-
"data_source_path": self.bucket_url + file_path,
147-
"data_format": "json"
148-
})
150+
pipeline.configure(
151+
{
152+
"data_source": "remote",
153+
"data_source_path": self.bucket_url + file_path,
154+
"data_format": "json",
155+
}
156+
)
149157
data = pipeline.extract()
150158
quality_report = self.assess_data_quality(data, file_path)
151159
self.quality_reports.append(quality_report)
152160
processing_time = time.time() - start_time
153161
result = {
154-
'success': True,
155-
'file_path': file_path,
156-
'processing_time': processing_time,
157-
'record_count': quality_report.record_count,
158-
'quality_score': quality_report.quality_score,
159-
'data': data,
160-
'quality_report': quality_report
162+
"success": True,
163+
"file_path": file_path,
164+
"processing_time": processing_time,
165+
"record_count": quality_report.record_count,
166+
"quality_score": quality_report.quality_score,
167+
"data": data,
168+
"quality_report": quality_report,
161169
}
162170
print(
163171
f" ✅ {quality_report.record_count} records, "
@@ -171,12 +179,12 @@ def process_file_with_analytics(self, file_path: str) -> Dict[str, Any]:
171179
processing_time = time.time() - start_time
172180
print(f" ❌ Error: {str(e)}")
173181
return {
174-
'success': False,
175-
'file_path': file_path,
176-
'processing_time': processing_time,
177-
'error': str(e),
178-
'record_count': 0,
179-
'quality_score': 0
182+
"success": False,
183+
"file_path": file_path,
184+
"processing_time": processing_time,
185+
"error": str(e),
186+
"record_count": 0,
187+
"quality_score": 0,
180188
}
181189

182190
def process_batch_with_analytics(self, file_paths: List[str]) -> ProcessingMetrics:
@@ -194,10 +202,10 @@ def process_batch_with_analytics(self, file_paths: List[str]) -> ProcessingMetri
194202
if i < len(file_paths) - 1:
195203
time.sleep(0.5)
196204
total_time = time.time() - start_time
197-
successful_files = sum(1 for r in results if r['success'])
205+
successful_files = sum(1 for r in results if r["success"])
198206
failed_files = len(results) - successful_files
199-
total_records = sum(r['record_count'] for r in results if r['success'])
200-
quality_scores = [r['quality_score'] for r in results if r['success']]
207+
total_records = sum(r["record_count"] for r in results if r["success"])
208+
quality_scores = [r["quality_score"] for r in results if r["success"]]
201209
avg_quality = statistics.mean(quality_scores) if quality_scores else 0
202210
throughput = total_records / total_time if total_time > 0 else 0
203211
metrics = ProcessingMetrics(
@@ -211,7 +219,7 @@ def process_batch_with_analytics(self, file_paths: List[str]) -> ProcessingMetri
211219
),
212220
data_quality_score=avg_quality,
213221
error_rate=failed_files / len(file_paths) * 100,
214-
throughput=throughput
222+
throughput=throughput,
215223
)
216224
self._print_analytics_summary(metrics, results)
217225
return metrics
@@ -235,16 +243,14 @@ def _print_analytics_summary(self, metrics: ProcessingMetrics, results: List[Dic
235243
print(f" • Throughput: {metrics.throughput:.1f} records/sec")
236244
print("\n🔍 Quality Metrics:")
237245
print(f" • Avg Quality Score: {metrics.data_quality_score:.1f}/100")
238-
quality_scores = [r['quality_score'] for r in results if r['success']]
246+
quality_scores = [r["quality_score"] for r in results if r["success"]]
239247
if quality_scores:
240248
print(
241249
f" • Quality Range: {min(quality_scores):.1f} - "
242250
f"{max(quality_scores):.1f}"
243251
)
244252
if len(quality_scores) > 1:
245-
print(
246-
f" • Quality Std Dev: {statistics.stdev(quality_scores):.1f}"
247-
)
253+
print(f" • Quality Std Dev: {statistics.stdev(quality_scores):.1f}")
248254
if self.quality_reports:
249255
data_types = defaultdict(int)
250256
for report in self.quality_reports:
@@ -273,13 +279,14 @@ def get_all_odds_files_by_format():
273279
"decimal": [
274280
"odds/decimal/event_0089bc8773d8ce4ce20f9df90723cac9.json",
275281
# ... add more as needed ...
276-
]
282+
],
277283
}
278284

279285

280286
# --- REPLACE CATEGORIES with new odds file discovery ---
281287
odds_files_by_format = get_all_odds_files_by_format()
282288

289+
283290
# Flatten for batch processing, tagging each with its format
284291
def get_tagged_odds_file_list():
285292
tagged = []
@@ -293,14 +300,13 @@ def demonstrate_batch_processing():
293300
"""Demonstrate batch processing with analytics for both odds formats."""
294301
print("\n🎯 Batch Processing with SerenadeFlow (American & Decimal Odds)")
295302
print("=" * 70)
296-
processor = BatchProcessor(
297-
bucket_url=BUCKET_BASE_URL,
298-
max_concurrent=3
299-
)
303+
processor = BatchProcessor(bucket_url=BUCKET_BASE_URL, max_concurrent=3)
300304
tagged_files = get_tagged_odds_file_list()
301305
all_metrics = {}
302306
for odds_format in odds_files_by_format:
303-
format_files = [f["file_path"] for f in tagged_files if f["odds_format"] == odds_format]
307+
format_files = [
308+
f["file_path"] for f in tagged_files if f["odds_format"] == odds_format
309+
]
304310
print(f"\n🎯 Processing {odds_format.upper()} Odds Format")
305311
print("-" * 50)
306312
metrics = processor.process_batch_with_analytics(format_files)

0 commit comments

Comments
 (0)