Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pipeline:
log_collection:
collector:
logline_format:
- [ "timestamp", RegEx, '^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$' ]
- [ "timestamp", Timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ]
- [ "status_code", ListItem, [ "NOERROR", "NXDOMAIN" ], [ "NXDOMAIN" ] ]
- [ "client_ip", IpAddress ]
- [ "dns_server_ip", IpAddress ]
Expand Down Expand Up @@ -69,7 +69,6 @@ pipeline:
batch_timeout: 2.0

environment:
timestamp_format: "%Y-%m-%dT%H:%M:%S.%fZ"
kafka_brokers:
- hostname: kafka1
port: 8097
Expand Down
8 changes: 2 additions & 6 deletions src/base/data_classes/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@ class Batch:
metadata={"marshmallow_field": marshmallow.fields.UUID()}
)
begin_timestamp: datetime.datetime = field(
metadata={
"marshmallow_field": marshmallow.fields.DateTime("%Y-%m-%dT%H:%M:%S.%fZ")
}
metadata={"marshmallow_field": marshmallow.fields.DateTime()} # uses ISO format
)
end_timestamp: datetime.datetime = field(
metadata={
"marshmallow_field": marshmallow.fields.DateTime("%Y-%m-%dT%H:%M:%S.%fZ")
}
metadata={"marshmallow_field": marshmallow.fields.DateTime()} # uses ISO format
)
data: List = field(default_factory=list)
52 changes: 51 additions & 1 deletion src/base/logline_handler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import re

from src.base.log_config import get_logger
Expand Down Expand Up @@ -63,6 +64,45 @@
return True if re.match(self.pattern, value) else False


class Timestamp(FieldType):
"""
A :cls:`Timestamp` object takes a name, and a timestamp format, which a value needs to have.
"""

def __init__(self, name: str, timestamp_format: str):
super().__init__(name)
self.timestamp_format = timestamp_format

def validate(self, value) -> bool:
"""
Validates the input value.

Args:
value: The value to be validated

Returns:
True if the value is valid, False otherwise
"""
try:
datetime.datetime.strptime(value, self.timestamp_format)
except ValueError:
return False

Check warning on line 89 in src/base/logline_handler.py

View check run for this annotation

Codecov / codecov/patch

src/base/logline_handler.py#L88-L89

Added lines #L88 - L89 were not covered by tests

return True

def get_timestamp_as_str(self, value) -> str:
"""
Returns the timestamp as string for a given timestamp with valid format.

Args:
value: Correctly formatted timestamp according to self.timestamp_format

Returns:
String of the given timestamp with standard format
"""
return str(datetime.datetime.strptime(value, self.timestamp_format).isoformat())


