Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[submodule "3rdparty/_kiopcgenerator"]
path = 3rdparty/_kiopcgenerator
url = https://github.com/iotconnectivity/kiopcgenerator.git
[submodule "3rdparty/kiopcgenerator"]
path = 3rdparty/kiopcgenerator
url = https://github.com/iotconnectivity/kiopcgenerator.git
287 changes: 287 additions & 0 deletions gsm_data_generator/executor/script_gpt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
import pandas as pd


from ..algorithm import CryptoUtils, DependentDataGenerator
from ..processor import DataProcessing, DataFrameProcessor
from ..globals import DataFrames, Parameters
from ..generator import DataGenerator
from ..utils import copy_function, list_2_dict, DEFAULT_HEADER


class DataGenerationScript:

def __init__(self, config_holder):
self.config_holder = config_holder
self.params = Parameters.get_instance()
self.dataframes = DataFrames.get_instance()
self.crypto_utils = CryptoUtils()
self.data_generator = DataGenerator()
self.data_processor = DataProcessing()
self.df_processor = DataFrameProcessor()
self.dep_data_generator = DependentDataGenerator()

def json_to_global_params(self):
self.params.set_SERVER_SEP(self.config_holder.DISP.server_data_sep)
self.params.set_ELECT_SEP(self.config_holder.DISP.elect_data_sep)
self.params.set_GRAPH_SEP(self.config_holder.DISP.graph_data_sep)
self.params.set_K4(self.config_holder.DISP.K4)
self.params.set_OP(self.config_holder.DISP.op)
self.params.set_IMSI(self.config_holder.DISP.imsi)
self.params.set_ICCID(self.config_holder.DISP.iccid)
self.params.set_PIN1(self.config_holder.DISP.pin1)
self.params.set_PUK1(self.config_holder.DISP.puk1)
self.params.set_PIN2(self.config_holder.DISP.pin2)
self.params.set_PUK2(self.config_holder.DISP.puk2)
self.params.set_ADM1(self.config_holder.DISP.adm1)
self.params.set_ADM6(self.config_holder.DISP.adm6)
self.params.set_DATA_SIZE(self.config_holder.DISP.size)

self.params.set_PRODUCTION_CHECK(False)

self.params.set_ELECT_CHECK(self.config_holder.DISP.elect_check)
self.params.set_GRAPH_CHECK(self.config_holder.DISP.graph_check)
self.params.set_SERVER_CHECK(self.config_holder.DISP.server_check)

self.params.set_ELECT_DICT(
list_2_dict(self.config_holder.PARAMETERS.data_variables)
)
self.params.set_GRAPH_DICT(self.config_holder.PARAMETERS.laser_variables)
self.params.set_SERVER_DICT(
list_2_dict(self.config_holder.PARAMETERS.server_variables)
)

self.params.set_PIN1_RAND(self.config_holder.DISP.pin1_fix)
self.params.set_PUK1_RAND(self.config_holder.DISP.puk1_fix)
self.params.set_PIN2_RAND(self.config_holder.DISP.pin2_fix)
self.params.set_PUK2_RAND(self.config_holder.DISP.puk2_fix)
self.params.set_ADM1_RAND(self.config_holder.DISP.adm1_fix)
self.params.set_ADM6_RAND(self.config_holder.DISP.adm6_fix)

def generate_eki(self, ki):
return self.dep_data_generator.calculate_eki(self.params.get_K4(), ki)

def generate_opc(self, ki):
return self.dep_data_generator.calculate_opc(self.params.get_OP(), ki)

def generate_pin(self, pin_type):
if getattr(self.params, f"get_{pin_type}_RAND")():
return getattr(self.params, f"get_{pin_type}")()
return self.data_generator.generate_4_digit()

def generate_puk(self, puk_type):
if getattr(self.params, f"get_{puk_type}_RAND")():
return getattr(self.params, f"get_{puk_type}")()
return self.data_generator.generate_8_digit()

