Skip to content

Commit 016b64a

Browse files
author
Harish Cherian
committed
feat: Add complete Apache Iceberg integration with AWS Glue Catalog support
- Add comprehensive Iceberg functions library (libs/glue_functions/iceberg_glue_functions.py) - Implement production-ready Lambda handlers for Iceberg table operations - Add time travel queries and metadata access capabilities - Include advanced features: schema evolution, table history, snapshots - Provide complete ETL pipeline with data quality checks - Add CloudFormation infrastructure templates for AWS deployment - Include comprehensive usage examples and documentation - Support for reading Iceberg tables from AWS Glue Catalog - Full S3 integration with proper AWS credentials handling - Production deployment scripts and cleanup utilities Key Features: - Read Iceberg tables with time travel support - Query table metadata (history, snapshots, schema) - Complete Spark session configuration for Lambda - Error handling and comprehensive logging - Multiple Lambda handler templates for different use cases - Infrastructure as Code with CloudFormation - End-to-end testing and validation scripts
1 parent fbff227 commit 016b64a

18 files changed

+3028
-57
lines changed

.vscode/settings.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"kiroAgent.configureMCP": "Enabled"
3+
}

ICEBERG_INTEGRATION_SUMMARY.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Iceberg Integration Summary
2+
3+
## Core Integration Files
4+
5+
### Essential Library
6+
- `libs/glue_functions/iceberg_glue_functions.py` - Core Iceberg functions for Glue Catalog integration
7+
8+
### Production Code
9+
- `lambda-deployment/spark-iceberg-reader.py` - Production Lambda handler
10+
- `lambda-deployment/deploy-production-lambda.sh` - Deployment script
11+
12+
### Key Examples (Kept)
13+
- `examples/advanced-iceberg-features.py` - Time travel and metadata queries
14+
- `examples/lambda-handler-templates.py` - Production Lambda templates
15+
- `examples/production-etl-pipeline.py` - Complete ETL pipeline
16+
- `examples/USAGE_GUIDE.md` - Usage documentation
17+
18+
### Infrastructure (Kept)
19+
- `test-infrastructure/iceberg-test-setup.yaml` - CloudFormation template
20+
- `test-infrastructure/create-sample-iceberg-table.py` - Table creation
21+
- `test-infrastructure/deploy-test-environment.sh` - Environment deployment
22+
- `test-infrastructure/cleanup-test-environment.sh` - Cleanup script
23+
24+
## Removed Files
25+
- All redundant example scripts
26+
- Test and demo scripts
27+
- Duplicate functionality files
28+
- Temporary files and guides
29+
30+
## Usage
31+
Your Iceberg integration is now streamlined with only essential files for production use.

