33import os
44from flask import Flask , jsonify
55from sns import Sns
6- from sys import stdout
76from botocore .exceptions import ClientError
7+ from access_control import AccessControl
8+ from logger import logger
89
910AWS_REGION = os .getenv ("AWS_REGION" )
1011QUEUE_URL = os .getenv ("QUEUE_URL" )
1112DEAD_LETTER_QUEUE_URL = os .getenv ("DEAD_LETTER_QUEUE_URL" )
1213SUB_DEAD_LETTER_QUEUE_URL = os .getenv ("SUB_DEAD_LETTER_QUEUE_URL" )
13- LONG_POLL_TIME = os .getenv ("LONG_POLL_TIME" )
14+ LONG_POLL_TIME = os .getenv ("LONG_POLL_TIME" , "10" )
1415SNS_NAME = os .getenv ("SNS_NAME" )
1516
1617def receive_message (sqs_client , queue_url ):
@@ -21,8 +22,7 @@ def receive_message(sqs_client, queue_url):
2122 WaitTimeSeconds = (int (LONG_POLL_TIME )))
2223
2324 if len (response .get ('Messages' , [])) > 0 :
24- print (f"Number of messages received: { len (response .get ('Messages' , []))} " )
25- stdout .flush ()
25+ logger .info (f"Number of messages received: { len (response .get ('Messages' , []))} " )
2626 return response
2727
2828def delete_message (sqs_client , queue_url , receipt_handle ):
@@ -33,29 +33,45 @@ def delete_messages(sqs_client, queue_url, messages):
3333 receipt_handle = message ['ReceiptHandle' ]
3434 delete_message (sqs_client = sqs_client , queue_url = queue_url , receipt_handle = receipt_handle )
3535
36- def process_messages (topic , messages ):
36+ def process_messages (sns_client , topic , messages , access_control ):
3737 for message in messages .get ("Messages" , []):
38+
39+ # Get the permission for the collection from access-control
40+ # response = access_control.get_permissions(subscriber-id, collection-concept-id)
41+ # Return is either None (Null or Nil) (if check on response is false) or
42+ # {"C1200484253-CMR_ONLY":["read","update","delete","order"]}
43+ #if response and if array contains read:
44+ # publish message.
45+ #else:
46+ # log subscriber-id no longer has read access to collection-concept-id
47+
3848 sns_client .publish_message (topic , message )
3949
4050def poll_queue (running ):
4151 """ Poll the SQS queue and process messages. """
52+
53+ sqs_client = boto3 .client ("sqs" , region_name = AWS_REGION )
54+ sns_resource = boto3 .resource ("sns" , region_name = AWS_REGION )
55+ sns_client = Sns (sns_resource )
56+ topic = sns_client .create_topic (SNS_NAME )
57+
58+ access_control = AccessControl ()
4259 while running .value :
4360 try :
4461 # Poll the SQS
4562 messages = receive_message (sqs_client = sqs_client , queue_url = QUEUE_URL )
4663
4764 if messages :
48- process_messages (topic = topic , messages = messages )
65+ process_messages (sns_client = sns_client , topic = topic , messages = messages , access_control = access_control )
4966 delete_messages (sqs_client = sqs_client , queue_url = QUEUE_URL , messages = messages )
5067
5168 dl_messages = receive_message (sqs_client = sqs_client , queue_url = DEAD_LETTER_QUEUE_URL )
5269 if dl_messages :
53- process_messages (topic = topic , messages = dl_messages )
70+ process_messages (sns_client = sns_client , topic = topic , messages = dl_messages , access_control = access_control )
5471 delete_messages (sqs_client = sqs_client , queue_url = DEAD_LETTER_QUEUE_URL , messages = dl_messages )
5572
5673 except Exception as e :
57- print (f"An error occurred receiving or deleting messages: { e } " )
58- stdout .flush ()
74+ logger .warning (f"An error occurred receiving or deleting messages: { e } " )
5975
6076app = Flask (__name__ )
6177@app .route ('/shutdown' , methods = ['POST' ])
@@ -66,16 +82,11 @@ def shutdown():
6682 running .value = False
6783 return jsonify ({'status' : 'shutting down' })
6884
69- sqs_client = boto3 .client ("sqs" , region_name = AWS_REGION )
70- sns_resource = boto3 .resource ("sns" )
71- sns_client = Sns (sns_resource )
72- topic = sns_client .create_topic (SNS_NAME )
73-
7485#Shared boolean value for process communication
7586running = multiprocessing .Value ('b' ,True )
7687
7788if __name__ == "__main__" :
78- print ( "Starting to poll the SQS queue..." )
89+ logger . info ( "The subscription worker is starting to poll the SQS queue..." )
7990 # Start the polling process
8091 poll_process = multiprocessing .Process (target = poll_queue , args = (running ,))
8192 poll_process .start ()
@@ -86,4 +97,4 @@ def shutdown():
8697
8798 # Wait for the polling process to finish before exiting
8899 poll_process .join ()
89- print ( "Exited polling loop." )
100+ logger . info ( "The subscription worker exited the polling loop." )
0 commit comments