def generate_adm(self, adm_type):
if getattr(self.params, f"get_{adm_type}_RAND")():
return getattr(self.params, f"get_{adm_type}")()
return self.data_generator.generate_8_digit()

def apply_function(self, df, dest: str, src: str, function):
if dest in df.columns:
df[dest] = df[src].apply(function).copy(deep=False)

def apply_functions(self, df):
df["ICCID"] = df["ICCID"].apply(lambda x: copy_function(x))
df["IMSI"] = df["IMSI"].apply(lambda x: copy_function(x))
df["PIN1"] = df["PIN1"].apply(lambda x: self.generate_pin("PIN1"))
df["PIN2"] = df["PIN2"].apply(lambda x: self.generate_pin("PIN2"))
df["PUK1"] = df["PUK1"].apply(lambda x: self.generate_puk("PUK1"))
df["PUK2"] = df["PUK2"].apply(lambda x: self.generate_puk("PUK2"))

df["ADM1"] = df["ADM1"].apply(lambda x: self.generate_adm("ADM1"))
df["ADM6"] = df["ADM6"].apply(lambda x: self.generate_adm("ADM6"))

df["KI"] = df["KI"].apply(lambda x: self.data_generator.generate_ki())
df["ACC"] = df["IMSI"].apply(
lambda imsi: self.dep_data_generator.calculate_acc(imsi=str(imsi))
)

self.apply_function(df, "EKI", "KI", self.generate_eki)
self.apply_function(df, "OPC", "KI", self.generate_opc)

for i in range(1, 4):
for key in ["KIC", "KID", "KIK"]:
col = f"{key}{i}"
if col in df.columns:
df[col] = df["KI"].apply(
lambda x: self.data_generator.generate_otas()
)
return df

def generate_demo_data(self):
df = self.df_processor.generate_empty_dataframe(
DEFAULT_HEADER, self.params.get_DATA_SIZE() # type: ignore
)
self.df_processor.initialize_column(df, "ICCID", self.params.get_ICCID())
self.df_processor.initialize_column(df, "IMSI", self.params.get_IMSI())
self.df_processor.initialize_column(
df, "OP", self.params.get_OP(), increment=False
)
self.df_processor.initialize_column(
df, "K4", self.params.get_K4(), increment=False
)
return self.apply_functions(df)

def generate_non_demo_data(self):
input_df = self.dataframes.get_input_df()
df = self.df_processor.generate_empty_dataframe(DEFAULT_HEADER, len(input_df)) # type: ignore
self.df_processor.initialize_column(
df, "OP", self.params.get_OP(), increment=False
)
self.df_processor.initialize_column(
df, "K4", self.params.get_K4(), increment=False
)
df["ICCID"] = input_df["ICCID"]
df["IMSI"] = input_df["IMSI"]
return self.apply_functions(df)

def generate_initial_data(self, is_demo: bool):
try:
if is_demo:
demo_data = self.generate_demo_data()

k4 = self.params.get_K4()
op = self.params.get_OP()

# Validate required parameters
if not k4 or not isinstance(k4, str):
raise ValueError(
"Invalid value for K4: must be a non-empty string."
)
if not op or not isinstance(op, str):
raise ValueError(
"Invalid value for OP: must be a non-empty string."
)

return demo_data, {"k4": k4, "op": op}

else:
raise NotImplementedError(
"Non-demo data generation is not yet implemented."
)

except Exception as e:
raise RuntimeError(f"Error in generate_initial_data: {e}") from e

# def generate_initial_data(self, is_demo: bool):
# if is_demo:
# return self.generate_demo_data(), {
# "k4": self.params.get_K4(),
# "op": self.params.get_OP(),
# }
# return self.generate_non_demo_data(), {
# "k4": self.params.get_K4(),
# "op": self.params.get_OP(),
# }

