22from kafka import KafkaProducer
33import json
44import os
5+ import time
6+ import requests # 자동 구독 확인을 위해 requests 라이브러리 추가
57
68app = Flask (__name__ )
79
10+ # =========================
11+ # 환경 변수 및 초기화
12+ # =========================
13+
814KAFKA_BOOTSTRAP = os .getenv ("KAFKA_BOOTSTRAP" , "kafka:29092" )
915KAFKA_TOPIC = os .getenv ("KAFKA_TOPIC" , "s3-events" )
1016
11- producer = KafkaProducer (
12- bootstrap_servers = [KAFKA_BOOTSTRAP ],
13- value_serializer = lambda v : json .dumps (v ).encode ("utf-8" )
14- )
17+ # Kafka Producer 연결 재시도
18+ while True :
19+ try :
20+ producer = KafkaProducer (
21+ bootstrap_servers = [KAFKA_BOOTSTRAP ],
22+ value_serializer = lambda v : json .dumps (v ).encode ("utf-8" )
23+ )
24+ print ("Kafka Producer 연결 성공..." )
25+ break
26+ except Exception as e :
27+ print (f"Kafka Producer 연결 실패, 5초 후 재시도... 오류: { e } " )
28+ time .sleep (5 )
29+
1530
1631@app .route ("/sns" , methods = ["POST" ])
1732def sns_listener ():
33+ # force=True 옵션으로 Content-Type이 application/json이 아니어도 JSON으로 파싱 시도
1834 data = request .get_json (force = True )
1935
20- # SNS 인증 요청 (SubscriptionConfirmation)
36+ # =========================================================
37+ # 1. SNS 인증 요청 (SubscriptionConfirmation) - 자동 확인 로직
38+ # =========================================================
2139 if "Type" in data and data ["Type" ] == "SubscriptionConfirmation" :
22- print ("SNS SubscriptionConfirmation received" )
23- print (f"Confirm URL: { data ['SubscribeURL' ]} " )
24- return jsonify ({"message" : "Subscription confirmation received" }), 200
40+ print ("SNS SubscriptionConfirmation received. Initiating auto-confirmation." )
41+ subscribe_url = data .get ('SubscribeURL' )
42+
43+ if not subscribe_url :
44+ print ("❌ SubscribeURL이 메시지에 없습니다. 수동 확인이 필요할 수 있습니다." )
45+ return jsonify ({"message" : "Subscription confirmation received, but URL missing" }), 200
2546
26- # 실제 S3 이벤트 메시지
47+ try :
48+ # SubscribeURL로 GET 요청을 보내 구독을 자동으로 확인합니다.
49+ response = requests .get (subscribe_url )
50+ # 2xx 응답이 아니면 예외 발생
51+ response .raise_for_status ()
52+ print (f"✅ Subscription Confirmed successfully! URL: { subscribe_url } " )
53+ return jsonify ({"message" : "Subscription automatically confirmed" }), 200
54+ except requests .exceptions .RequestException as e :
55+ print (f"❌ ERROR confirming subscription: { e } " )
56+ # 자동 확인 실패하더라도 500 에러를 반환하여 SNS가 재시도할 수 있도록 합니다.
57+ return jsonify ({"message" : "Subscription confirmation failed" }), 500
58+
59+ # =========================================================
60+ # 2. 실제 S3 이벤트 메시지 (Notification)
61+ # =========================================================
2762 if "Type" in data and data ["Type" ] == "Notification" :
28- message = json .loads (data ["Message" ])
29- print (f"📦 Received S3 event: { json .dumps (message , indent = 2 )} " )
30- producer .send (KAFKA_TOPIC , message )
31- producer .flush ()
32- return jsonify ({"status" : "sent to kafka" }), 200
63+ try :
64+ # SNS 메시지 본문은 문자열 JSON이므로 다시 파싱해야 함
65+ message = json .loads (data ["Message" ])
66+ print (f"📦 Received S3 event: { json .dumps (message , indent = 2 )} " )
67+
68+ # Kafka로 S3 이벤트 메시지 전송
69+ producer .send (KAFKA_TOPIC , message )
70+ producer .flush ()
71+
72+ return jsonify ({"status" : "sent to kafka" }), 200
73+ except Exception as e :
74+ print (f"❌ ERROR processing notification: { e } " )
75+ return jsonify ({"status" : "processing failed" }), 500
3376
77+ # =========================================================
78+ # 3. 기타 메시지 타입 (UnsubscribeConfirmation 등)
79+ # =========================================================
80+ print (f"🤷 Ignored SNS message type: { data .get ('Type' )} " )
3481 return jsonify ({"status" : "ignored" }), 200
3582
3683
3784if __name__ == "__main__" :
3885 port = int (os .getenv ("SNS_PORT" , 5000 ))
39- app .run (host = "0.0.0.0" , port = port )
86+ # Flask 서버 실행
87+ app .run (host = "0.0.0.0" , port = port )
0 commit comments