99from boto3 import Session
1010from flask import current_app
1111
12+ from app import job_cache , job_cache_lock
1213from app .clients import AWS_CLIENT_CONFIG
1314from notifications_utils import aware_utcnow
1415
@@ -32,30 +33,25 @@ def get_service_id_from_key(key):
3233
3334
3435def set_job_cache (key , value ):
35- current_app .logger .debug (f"Setting { key } in the job_cache to { value } ." )
36- job_cache = current_app .config ["job_cache" ]
37- job_cache [key ] = (value , time .time () + 8 * 24 * 60 * 60 )
36+ # current_app.logger.debug(f"Setting {key} in the job_cache to {value}.")
37+
38+ with job_cache_lock :
39+ job_cache [key ] = (value , time .time () + 8 * 24 * 60 * 60 )
3840
3941
4042def get_job_cache (key ):
41- job_cache = current_app . config [ "job_cache" ]
43+
4244 ret = job_cache .get (key )
43- if ret is None :
44- current_app .logger .warning (f"Could not find { key } in the job_cache." )
45- else :
46- current_app .logger .debug (f"Got { key } from job_cache with value { ret } ." )
4745 return ret
4846
4947
5048def len_job_cache ():
51- job_cache = current_app .config ["job_cache" ]
5249 ret = len (job_cache )
5350 current_app .logger .debug (f"Length of job_cache is { ret } " )
5451 return ret
5552
5653
5754def clean_cache ():
58- job_cache = current_app .config ["job_cache" ]
5955 current_time = time .time ()
6056 keys_to_delete = []
6157 for key , (_ , expiry_time ) in job_cache .items ():
@@ -65,8 +61,9 @@ def clean_cache():
6561 current_app .logger .debug (
6662 f"Deleting the following keys from the job_cache: { keys_to_delete } "
6763 )
68- for key in keys_to_delete :
69- del job_cache [key ]
64+ with job_cache_lock :
65+ for key in keys_to_delete :
66+ del job_cache [key ]
7067
7168
7269def get_s3_client ():
@@ -80,7 +77,7 @@ def get_s3_client():
8077 aws_secret_access_key = secret_key ,
8178 region_name = region ,
8279 )
83- s3_client = session .client ("s3" )
80+ s3_client = session .client ("s3" , config = AWS_CLIENT_CONFIG )
8481 return s3_client
8582
8683
@@ -207,9 +204,8 @@ def read_s3_file(bucket_name, object_key, s3res):
207204 extract_personalisation (job ),
208205 )
209206
210- except LookupError :
211- # perhaps our key is not formatted as we expected. If so skip it.
212- current_app .logger .exception ("LookupError #notify-debug-admin-1200" )
207+ except Exception as e :
208+ current_app .logger .exception (str (e ))
213209
214210
215211def get_s3_files ():
@@ -224,11 +220,21 @@ def get_s3_files():
224220 current_app .logger .info (
225221 f"job_cache length before regen: { len_job_cache ()} #notify-debug-admin-1200"
226222 )
223+ count = 0
227224 try :
228225 for object_key in object_keys :
229226 read_s3_file (bucket_name , object_key , s3res )
227+ count = count + 1
228+ eventlet .sleep (0.2 )
230229 except Exception :
231- current_app .logger .exception ("Connection pool issue" )
230+ current_app .logger .exception (
231+ f"Trouble reading { object_key } which is # { count } during cache regeneration"
232+ )
233+ except OSError as e :
234+ current_app .logger .exception (
235+ f"Egress proxy issue reading { object_key } which is # { count } "
236+ )
237+ raise e
232238
233239 current_app .logger .info (
234240 f"job_cache length after regen: { len_job_cache ()} #notify-debug-admin-1200"
@@ -298,9 +304,7 @@ def file_exists(file_location):
298304
299305
300306def get_job_location (service_id , job_id ):
301- current_app .logger .debug (
302- f"#notify-debug-s3-partitioning NEW JOB_LOCATION: { NEW_FILE_LOCATION_STRUCTURE .format (service_id , job_id )} "
303- )
307+
304308 return (
305309 current_app .config ["CSV_UPLOAD_BUCKET" ]["bucket" ],
306310 NEW_FILE_LOCATION_STRUCTURE .format (service_id , job_id ),
@@ -316,9 +320,7 @@ def get_old_job_location(service_id, job_id):
316320 but it will take a few days where we have to support both formats.
317321 Remove this when everything works with the NEW_FILE_LOCATION_STRUCTURE.
318322 """
319- current_app .logger .debug (
320- f"#notify-debug-s3-partitioning OLD JOB LOCATION: { FILE_LOCATION_STRUCTURE .format (service_id , job_id )} "
321- )
323+
322324 return (
323325 current_app .config ["CSV_UPLOAD_BUCKET" ]["bucket" ],
324326 FILE_LOCATION_STRUCTURE .format (service_id , job_id ),
@@ -457,7 +459,6 @@ def extract_personalisation(job):
457459def get_phone_number_from_s3 (service_id , job_id , job_row_number ):
458460 job = get_job_cache (job_id )
459461 if job is None :
460- current_app .logger .debug (f"job { job_id } was not in the cache" )
461462 job = get_job_from_s3 (service_id , job_id )
462463 # Even if it is None, put it here to avoid KeyErrors
463464 set_job_cache (job_id , job )
@@ -471,8 +472,16 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
471472 )
472473 return "Unavailable"
473474
474- phones = extract_phones (job , service_id , job_id )
475- set_job_cache (f"{ job_id } _phones" , phones )
475+ phones = get_job_cache (f"{ job_id } _phones" )
476+ if phones is None :
477+ current_app .logger .debug ("HAVE TO REEXTRACT PHONES!" )
478+ phones = extract_phones (job , service_id , job_id )
479+ set_job_cache (f"{ job_id } _phones" , phones )
480+ current_app .logger .debug (f"SETTING PHONES TO { phones } " )
481+ else :
482+ phones = phones [
483+ 0
484+ ] # we only want the phone numbers not the cache expiration time
476485
477486 # If we can find the quick dictionary, use it
478487 phone_to_return = phones [job_row_number ]
@@ -491,7 +500,6 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
491500 # So this is a little recycling mechanism to reduce the number of downloads.
492501 job = get_job_cache (job_id )
493502 if job is None :
494- current_app .logger .debug (f"job { job_id } was not in the cache" )
495503 job = get_job_from_s3 (service_id , job_id )
496504 # Even if it is None, put it here to avoid KeyErrors
497505 set_job_cache (job_id , job )
@@ -509,7 +517,9 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
509517 )
510518 return {}
511519
512- set_job_cache (f"{ job_id } _personalisation" , extract_personalisation (job ))
520+ personalisation = get_job_cache (f"{ job_id } _personalisation" )
521+ if personalisation is None :
522+ set_job_cache (f"{ job_id } _personalisation" , extract_personalisation (job ))
513523
514524 return get_job_cache (f"{ job_id } _personalisation" )[0 ].get (job_row_number )
515525
0 commit comments