def process_final_data(
self, input_dict: dict, df_input: pd.DataFrame, clip: bool, encoding: bool
):
# print("In process_final_data()", clip, encoding)
df = df_input.copy(deep=True)
if encoding:
df = self.df_processor.encode_dataframe(df)

headers, _, _, _, left_ranges, right_ranges = (
self.data_processor.extract_parameter_info(input_dict)
)
df = self.df_processor.add_duplicate_columns(df, 10, headers)

if clip:
df = self.df_processor.clip_columns(df, left_ranges, right_ranges)
print(df)
return df

# def generate_all_data(self):
# # initial_df, keys_dict = self.generate_initial_data(
# # self.params.get_PRODUCTION_CHECK()
# # )
# initial_df, keys_dict = self.generate_initial_data(True)
# print(initial_df.head())
# # data_types = {bool, dict, bool, bool}
# data_types = {}
# data_types = {
# "SERVER": (
# self.params.get_SERVER_CHECK(),
# self.params.get_SERVER_DICT(),
# False,
# False,
# ),
# "GRAPH": (
# self.params.get_GRAPH_CHECK(),
# self.params.get_GRAPH_DICT(),
# True,
# False,
# ),
# "ELECT": (
# self.params.get_ELECT_CHECK(),
# self.params.get_ELECT_DICT(),
# False,
# True,
# ),
# }
# result_dfs = {}
# for data_type, (check, dict_func, clip, encoding) in data_types.items():
# print(data_type, (check, dict_func, clip, encoding))
# if check:
# result_dfs[data_type] = self.process_final_data(
# dict_func, initial_df, clip, encoding
# )
# return result_dfs, keys_dict

def generate_all_data(self):
# initial_df, keys_dict = self.generate_initial_data(
# self.params.get_PRODUCTION_CHECK()
# )
initial_df, keys_dict = self.generate_initial_data(True)
print(initial_df.head())

data_types = {
"SERVER": (
self.params.get_SERVER_CHECK(),
self.params.get_SERVER_DICT(),
False,
False,
),
"GRAPH": (
self.params.get_GRAPH_CHECK(),
self.params.get_GRAPH_DICT(),
True,
False,
),
"ELECT": (
self.params.get_ELECT_CHECK(),
self.params.get_ELECT_DICT(),
False,
True,
),
}

result_dfs = {}

for data_type, (check, dict_func, clip, encoding) in data_types.items():
print(f"Checking {data_type}: {check}, {dict_func}")

if check:
# 🔹 Raise if dict is missing or invalid
if not dict_func or not isinstance(dict_func, dict):
raise ValueError(
f"{data_type} is enabled (check=True) "
f"but the dictionary is missing or invalid."
)

try:
result_dfs[data_type] = self.process_final_data(
dict_func, initial_df, clip, encoding
)
except Exception as e:
raise RuntimeError(
f"Failed processing {data_type} data: {str(e)}"
) from e

return result_dfs, keys_dict


__all__ = ["DataGenerationScript"]
86 changes: 86 additions & 0 deletions tests/python/algorithm/test_encrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,79 @@ def test_calculate_acc(imsi):
),
]

