diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..493caeb --- /dev/null +++ b/.gitmodules @@ -0,0 +1,6 @@ +[submodule "3rdparty/_kiopcgenerator"] + path = 3rdparty/_kiopcgenerator + url = https://github.com/iotconnectivity/kiopcgenerator.git +[submodule "3rdparty/kiopcgenerator"] + path = 3rdparty/kiopcgenerator + url = https://github.com/iotconnectivity/kiopcgenerator.git diff --git a/gsm_data_generator/executor/script_gpt.py b/gsm_data_generator/executor/script_gpt.py new file mode 100644 index 0000000..e106f81 --- /dev/null +++ b/gsm_data_generator/executor/script_gpt.py @@ -0,0 +1,287 @@ +import pandas as pd + + +from ..algorithm import CryptoUtils, DependentDataGenerator +from ..processor import DataProcessing, DataFrameProcessor +from ..globals import DataFrames, Parameters +from ..generator import DataGenerator +from ..utils import copy_function, list_2_dict, DEFAULT_HEADER + + +class DataGenerationScript: + + def __init__(self, config_holder): + self.config_holder = config_holder + self.params = Parameters.get_instance() + self.dataframes = DataFrames.get_instance() + self.crypto_utils = CryptoUtils() + self.data_generator = DataGenerator() + self.data_processor = DataProcessing() + self.df_processor = DataFrameProcessor() + self.dep_data_generator = DependentDataGenerator() + + def json_to_global_params(self): + self.params.set_SERVER_SEP(self.config_holder.DISP.server_data_sep) + self.params.set_ELECT_SEP(self.config_holder.DISP.elect_data_sep) + self.params.set_GRAPH_SEP(self.config_holder.DISP.graph_data_sep) + self.params.set_K4(self.config_holder.DISP.K4) + self.params.set_OP(self.config_holder.DISP.op) + self.params.set_IMSI(self.config_holder.DISP.imsi) + self.params.set_ICCID(self.config_holder.DISP.iccid) + self.params.set_PIN1(self.config_holder.DISP.pin1) + self.params.set_PUK1(self.config_holder.DISP.puk1) + self.params.set_PIN2(self.config_holder.DISP.pin2) + self.params.set_PUK2(self.config_holder.DISP.puk2) + self.params.set_ADM1(self.config_holder.DISP.adm1) + self.params.set_ADM6(self.config_holder.DISP.adm6) + self.params.set_DATA_SIZE(self.config_holder.DISP.size) + + self.params.set_PRODUCTION_CHECK(False) + + self.params.set_ELECT_CHECK(self.config_holder.DISP.elect_check) + self.params.set_GRAPH_CHECK(self.config_holder.DISP.graph_check) + self.params.set_SERVER_CHECK(self.config_holder.DISP.server_check) + + self.params.set_ELECT_DICT( + list_2_dict(self.config_holder.PARAMETERS.data_variables) + ) + self.params.set_GRAPH_DICT(self.config_holder.PARAMETERS.laser_variables) + self.params.set_SERVER_DICT( + list_2_dict(self.config_holder.PARAMETERS.server_variables) + ) + + self.params.set_PIN1_RAND(self.config_holder.DISP.pin1_fix) + self.params.set_PUK1_RAND(self.config_holder.DISP.puk1_fix) + self.params.set_PIN2_RAND(self.config_holder.DISP.pin2_fix) + self.params.set_PUK2_RAND(self.config_holder.DISP.puk2_fix) + self.params.set_ADM1_RAND(self.config_holder.DISP.adm1_fix) + self.params.set_ADM6_RAND(self.config_holder.DISP.adm6_fix) + + def generate_eki(self, ki): + return self.dep_data_generator.calculate_eki(self.params.get_K4(), ki) + + def generate_opc(self, ki): + return self.dep_data_generator.calculate_opc(self.params.get_OP(), ki) + + def generate_pin(self, pin_type): + if getattr(self.params, f"get_{pin_type}_RAND")(): + return getattr(self.params, f"get_{pin_type}")() + return self.data_generator.generate_4_digit() + + def generate_puk(self, puk_type): + if getattr(self.params, f"get_{puk_type}_RAND")(): + return getattr(self.params, f"get_{puk_type}")() + return self.data_generator.generate_8_digit() + + def generate_adm(self, adm_type): + if getattr(self.params, f"get_{adm_type}_RAND")(): + return getattr(self.params, f"get_{adm_type}")() + return self.data_generator.generate_8_digit() + + def apply_function(self, df, dest: str, src: str, function): + if dest in df.columns: + df[dest] = df[src].apply(function).copy(deep=False) + + def apply_functions(self, df): + df["ICCID"] = df["ICCID"].apply(lambda x: copy_function(x)) + df["IMSI"] = df["IMSI"].apply(lambda x: copy_function(x)) + df["PIN1"] = df["PIN1"].apply(lambda x: self.generate_pin("PIN1")) + df["PIN2"] = df["PIN2"].apply(lambda x: self.generate_pin("PIN2")) + df["PUK1"] = df["PUK1"].apply(lambda x: self.generate_puk("PUK1")) + df["PUK2"] = df["PUK2"].apply(lambda x: self.generate_puk("PUK2")) + + df["ADM1"] = df["ADM1"].apply(lambda x: self.generate_adm("ADM1")) + df["ADM6"] = df["ADM6"].apply(lambda x: self.generate_adm("ADM6")) + + df["KI"] = df["KI"].apply(lambda x: self.data_generator.generate_ki()) + df["ACC"] = df["IMSI"].apply( + lambda imsi: self.dep_data_generator.calculate_acc(imsi=str(imsi)) + ) + + self.apply_function(df, "EKI", "KI", self.generate_eki) + self.apply_function(df, "OPC", "KI", self.generate_opc) + + for i in range(1, 4): + for key in ["KIC", "KID", "KIK"]: + col = f"{key}{i}" + if col in df.columns: + df[col] = df["KI"].apply( + lambda x: self.data_generator.generate_otas() + ) + return df + + def generate_demo_data(self): + df = self.df_processor.generate_empty_dataframe( + DEFAULT_HEADER, self.params.get_DATA_SIZE() # type: ignore + ) + self.df_processor.initialize_column(df, "ICCID", self.params.get_ICCID()) + self.df_processor.initialize_column(df, "IMSI", self.params.get_IMSI()) + self.df_processor.initialize_column( + df, "OP", self.params.get_OP(), increment=False + ) + self.df_processor.initialize_column( + df, "K4", self.params.get_K4(), increment=False + ) + return self.apply_functions(df) + + def generate_non_demo_data(self): + input_df = self.dataframes.get_input_df() + df = self.df_processor.generate_empty_dataframe(DEFAULT_HEADER, len(input_df)) # type: ignore + self.df_processor.initialize_column( + df, "OP", self.params.get_OP(), increment=False + ) + self.df_processor.initialize_column( + df, "K4", self.params.get_K4(), increment=False + ) + df["ICCID"] = input_df["ICCID"] + df["IMSI"] = input_df["IMSI"] + return self.apply_functions(df) + + def generate_initial_data(self, is_demo: bool): + try: + if is_demo: + demo_data = self.generate_demo_data() + + k4 = self.params.get_K4() + op = self.params.get_OP() + + # Validate required parameters + if not k4 or not isinstance(k4, str): + raise ValueError( + "Invalid value for K4: must be a non-empty string." + ) + if not op or not isinstance(op, str): + raise ValueError( + "Invalid value for OP: must be a non-empty string." + ) + + return demo_data, {"k4": k4, "op": op} + + else: + raise NotImplementedError( + "Non-demo data generation is not yet implemented." + ) + + except Exception as e: + raise RuntimeError(f"Error in generate_initial_data: {e}") from e + + # def generate_initial_data(self, is_demo: bool): + # if is_demo: + # return self.generate_demo_data(), { + # "k4": self.params.get_K4(), + # "op": self.params.get_OP(), + # } + # return self.generate_non_demo_data(), { + # "k4": self.params.get_K4(), + # "op": self.params.get_OP(), + # } + + def process_final_data( + self, input_dict: dict, df_input: pd.DataFrame, clip: bool, encoding: bool + ): + # print("In process_final_data()", clip, encoding) + df = df_input.copy(deep=True) + if encoding: + df = self.df_processor.encode_dataframe(df) + + headers, _, _, _, left_ranges, right_ranges = ( + self.data_processor.extract_parameter_info(input_dict) + ) + df = self.df_processor.add_duplicate_columns(df, 10, headers) + + if clip: + df = self.df_processor.clip_columns(df, left_ranges, right_ranges) + print(df) + return df + + # def generate_all_data(self): + # # initial_df, keys_dict = self.generate_initial_data( + # # self.params.get_PRODUCTION_CHECK() + # # ) + # initial_df, keys_dict = self.generate_initial_data(True) + # print(initial_df.head()) + # # data_types = {bool, dict, bool, bool} + # data_types = {} + # data_types = { + # "SERVER": ( + # self.params.get_SERVER_CHECK(), + # self.params.get_SERVER_DICT(), + # False, + # False, + # ), + # "GRAPH": ( + # self.params.get_GRAPH_CHECK(), + # self.params.get_GRAPH_DICT(), + # True, + # False, + # ), + # "ELECT": ( + # self.params.get_ELECT_CHECK(), + # self.params.get_ELECT_DICT(), + # False, + # True, + # ), + # } + # result_dfs = {} + # for data_type, (check, dict_func, clip, encoding) in data_types.items(): + # print(data_type, (check, dict_func, clip, encoding)) + # if check: + # result_dfs[data_type] = self.process_final_data( + # dict_func, initial_df, clip, encoding + # ) + # return result_dfs, keys_dict + + def generate_all_data(self): + # initial_df, keys_dict = self.generate_initial_data( + # self.params.get_PRODUCTION_CHECK() + # ) + initial_df, keys_dict = self.generate_initial_data(True) + print(initial_df.head()) + + data_types = { + "SERVER": ( + self.params.get_SERVER_CHECK(), + self.params.get_SERVER_DICT(), + False, + False, + ), + "GRAPH": ( + self.params.get_GRAPH_CHECK(), + self.params.get_GRAPH_DICT(), + True, + False, + ), + "ELECT": ( + self.params.get_ELECT_CHECK(), + self.params.get_ELECT_DICT(), + False, + True, + ), + } + + result_dfs = {} + + for data_type, (check, dict_func, clip, encoding) in data_types.items(): + print(f"Checking {data_type}: {check}, {dict_func}") + + if check: + # 🔹 Raise if dict is missing or invalid + if not dict_func or not isinstance(dict_func, dict): + raise ValueError( + f"{data_type} is enabled (check=True) " + f"but the dictionary is missing or invalid." + ) + + try: + result_dfs[data_type] = self.process_final_data( + dict_func, initial_df, clip, encoding + ) + except Exception as e: + raise RuntimeError( + f"Failed processing {data_type} data: {str(e)}" + ) from e + + return result_dfs, keys_dict + + +__all__ = ["DataGenerationScript"] diff --git a/tests/python/algorithm/test_encrypt.py b/tests/python/algorithm/test_encrypt.py index 781c5c1..292ca3a 100644 --- a/tests/python/algorithm/test_encrypt.py +++ b/tests/python/algorithm/test_encrypt.py @@ -102,6 +102,79 @@ def test_calculate_acc(imsi): ), ] +test_vectors_64 = [ + ( + "202DE24E913748FEBD685ED15CA69DD6", + "9DAC5DCB273557F21B46B9EE8802E92C", + "E6E267A75793A69C60011A23A46FB5F3132F48672122CAF415FF97F12C2CDDAC", + "FFAB94CB0C27197E8407DB03B7794A44", + "7DA243D3ACDE3E6EB97085E027A72F0B", + ), + ( + "4850EA05B4EB4D868DE36F1D48D7837B", + "9A88A8F36386FC0B6221EA976D3DEFAD", + "3C3B2D407857DC68F65F859F255B00B313B76ECE84FA2747B75944063701CE19", + "EC47CEA31DD79843B243E99B46AF3F40", + "90CFBA1D5C0CF125E883996DC4495D53", + ), + ( + "B51DB69F30FC47B981B42ADE2004171D", + "CC4B13C7C98D0A1C2832E39AA7F97941", + "811706F8802E115A22863DEC3808783AC01BC8D7F92F8D200D267A72490A8F1C", + "51C5153700FA68881A24DD1BAF303BF9", + "7DAC6BBC7C4FF94A7C3EFBC752D96CE5", + ), + ( + "1D40F7DD19894563AD6FB50CD4A0CC05", + "7E21DE7CB6FD1E28D3BF1A5F67791E59", + "B5FBE43EF54267B52D2B834FED8A3B4DE36FC9BADAE2BBC7F81A0A6FDF5659EF", + "7C13E3C97256B4D4B60519EAB490059E", + "3652600DBAF9AE368B9395CE3F5F94F0", + ), + ( + "AD9655CEFD3A4A64A681224B7F4FAD73", + "E0793B4809F28BFA40F2EC6F6B0C0B17", + "F6D02AEC76D2E57AAE5B75B01A3DD5BB7913E78EB7509561C2FC9355FD761C61", + "C209377264A6CBDB872CD5CA7036100C", + "04C8E1AA965CD0570CFCFC7BE2D654E8", + ), + ( + "9DEAEF950E8E46A09A118B7B65D2E185", + "76AF0A6EA983470CBD5518DD93B21841", + "EE24EA029D81D1EEC1113E98838CE6AC4931A8C7000906FB9039CAFE917C6ED1", + "57CC39DF3A5CF9F6B66AEBBA845944C3", + "4F4A6CC501EAD4349BFE672360142934", + ), + ( + "5A484C2B08284554B5A8A6DDD99F2F88", + "9CBA735C5559D223BB62097CD1538E4C", + "25C5DE200C5B622A7A4F7B3BC4BA9AD513340D22FD580AE246DCB2893B32C1A2", + "689A0D2867F69A1F00F98A675D13527F", + "9BB5D62FA0525A37D245798044CD98FD", + ), + ( + "468B486B0F9144FF8656B7E5FFD74B19", + "566BF33A69BC9EF39229EDB02F3E0370", + "940F5ABF80BD350B05CDD827ED4A3D865584CB7356E18B2D0E6889771DA02FF3", + "2570FC9FF0E2CC75826DBEBDD4D2EFF2", + "A861B0FDCBF7EBF30EF9EB8389F9E606", + ), + ( + "D7B6CA09BD3E410684F25006E17D117F", + "0854E7B2414D77ABE972F065CF157A4D", + "1DDC863D9BE16F105A5FF2AA77F9968783536FDA2EBB272FBEC43055BAC8A2A4", + "4FFB8CA95C06FC3EBAA911AFCBCB9CE9", + "FCDA3F55C79B01854C4BD2D783B61611", + ), + ( + "5362203230344C06B017E964396AAB42", + "40342DE81299957BB488EAF0FFA0D54E", + "C3EC2FB1B88B3350BBB10F51F6288864B40E2F437FA51BD86FCF8C566556CA7F", + "7E0E396F0839AE6A8EEE720E4FE8CCEF", + "64DF2024639DCA1226C81B444B0BF731", + ), +] + @pytest.mark.parametrize("ki, op, transport, expected_opc, expected_eki", test_vectors) def test_opc_and_eki_generation(ki, op, transport, expected_opc, expected_eki): @@ -112,3 +185,16 @@ def test_opc_and_eki_generation(ki, op, transport, expected_opc, expected_eki): assert opc.upper() == expected_opc.upper() assert eki.upper() == expected_eki.upper() + + +@pytest.mark.parametrize( + "ki, op, transport, expected_opc, expected_eki", test_vectors_64 +) +def test_opc_and_eki_generation_64(ki, op, transport, expected_opc, expected_eki): + """Validate OPC and eKI calculation with real known-good vectors.""" + + opc = DependentDataGenerator.calculate_opc(op, ki) + eki = DependentDataGenerator.calculate_eki(transport, ki) + + assert opc.upper() == expected_opc.upper() + assert eki.upper() == expected_eki.upper() diff --git a/tests/python/executor/test_script.py b/tests/python/executor/test_script.py index e69de29..a8ee0d6 100644 --- a/tests/python/executor/test_script.py +++ b/tests/python/executor/test_script.py @@ -0,0 +1,555 @@ +from gsm_data_generator.executor import DataGenerationScript +from gsm_data_generator.parser import json_loader_2_ConfigHolder, json_loader +from gsm_data_generator.parser.utils import ConfigHolder +from gsm_data_generator.utils import list_2_dict, dict_2_list +import json +from gsm_data_generator.globals import Parameters + +import pytest +import json +from gsm_data_generator.executor import DataGenerationScript +from gsm_data_generator.parser import json_loader_2_ConfigHolder +from gsm_data_generator.globals import Parameters + +p = Parameters.get_instance() + + +p.set_ELECT_SEP(",") +p.set_GRAPH_SEP(",") +p.set_SERVER_SEP(",") + +p.set_K4("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") +p.set_OP("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") +p.set_IMSI("111111111121111") +p.set_ICCID("111111111121221111") +p.set_PIN1("1111") +p.set_PUK1("11111111") +p.set_PIN2("1111") +p.set_PUK2("11111111") +p.set_ADM1("11111111") +p.set_ADM6("11111111") +p.set_ACC("1111") +p.set_DATA_SIZE("5") +p.set_PRODUCTION_CHECK(True) +p.set_GRAPH_CHECK(True) +p.set_ELECT_CHECK(True) +p.set_SERVER_CHECK(True) +p.set_PIN1_RAND(False) +p.set_PIN2_RAND(False) + +p.set_PUK1_RAND(False) +p.set_PUK2_RAND(False) +p.set_ADM1_RAND(False) +p.set_ADM6_RAND(False) + +p.set_ELECT_DF( + list_2_dict( + [ + "IMSI", + "ICCID", + "PIN1", + "PUK1", + "PIN2", + "PUK2", + "ADM1", + "ADM6", + "KI", + "OPC", + "ACC", + "KIC1", + "KID1", + "KIK1", + "KIC2", + "KID2", + "KIK2", + "KIC3", + "KID3", + "KIK3", + ] + ) +) + +p.set_SERVER_DICT( + list_2_dict( + [ + "IMSI", + "EKI", + "ICCID", + "PIN1", + "PUK1", + "PIN2", + "PUK2", + "ADM1", + "ADM6", + "ACC", + "KIC1", + "KID1", + "KIK1", + "KIC2", + "KID2", + "KIK2", + "KIC3", + "KID3", + "KIK3", + ] + ) +) +p.set_GRAPH_DICT( + { + "0": ["ICCID", "Normal", "0-20"], + "1": ["ICCID", "Normal", "0-20"], + "2": ["ICCID", "Normal", "0-3"], + "3": ["ICCID", "Normal", "4-7"], + "4": ["ICCID", "Normal", "8-11"], + "5": ["ICCID", "Normal", "12-15"], + "6": ["ICCID", "Normal", "16-20"], + "7": ["PIN1", "Normal", "0-3"], + "8": ["PUK1", "Normal", "0-7"], + "9": ["PIN2", "Normal", "0-3"], + "10": ["PUK2", "Normal", "0-7"], + "11": ["IMSI", "Normal", "0-5"], + "12": ["IMSI", "Normal", "6-15"], + } +) + + +def global_params_to_json(): + param_dict = { + "DISP": { + "elect_data_sep": ".", + "server_data_sep": ".", + "graph_data_sep": ".", + "K4": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + "op": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + "imsi": "111111111121111", + "iccid": "111111111121221111", + "pin1": "1111", + "puk1": "11111111", + "pin2": "1111", + "puk2": "11111111", + "adm1": "11111111", + "adm6": "11111111", + "acc": "1111", + "size": "11", + "prod_check": 1, + "elect_check": 1, + "graph_check": 1, + "server_check": 1, + "pin1_fix": 1, + "puk1_fix": 1, + "pin2_fix": 1, + "puk2_fix": 1, + "adm1_fix": 1, + "adm6_fix": 1, + }, + "PATHS": { + "FILE_NAME": "get_file_name", + "OUTPUT_FILES_DIR": "output_files1", + "OUTPUT_FILES_LASER_EXT": "laser_extracted", + }, + "PARAMETERS": { + "server_variables": [ + "IMSI", + "EKI", + "ICCID", + "PIN1", + "PUK1", + "PIN2", + "PUK2", + "ADM1", + "ADM6", + "ACC", + "KIC1", + "KID1", + "KIK1", + "KIC2", + "KID2", + "KIK2", + "KIC3", + "KID3", + "KIK3", + ], + "data_variables": [ + "IMSI", + "ICCID", + "PIN1", + "PUK1", + "PIN2", + "PUK2", + "ADM1", + "ADM6", + "KI", + "OPC", + "ACC", + "KIC1", + "KID1", + "KIK1", + "KIC2", + "KID2", + "KIK2", + "KIC3", + "KID3", + "KIK3", + ], + "laser_variables": { + "0": ["ICCID", "Normal", "0-20"], + "1": ["ICCID", "Normal", "0-20"], + "2": ["ICCID", "Normal", "0-3"], + "3": ["ICCID", "Normal", "4-7"], + "4": ["ICCID", "Normal", "8-11"], + "5": ["ICCID", "Normal", "12-15"], + "6": ["ICCID", "Normal", "16-20"], + "7": ["PIN1", "Normal", "0-3"], + "8": ["PUK1", "Normal", "0-7"], + "9": ["PIN2", "Normal", "0-3"], + "10": ["PUK2", "Normal", "0-7"], + "11": ["IMSI", "Normal", "0-5"], + "12": ["IMSI", "Normal", "6-15"], + }, + }, + } + + return param_dict + + +import pandas as pd + +# param_dict = { +# "DISP": { +# "elect_data_sep": ".", +# "server_data_sep": ".", +# "graph_data_sep": ".", +# "K4": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", +# "op": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", +# "imsi": "111111111121111", +# "iccid": "111111111121221111", +# "pin1": "1111", +# "puk1": "11111111", +# "pin2": "1111", +# "puk2": "11111111", +# "adm1": "11111111", +# "adm6": "11111111", +# "acc": "1111", +# "size": "11", +# "prod_check": 1, +# "elect_check": 1, +# "graph_check": 1, +# "server_check": 1, +# "pin1_fix": 1, +# "puk1_fix": 1, +# "pin2_fix": 1, +# "puk2_fix": 1, +# "adm1_fix": 1, +# "adm6_fix": 1, +# }, +# "PATHS": { +# "FILE_NAME": "get_file_name", +# "OUTPUT_FILES_DIR": "output_files1", +# "OUTPUT_FILES_LASER_EXT": "laser_extracted", +# }, +# "PARAMETERS": { +# "server_variables": [ +# "IMSI", +# "EKI", +# "ICCID", +# "PIN1", +# "PUK1", +# "PIN2", +# "PUK2", +# "ADM1", +# "ADM6", +# "ACC", +# "KIC1", +# "KID1", +# "KIK1", +# "KIC2", +# "KID2", +# "KIK2", +# "KIC3", +# "KID3", +# "KIK3", +# ], +# "data_variables": [ +# "IMSI", +# "ICCID", +# "PIN1", +# "PUK1", +# "PIN2", +# "PUK2", +# "ADM1", +# "ADM6", +# "KI", +# "OPC", +# "ACC", +# "KIC1", +# "KID1", +# "KIK1", +# "KIC2", +# "KID2", +# "KIK2", +# "KIC3", +# "KID3", +# "KIK3", +# ], +# "laser_variables": { +# "0": ["ICCID", "Normal", "0-20"], +# "1": ["ICCID", "Normal", "0-20"], +# "2": ["ICCID", "Normal", "0-3"], +# "3": ["ICCID", "Normal", "4-7"], +# "4": ["ICCID", "Normal", "8-11"], +# "5": ["ICCID", "Normal", "12-15"], +# "6": ["ICCID", "Normal", "16-20"], +# "7": ["PIN1", "Normal", "0-3"], +# "8": ["PUK1", "Normal", "0-7"], +# "9": ["PIN2", "Normal", "0-3"], +# "10": ["PUK2", "Normal", "0-7"], +# "11": ["IMSI", "Normal", "0-5"], +# "12": ["IMSI", "Normal", "6-15"], +# }, +# }, +# } + + +# def test_data_generation_script_runs(params_dict): +# js = json_loader_2_ConfigHolder(params_dict) + + +# script = DataGenerationScript(js) + +# # Step 1: apply JSON params to global Parameters singleton +# script.json_to_global_params() +# p = Parameters.get_instance() + +# # sanity checks +# assert p.get_IMSI() == "111111111121111" +# assert p.get_PIN1() == "1111" + +# # Step 2: run data generation (should not raise) +# t1, t2 =script.generate_all_data() + +# print(t1) + + +# def test_script(): +# t = global_params_to_json() +# js = json_loader1(t) +# #js = json_loader("settings.json") + +# s = DataGenerationScript(js) +# s.json_to_global_params() +# s.generate_all_data() + + +import pytest +import itertools +import copy + +# import your actual modules here +# from your_module import DataGenerationScript, Parameters, json_loader_2_ConfigHolder + +param_dict_template = { + "DISP": { + "elect_data_sep": ".", + "server_data_sep": ".", + "graph_data_sep": ".", + "K4": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + "op": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + "imsi": "111111111121111", + "iccid": "111111111121221111", + "pin1": "1111", + "puk1": "11111111", + "pin2": "1111", + "puk2": "11111111", + "adm1": "11111111", + "adm6": "11111111", + "acc": "1111", + "size": "11", + "prod_check": 1, + "elect_check": 1, + "graph_check": 1, + "server_check": 1, + "pin1_fix": 1, + "puk1_fix": 1, + "pin2_fix": 1, + "puk2_fix": 1, + "adm1_fix": 1, + "adm6_fix": 1, + }, + "PATHS": { + "FILE_NAME": "get_file_name", + "OUTPUT_FILES_DIR": "output_files1", + "OUTPUT_FILES_LASER_EXT": "laser_extracted", + }, + "PARAMETERS": { + "server_variables": [ + "IMSI", + "EKI", + "ICCID", + "PIN1", + "PUK1", + "PIN2", + "PUK2", + "ADM1", + "ADM6", + "ACC", + "KIC1", + "KID1", + "KIK1", + "KIC2", + "KID2", + "KIK2", + "KIC3", + "KID3", + "KIK3", + ], + "data_variables": [ + "IMSI", + "ICCID", + "PIN1", + "PUK1", + "PIN2", + "PUK2", + "ADM1", + "ADM6", + "KI", + "OPC", + "ACC", + "KIC1", + "KID1", + "KIK1", + "KIC2", + "KID2", + "KIK2", + "KIC3", + "KID3", + "KIK3", + ], + "laser_variables": { + "0": ["ICCID", "Normal", "0-20"], + "1": ["ICCID", "Normal", "0-20"], + "2": ["ICCID", "Normal", "0-3"], + "3": ["ICCID", "Normal", "4-7"], + "4": ["ICCID", "Normal", "8-11"], + "5": ["ICCID", "Normal", "12-15"], + "6": ["ICCID", "Normal", "16-20"], + "7": ["PIN1", "Normal", "0-3"], + "8": ["PUK1", "Normal", "0-7"], + "9": ["PIN2", "Normal", "0-3"], + "10": ["PUK2", "Normal", "0-7"], + "11": ["IMSI", "Normal", "0-5"], + "12": ["IMSI", "Normal", "6-15"], + }, + }, +} + +# Fields we want to randomize between 0 and 1 +boolean_fields = [ + # "prod_check", + # "elect_check", + # "graph_check", + # "server_check", + "pin1_fix", + "puk1_fix", + "pin2_fix", + "puk2_fix", + "adm1_fix", + "adm6_fix", +] + + +# Generate all 2^N combinations (might be large) +def generate_param_dicts(): + for combo in itertools.product([0, 1], repeat=len(boolean_fields)): + pdict = copy.deepcopy(param_dict_template) + for field, val in zip(boolean_fields, combo): + pdict["DISP"][field] = val + yield pdict, dict(zip(boolean_fields, combo)) + + +test_cases = list(generate_param_dicts()) + +# @pytest.mark.parametrize("params_dict,expected_bools", test_cases) +# def test_data_generation_script_runs(params_dict, expected_bools): +# js = json_loader_2_ConfigHolder(params_dict) + +# script = DataGenerationScript(js) + +# # Step 1: apply JSON params to global Parameters singleton +# script.json_to_global_params() +# p = Parameters.get_instance() + +# # sanity checks (fixed values) +# assert p.get_IMSI() == "111111111121111" +# assert p.get_PIN1() == "1111" + +# # Step 2: run data generation (should not raise) +# t1, t2 = script.generate_all_data() + +# print("Generated:", t1) +# print("Bools:", expected_bools) + + +#################################################################### + + +import pytest +import itertools +import copy + +# Example data sizes to test +data_sizes = [5, 10, 20] # different number of rows for your data + +# Boolean fields to vary +boolean_fields = [ + "pin1_fix", + "puk1_fix", + "pin2_fix", + "puk2_fix", + "adm1_fix", + "adm6_fix", +] + + +# Generate all combinations of booleans + sizes +def generate_param_dicts_with_size(): + for combo in itertools.product([0, 1], repeat=len(boolean_fields)): + for size in data_sizes: + pdict = copy.deepcopy(param_dict_template) + # Set boolean fields + for field, val in zip(boolean_fields, combo): + pdict["DISP"][field] = val + # Set data size + pdict["DISP"]["size"] = str(size) + yield pdict, dict(zip(boolean_fields, combo)), size + + +test_cases = list(generate_param_dicts_with_size()) + + +@pytest.mark.parametrize("params_dict,expected_bools,size", test_cases) +def test_data_generation_script_runs(params_dict, expected_bools, size): + js = json_loader_2_ConfigHolder(params_dict) + script = DataGenerationScript(js) + + # Step 1: apply JSON params to global Parameters singleton + script.json_to_global_params() + p = Parameters.get_instance() + + # sanity checks (fixed values) + assert p.get_IMSI() == "111111111121111" + assert p.get_PIN1() == "1111" + + # Step 2: run data generation (should not raise) + t1, t2 = script.generate_all_data() + ds = t1["ELECT"].shape[0] + # Step 3: verify DataFrame shape + # if isinstance(t1, pd.DataFrame): + assert ds == size, f"Expected {size} rows, got {ds}" + # else: + # print("FAAIIIIIILLLLLL!") + # if isinstance(t2, pd.DataFrame): + # assert t2.shape[0] == size, f"Expected {size} rows, got {t2.shape[0]}" + + print("Generated:", t1.shape if isinstance(t1, pd.DataFrame) else type(t1)) + print("Bools:", expected_bools) + print("Size:", size) diff --git a/tests/python/executor/test_script_gpt.py b/tests/python/executor/test_script_gpt.py index e69de29..3e89842 100644 --- a/tests/python/executor/test_script_gpt.py +++ b/tests/python/executor/test_script_gpt.py @@ -0,0 +1,362 @@ +from gsm_data_generator.executor import DataGenerationScript +from gsm_data_generator.parser import json_loader_2_ConfigHolder, json_loader +from gsm_data_generator.parser.utils import ConfigHolder +from gsm_data_generator.utils import list_2_dict, dict_2_list +import json +from gsm_data_generator.globals import Parameters + +import pytest +import json +from gsm_data_generator.executor import DataGenerationScript +from gsm_data_generator.parser import json_loader_2_ConfigHolder +from gsm_data_generator.globals import Parameters + +p = Parameters.get_instance() + + +p.set_ELECT_SEP(",") +p.set_GRAPH_SEP(",") +p.set_SERVER_SEP(",") + +p.set_K4("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") +p.set_OP("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") +p.set_IMSI("111111111121111") +p.set_ICCID("111111111121221111") +p.set_PIN1("1111") +p.set_PUK1("11111111") +p.set_PIN2("1111") +p.set_PUK2("11111111") +p.set_ADM1("11111111") +p.set_ADM6("11111111") +p.set_ACC("1111") +p.set_DATA_SIZE("5") +p.set_PRODUCTION_CHECK(True) +p.set_GRAPH_CHECK(True) +p.set_ELECT_CHECK(True) +p.set_SERVER_CHECK(True) +p.set_PIN1_RAND(False) +p.set_PIN2_RAND(False) + +p.set_PUK1_RAND(False) +p.set_PUK2_RAND(False) +p.set_ADM1_RAND(False) +p.set_ADM6_RAND(False) + +p.set_ELECT_DF( + list_2_dict( + [ + "IMSI", + "ICCID", + "PIN1", + "PUK1", + "PIN2", + "PUK2", + "ADM1", + "ADM6", + "KI", + "OPC", + "ACC", + "KIC1", + "KID1", + "KIK1", + "KIC2", + "KID2", + "KIK2", + "KIC3", + "KID3", + "KIK3", + ] + ) +) + +p.set_SERVER_DICT( + list_2_dict( + [ + "IMSI", + "EKI", + "ICCID", + "PIN1", + "PUK1", + "PIN2", + "PUK2", + "ADM1", + "ADM6", + "ACC", + "KIC1", + "KID1", + "KIK1", + "KIC2", + "KID2", + "KIK2", + "KIC3", + "KID3", + "KIK3", + ] + ) +) +p.set_GRAPH_DICT( + { + "0": ["ICCID", "Normal", "0-20"], + "1": ["ICCID", "Normal", "0-20"], + "2": ["ICCID", "Normal", "0-3"], + "3": ["ICCID", "Normal", "4-7"], + "4": ["ICCID", "Normal", "8-11"], + "5": ["ICCID", "Normal", "12-15"], + "6": ["ICCID", "Normal", "16-20"], + "7": ["PIN1", "Normal", "0-3"], + "8": ["PUK1", "Normal", "0-7"], + "9": ["PIN2", "Normal", "0-3"], + "10": ["PUK2", "Normal", "0-7"], + "11": ["IMSI", "Normal", "0-5"], + "12": ["IMSI", "Normal", "6-15"], + } +) + + +def global_params_to_json(): + param_dict = { + "DISP": { + "elect_data_sep": ".", + "server_data_sep": ".", + "graph_data_sep": ".", + "K4": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + "op": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + "imsi": "111111111121111", + "iccid": "111111111121221111", + "pin1": "1111", + "puk1": "11111111", + "pin2": "1111", + "puk2": "11111111", + "adm1": "11111111", + "adm6": "11111111", + "acc": "1111", + "size": "11", + "prod_check": 1, + "elect_check": 1, + "graph_check": 1, + "server_check": 1, + "pin1_fix": 1, + "puk1_fix": 1, + "pin2_fix": 1, + "puk2_fix": 1, + "adm1_fix": 1, + "adm6_fix": 1, + }, + "PATHS": { + "FILE_NAME": "get_file_name", + "OUTPUT_FILES_DIR": "output_files1", + "OUTPUT_FILES_LASER_EXT": "laser_extracted", + }, + "PARAMETERS": { + "server_variables": [ + "IMSI", + "EKI", + "ICCID", + "PIN1", + "PUK1", + "PIN2", + "PUK2", + "ADM1", + "ADM6", + "ACC", + "KIC1", + "KID1", + "KIK1", + "KIC2", + "KID2", + "KIK2", + "KIC3", + "KID3", + "KIK3", + ], + "data_variables": [ + "IMSI", + "ICCID", + "PIN1", + "PUK1", + "PIN2", + "PUK2", + "ADM1", + "ADM6", + "KI", + "OPC", + "ACC", + "KIC1", + "KID1", + "KIK1", + "KIC2", + "KID2", + "KIK2", + "KIC3", + "KID3", + "KIK3", + ], + "laser_variables": { + "0": ["ICCID", "Normal", "0-20"], + "1": ["ICCID", "Normal", "0-20"], + "2": ["ICCID", "Normal", "0-3"], + "3": ["ICCID", "Normal", "4-7"], + "4": ["ICCID", "Normal", "8-11"], + "5": ["ICCID", "Normal", "12-15"], + "6": ["ICCID", "Normal", "16-20"], + "7": ["PIN1", "Normal", "0-3"], + "8": ["PUK1", "Normal", "0-7"], + "9": ["PIN2", "Normal", "0-3"], + "10": ["PUK2", "Normal", "0-7"], + "11": ["IMSI", "Normal", "0-5"], + "12": ["IMSI", "Normal", "6-15"], + }, + }, + } + + return param_dict + + +@pytest.fixture +def params_dict(): + """Fixture to provide global params dict.""" + return global_params_to_json() + + +def test_json_loader1_returns_dict(params_dict): + js = json_loader_2_ConfigHolder(params_dict) + assert isinstance(js, ConfigHolder), "json_loader1 should return a ConfigHolder" + # assert "DISP" in js.DISP.adm1 + # assert "PARAMETERS" in js.PARAMETERS.data_variables + # assert "PATHS" in js.PATHS.OUTPUT_FILES_DIR + + +def global_params_to_json(): + param_dict = { + "DISP": { + "elect_data_sep": ".", + "server_data_sep": ".", + "graph_data_sep": ".", + "K4": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + "op": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + "imsi": "111111111121111", + "iccid": "111111111121221111", + "pin1": "1111", + "puk1": "11111111", + "pin2": "1111", + "puk2": "11111111", + "adm1": "11111111", + "adm6": "11111111", + "acc": "1111", + "size": "11", + "prod_check": 1, + "elect_check": 1, + "graph_check": 1, + "server_check": 1, + "pin1_fix": 1, + "puk1_fix": 1, + "pin2_fix": 1, + "puk2_fix": 1, + "adm1_fix": 1, + "adm6_fix": 1, + }, + "PATHS": { + "FILE_NAME": "get_file_name", + "OUTPUT_FILES_DIR": "output_files1", + "OUTPUT_FILES_LASER_EXT": "laser_extracted", + }, + "PARAMETERS": { + "server_variables": [ + "IMSI", + "EKI", + "ICCID", + "PIN1", + "PUK1", + "PIN2", + "PUK2", + "ADM1", + "ADM6", + "ACC", + "KIC1", + "KID1", + "KIK1", + "KIC2", + "KID2", + "KIK2", + "KIC3", + "KID3", + "KIK3", + ], + "data_variables": [ + "IMSI", + "ICCID", + "PIN1", + "PUK1", + "PIN2", + "PUK2", + "ADM1", + "ADM6", + "KI", + "OPC", + "ACC", + "KIC1", + "KID1", + "KIK1", + "KIC2", + "KID2", + "KIK2", + "KIC3", + "KID3", + "KIK3", + ], + "laser_variables": { + "0": ["ICCID", "Normal", "0-20"], + "1": ["ICCID", "Normal", "0-20"], + "2": ["ICCID", "Normal", "0-3"], + "3": ["ICCID", "Normal", "4-7"], + "4": ["ICCID", "Normal", "8-11"], + "5": ["ICCID", "Normal", "12-15"], + "6": ["ICCID", "Normal", "16-20"], + "7": ["PIN1", "Normal", "0-3"], + "8": ["PUK1", "Normal", "0-7"], + "9": ["PIN2", "Normal", "0-3"], + "10": ["PUK2", "Normal", "0-7"], + "11": ["IMSI", "Normal", "0-5"], + "12": ["IMSI", "Normal", "6-15"], + }, + }, + } + + return param_dict + + +def test_data_generation_script_runs(params_dict, tmp_path): + js = json_loader_2_ConfigHolder(params_dict) + + # Override output dir to temp folder so tests don't pollute real dirs + # js["PATHS"]["OUTPUT_FILES_DIR"] = str(tmp_path / "output_files") + # js["PATHS"]["OUTPUT_FILES_LASER_EXT"] = str(tmp_path / "laser_extracted") + + script = DataGenerationScript(js) + + # Step 1: apply JSON params to global Parameters singleton + script.json_to_global_params() + p = Parameters.get_instance() + + # sanity checks + assert p.get_IMSI() == "111111111121111" + assert p.get_PIN1() == "1111" + + # Step 2: run data generation (should not raise) + script.generate_all_data() + + # Verify expected outputs + # out_dir = tmp_path / "output_files" + # assert out_dir.exists(), "Output directory should be created" + # You can also check specific files if you know what gets generated: + # assert any(out_dir.iterdir()), "Expected at least one file in output dir" + + +# def test_script(): +# t = global_params_to_json() +# js = json_loader1(t) +# #js = json_loader("settings.json") + +# s = DataGenerationScript(js) +# s.json_to_global_params() +# s.generate_all_data()