Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions NIDS/columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
Aggr_conn = ['duration',
'local_orig',
'local_resp',
'missed_bytes',
'orig_pkts',
'orig_ip_bytes',
'resp_pkts',
'resp_ip_bytes',
'orig_bytes',
'resp_bytes',
'has_service',
'history_has_S',
'history_has_h',
'history_has_A',
'history_has_D',
'history_has_a',
'history_has_d',
'history_has_F',
'history_has_f',
'history_has_N',
'is_destination_broadcast',
'conn_state_OTH',
'conn_state_RSTR',
'conn_state_RSTRH',
'conn_state_S0',
'conn_state_S1',
'conn_state_SF',
'proto_icmp',
'proto_tcp',
'proto_udp',
'traffic_direction_IPv6',
'traffic_direction_internal',
'traffic_direction_outgoing',
'service_dns',
'service_ntp',
'service_other',
'service_quic',
'service_quic,ssl',
'service_ssl',
'duration_mean_60',
'duration_min_60',
'duration_max_60',
'duration_std_60',
'duration_var_60',
'duration_cnt_60',
'duration_sum_60',
'missed_bytes_mean_60',
'missed_bytes_min_60',
'missed_bytes_max_60',
'missed_bytes_std_60',
'missed_bytes_var_60',
'missed_bytes_cnt_60',
'missed_bytes_sum_60',
'orig_pkts_mean_60',
'orig_pkts_min_60',
'orig_pkts_max_60',
'orig_pkts_std_60',
'orig_pkts_var_60',
'orig_pkts_cnt_60',
'orig_pkts_sum_60',
'orig_ip_bytes_mean_60',
'orig_ip_bytes_min_60',
'orig_ip_bytes_max_60',
'orig_ip_bytes_std_60',
'orig_ip_bytes_var_60',
'orig_ip_bytes_cnt_60',
'orig_ip_bytes_sum_60',
'resp_pkts_mean_60',
'resp_pkts_min_60',
'resp_pkts_max_60',
'resp_pkts_std_60',
'resp_pkts_var_60',
'resp_pkts_cnt_60',
'resp_pkts_sum_60',
'resp_ip_bytes_mean_60',
'resp_ip_bytes_min_60',
'resp_ip_bytes_max_60',
'resp_ip_bytes_std_60',
'resp_ip_bytes_var_60',
'resp_ip_bytes_cnt_60',
'resp_ip_bytes_sum_60',
'local_orig_nunique_60',
'local_orig_entropy_60',
'local_resp_nunique_60',
'local_resp_entropy_60',
'duration_mean_3600',
'duration_min_3600',
'duration_max_3600',
'duration_std_3600',
'duration_var_3600',
'duration_cnt_3600',
'duration_sum_3600',
'missed_bytes_mean_3600',
'missed_bytes_min_3600',
'missed_bytes_max_3600',
'missed_bytes_std_3600',
'missed_bytes_var_3600',
'missed_bytes_cnt_3600',
'missed_bytes_sum_3600',
'orig_pkts_mean_3600',
'orig_pkts_min_3600',
'orig_pkts_max_3600',
'orig_pkts_std_3600',
'orig_pkts_var_3600',
'orig_pkts_cnt_3600',
'orig_pkts_sum_3600',
'orig_ip_bytes_mean_3600',
'orig_ip_bytes_min_3600',
'orig_ip_bytes_max_3600',
'orig_ip_bytes_std_3600',
'orig_ip_bytes_var_3600',
'orig_ip_bytes_cnt_3600',
'orig_ip_bytes_sum_3600',
'resp_pkts_mean_3600',
'resp_pkts_min_3600',
'resp_pkts_max_3600',
'resp_pkts_std_3600',
'resp_pkts_var_3600',
'resp_pkts_cnt_3600',
'resp_pkts_sum_3600',
'resp_ip_bytes_mean_3600',
'resp_ip_bytes_min_3600',
'resp_ip_bytes_max_3600',
'resp_ip_bytes_std_3600',
'resp_ip_bytes_var_3600',
'resp_ip_bytes_cnt_3600',
'resp_ip_bytes_sum_3600',
'local_orig_nunique_3600',
'local_orig_entropy_3600',
'local_resp_nunique_3600',
'local_resp_entropy_3600',
'duration_mean_7200',
'duration_min_7200',
'duration_max_7200',
'duration_std_7200',
'duration_var_7200',
'duration_cnt_7200',
'duration_sum_7200',
'missed_bytes_mean_7200',
'missed_bytes_min_7200',
'missed_bytes_max_7200',
'missed_bytes_std_7200',
'missed_bytes_var_7200',
'missed_bytes_cnt_7200',
'missed_bytes_sum_7200',
'orig_pkts_mean_7200',
'orig_pkts_min_7200',
'orig_pkts_max_7200',
'orig_pkts_std_7200',
'orig_pkts_var_7200',
'orig_pkts_cnt_7200',
'orig_pkts_sum_7200',
'orig_ip_bytes_mean_7200',
'orig_ip_bytes_min_7200',
'orig_ip_bytes_max_7200',
'orig_ip_bytes_std_7200',
'orig_ip_bytes_var_7200',
'orig_ip_bytes_cnt_7200',
'orig_ip_bytes_sum_7200',
'resp_pkts_mean_7200',
'resp_pkts_min_7200',
'resp_pkts_max_7200',
'resp_pkts_std_7200',
'resp_pkts_var_7200',
'resp_pkts_cnt_7200',
'resp_pkts_sum_7200',
'resp_ip_bytes_mean_7200',
'resp_ip_bytes_min_7200',
'resp_ip_bytes_max_7200',
'resp_ip_bytes_std_7200',
'resp_ip_bytes_var_7200',
'resp_ip_bytes_cnt_7200',
'resp_ip_bytes_sum_7200',
'local_orig_nunique_7200',
'local_orig_entropy_7200',
'local_resp_nunique_7200',
'local_resp_entropy_7200']
122 changes: 115 additions & 7 deletions NIDS/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import json
import logging
import ipaddress
from scipy.stats import entropy

