From 2a1da1e2ac29b237c6927663dd209fd65cd2665a Mon Sep 17 00:00:00 2001 From: B-kash Date: Mon, 13 May 2024 22:25:33 +0200 Subject: [PATCH 1/2] =?UTF-8?q?=E2=9C=A8=20Add=20retry=20mechanism=20incas?= =?UTF-8?q?e=20of=20remaining=20students=20allocation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- school_center.py | 455 ++++++++++++++++++++++++++++++----------------- 1 file changed, 291 insertions(+), 164 deletions(-) diff --git a/school_center.py b/school_center.py index a03e859..8b34277 100644 --- a/school_center.py +++ b/school_center.py @@ -8,19 +8,23 @@ import math # Parameters -PREF_DISTANCE_THRESHOLD = 2 # Preferred threshold distance in km -ABS_DISTANCE_THRESHOLD = 7 # Absolute threshold distance in km -MIN_STUDENT_IN_CENTER = 10 # Min. no of students from a school to be assigned to a center in normal circumstances -STRETCH_CAPACITY_FACTOR = 0.02 # How much can center capacity be streched if need arises -PREF_CUTOFF = -4 # Do not allocate students with pref score less than cutoff -DEFAULT_OUTPUT_DIR = 'results' # Default directory to create output files if --output not provided -DEFAULT_OUTPUT_FILENAME = 'school-center.tsv' +PREF_DISTANCE_THRESHOLD = 2 # Preferred threshold distance in km +ABS_DISTANCE_THRESHOLD = 7 # Absolute threshold distance in km +MIN_STUDENT_IN_CENTER = 10 # Min. no of students from a school to be assigned to a center in normal circumstances +STRETCH_CAPACITY_FACTOR = ( + 0.02 # How much can center capacity be streched if need arises +) +PREF_CUTOFF = -4 # Do not allocate students with pref score less than cutoff +DEFAULT_OUTPUT_DIR = ( + "results" # Default directory to create output files if --output not provided +) +DEFAULT_OUTPUT_FILENAME = "school-center.tsv" +MAX_RETRIES = 10 # Maximum number of retries to allocate students configure_logging() logger = logging.getLogger(__name__) - def haversine_distance(lat1, lon1, lat2, lon2): """ Calculate the great circle distance between two points @@ -30,17 +34,25 @@ def haversine_distance(lat1, lon1, lat2, lon2): # Convert decimal degrees to radians lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2]) - # Haversine formula + # Haversine formula dlon = lon2 - lon1 dlat = lat2 - lat1 - a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2 - c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) - radius_earth = 6371 # Average Radius of Earth in km + a = ( + math.sin(dlat / 2) ** 2 + + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2 + ) + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + radius_earth = 6371 # Average Radius of Earth in km distance = radius_earth * c return distance -def centers_within_distance(school: Dict[str, str], centers: Dict[str, str], distance_threshold: float, relax_threshold: bool) -> List[Dict[str, any]]: +def centers_within_distance( + school: Dict[str, str], + centers: Dict[str, str], + distance_threshold: float, + relax_threshold: bool, +) -> List[Dict[str, any]]: """ Return List of centers that are within given distance from school. relax_threshold: If there are no centers within given distance return one that is closest @@ -48,22 +60,28 @@ def centers_within_distance(school: Dict[str, str], centers: Dict[str, str], dis {'cscode', 'name', 'address', 'capacity', 'lat', 'long', 'distance_km'} """ + def center_to_dict(c, distance): - return {'cscode': c['cscode'], - 'name': c['name'], - 'address': c['address'], - 'capacity': c['capacity'], - 'lat': c['lat'], - 'long': c['long'], - 'distance_km': distance} + return { + "cscode": c["cscode"], + "name": c["name"], + "address": c["address"], + "capacity": c["capacity"], + "lat": c["lat"], + "long": c["long"], + "distance_km": distance, + } def sort_key(c): # intent: sort by preference score DESC then by distance_km ASC # leaky abstraction - sorted requires a single numeric value for each element - return c['distance_km'] * random.uniform(1, 5) - get_pref(school['scode'], c['cscode']) * 100 - - school_lat = school.get('lat') - school_long = school.get('long') + return ( + c["distance_km"] * random.uniform(1, 5) + - get_pref(school["scode"], c["cscode"]) * 100 + ) + + school_lat = school.get("lat") + school_long = school.get("long") if len(school_lat) == 0 or len(school_long) == 0: return [] @@ -71,25 +89,36 @@ def sort_key(c): # nearest_distance = None # nearest_center = None for c in centers: - if school['scode'] == c['cscode'] \ - or is_allocated(c['cscode'], s['scode']) \ - or get_pref(school['scode'], c['cscode']) <= PREF_CUTOFF: + if ( + school["scode"] == c["cscode"] + or is_allocated(c["cscode"], s["scode"]) + or get_pref(school["scode"], c["cscode"]) <= PREF_CUTOFF + ): continue - distance = haversine_distance(float(school_lat), float( - school_long), float(c.get('lat')), float(c.get('long'))) + distance = haversine_distance( + float(school_lat), + float(school_long), + float(c.get("lat")), + float(c.get("long")), + ) # if nearest_center is None or distance < nearest_distance: # nearest_center = c # nearest_distance = distance qualifying_centers.append(center_to_dict(c, distance)) - within_distance = [ c for c in qualifying_centers if c['distance_km'] <= distance_threshold ] + within_distance = [ + c for c in qualifying_centers if c["distance_km"] <= distance_threshold + ] if len(within_distance) > 0: - return sorted(within_distance, key=sort_key) - elif relax_threshold: # if there are no centers within given threshold, return one that is closest - return sorted(qualifying_centers, key=sort_key) - else: + return sorted(within_distance, key=sort_key) + elif ( + relax_threshold + ): # if there are no centers within given threshold, return one that is closest + return sorted(qualifying_centers, key=sort_key) + else: return [] + def read_tsv(file_path: str) -> List[Dict[str, str]]: """ Function to read the tsv file for school.tsv and centers.tsv @@ -97,8 +126,8 @@ def read_tsv(file_path: str) -> List[Dict[str, str]]: """ data = [] try: - with open(file_path, 'r', newline='', encoding='utf-8') as file: - reader = csv.DictReader(file, delimiter='\t') + with open(file_path, "r", newline="", encoding="utf-8") as file: + reader = csv.DictReader(file, delimiter="\t") for row in reader: data.append(dict(row)) except FileNotFoundError as e: @@ -111,7 +140,9 @@ def read_tsv(file_path: str) -> List[Dict[str, str]]: logger.error(f"Error opening or reading file: '{file_path}' : {e}") sys.exit(1) except Exception as e: - logger.error(f"An unexpected error occurred while reading file '{file_path}' : {e}") + logger.error( + f"An unexpected error occurred while reading file '{file_path}' : {e}" + ) sys.exit(1) return data @@ -123,16 +154,16 @@ def read_prefs(file_path: str) -> Dict[str, Dict[str, int]]: """ prefs = {} try: - with open(file_path, 'r', newline='', encoding='utf-8') as file: - reader = csv.DictReader(file, delimiter='\t') + with open(file_path, "r", newline="", encoding="utf-8") as file: + reader = csv.DictReader(file, delimiter="\t") for row in reader: - if prefs.get(row['scode']): - if prefs[row['scode']].get(row['cscode']): - prefs[row['scode']][row['cscode']] += int(row['pref']) + if prefs.get(row["scode"]): + if prefs[row["scode"]].get(row["cscode"]): + prefs[row["scode"]][row["cscode"]] += int(row["pref"]) else: - prefs[row['scode']][row['cscode']] = int(row['pref']) + prefs[row["scode"]][row["cscode"]] = int(row["pref"]) else: - prefs[row['scode']] = {row['cscode']: int(row['pref'])} + prefs[row["scode"]] = {row["cscode"]: int(row["pref"])} except FileNotFoundError as e: logger.error(f"File '{file_path} :{e}' not found.") sys.exit(1) @@ -143,7 +174,9 @@ def read_prefs(file_path: str) -> Dict[str, Dict[str, int]]: logger.error(f"Error opening or reading file: {file_path} :{e}") sys.exit(1) except Exception as e: - logger.error(f"An unexpected error occurred while reading file '{file_path}': {e}") + logger.error( + f"An unexpected error occurred while reading file '{file_path}': {e}" + ) sys.exit(1) return prefs @@ -170,14 +203,14 @@ def calc_per_center(count: int) -> int: return 100 # elif count <= 900: # return 200 - else: + else: return 200 def school_sort_key(s): # intent: allocate students from schools with large students count first # to avoid excessive fragmentation - return (-1 if int(s['count']) > 500 else 1) * random.uniform(1, 100) + return (-1 if int(s["count"]) > 500 else 1) * random.uniform(1, 100) def allocate(scode: str, cscode: str, count: int): @@ -199,141 +232,235 @@ def is_allocated(scode1: str, scode2: str) -> bool: return allocations.get(scode1, {}).get(scode2) is not None -parser = argparse.ArgumentParser( - prog='center randomizer', - description='Assigns centers to exam centers to students') -parser.add_argument('schools_tsv', default='schools.tsv', - help="Tab separated (TSV) file containing school details") -parser.add_argument('centers_tsv', default='centers.tsv', - help="Tab separated (TSV) file containing center details") -parser.add_argument('prefs_tsv', default='prefs.tsv', - help="Tab separated (TSV) file containing preference scores") -parser.add_argument('-o', '--output', default = DEFAULT_OUTPUT_FILENAME, - help='Output file') -parser.add_argument('-s', '--seed', action='store', metavar='SEEDVALUE', - default=None, type=float, - help='Initialization seed for Random Number Generator') - +def get_parser(): + parser = argparse.ArgumentParser( + prog="center randomizer", + description="Assigns centers to exam centers to students", + ) + parser.add_argument( + "schools_tsv", + default="schools.tsv", + help="Tab separated (TSV) file containing school details", + ) + parser.add_argument( + "centers_tsv", + default="centers.tsv", + help="Tab separated (TSV) file containing center details", + ) + parser.add_argument( + "prefs_tsv", + default="prefs.tsv", + help="Tab separated (TSV) file containing preference scores", + ) + parser.add_argument( + "-o", "--output", default=DEFAULT_OUTPUT_FILENAME, help="Output file" + ) + parser.add_argument( + "-s", + "--seed", + action="store", + metavar="SEEDVALUE", + default=None, + type=float, + help="Initialization seed for Random Number Generator", + ) + parser.add_argument( + "-r", + "--max-retries", + action="store", + default=MAX_RETRIES, + help="Maximum number of retries to allocate students", + ) + return parser + + +parser = get_parser() args = parser.parse_args() - -random = random.Random(args.seed) #overwrites the random module to use seeded rng - -schools = sorted(read_tsv(args.schools_tsv), key= school_sort_key) -centers = read_tsv(args.centers_tsv) -centers_remaining_cap = {c['cscode']: int(c['capacity']) for c in centers} +random = random.Random(args.seed) # overwrites the random module to use seeded rng prefs = read_prefs(args.prefs_tsv) - -remaining = 0 # stores count of non allocated students -allocations = {} # to track mutual allocations +schools = sorted(read_tsv(args.schools_tsv), key=school_sort_key) +centers = read_tsv(args.centers_tsv) def get_output_dir(): dirname = path.dirname(args.output) - if(dirname): + if dirname: return dirname else: return DEFAULT_OUTPUT_DIR + def get_output_filename(): basename = path.basename(args.output) - if(basename): + if basename: return basename else: return DEFAULT_OUTPUT_FILENAME +def get_max_retries(): + if args.max_retries: + try: + max_retries = int(args.max_retries) + return max_retries + except ValueError: + return MAX_RETRIES + return args.max_retries + return MAX_RETRIES + + output_dirname = get_output_dir() output_filename = get_output_filename() -makedirs(output_dirname, exist_ok=True) # Create the output directory if not exists - -with open(path.join(output_dirname, "school-center-distance.tsv"), 'w', encoding='utf-8') as intermediate_file, \ -open(path.join(output_dirname, output_filename), 'w', encoding='utf-8') as a_file: - writer = csv.writer(intermediate_file, delimiter="\t") - writer.writerow(["scode", - "s_count", - "school_name", - "school_lat", - "school_long", - "cscode", - "center_name", - "center_address", - "center_capacity", - "distance_km"]) - - allocation_file = csv.writer(a_file, delimiter='\t') - allocation_file.writerow(["scode", - "school", - "cscode", - "center", - "center_address", - "center_lat", - "center_long", - "allocation", - "distance_km"]) - - for s in schools: - centers_for_school = centers_within_distance( - s, centers, PREF_DISTANCE_THRESHOLD, False) - to_allot = int(s['count']) - per_center = calc_per_center(to_allot) - - allocated_centers = {} - - # per_center = math.ceil(to_allot / min(calc_num_centers(to_allot), len(centers_for_school))) - for c in centers_for_school: - writer.writerow([s['scode'], - s['count'], - s['name-address'], - s['lat'], - s['long'], - c['cscode'], - c['name'], - c['address'], - c['capacity'], - c['distance_km']]) - next_allot = min(to_allot, per_center, max( - centers_remaining_cap[c['cscode']], MIN_STUDENT_IN_CENTER)) - if to_allot > 0 and next_allot > 0 and centers_remaining_cap[c['cscode']] >= next_allot: - allocated_centers[c['cscode']] = c - allocate(s['scode'], c['cscode'], next_allot) - # allocation.writerow([s['scode'], s['name-address'], c['cscode'], c['name'], c['address'], next_allot, c['distance_km']]) - to_allot -= next_allot - centers_remaining_cap[c['cscode']] -= next_allot - - if to_allot > 0: # try again with relaxed constraints and more capacity at centers - expanded_centers = centers_within_distance( - s, centers, ABS_DISTANCE_THRESHOLD, True) - for c in expanded_centers: - stretched_capacity = math.floor( - int(c['capacity']) * STRETCH_CAPACITY_FACTOR + centers_remaining_cap[c['cscode']]) - next_allot = min(to_allot, max( - stretched_capacity, MIN_STUDENT_IN_CENTER)) - if to_allot > 0 and next_allot > 0 and stretched_capacity >= next_allot: - allocated_centers[c['cscode']] = c - allocate(s['scode'], c['cscode'], next_allot) +makedirs(output_dirname, exist_ok=True) # Create the output directory if not exists + +# Try allocating center max of MAX_RETRIES times and break out if all students are allocated +for retries in range(get_max_retries()): + centers_remaining_cap = {c["cscode"]: int(c["capacity"]) for c in centers} + remaining = 0 # stores count of non allocated students + allocations = {} # to track mutual allocations + with ( + open( + path.join(output_dirname, "school-center-distance.tsv"), + "w", + encoding="utf-8", + ) as intermediate_file, + open( + path.join(output_dirname, output_filename), "w", encoding="utf-8" + ) as a_file, + ): + writer = csv.writer(intermediate_file, delimiter="\t") + writer.writerow( + [ + "scode", + "s_count", + "school_name", + "school_lat", + "school_long", + "cscode", + "center_name", + "center_address", + "center_capacity", + "distance_km", + ] + ) + + allocation_file = csv.writer(a_file, delimiter="\t") + allocation_file.writerow( + [ + "scode", + "school", + "cscode", + "center", + "center_address", + "center_lat", + "center_long", + "allocation", + "distance_km", + ] + ) + + for s in schools: + centers_for_school = centers_within_distance( + s, centers, PREF_DISTANCE_THRESHOLD, False + ) + to_allot = int(s["count"]) + per_center = calc_per_center(to_allot) + + allocated_centers = {} + + # per_center = math.ceil(to_allot / min(calc_num_centers(to_allot), len(centers_for_school))) + for c in centers_for_school: + writer.writerow( + [ + s["scode"], + s["count"], + s["name-address"], + s["lat"], + s["long"], + c["cscode"], + c["name"], + c["address"], + c["capacity"], + c["distance_km"], + ] + ) + next_allot = min( + to_allot, + per_center, + max(centers_remaining_cap[c["cscode"]], MIN_STUDENT_IN_CENTER), + ) + if ( + to_allot > 0 + and next_allot > 0 + and centers_remaining_cap[c["cscode"]] >= next_allot + ): + allocated_centers[c["cscode"]] = c + allocate(s["scode"], c["cscode"], next_allot) # allocation.writerow([s['scode'], s['name-address'], c['cscode'], c['name'], c['address'], next_allot, c['distance_km']]) to_allot -= next_allot - centers_remaining_cap[c['cscode']] -= next_allot - - for c in allocated_centers.values(): - allocation_file.writerow([s['scode'], - s['name-address'], - c['cscode'], - c['name'], - c['address'], - c['lat'], - c['long'], - allocations[s['scode']][c['cscode']], - c['distance_km']]) - - if to_allot > 0: - remaining += to_allot - logger.warning( - f"{to_allot}/{s['count']} left for {s['scode']} {s['name-address']} centers: {len(centers_for_school)}") - - logger.info("Remaining capacity at each center (remaining_capacity cscode):") - logger.info(sorted([(v, k) - for k, v in centers_remaining_cap.items() if v != 0])) - logger.info( - f"Total remaining capacity across all centers: {sum({k:v for k, v in centers_remaining_cap.items() if v != 0}.values())}") - logger.info(f"Students not assigned: {remaining}") + centers_remaining_cap[c["cscode"]] -= next_allot + + if ( + to_allot > 0 + ): # try again with relaxed constraints and more capacity at centers + expanded_centers = centers_within_distance( + s, centers, ABS_DISTANCE_THRESHOLD, True + ) + for c in expanded_centers: + stretched_capacity = math.floor( + int(c["capacity"]) * STRETCH_CAPACITY_FACTOR + + centers_remaining_cap[c["cscode"]] + ) + next_allot = min( + to_allot, max(stretched_capacity, MIN_STUDENT_IN_CENTER) + ) + if ( + to_allot > 0 + and next_allot > 0 + and stretched_capacity >= next_allot + ): + allocated_centers[c["cscode"]] = c + allocate(s["scode"], c["cscode"], next_allot) + # allocation.writerow([s['scode'], s['name-address'], c['cscode'], c['name'], c['address'], next_allot, c['distance_km']]) + to_allot -= next_allot + centers_remaining_cap[c["cscode"]] -= next_allot + + for c in allocated_centers.values(): + allocation_file.writerow( + [ + s["scode"], + s["name-address"], + c["cscode"], + c["name"], + c["address"], + c["lat"], + c["long"], + allocations[s["scode"]][c["cscode"]], + c["distance_km"], + ] + ) + + if to_allot > 0: + remaining += to_allot + logger.warning( + f"{to_allot}/{s['count']} left for {s['scode']} {s['name-address']} centers: {len(centers_for_school)}" + ) + logger.warning( + f"Retry {retries}/{get_max_retries()}, remaining students: {remaining}" + ) + if remaining <= 0: + break + + +if remaining > 0: + logger.error( + f"Failed to allocate {remaining} students after {get_max_retries()} retries. Exiting." + ) + sys.exit(1) + +logger.info("Remaining capacity at each center (remaining_capacity cscode):") +logger.info(sorted([(v, k) for k, v in centers_remaining_cap.items() if v != 0])) +logger.info( + f"Total remaining capacity across all centers: {sum({k:v for k, v in centers_remaining_cap.items() if v != 0}.values())}" +) +logger.info(f"Students not assigned: {remaining}") From 72da7a167578ae0db0a9f5055d43037336355676 Mon Sep 17 00:00:00 2001 From: B-kash Date: Mon, 13 May 2024 22:37:57 +0200 Subject: [PATCH 2/2] :fire: remove useless return --- school_center.py | 1 - 1 file changed, 1 deletion(-) diff --git a/school_center.py b/school_center.py index 8b34277..4f51e8e 100644 --- a/school_center.py +++ b/school_center.py @@ -305,7 +305,6 @@ def get_max_retries(): return max_retries except ValueError: return MAX_RETRIES - return args.max_retries return MAX_RETRIES