Skip to content

Commit 68de1d7

Browse files
authored
Merge pull request #59 from stefanDeveloper/add-grafana
Add monitoring solution using Grafana
2 parents 1940225 + 8d643ae commit 68de1d7

File tree

76 files changed

+11602
-859
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+11602
-859
lines changed

.github/workflows/build_publish_docker.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ jobs:
2525
"inspector",
2626
"logcollector",
2727
"logserver",
28-
"prefilter"
28+
"prefilter",
29+
"monitoring"
2930
]
3031
permissions:
3132
contents: read

config.yaml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ pipeline:
3232
- [ "response_ip", IpAddress ]
3333
- [ "size", RegEx, '^\d+b$' ]
3434
batch_handler:
35-
batch_size: 1000
36-
batch_timeout: 20.0
35+
batch_size: 10000
36+
batch_timeout: 30.0
3737
subnet_id:
3838
ipv4_prefix_length: 24
3939
ipv6_prefix_length: 64
@@ -65,25 +65,25 @@ pipeline:
6565

6666
monitoring:
6767
clickhouse_connector:
68-
batch_size: 10000
68+
batch_size: 50 # do not set higher
6969
batch_timeout: 2.0
7070

7171
environment:
7272
timestamp_format: "%Y-%m-%dT%H:%M:%S.%fZ"
7373
kafka_brokers:
74-
- hostname: 172.27.0.3
74+
- hostname: kafka1
7575
port: 8097
76-
- hostname: 172.27.0.4
76+
- hostname: kafka2
7777
port: 8098
78-
- hostname: 172.27.0.5
78+
- hostname: kafka3
7979
port: 8099
8080
kafka_topics:
8181
pipeline:
82-
logserver_in: "pipeline.logserver_in"
83-
logserver_to_collector: "pipeline.logserver_to_collector"
84-
batch_sender_to_prefilter: "pipeline.batch_sender_to_prefilter"
85-
prefilter_to_inspector: "pipeline.prefilter_to_inspector"
86-
inspector_to_detector: "pipeline.inspector_to_detector"
82+
logserver_in: "pipeline-logserver_in"
83+
logserver_to_collector: "pipeline-logserver_to_collector"
84+
batch_sender_to_prefilter: "pipeline-batch_sender_to_prefilter"
85+
prefilter_to_inspector: "pipeline-prefilter_to_inspector"
86+
inspector_to_detector: "pipeline-inspector_to_detector"
8787
monitoring:
8888
clickhouse_server:
89-
hostname: 172.27.0.11
89+
hostname: clickhouse-server

docker/.env

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
MOUNT_PATH=./default.txt
1+
MOUNT_PATH=../../default.txt
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
FROM python:3.11-slim-bookworm
2+
3+
ENV PYTHONDONTWRITEBYTECODE=1
4+
5+
WORKDIR /usr/src/app
6+
7+
RUN pip --disable-pip-version-check install --no-cache-dir --no-compile marshmallow_dataclass colorlog pyYAML confluent_kafka numpy polars scikit-learn torch
8+
9+
COPY src/base ./src/base
10+
COPY src/train ./src/train
11+
COPY config.yaml .
12+
COPY docker/benchmark_tests .
13+
COPY data ./data
14+
15+
RUN rm -rf /root/.cache
16+
17+
CMD [ "python", "run_test.py"]
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
services:
2+
benchmark_test_run:
3+
build:
4+
context: ../..
5+
dockerfile: docker/benchmark_tests/Dockerfile.run_test
6+
network: host
7+
networks:
8+
docker_heidgaf:
9+
deploy:
10+
resources:
11+
limits:
12+
cpus: '2'
13+
memory: 512m
14+
reservations:
15+
cpus: '1'
16+
memory: 256m
17+
18+
networks:
19+
docker_heidgaf:
20+
external: true