# TODO: is there a better way to handle multi-file logging aside from spamming these everywhere?
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s)')
Expand Down Expand Up @@ -48,8 +49,6 @@ def preprocess_json_conn(json_batch):

Note: the input is only one unzipped json file.
"""
# features = ['id.orig_h', "id.resp_h", "proto", "conn_state", "missed_bytes",
# "orig_pkts", "orig_ip_bytes", "resp_pkts", "resp_ip_bytes"]
features = ["id.orig_h", "id.resp_h", "proto", "service", "duration", "conn_state",
"local_orig","local_resp","missed_bytes","history",
"orig_pkts", "orig_ip_bytes", "resp_pkts", "resp_ip_bytes"]
Expand All @@ -67,8 +66,7 @@ def preprocess_json_conn(json_batch):
new_df = pd.DataFrame(data_list, columns=features)
#Fill NaNs with 0s : duration, orig_bytes resp_bytes, if there are no columns, create one and fill with 0s
new_df = fill_na(new_df)
# # Drop unnecessary columns
# new_df = drop_columns(new_df, ['ts','uid','local_orig', 'local_resp'])

# create history, broadcast, traffic_direction variables
new_df = create_history_variable(new_df)
new_df = create_broadcast_variable(new_df)
Expand All @@ -88,12 +86,68 @@ def preprocess_json_conn(json_batch):
'service_other', 'service_ssh','service_ssl',
'traffic_direction_external','traffic_direction_incoming',
'traffic_direction_internal','traffic_direction_outgoing',
"local_orig","local_resp","missed_bytes","orig_pkts","orig_ip_bytes","resp_pkts","resp_ip_bytes"]
"duration","local_orig","local_resp","missed_bytes","orig_pkts","orig_ip_bytes","resp_pkts","resp_ip_bytes"]
new_df = makedf_samecol(cols, new_df)
# Convert DataFrame to NumPy array
np_arr = new_df.to_numpy(dtype=np.float32)
return np_arr

from columns import Aggr_conn
def preprocess_json_conn_agg(json_batch):
"""
This function receives a json batch from the main control flow of the train
functions. It should convert the conn.log of the json_batch to a numpy 2D array, apply necessary transformations,
then return it.

Note: the input is only one unzipped json file.
"""
features = ["ts","uid", "id.orig_h", "id.orig_p", "id.resp_h", "id.resp_p",
"proto", "service", "duration", "conn_state", "local_orig","local_resp",
"missed_bytes","history", "orig_pkts", "orig_ip_bytes", "resp_pkts", "resp_ip_bytes"]
#TODO: add features: duration, local_orig, local_resp
data_list = []
for line in json_batch.splitlines():
# log_entry is now a single json log from the file
log_entry = json.loads(line.strip())
# data_list.append([log_entry[feature] for feature in features])
# Check if each feature is present in the log_entry
feature_values = [log_entry.get(feature, None) for feature in features]
data_list.append(feature_values)

#TODO: optimize the code via removing pandas
df = pd.DataFrame(data_list, columns=features)

#fill Nans with 0s : duration, orig_bytes resp_bytes
df = fill_na(df)
# create history, broadcast, traffic_direction variables
df = create_history_variable(df)
df = create_broadcast_variable(df)
df = create_direction_variable(df)

# one hot encode categorical variables
column_name = ['conn_state', "proto", "traffic_direction" , "service"]
df = one_hot_encode(df, column_name)

# Convert the boolean values in columns "local_orig" and "local_resp" to 1 and 0s
df['local_orig'] = df['local_orig'].astype(int)
df['local_resp'] = df['local_resp'].astype(int)

#Compute Aggregated Features
windows = [60,3600,7200] #seconds
grp = ['id.orig_h', 'id.orig_p', 'id.resp_h', 'id.resp_p']
aggr_feature_num = ['duration', 'missed_bytes', 'orig_pkts', 'orig_ip_bytes', 'resp_pkts', 'resp_ip_bytes']
aggr_feature_cat = ['local_orig', 'local_resp']
for window in windows:
for feature in aggr_feature_num:
df = calculate_agg_feature_num(df, feature, window)
for feature in aggr_feature_cat:
df = calculate_agg_feature_cat(df, feature, window)
cols = Aggr_conn
# make sure the columns are the same
df = makedf_samecol(cols, df)
# Convert DataFrame to NumPy array
np_arr = df.to_numpy(dtype=np.float32)
return np_arr

def preprocess_json_dns(json_batch):
"""
Expand Down Expand Up @@ -215,8 +269,8 @@ def preprocess_json_ssh(json_batch):