class IpAddress(FieldType):
"""
An :cls:`IpAddress` object takes only a name. It is used for IP addresses, and checks in the :meth:`validate` method
Expand Down Expand Up @@ -227,7 +267,12 @@
return_dict = {}

for i in range(self.number_of_fields):
return_dict[self.instances_by_position[i].name] = parts[i]
if not isinstance(self.instances_by_position[i], Timestamp):
return_dict[self.instances_by_position[i].name] = parts[i]
else:
return_dict[self.instances_by_position[i].name] = (
self.instances_by_position[i].get_timestamp_as_str(parts[i])
)

return return_dict.copy()

Expand Down Expand Up @@ -297,6 +342,11 @@
raise ValueError("Invalid RegEx parameters")
instance = cls(name=name, pattern=field_list[2])

elif cls_name == "Timestamp":
if len_of_field_list != 3 or not isinstance(field_list[2], str):
raise ValueError("Invalid Timestamp parameters")

Check warning on line 347 in src/base/logline_handler.py

View check run for this annotation

Codecov / codecov/patch

src/base/logline_handler.py#L347

Added line #L347 was not covered by tests
instance = cls(name=name, timestamp_format=field_list[2])

elif cls_name == "ListItem":
if len_of_field_list not in [3, 4] or not isinstance(field_list[2], list):
raise ValueError("Invalid ListItem parameters")
Expand Down
5 changes: 2 additions & 3 deletions src/inspector/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
SCORE_THRESHOLD = config["pipeline"]["data_inspection"]["inspector"]["score_threshold"]
TIME_TYPE = config["pipeline"]["data_inspection"]["inspector"]["time_type"]
TIME_RANGE = config["pipeline"]["data_inspection"]["inspector"]["time_range"]
TIMESTAMP_FORMAT = config["environment"]["timestamp_format"]
CONSUME_TOPIC = config["environment"]["kafka_topics"]["pipeline"][
"prefilter_to_inspector"
]
Expand Down Expand Up @@ -187,7 +186,7 @@ def _mean_packet_size(self, messages: list, begin_timestamp, end_timestamp):
logger.debug("Convert timestamps to numpy datetime64")
timestamps = np.array(
[
np.datetime64(datetime.strptime(item["timestamp"], TIMESTAMP_FORMAT))
np.datetime64(datetime.fromisoformat(item["timestamp"]))
for item in messages
]
)
Expand Down Expand Up @@ -266,7 +265,7 @@ def _count_errors(self, messages: list, begin_timestamp, end_timestamp):
logger.debug("Convert timestamps to numpy datetime64")
timestamps = np.array(
[
np.datetime64(datetime.strptime(item["timestamp"], TIMESTAMP_FORMAT))
np.datetime64(datetime.fromisoformat(item["timestamp"]))
for item in messages
]
)
Expand Down
20 changes: 6 additions & 14 deletions src/logcollector/batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
config = setup_config()
BATCH_SIZE = config["pipeline"]["log_collection"]["batch_handler"]["batch_size"]
BATCH_TIMEOUT = config["pipeline"]["log_collection"]["batch_handler"]["batch_timeout"]
TIMESTAMP_FORMAT = config["environment"]["timestamp_format"]
PRODUCE_TOPIC = config["environment"]["kafka_topics"]["pipeline"][
"batch_sender_to_prefilter"
]
Expand Down Expand Up @@ -217,13 +216,9 @@ def _get_last_timestamp_of_batch() -> str | None:

data = {
"batch_id": batch_id,
"begin_timestamp": datetime.datetime.strptime(
begin_timestamp,
"%Y-%m-%dT%H:%M:%S.%fZ",
),
"end_timestamp": datetime.datetime.strptime(
_get_last_timestamp_of_batch(),
"%Y-%m-%dT%H:%M:%S.%fZ",
"begin_timestamp": datetime.datetime.fromisoformat(begin_timestamp),
"end_timestamp": datetime.datetime.fromisoformat(
_get_last_timestamp_of_batch()
),
"data": buffer_data + self.batch[key],
}
Expand Down Expand Up @@ -322,11 +317,8 @@ def _extract_tuples_from_json_formatted_strings(
@staticmethod
def _sort_by_timestamp(
data: list[tuple[str, str]],
timestamp_format: str = TIMESTAMP_FORMAT,
) -> list[str]:
sorted_data = sorted(
data, key=lambda x: datetime.datetime.strptime(x[0], timestamp_format)
)
sorted_data = sorted(data, key=lambda x: x[0])
loglines = [message for _, message in sorted_data]

return loglines
Expand Down Expand Up @@ -410,7 +402,7 @@ def _send_all_batches(self, reset_timer: bool = True) -> None:
Dispatch all batches for the Kafka queue

Args:
reset_timer (bool): whether or not the timer should be reset
reset_timer (bool): whether the timer should be reset
"""
number_of_keys = 0
total_number_of_batch_messages = self.batch.get_message_count_for_batch()
Expand Down Expand Up @@ -460,7 +452,7 @@ def _send_batch_for_key(self, key: str) -> None:

