|
| 1 | +from oracle.oracle_specific_functions import get_kit_id_logged_from_db |
| 2 | +from utils.oracle.oracle_specific_functions import get_kit_id_from_db |
| 3 | +from pages.base_page import BasePage |
| 4 | +from datetime import datetime |
| 5 | +import logging |
| 6 | +import pandas as pd |
| 7 | +import pytest |
| 8 | + |
| 9 | + |
| 10 | +class FitKitGeneration: |
| 11 | + """This class is responsible for generating FIT Device IDs from test kit data.""" |
| 12 | + |
| 13 | + def create_fit_id_df( |
| 14 | + self, |
| 15 | + tk_type_id: int, |
| 16 | + hub_id: int, |
| 17 | + no_of_kits_to_retrieve: int, |
| 18 | + ) -> pd.DataFrame: |
| 19 | + """ |
| 20 | + This function retrieves test kit data from the database for the specified compartment (using the 'get_kit_id_from_db' function from 'oracle_specific_functions.py'). |
| 21 | + It then calculates a check digit for each retrieved kit ID and appends it to the kit ID. |
| 22 | + Finally, it generates a FIT Device ID by appending an expiry date and a fixed suffix to the kit ID. |
| 23 | +
|
| 24 | + For example: |
| 25 | + Given the following inputs: |
| 26 | + tk_type_id = 1, hub_id = 101, no_of_kits_to_retrieve = 2 |
| 27 | + The function retrieves two kit IDs from the database, e.g., ["ABC123", "DEF456"]. |
| 28 | + It calculates the check digit for each kit ID, resulting in ["ABC123-K", "DEF456-M"]. |
| 29 | + Then, it generates the FIT Device IDs, e.g., ["ABC123-K122512345/KD00001", "DEF456-M122512345/KD00001"]. |
| 30 | +
|
| 31 | + Args: |
| 32 | + tk_type_id (int): The type ID of the test kit. |
| 33 | + hub_id (int): The ID of the hub from which to retrieve the kits. |
| 34 | + no_of_kits_to_retrieve (int): The number of kits to retrieve from the database. |
| 35 | +
|
| 36 | + Returns: |
| 37 | + pd.DataFrame: A DataFrame containing the processed kit IDs, including the calculated check digit |
| 38 | + and the final formatted FIT Device ID. |
| 39 | + """ |
| 40 | + df = get_kit_id_from_db(tk_type_id, hub_id, no_of_kits_to_retrieve) |
| 41 | + df["fit_device_id"] = df["kitid"].apply(self.calculate_check_digit) |
| 42 | + df["fit_device_id"] = df["fit_device_id"].apply( |
| 43 | + self.convert_kit_id_to_fit_device_id |
| 44 | + ) |
| 45 | + return df |
| 46 | + |
| 47 | + def calculate_check_digit(self, kit_id: str) -> str: |
| 48 | + """ |
| 49 | + Calculates the check digit for a given kit ID. |
| 50 | +
|
| 51 | + The check digit is determined by summing the positions of each character in the kit ID |
| 52 | + within a predefined character set. The remainder of the sum divided by 43 is used to |
| 53 | + find the corresponding character in the character set, which becomes the check digit. |
| 54 | +
|
| 55 | + For example: |
| 56 | + Given the kit ID "ABC123", the positions of the characters in the predefined |
| 57 | + character set are summed. If the total is 123, the remainder when divided by 43 |
| 58 | + is 37. The character at position 37 in the character set is "K". The resulting |
| 59 | + kit ID with the check digit appended would be "ABC123-K". |
| 60 | +
|
| 61 | + Args: |
| 62 | + kit_id (str): The kit ID to calculate the check digit for. |
| 63 | +
|
| 64 | + Returns: |
| 65 | + str: The kit ID with the calculated check digit appended. |
| 66 | + """ |
| 67 | + logging.info(f"Calculating check digit for kit id: {kit_id}") |
| 68 | + total = 0 |
| 69 | + char_string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ-. $/+%" |
| 70 | + for i in range(len(kit_id)): |
| 71 | + total += char_string.index(kit_id[i - 1]) |
| 72 | + check_digit = char_string[total % 43] |
| 73 | + return f"{kit_id}-{check_digit}" |
| 74 | + |
| 75 | + def convert_kit_id_to_fit_device_id(self, kit_id: str) -> str: |
| 76 | + """ |
| 77 | + Converts a Kit ID into a FIT Device ID by appending an expiry date and a fixed suffix. |
| 78 | +
|
| 79 | + The expiry date is calculated by setting the month to December and the year to one year |
| 80 | + in the future based on the current date. For example, if the current date is June 2024, |
| 81 | + the expiry date will be set to December 2025. |
| 82 | +
|
| 83 | + Args: |
| 84 | + kit_id (str): The Kit ID to be converted. |
| 85 | +
|
| 86 | + Returns: |
| 87 | + str: The generated FIT Device ID in the format "{kit_id}12{next_year}12345/KD00001". |
| 88 | + """ |
| 89 | + logging.info(f"Generating FIT Device ID from: {kit_id}") |
| 90 | + today = datetime.now() |
| 91 | + year = today.strftime("%y") # Get the year from todays date in YY format |
| 92 | + return f"{kit_id}12{int(year) + 1}12345/KD00001" |
| 93 | + |
| 94 | + |
| 95 | +class FitKitLogged: |
| 96 | + """This class is responsible for processing FIT Device IDs and logging them as normal or abnormal.""" |
| 97 | + |
| 98 | + def process_kit_data(self, smokescreen_properties: dict) -> list: |
| 99 | + """ |
| 100 | + This method retrieved the test data needed for compartment 3 and then splits it into two data frames: |
| 101 | + - 1 normal |
| 102 | + - 1 abnormal |
| 103 | + Once the dataframe is split in two it then creates two lists, one for normal and one for abnormal |
| 104 | + Each list will either have true or false appended depending on if it is normal or abnormal |
| 105 | + """ |
| 106 | + # Get test data for compartment 3 |
| 107 | + kit_id_df = get_kit_id_logged_from_db(smokescreen_properties) |
| 108 | + |
| 109 | + # Split dataframe into two different dataframes, normal and abnormal |
| 110 | + normal_fit_kit_df, abnormal_fit_kit_df = self.split_fit_kits( |
| 111 | + kit_id_df, smokescreen_properties |
| 112 | + ) |
| 113 | + |
| 114 | + # Prepare a list to store device IDs and their respective flags |
| 115 | + device_ids = [] |
| 116 | + |
| 117 | + # Process normal kits (only 1) |
| 118 | + if not normal_fit_kit_df.empty: |
| 119 | + device_id = normal_fit_kit_df["device_id"].iloc[0] |
| 120 | + logging.info( |
| 121 | + f"Processing normal kit with Device ID: {device_id}" |
| 122 | + ) # Logging normal device_id |
| 123 | + device_ids.append((device_id, True)) # Add to the list with normal flag |
| 124 | + else: |
| 125 | + pytest.fail("No normal kits found for processing.") |
| 126 | + |
| 127 | + # Process abnormal kits (multiple, loop through) |
| 128 | + if not abnormal_fit_kit_df.empty: |
| 129 | + for index, row in abnormal_fit_kit_df.iterrows(): |
| 130 | + device_id = row["device_id"] |
| 131 | + logging.info( |
| 132 | + f"Processing abnormal kit with Device ID: {device_id}" |
| 133 | + ) # Logging abnormal device_id |
| 134 | + device_ids.append( |
| 135 | + (device_id, False) |
| 136 | + ) # Add to the list with abnormal flag |
| 137 | + else: |
| 138 | + pytest.fail("No abnormal kits found for processing.") |
| 139 | + |
| 140 | + return device_ids |
| 141 | + |
| 142 | + def split_fit_kits( |
| 143 | + self, kit_id_df: pd.DataFrame, smokescreen_properties: dict |
| 144 | + ) -> pd.DataFrame: |
| 145 | + """ |
| 146 | + This method splits the dataframe into two, 1 normal and 1 abnormal |
| 147 | + """ |
| 148 | + number_of_normal = int( |
| 149 | + smokescreen_properties["c3_eng_number_of_normal_fit_kits"] |
| 150 | + ) |
| 151 | + number_of_abnormal = int( |
| 152 | + smokescreen_properties["c3_eng_number_of_abnormal_fit_kits"] |
| 153 | + ) |
| 154 | + |
| 155 | + # Split dataframe into two dataframes |
| 156 | + normal_fit_kit_df = kit_id_df.iloc[:number_of_normal] |
| 157 | + abnormal_fit_kit_df = kit_id_df.iloc[ |
| 158 | + number_of_normal : number_of_normal + number_of_abnormal |
| 159 | + ] |
| 160 | + return normal_fit_kit_df, abnormal_fit_kit_df |
0 commit comments