docker/benchmark_tests/run_test.py

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
import datetime
2+
import ipaddress
3+
import os
4+
import random
5+
import sys
6+
import time
7+
8+
import polars as pl
9+
from confluent_kafka import KafkaError
10+
11+
sys.path.append(os.getcwd())
12+
from src.base.kafka_handler import SimpleKafkaProduceHandler
13+
from src.train.dataset import Dataset, DatasetLoader
14+
from src.base.log_config import get_logger
15+
from src.base.utils import setup_config
16+
17+
logger = get_logger()
18+
config = setup_config()
19+
20+
PRODUCE_TO_TOPIC = config["environment"]["kafka_topics"]["pipeline"]["logserver_in"]
21+
22+
23+
class DatasetGenerator:
24+
"""Generates log lines and datasets."""
25+
26+
def __init__(self, data_base_path: str = "./data"):
27+
datasets = DatasetLoader(base_path=data_base_path, max_rows=10000)
28+
29+
dataset = Dataset(
30+
data_path="",
31+
data=pl.concat(
32+
[
33+
datasets.dgta_dataset.data,
34+
# datasets.cic_dataset.data,
35+
# datasets.bambenek_dataset.data,
36+
# datasets.dga_dataset.data,
37+
# datasets.dgarchive_dataset.data,
38+
]
39+
),
40+
max_rows=1000,
41+
)
42+
43+
self.domains = dataset.data
44+
45+
def generate_random_logline(
46+
self, statuses: list[str] = None, record_types: list[str] = None
47+
):
48+
"""Generates a (mostly) random logline."""
49+
if record_types is None:
50+
record_types = 6 * ["AAAA"] + 10 * ["A"] + ["PR", "CNAME"]
51+
52+
if statuses is None:
53+
statuses = ["NOERROR", "NXDOMAIN"]
54+
55+
# choose timestamp
56+
timestamp = (
57+
datetime.datetime.now() + datetime.timedelta(0, 0, random.randint(0, 900))
58+
).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
59+
60+
# choose status code
61+
status = random.choice(statuses)
62+
63+
# choose client IP address
64+
number_of_subnets = 50
65+
client_ip = (
66+
f"192.168.{random.randint(0, number_of_subnets)}.{random.randint(1, 255)}"
67+
)
68+
69+
# choose server IP address
70+
server_ip = f"10.10.0.{random.randint(1, 100)}"
71+
72+
# choose random domain (can be malicious or benign)
73+
domain = self.get_random_domain()
74+
75+
# choose random record type
76+
record_type = random.choice(record_types)
77+
78+
# choose random response IP address
79+
def _get_random_ipv4():
80+
max_ipv4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1
81+
return ipaddress.IPv4Address._string_from_ip_int(
82+
random.randint(0, max_ipv4)
83+
)
84+
85+
def _get_random_ipv6():
86+
max_ipv6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1
87+
return ipaddress.IPv6Address._string_from_ip_int(
88+
random.randint(0, max_ipv6)
89+
)
90+
91+
ip_address_choices = [_get_random_ipv4(), _get_random_ipv6()]
92+
response_ip_address = random.choice(ip_address_choices)
93+
94+
# choose random size
95+
size = f"{random.randint(50, 255)}b"
96+
97+
return f"{timestamp} {status} {client_ip} {server_ip} {domain} {record_type} {response_ip_address} {size}"
98+
99+
def get_random_domain(self) -> str:
100+
random_domain = self.domains.sample(n=1)
101+
return random_domain["query"].item()
102+
103+
def generate_dataset(self, number_of_elements: int) -> list[str]:
104+
dataset = []
105+
106+
for _ in range(number_of_elements):
107+
logline = self.generate_random_logline()
108+
dataset.append(logline)
109+
110+
return dataset
111+
112+
113+
class ScalabilityTest:
114+
"""Base class for tests that focus on the scalability of the software."""
115+
116+
def __init__(self):
117+
self.dataset_generator = DatasetGenerator()
118+
self.kafka_producer = SimpleKafkaProduceHandler()
119+
120+
self.interval_lengths = None
121+
self.msg_per_sec_in_intervals = None
122+
123+
def execute(self):
124+
"""Executes the test with the configured parameters."""
125+
logger.warning(f"Start at: {datetime.datetime.now()}")
126+
127+
cur_index = 0
128+
for i in range(len(self.msg_per_sec_in_intervals)):
129+
cur_index = self._execute_one_interval(
130+
cur_index=cur_index,
131+
msg_per_sec=self.msg_per_sec_in_intervals[i],
132+
length_in_sec=self.interval_lengths[i],
133+
)
134+
135+
logger.warning(f"Stop at: {datetime.datetime.now()}")
136+
137+
def _execute_one_interval(
138+
self, cur_index: int, msg_per_sec: float | int, length_in_sec: float | int
139+
) -> int:
140+
start_of_interval_timestamp = datetime.datetime.now()
141+
logger.warning(
142+
f"Start interval with {msg_per_sec} msg/s at {start_of_interval_timestamp}"
143+
)
144+
145+
while (
146+
datetime.datetime.now() - start_of_interval_timestamp
147+
< datetime.timedelta(seconds=length_in_sec)
148+
):
149+
try:
150+
self.kafka_producer.produce(
151+
PRODUCE_TO_TOPIC,
152+
self.dataset_generator.generate_random_logline(),
153+
)
154+
logger.info(
155+
f"Sent message {cur_index + 1} at: {datetime.datetime.now()}"
156+
)
157+
cur_index += 1
158+
except KafkaError:
159+
logger.warning(KafkaError)
160+
time.sleep(1.0 / msg_per_sec)
161+
162+
logger.warning(f"Finish interval with {msg_per_sec} msg/s")
163+
return cur_index
164+
165+
166+
class RampUpTest(ScalabilityTest):
167+
"""Starts with a low rate and increases the rate in fixed intervals."""
168+
169+
def __init__(
170+
self,
171+
msg_per_sec_in_intervals: list[float | int],
172+
interval_length_in_sec: int | float | list[int | float],
173+
):
174+
super().__init__()
175+
self.msg_per_sec_in_intervals = msg_per_sec_in_intervals
176+
177+
if type(interval_length_in_sec) is list:
178+
self.interval_lengths = interval_length_in_sec
179+
else:
180+
self.interval_lengths = [
181+
interval_length_in_sec for _ in range(len(msg_per_sec_in_intervals))
182+
]
183+
184+
if len(interval_length_in_sec) != len(msg_per_sec_in_intervals):
185+
raise Exception("Different lengths of interval lists. Must be equal.")
186+
187+
188+
class BurstTest(ScalabilityTest):
189+
"""Starts with a normal rate, sends a high rate for a short period, then returns to normal rate. Repeats the
190+
process for a defined number of times."""
191+
192+
def __init__(
193+
self,
194+
normal_rate_msg_per_sec: float | int,
195+
burst_rate_msg_per_sec: float | int,
196+
normal_rate_interval_length: float | int,
197+
burst_rate_interval_length: float | int,
198+
number_of_intervals: int = 1,
199+
):
200+
super().__init__()
201+
202+
self.msg_per_sec_in_intervals = [normal_rate_msg_per_sec]
203+
self.interval_lengths = [normal_rate_interval_length]
204+
205+
for _ in range(number_of_intervals):
206+
self.msg_per_sec_in_intervals.append(burst_rate_msg_per_sec)
207+
self.msg_per_sec_in_intervals.append(normal_rate_msg_per_sec)
208+
209+
self.interval_lengths.append(burst_rate_interval_length)
210+
self.interval_lengths.append(normal_rate_interval_length)
211+
212+
213+
class LongTermTest(ScalabilityTest):
214+
"""Starts with a low rate and increases the rate in fixed intervals."""
215+
216+
def __init__(self, full_length: float | int, msg_per_sec: float | int):
217+
super().__init__()
218+
219+
self.msg_per_sec_in_intervals = [msg_per_sec]
220+
self.interval_lengths = [full_length]
221+
222+
223+
def main():
224+
"""Creates the test instance and executes the test."""
225+
# ramp_up_test = RampUpTest(
226+
# msg_per_sec_in_intervals=[1, 10, 50, 100, 150],
227+
# interval_length_in_sec=[10, 5, 4, 4, 2],
228+
# )
229+
# ramp_up_test.execute()
230+
231+
burst_test = BurstTest(
232+
normal_rate_msg_per_sec=20,
233+
burst_rate_msg_per_sec=10000,
234+
normal_rate_interval_length=10,
235+
burst_rate_interval_length=2,
236+
number_of_intervals=3,
237+
)
238+
burst_test.execute()
239+
240+
# long_term_test = LongTermTest(
241+
# full_length=10.4,
242+
# msg_per_sec=15,
243+
# )
244+
# long_term_test.execute()
245+
246+
247+
if __name__ == "__main__":
248+
main()
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
CREATE TABLE IF NOT EXISTS dgta_dataset (
2+
query String,
3+
class Int32,
4+
labels Array(String),
5+
tld String,
6+
fqdn String,
7+
secondleveldomain String,
8+
thirdleveldomain String
9+
)
10+
ENGINE = MergeTree
11+
PRIMARY KEY(query);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ CREATE TABLE IF NOT EXISTS alerts (
33
alert_timestamp DateTime64(6) NOT NULL,
44
suspicious_batch_id UUID NOT NULL,
55
overall_score Float32 NOT NULL,
6+
domain_names String NOT NULL,
67
result String,
78
)
89
ENGINE = MergeTree
File renamed without changes.

src/monitoring/create_tables/dns_loglines.sql renamed to docker/create_tables/dns_loglines.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ CREATE TABLE IF NOT EXISTS dns_loglines (
55
status_code String NOT NULL,
66
client_ip String NOT NULL,
77
record_type String NOT NULL,
8-
additional_fields Nullable(String)
8+
additional_fields String
99
)
1010
ENGINE = MergeTree
1111
PRIMARY KEY (logline_id);

0 commit comments

Comments
 (0)