def _send_data_packet(self, key: str, data: dict) -> None:
"""
Sends a packet of a batch to the defined Kafka topic
Sends a packet of a Batch to the defined Kafka topic

Args:
key (str): key to identify the batch
Expand Down
5 changes: 1 addition & 4 deletions src/logcollector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
IPV6_PREFIX_LENGTH = config["pipeline"]["log_collection"]["batch_handler"]["subnet_id"][
"ipv6_prefix_length"
]
TIMESTAMP_FORMAT = config["environment"]["timestamp_format"]
REQUIRED_FIELDS = [
"timestamp",
"status_code",
Expand Down Expand Up @@ -129,9 +128,7 @@ def send(self, timestamp_in: datetime.datetime, message: str) -> None:
dict(
logline_id=logline_id,
subnet_id=subnet_id,
timestamp=datetime.datetime.strptime(
fields.get("timestamp"), TIMESTAMP_FORMAT
),
timestamp=datetime.datetime.fromisoformat(fields.get("timestamp")),
status_code=fields.get("status_code"),
client_ip=fields.get("client_ip"),
record_type=fields.get("record_type"),
Expand Down
2 changes: 1 addition & 1 deletion tests/logcollector/test_batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def test_send_data_packet(self, mock_clickhouse, mock_produce_handler):
mock_produce_handler_instance.produce.assert_called_once_with(
topic="test_topic",
data='{"batch_id": "b4b6f13e-d064-4ab7-94ed-d02b46063308", "begin_timestamp": '
'"2024-12-06T13:12:30.324015Z", "end_timestamp": "2024-12-06T13:12:31.832173Z", '
'"2024-12-06T13:12:30.324015", "end_timestamp": "2024-12-06T13:12:31.832173", '
'"data": ["test_data"]}',
key=key,
)
Expand Down
30 changes: 24 additions & 6 deletions tests/logcollector/test_buffered_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,10 +637,16 @@ def test_complete_batch_variant_1(self, mock_clickhouse):

# Assert
self.assertEqual(
datetime.datetime(2024, 5, 21, 8, 31, 28, 119000), data["begin_timestamp"]
datetime.datetime(
2024, 5, 21, 8, 31, 28, 119000, tzinfo=datetime.timezone.utc
),
data["begin_timestamp"],
)
self.assertEqual(
datetime.datetime(2024, 5, 21, 8, 31, 28, 249000), data["end_timestamp"]
datetime.datetime(
2024, 5, 21, 8, 31, 28, 249000, tzinfo=datetime.timezone.utc
),
data["end_timestamp"],
)
self.assertEqual(expected_messages, data["data"])

Expand Down Expand Up @@ -682,16 +688,28 @@ def test_complete_batch_variant_2(self, mock_clickhouse):

# Assert
self.assertEqual(
datetime.datetime(2024, 5, 21, 8, 31, 28, 119000), data_1["begin_timestamp"]
datetime.datetime(
2024, 5, 21, 8, 31, 28, 119000, tzinfo=datetime.timezone.utc
),
data_1["begin_timestamp"],
)
self.assertEqual(
datetime.datetime(2024, 5, 21, 8, 31, 28, 249000), data_1["end_timestamp"]
datetime.datetime(
2024, 5, 21, 8, 31, 28, 249000, tzinfo=datetime.timezone.utc
),
data_1["end_timestamp"],
)
self.assertEqual(
datetime.datetime(2024, 5, 21, 8, 31, 28, 119000), data_2["begin_timestamp"]
datetime.datetime(
2024, 5, 21, 8, 31, 28, 119000, tzinfo=datetime.timezone.utc
),
data_2["begin_timestamp"],
)
self.assertEqual(
datetime.datetime(2024, 5, 21, 8, 31, 28, 749000), data_2["end_timestamp"]
datetime.datetime(
2024, 5, 21, 8, 31, 28, 749000, tzinfo=datetime.timezone.utc
),
data_2["end_timestamp"],
)
self.assertEqual({key: [message_3, message_4]}, sut.buffer)
self.assertEqual({}, sut.batch)
Expand Down
3 changes: 0 additions & 3 deletions tests/logcollector/test_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ def test_invalid_logline(self):

# Act
with (
patch(
"src.logcollector.collector.TIMESTAMP_FORMAT", "%Y-%m-%d %H:%M:%S.%f"
),
patch("src.logcollector.collector.IPV4_PREFIX_LENGTH", 24),
patch(
"src.logcollector.collector.uuid.uuid4",
Expand Down
Loading
Loading