22import datetime
33import re
44import time
5- from concurrent .futures import ThreadPoolExecutor
65from io import StringIO
76
87import botocore
98import eventlet
109from boto3 import Session
1110from flask import current_app
1211
12+ from app import job_cache , job_cache_lock
1313from app .clients import AWS_CLIENT_CONFIG
1414from notifications_utils import aware_utcnow
1515
2525s3_resource = None
2626
2727
28+ def get_service_id_from_key (key ):
29+ key = key .replace ("service-" , "" )
30+ key = key .split ("/" )
31+ key = key [0 ].replace ("-notify" , "" )
32+ return key
33+
34+
2835def set_job_cache (key , value ):
29- current_app .logger .debug (f"Setting { key } in the job_cache." )
30- job_cache = current_app .config ["job_cache" ]
31- job_cache [key ] = (value , time .time () + 8 * 24 * 60 * 60 )
36+ # current_app.logger.debug(f"Setting {key} in the job_cache to {value}.")
37+
38+ with job_cache_lock :
39+ job_cache [key ] = (value , time .time () + 8 * 24 * 60 * 60 )
3240
3341
3442def get_job_cache (key ):
35- job_cache = current_app . config [ "job_cache" ]
43+
3644 ret = job_cache .get (key )
37- if ret is None :
38- current_app .logger .warning (f"Could not find { key } in the job_cache." )
39- else :
40- current_app .logger .debug (f"Got { key } from job_cache." )
4145 return ret
4246
4347
4448def len_job_cache ():
45- job_cache = current_app .config ["job_cache" ]
4649 ret = len (job_cache )
4750 current_app .logger .debug (f"Length of job_cache is { ret } " )
4851 return ret
4952
5053
5154def clean_cache ():
52- job_cache = current_app .config ["job_cache" ]
5355 current_time = time .time ()
5456 keys_to_delete = []
5557 for key , (_ , expiry_time ) in job_cache .items ():
@@ -59,8 +61,9 @@ def clean_cache():
5961 current_app .logger .debug (
6062 f"Deleting the following keys from the job_cache: { keys_to_delete } "
6163 )
62- for key in keys_to_delete :
63- del job_cache [key ]
64+ with job_cache_lock :
65+ for key in keys_to_delete :
66+ del job_cache [key ]
6467
6568
6669def get_s3_client ():
@@ -74,7 +77,7 @@ def get_s3_client():
7477 aws_secret_access_key = secret_key ,
7578 region_name = region ,
7679 )
77- s3_client = session .client ("s3" )
80+ s3_client = session .client ("s3" , config = AWS_CLIENT_CONFIG )
7881 return s3_client
7982
8083
@@ -185,23 +188,24 @@ def read_s3_file(bucket_name, object_key, s3res):
185188 """
186189 try :
187190 job_id = get_job_id_from_s3_object_key (object_key )
191+ service_id = get_service_id_from_key (object_key )
192+
188193 if get_job_cache (job_id ) is None :
189- object = (
194+ job = (
190195 s3res .Object (bucket_name , object_key )
191196 .get ()["Body" ]
192197 .read ()
193198 .decode ("utf-8" )
194199 )
195- set_job_cache (job_id , object )
196- set_job_cache (f"{ job_id } _phones" , extract_phones (object ))
200+ set_job_cache (job_id , job )
201+ set_job_cache (f"{ job_id } _phones" , extract_phones (job , service_id , job_id ))
197202 set_job_cache (
198203 f"{ job_id } _personalisation" ,
199- extract_personalisation (object ),
204+ extract_personalisation (job ),
200205 )
201206
202- except LookupError :
203- # perhaps our key is not formatted as we expected. If so skip it.
204- current_app .logger .exception ("LookupError #notify-debug-admin-1200" )
207+ except Exception as e :
208+ current_app .logger .exception (str (e ))
205209
206210
207211def get_s3_files ():
@@ -216,11 +220,21 @@ def get_s3_files():
216220 current_app .logger .info (
217221 f"job_cache length before regen: { len_job_cache ()} #notify-debug-admin-1200"
218222 )
223+ count = 0
219224 try :
220- with ThreadPoolExecutor () as executor :
221- executor .map (lambda key : read_s3_file (bucket_name , key , s3res ), object_keys )
225+ for object_key in object_keys :
226+ read_s3_file (bucket_name , object_key , s3res )
227+ count = count + 1
228+ eventlet .sleep (0.2 )
222229 except Exception :
223- current_app .logger .exception ("Connection pool issue" )
230+ current_app .logger .exception (
231+ f"Trouble reading { object_key } which is # { count } during cache regeneration"
232+ )
233+ except OSError as e :
234+ current_app .logger .exception (
235+ f"Egress proxy issue reading { object_key } which is # { count } "
236+ )
237+ raise e
224238
225239 current_app .logger .info (
226240 f"job_cache length after regen: { len_job_cache ()} #notify-debug-admin-1200"
@@ -290,9 +304,7 @@ def file_exists(file_location):
290304
291305
292306def get_job_location (service_id , job_id ):
293- current_app .logger .debug (
294- f"#notify-debug-s3-partitioning NEW JOB_LOCATION: { NEW_FILE_LOCATION_STRUCTURE .format (service_id , job_id )} "
295- )
307+
296308 return (
297309 current_app .config ["CSV_UPLOAD_BUCKET" ]["bucket" ],
298310 NEW_FILE_LOCATION_STRUCTURE .format (service_id , job_id ),
@@ -308,9 +320,7 @@ def get_old_job_location(service_id, job_id):
308320 but it will take a few days where we have to support both formats.
309321 Remove this when everything works with the NEW_FILE_LOCATION_STRUCTURE.
310322 """
311- current_app .logger .debug (
312- f"#notify-debug-s3-partitioning OLD JOB LOCATION: { FILE_LOCATION_STRUCTURE .format (service_id , job_id )} "
313- )
323+
314324 return (
315325 current_app .config ["CSV_UPLOAD_BUCKET" ]["bucket" ],
316326 FILE_LOCATION_STRUCTURE .format (service_id , job_id ),
@@ -449,7 +459,6 @@ def extract_personalisation(job):
449459def get_phone_number_from_s3 (service_id , job_id , job_row_number ):
450460 job = get_job_cache (job_id )
451461 if job is None :
452- current_app .logger .debug (f"job { job_id } was not in the cache" )
453462 job = get_job_from_s3 (service_id , job_id )
454463 # Even if it is None, put it here to avoid KeyErrors
455464 set_job_cache (job_id , job )
@@ -463,8 +472,16 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
463472 )
464473 return "Unavailable"
465474
466- phones = extract_phones (job , service_id , job_id )
467- set_job_cache (f"{ job_id } _phones" , phones )
475+ phones = get_job_cache (f"{ job_id } _phones" )
476+ if phones is None :
477+ current_app .logger .debug ("HAVE TO REEXTRACT PHONES!" )
478+ phones = extract_phones (job , service_id , job_id )
479+ set_job_cache (f"{ job_id } _phones" , phones )
480+ current_app .logger .debug (f"SETTING PHONES TO { phones } " )
481+ else :
482+ phones = phones [
483+ 0
484+ ] # we only want the phone numbers not the cache expiration time
468485
469486 # If we can find the quick dictionary, use it
470487 phone_to_return = phones [job_row_number ]
@@ -483,7 +500,6 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
483500 # So this is a little recycling mechanism to reduce the number of downloads.
484501 job = get_job_cache (job_id )
485502 if job is None :
486- current_app .logger .debug (f"job { job_id } was not in the cache" )
487503 job = get_job_from_s3 (service_id , job_id )
488504 # Even if it is None, put it here to avoid KeyErrors
489505 set_job_cache (job_id , job )
@@ -501,7 +517,9 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
501517 )
502518 return {}
503519
504- set_job_cache (f"{ job_id } _personalisation" , extract_personalisation (job ))
520+ personalisation = get_job_cache (f"{ job_id } _personalisation" )
521+ if personalisation is None :
522+ set_job_cache (f"{ job_id } _personalisation" , extract_personalisation (job ))
505523
506524 return get_job_cache (f"{ job_id } _personalisation" )[0 ].get (job_row_number )
507525
0 commit comments