examples/USAGE_GUIDE.md

Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
# Iceberg Integration Usage Guide
2+
3+
This guide shows you how to use the Iceberg integration code in your Spark on AWS Lambda applications.
4+
5+
## 🚀 Quick Start
6+
7+
### 1. Basic Setup
8+
9+
```python
10+
# In your Lambda function
11+
import sys
12+
sys.path.append('/home/glue_functions')
13+
14+
from iceberg_glue_functions import read_iceberg_table_with_spark
15+
from pyspark.sql import SparkSession
16+
17+
# Create Spark session (use the provided helper)
18+
spark = create_iceberg_spark_session()
19+
20+
# Read Iceberg table
21+
df = spark.read.format("iceberg").load("glue_catalog.your_database.your_table")
22+
```
23+
24+
### 2. Environment Variables
25+
26+
Set these in your Lambda function:
27+
28+
```bash
29+
SCRIPT_BUCKET=your-s3-bucket
30+
SPARK_SCRIPT=your-script.py
31+
DATABASE_NAME=your_database
32+
TABLE_NAME=your_table
33+
AWS_REGION=us-east-1
34+
```
35+
36+
## 📖 Usage Examples
37+
38+
### Example 1: Simple Data Reading
39+
40+
```python
41+
def lambda_handler(event, context):
42+
spark = create_iceberg_spark_session()
43+
44+
try:
45+
# Read table
46+
df = spark.read.format("iceberg").load("glue_catalog.analytics.customer_data")
47+
48+
# Basic operations
49+
print(f"Row count: {df.count()}")
50+
df.show(10)
51+
52+
# Filter data
53+
recent_data = df.filter(col("created_date") >= "2024-01-01")
54+
55+
return {
56+
'statusCode': 200,
57+
'body': json.dumps({
58+
'total_rows': df.count(),
59+
'recent_rows': recent_data.count()
60+
})
61+
}
62+
finally:
63+
spark.stop()
64+
```
65+
66+
### Example 2: Time Travel Queries
67+
68+
```python
69+
def lambda_handler(event, context):
70+
spark = create_iceberg_spark_session()
71+
72+
try:
73+
# Current data
74+
current_df = spark.read.format("iceberg").load("glue_catalog.sales.transactions")
75+
76+
# Historical data (yesterday)
77+
historical_df = spark.read.format("iceberg") \
78+
.option("as-of-timestamp", "2024-01-20 00:00:00.000") \
79+
.load("glue_catalog.sales.transactions")
80+
81+
# Compare
82+
current_count = current_df.count()
83+
historical_count = historical_df.count()
84+
85+
return {
86+
'statusCode': 200,
87+
'body': json.dumps({
88+
'current_transactions': current_count,
89+
'historical_transactions': historical_count,
90+
'new_transactions': current_count - historical_count
91+
})
92+
}
93+
finally:
94+
spark.stop()
95+
```
96+
97+
### Example 3: Data Processing Pipeline
98+
99+
```python
100+
def lambda_handler(event, context):
101+
spark = create_iceberg_spark_session()
102+
103+
try:
104+
# Read source data
105+
raw_df = spark.read.format("iceberg").load("glue_catalog.raw.events")
106+
107+
# Process data
108+
processed_df = raw_df \
109+
.filter(col("event_type") == "purchase") \
110+
.withColumn("processing_date", current_date()) \
111+
.groupBy("customer_id", "product_category") \
112+
.agg(
113+
sum("amount").alias("total_spent"),
114+
count("*").alias("purchase_count")
115+
)
116+
117+
# Write to target table
118+
processed_df.write \
119+
.format("iceberg") \
120+
.mode("overwrite") \
121+
.save("glue_catalog.analytics.customer_purchases")
122+
123+
return {
124+
'statusCode': 200,
125+
'body': json.dumps({
126+
'processed_customers': processed_df.count(),
127+
'message': 'Processing completed successfully'
128+
})
129+
}
130+
finally:
131+
spark.stop()
132+
```
133+
134+
## 🔧 Configuration Options
135+
136+
### Spark Session Configuration
137+
138+
```python
139+
spark = SparkSession.builder \
140+
.appName("Your-App-Name") \
141+
.master("local[*]") \
142+
.config("spark.driver.memory", "5g") \
143+
.config("spark.executor.memory", "5g") \
144+
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
145+
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
146+
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
147+
.config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
148+
.getOrCreate()
149+
```
150+
151+
### Lambda Function Settings
152+
153+
- **Memory**: 3008 MB (recommended for Spark workloads)
154+
- **Timeout**: 15 minutes (maximum)
155+
- **Runtime**: Use container image with Iceberg support
156+
- **Environment Variables**: Set database and table names
157+
158+
## 📊 Available Functions
159+
160+
### Basic Operations
161+
162+
```python
163+
# Read table
164+
df = read_iceberg_table_with_spark(spark, "database", "table")
165+
166+
# Get table metadata
167+
metadata = get_iceberg_table_metadata("database", "table", "us-east-1")
168+
169+
# Get table location
170+
location = get_iceberg_table_location(metadata)
171+
```
172+
173+
### Advanced Operations
174+
175+
```python
176+
# Time travel
177+
historical_df = read_iceberg_table_at_timestamp(spark, "db", "table", "2024-01-01 00:00:00")
178+
179+
# Snapshot queries
180+
snapshot_df = read_iceberg_table_at_snapshot(spark, "db", "table", "snapshot_id")
181+
182+
# Table history
183+
history_df = query_iceberg_table_history(spark, "db", "table")
184+
185+
# Table snapshots
186+
snapshots_df = query_iceberg_table_snapshots(spark, "db", "table")
187+
```
188+
189+
## 🎯 Event Formats
190+
191+
### Simple Read Event
192+
193+
```json
194+
{
195+
"handler_type": "simple_reader",
196+
"database": "analytics",
197+
"table": "customer_data",
198+
"limit": 100
199+
}
200+
```
201+
202+
### Analytics Event
203+
204+
```json
205+
{
206+
"handler_type": "analytics",
207+
"database": "sales",
208+
"table": "transactions",
209+
"filters": ["date >= '2024-01-01'", "amount > 100"],
210+
"aggregations": {
211+
"group_by": ["product_category"],
212+
"metrics": ["sum(amount) as total_sales", "count(*) as transaction_count"]
213+
}
214+
}
215+
```
216+
217+
### Time Travel Event
218+
219+
```json
220+
{
221+
"handler_type": "time_travel",
222+
"database": "analytics",
223+
"table": "customer_data",
224+
"timestamp": "2024-01-15 10:00:00.000",
225+
"compare_with_current": true
226+
}
227+
```
228+
229+
## 🔍 Error Handling
230+
231+
```python
232+
def lambda_handler(event, context):
233+
spark = None
234+
235+
try:
236+
spark = create_iceberg_spark_session()
237+
238+
# Your processing logic here
239+
df = spark.read.format("iceberg").load("glue_catalog.db.table")
240+
241+
return {'statusCode': 200, 'body': 'Success'}
242+
243+
except Exception as e:
244+
print(f"Error: {e}")
245+
return {
246+
'statusCode': 500,
247+
'body': json.dumps({'error': str(e)})
248+
}
249+
finally:
250+
if spark:
251+
spark.stop()
252+
```
253+
254+
## 📈 Performance Tips
255+
256+
1. **Use appropriate filters** to reduce data volume
257+
2. **Set proper memory allocation** (3008 MB recommended)
258+
3. **Enable adaptive query execution**
259+
4. **Use columnar operations** when possible
260+
5. **Consider partitioning** for large tables
261+
262+
## 🔗 Integration Patterns
263+
264+
### Event-Driven Processing
265+
266+
```python
267+
# Triggered by S3 events
268+
def process_new_data(event, context):
269+
for record in event['Records']:
270+
bucket = record['s3']['bucket']['name']
271+
key = record['s3']['object']['key']
272+
273+
# Process new file and update Iceberg table
274+
process_file(f"s3a://{bucket}/{key}")
275+
```
276+
277+
### Scheduled Processing
278+
279+
```python
280+
# Triggered by CloudWatch Events
281+
def daily_aggregation(event, context):
282+
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
283+
284+
# Process yesterday's data
285+
df = spark.read.format("iceberg") \
286+
.load("glue_catalog.raw.events") \
287+
.filter(f"date = '{yesterday}'")
288+
289+
# Aggregate and save
290+
aggregated = df.groupBy("category").agg(sum("amount"))
291+
aggregated.write.format("iceberg").mode("append").save("glue_catalog.analytics.daily_summary")
292+
```
293+
294+
## 🛠️ Troubleshooting
295+
296+
### Common Issues
297+
298+
1. **"Table not found"**
299+
- Check database and table names
300+
- Verify Glue Catalog permissions
301+
302+
2. **"Access Denied"**
303+
- Check S3 permissions for table location
304+
- Verify IAM role has Glue access
305+
306+
3. **Memory errors**
307+
- Increase Lambda memory allocation
308+
- Add filters to reduce data volume
309+
310+
4. **Timeout errors**
311+
- Optimize queries with filters
312+
- Consider breaking into smaller chunks
313+
314+
### Debug Commands
315+
316+
```python
317+
# Check table exists
318+
metadata = get_iceberg_table_metadata("db", "table", "us-east-1")
319+
print(f"Table type: {metadata['Table']['Parameters'].get('table_type')}")
320+
321+
# Check table location
322+
location = get_iceberg_table_location(metadata)
323+
print(f"Location: {location}")
324+
325+
# Test S3 access
326+
s3_client = boto3.client('s3')
327+
response = s3_client.list_objects_v2(Bucket='bucket', Prefix='prefix', MaxKeys=1)
328+
print(f"S3 accessible: {response.get('KeyCount', 0) >= 0}")
329+
```
330+
331+
## 📚 Next Steps
332+
333+
1. **Start with simple examples** and gradually add complexity
334+
2. **Test with small datasets** before scaling up
335+
3. **Monitor CloudWatch logs** for debugging
336+
4. **Set up proper error handling** and retry logic
337+
5. **Consider cost optimization** with appropriate filtering

0 commit comments

Comments
 (0)