Skip to content

Commit ae0e91b

Browse files
committed
zeek.py: log the time each func takes inside process_line and the db calls in add_flow_to_profile
1 parent c81f5f1 commit ae0e91b

File tree

2 files changed

+65
-9
lines changed

2 files changed

+65
-9
lines changed

slips_files/core/input_profilers/zeek.py

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# SPDX-FileCopyrightText: 2021 Sebastian Garcia <[email protected]>
22
# SPDX-License-Identifier: GPL-2.0-only
3+
import csv
4+
import os
5+
import time
36
from re import split
47
from typing import Dict
58
from slips_files.common.abstracts.iinput_type import IInputType
@@ -148,6 +151,32 @@ def fill_empty_class_fields(self, flow_values: dict, slips_class):
148151
class ZeekJSON(IInputType, Zeek):
149152
def __init__(self):
150153
self.line_processor_cache = {}
154+
self.times = {}
155+
self.init_csv()
156+
157+
def init_csv(self):
158+
path = os.path.join("/tmp/", "zeek_json_times_each_func_took.csv")
159+
with open(path, "w", newline="") as f:
160+
writer = csv.writer(f)
161+
writer.writerow(
162+
[
163+
"get_file_type",
164+
"removing_empty_vals",
165+
"setting_conn_vals_to_0",
166+
"fill_empty_class_fields",
167+
"calling_slips_class",
168+
]
169+
)
170+
171+
def log_time(self, what, time):
172+
self.times[what] = f"{time:.2f}"
173+
last_metric = "calling_slips_class"
174+
if what == last_metric:
175+
path = os.path.join("/tmp/", "zeek_json_times_each_func_took.csv")
176+
with open(path, "a", newline="") as f:
177+
writer = csv.DictWriter(f, fieldnames=self.times.keys())
178+
writer.writerow(self.times)
179+
self.times = {}
151180

152181
def process_line(self, new_line: dict):
153182
line = new_line["data"]
@@ -156,7 +185,11 @@ def process_line(self, new_line: dict):
156185
if not isinstance(line, dict):
157186
return False
158187

188+
n = time.time()
159189
file_type = self.get_file_type(new_line)
190+
latency = time.time() - n
191+
self.log_time("get_file_type", latency)
192+
160193
line_map = LOG_MAP.get(file_type)
161194
if not line_map:
162195
return False
@@ -168,6 +201,7 @@ def process_line(self, new_line: dict):
168201

169202
flow_values = {"starttime": starttime, "interface": interface}
170203

204+
n = time.time()
171205
for zeek_field, slips_field in line_map.items():
172206
if not slips_field:
173207
continue
@@ -176,25 +210,37 @@ def process_line(self, new_line: dict):
176210
val = ""
177211
flow_values[slips_field] = val
178212

213+
latency = time.time() - n
214+
self.log_time("removing_empty_vals", latency)
215+
179216
if file_type in LINE_TYPE_TO_SLIPS_CLASS:
217+
n = time.time()
180218
slips_class = LINE_TYPE_TO_SLIPS_CLASS[file_type]
181-
182219
if file_type == "conn.log":
183220
flow_values["dur"] = float(flow_values.get("dur", 0) or 0)
184-
for fld in (
221+
for field in (
185222
"sbytes",
186223
"dbytes",
187224
"spkts",
188225
"dpkts",
189226
"sport",
190227
"dport",
191228
):
192-
flow_values[fld] = int(flow_values.get(fld, 0) or 0)
193-
229+
flow_values[field] = int(flow_values.get(field, 0) or 0)
230+
latency = time.time() - n
231+
self.log_time("setting_conn_vals_to_0", latency)
232+
n = time.time()
194233
flow_values = self.fill_empty_class_fields(
195234
flow_values, slips_class
196235
)
236+
latency = time.time() - n
237+
self.log_time("fill_empty_class_fields", latency)
238+
239+
n = time.time()
197240
self.flow = slips_class(**flow_values)
241+
latency = time.time() - n
242+
self.log_time("calling_slips_class", latency)
243+
198244
return self.flow
199245

200246
print(f"[Profiler] Invalid file_type: {file_type}, line: {line}")

slips_files/core/profiler_worker.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -531,22 +531,30 @@ def add_flow_to_profile(self, flow):
531531
time_it_Took = time.time() - n
532532
self.log_time("is_whitelisted_flow", time_it_Took)
533533

534-
n = time.time()
535-
536534
# 5th. Store the data according to the paremeters
537535
# Now that we have the profileid and twid, add the data from the flow
538536
# in this tw for this profile
539537
self.print(f"Storing data in the profile: {profileid}", 3, 0)
538+
539+
n = time.time()
540540
flow.starttime = self.convert_starttime_to_epoch(flow.starttime)
541+
time_it_Took = time.time() - n
542+
543+
self.log_time("convert_starttime_to_epoch", time_it_Took)
541544
# For this 'forward' profile, find the id in the
542-
# database of the tw where the flow belongs.
545+
# database of the tw where the flow belongs. n = time.time()
546+
n = time.time()
543547
twid = self.db.get_timewindow(flow.starttime, profileid)
548+
time_it_Took = time.time() - n
549+
self.log_time("get_timewindow", time_it_Took)
550+
544551
flow_parser.twid = twid
545552

553+
n = time.time()
546554
# Create profiles for all ips we see
547555
self.db.add_profile(profileid, flow.starttime)
548556
time_it_Took = time.time() - n
549-
self.log_time("db_calls", time_it_Took)
557+
self.log_time("add_profile", time_it_Took)
550558

551559
n = time.time()
552560
self.store_features_going_out(flow, flow_parser)
@@ -570,7 +578,9 @@ def init_csv(self):
570578
[
571579
"get_gateway_info",
572580
"is_whitelisted_flow",
573-
"db_calls",
581+
"convert_starttime_to_epoch",
582+
"get_timewindow",
583+
"add_profile",
574584
"store_features_going_out",
575585
"process_line",
576586
"add_flow_to_profile",

0 commit comments

Comments
 (0)