test_vectors_64 = [
(
"202DE24E913748FEBD685ED15CA69DD6",
"9DAC5DCB273557F21B46B9EE8802E92C",
"E6E267A75793A69C60011A23A46FB5F3132F48672122CAF415FF97F12C2CDDAC",
"FFAB94CB0C27197E8407DB03B7794A44",
"7DA243D3ACDE3E6EB97085E027A72F0B",
),
(
"4850EA05B4EB4D868DE36F1D48D7837B",
"9A88A8F36386FC0B6221EA976D3DEFAD",
"3C3B2D407857DC68F65F859F255B00B313B76ECE84FA2747B75944063701CE19",
"EC47CEA31DD79843B243E99B46AF3F40",
"90CFBA1D5C0CF125E883996DC4495D53",
),
(
"B51DB69F30FC47B981B42ADE2004171D",
"CC4B13C7C98D0A1C2832E39AA7F97941",
"811706F8802E115A22863DEC3808783AC01BC8D7F92F8D200D267A72490A8F1C",
"51C5153700FA68881A24DD1BAF303BF9",
"7DAC6BBC7C4FF94A7C3EFBC752D96CE5",
),
(
"1D40F7DD19894563AD6FB50CD4A0CC05",
"7E21DE7CB6FD1E28D3BF1A5F67791E59",
"B5FBE43EF54267B52D2B834FED8A3B4DE36FC9BADAE2BBC7F81A0A6FDF5659EF",
"7C13E3C97256B4D4B60519EAB490059E",
"3652600DBAF9AE368B9395CE3F5F94F0",
),
(
"AD9655CEFD3A4A64A681224B7F4FAD73",
"E0793B4809F28BFA40F2EC6F6B0C0B17",
"F6D02AEC76D2E57AAE5B75B01A3DD5BB7913E78EB7509561C2FC9355FD761C61",
"C209377264A6CBDB872CD5CA7036100C",
"04C8E1AA965CD0570CFCFC7BE2D654E8",
),
(
"9DEAEF950E8E46A09A118B7B65D2E185",
"76AF0A6EA983470CBD5518DD93B21841",
"EE24EA029D81D1EEC1113E98838CE6AC4931A8C7000906FB9039CAFE917C6ED1",
"57CC39DF3A5CF9F6B66AEBBA845944C3",
"4F4A6CC501EAD4349BFE672360142934",
),
(
"5A484C2B08284554B5A8A6DDD99F2F88",
"9CBA735C5559D223BB62097CD1538E4C",
"25C5DE200C5B622A7A4F7B3BC4BA9AD513340D22FD580AE246DCB2893B32C1A2",
"689A0D2867F69A1F00F98A675D13527F",
"9BB5D62FA0525A37D245798044CD98FD",
),
(
"468B486B0F9144FF8656B7E5FFD74B19",
"566BF33A69BC9EF39229EDB02F3E0370",
"940F5ABF80BD350B05CDD827ED4A3D865584CB7356E18B2D0E6889771DA02FF3",
"2570FC9FF0E2CC75826DBEBDD4D2EFF2",
"A861B0FDCBF7EBF30EF9EB8389F9E606",
),
(
"D7B6CA09BD3E410684F25006E17D117F",
"0854E7B2414D77ABE972F065CF157A4D",
"1DDC863D9BE16F105A5FF2AA77F9968783536FDA2EBB272FBEC43055BAC8A2A4",
"4FFB8CA95C06FC3EBAA911AFCBCB9CE9",
"FCDA3F55C79B01854C4BD2D783B61611",
),
(
"5362203230344C06B017E964396AAB42",
"40342DE81299957BB488EAF0FFA0D54E",
"C3EC2FB1B88B3350BBB10F51F6288864B40E2F437FA51BD86FCF8C566556CA7F",
"7E0E396F0839AE6A8EEE720E4FE8CCEF",
"64DF2024639DCA1226C81B444B0BF731",
),
]


@pytest.mark.parametrize("ki, op, transport, expected_opc, expected_eki", test_vectors)
def test_opc_and_eki_generation(ki, op, transport, expected_opc, expected_eki):
Expand All @@ -112,3 +185,16 @@ def test_opc_and_eki_generation(ki, op, transport, expected_opc, expected_eki):

assert opc.upper() == expected_opc.upper()
assert eki.upper() == expected_eki.upper()


@pytest.mark.parametrize(
"ki, op, transport, expected_opc, expected_eki", test_vectors_64
)
def test_opc_and_eki_generation_64(ki, op, transport, expected_opc, expected_eki):
"""Validate OPC and eKI calculation with real known-good vectors."""

opc = DependentDataGenerator.calculate_opc(op, ki)
eki = DependentDataGenerator.calculate_eki(transport, ki)

assert opc.upper() == expected_opc.upper()
assert eki.upper() == expected_eki.upper()
Loading