Note: the input is only one unzipped json file.
"""
features = ['id.orig_h', 'id.resp_h','trans_depth','method','host','version',
'request_body_len','response_body_len','status_code']
features = ['id.orig_h', 'id.resp_h','version','auth_success','auth_attempts',
'direction','version','traffic_direction']

data_list = []
for line in json_batch.splitlines():
Expand Down Expand Up @@ -523,6 +577,60 @@ def get_raw_conn(json_data_file):

return df

def calculate_agg_feature_num(df, agg_feature, window_size):
"""
This function adds a new column "{agg_feature}_{either mean, min, max, std, or var}" to the DataFrame.
This column contains the aggregated features (mean/min/max/std/var/count/sum) of network flows within the past {window_size} seconds
for each group with the same ['id.orig_h', 'id.orig_p', 'id.resp_h', 'id.resp_p'].

Args:
df: The pandas DataFrame containing network flow data.
window_size: Size of the window for calculating the average (default: 5000 seconds).

Returns:
A new DataFrame with the added aggregated feautre columns.
"""
# Convert timestamp to datetime
# df['ts'] = datetime.fromtimestamp(df['ts']) #assumes timestamps are in the local machine's timezone. not suggested
df['ts'] = pd.to_datetime(df['ts'], unit='s')
df = df.set_index('ts')
# Calculate the aggregated feature for each group
# to avoid NaN values, calculate the population standard deviation, specified with std(ddof=0)
grp = ['id.orig_h', 'id.orig_p', 'id.resp_h', 'id.resp_p']
df[f'{agg_feature}_mean_{window_size}'] = df.groupby(grp)[f'{agg_feature}'].transform(lambda x: x.rolling(f'{window_size}s', min_periods=1).mean())
df[f'{agg_feature}_min_{window_size}'] = df.groupby(grp)[f'{agg_feature}'].transform(lambda x: x.rolling(f'{window_size}s', min_periods=1).min())
df[f'{agg_feature}_max_{window_size}'] = df.groupby(grp)[f'{agg_feature}'].transform(lambda x: x.rolling(f'{window_size}s', min_periods=1).max())
df[f'{agg_feature}_std_{window_size}'] = df.groupby(grp)[f'{agg_feature}'].transform(lambda x: x.rolling(f'{window_size}s', min_periods=1).std(ddof=0))
df[f'{agg_feature}_var_{window_size}'] = df.groupby(grp)[f'{agg_feature}'].transform(lambda x: x.rolling(f'{window_size}s', min_periods=1).var(ddof=0))
df[f'{agg_feature}_cnt_{window_size}'] = df.groupby(grp)[f'{agg_feature}'].transform(lambda x: x.rolling(f'{window_size}s', min_periods=1).count())
df[f'{agg_feature}_sum_{window_size}'] = df.groupby(grp)[f'{agg_feature}'].transform(lambda x: x.rolling(f'{window_size}s', min_periods=1).sum())

return df.reset_index()

#For feature such as local_orig , port,... numerical but can be treated as categorical
def calculate_agg_feature_cat(df, agg_feature, window_size):
"""
This function adds a new column "{agg_feature}_{either nunique or entropy}" to the DataFrame.
This column contains the aggregated features (nunique/entropy) of network flows within the past {window_size} seconds
for each group with the same ['id.orig_h', 'id.orig_p', 'id.resp_h', 'id.resp_p'].

Args:
df: The pandas DataFrame containing network flow data.
window_size: Size of the window for calculating the average (default: 5000 seconds).

Returns:
A new DataFrame with the added aggregated feautre columns.
"""
# Convert timestamp to datetime
# df['ts'] = datetime.fromtimestamp(df['ts']) #assumes timestamps are in the local machine's timezone. not suggested
df['ts'] = pd.to_datetime(df['ts'], unit='s')
df = df.set_index('ts')
grp = ['id.orig_h', 'id.orig_p', 'id.resp_h', 'id.resp_p']
df[f'{agg_feature}_nunique_{window_size}'] = df.groupby(grp)[f'{agg_feature}'].transform(lambda x: x.rolling(f'{window_size}s', min_periods=1).apply(lambda x: x.unique().shape[0]))
df[f'{agg_feature}_entropy_{window_size}'] = df.groupby(grp)[f'{agg_feature}'].transform(lambda x: x.rolling(f'{window_size}s', min_periods=1).apply(lambda x: entropy(x.value_counts())))
return df.reset_index()


#------------------Online Normalization------------------#
#TODO: def online_normalization(new_df):
# can be skipped for now, since kitnet has its own normalization.
Expand Down
Loading