-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paths3_scanner.py
More file actions
99 lines (76 loc) · 3.16 KB
/
s3_scanner.py
File metadata and controls
99 lines (76 loc) · 3.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import boto3
import json
import os
import google.generativeai as genai
def lambda_handler(event, context):
"""Scan S3 buckets for encryption and use AI to explain risks"""
# Initialize S3 client
s3_client = boto3.client('s3')
print("Scanning S3 buckets for encryption...")
# Get all S3 buckets
response = s3_client.list_buckets()
buckets = response['Buckets']
print(f"Found {len(buckets)} buckets to scan\n")
# Store results
scan_results = []
for bucket in buckets:
bucket_name = bucket['Name']
# We'll add the encryption check logic next
# Check if bucket has encryption enabled
try:
encryption = s3_client.get_bucket_encryption(Bucket=bucket_name)
encrypted = True
encryption_type = encryption['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm']
except s3_client.exceptions.ServerSideEncryptionConfigurationNotFoundError:
encrypted = False
encryption_type = 'None'
# Store result
scan_results.append({
'bucket_name': bucket_name,
'encrypted': encrypted,
'encryption_type': encryption_type
})
status = "Encrypted" if encrypted else "Not Encrypted"
print(f"{status}: {bucket_name} ({encryption_type})")
# Count unencrypted buckets
unencrypted_count = len([r for r in scan_results if not r['encrypted']])
unencrypted_buckets = [r['bucket_name'] for r in scan_results if not r['encrypted']]
# Use AI to analyze security findings
print("\nAnalyzing security findings with Gemini AI...")
# Configure Gemini
api_key = os.environ.get('GOOGLE_API_KEY')
if not api_key:
ai_analysis = "AI analysis skipped: GOOGLE_API_KEY not configured"
else:
genai.configure(api_key=api_key)
model = genai.GenerativeModel("gemini-2.0-flash-exp")
prompt = f"""You are an AWS security expert. Analyze this S3 encryption scan and provide a brief security assessment.
Scan Results:
- Total Buckets: {len(buckets)}
- Encrypted: {len(buckets) - unencrypted_count}
- Unencrypted: {unencrypted_count}
- Unencrypted Bucket Names: {', '.join(unencrypted_buckets) if unencrypted_buckets else 'None'}
Provide a 2-3 sentence analysis:
1. What's the security risk of unencrypted buckets?
2. What encryption should be enabled? (AES256 or aws:kms)
3. What action should the user take immediately?
Be concise and actionable."""
try:
response = model.generate_content(prompt)
ai_analysis = response.text
except Exception as e:
ai_analysis = f"AI analysis failed: {str(e)}"
# Build final result
result = {
'total_buckets': len(buckets),
'unencrypted_buckets': unencrypted_count,
'encrypted_buckets': len(buckets) - unencrypted_count,
'scan_results': scan_results,
'ai_analysis': ai_analysis,
'alert': unencrypted_count > 0
}
print(f"\nScan complete: {unencrypted_count}/{len(buckets)} buckets need encryption")
return {
'statusCode': 200,
'body': json.dumps(result)
}