Skip to content

Commit 97abc46

Browse files
Merge pull request #78 from stefanDeveloper/Remove-timestamp-format-parameter
Remove timestamp format parameter
2 parents a4cf83c + 5d91989 commit 97abc46

File tree

13 files changed

+175
-60
lines changed

13 files changed

+175
-60
lines changed

config.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pipeline:
2323
log_collection:
2424
collector:
2525
logline_format:
26-
- [ "timestamp", RegEx, '^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$' ]
26+
- [ "timestamp", Timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ]
2727
- [ "status_code", ListItem, [ "NOERROR", "NXDOMAIN" ], [ "NXDOMAIN" ] ]
2828
- [ "client_ip", IpAddress ]
2929
- [ "dns_server_ip", IpAddress ]
@@ -69,7 +69,6 @@ pipeline:
6969
batch_timeout: 2.0
7070

7171
environment:
72-
timestamp_format: "%Y-%m-%dT%H:%M:%S.%fZ"
7372
kafka_brokers:
7473
- hostname: kafka1
7574
port: 8097

src/base/data_classes/batch.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,9 @@ class Batch:
1616
metadata={"marshmallow_field": marshmallow.fields.UUID()}
1717
)
1818
begin_timestamp: datetime.datetime = field(
19-
metadata={
20-
"marshmallow_field": marshmallow.fields.DateTime("%Y-%m-%dT%H:%M:%S.%fZ")
21-
}
19+
metadata={"marshmallow_field": marshmallow.fields.DateTime()} # uses ISO format
2220
)
2321
end_timestamp: datetime.datetime = field(
24-
metadata={
25-
"marshmallow_field": marshmallow.fields.DateTime("%Y-%m-%dT%H:%M:%S.%fZ")
26-
}
22+
metadata={"marshmallow_field": marshmallow.fields.DateTime()} # uses ISO format
2723
)
2824
data: List = field(default_factory=list)

src/base/logline_handler.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import re
23

34
from src.base.log_config import get_logger
@@ -63,6 +64,45 @@ def validate(self, value) -> bool:
6364
return True if re.match(self.pattern, value) else False
6465

6566

67+
class Timestamp(FieldType):
68+
"""
69+
A :cls:`Timestamp` object takes a name, and a timestamp format, which a value needs to have.
70+
"""
71+
72+
def __init__(self, name: str, timestamp_format: str):
73+
super().__init__(name)
74+
self.timestamp_format = timestamp_format
75+
76+
def validate(self, value) -> bool:
77+
"""
78+
Validates the input value.
79+
80+
Args:
81+
value: The value to be validated
82+
83+
Returns:
84+
True if the value is valid, False otherwise
85+
"""
86+
try:
87+
datetime.datetime.strptime(value, self.timestamp_format)
88+
except ValueError:
89+
return False
90+
91+
return True
92+
93+
def get_timestamp_as_str(self, value) -> str:
94+
"""
95+
Returns the timestamp as string for a given timestamp with valid format.
96+
97+
Args:
98+
value: Correctly formatted timestamp according to self.timestamp_format
99+
100+
Returns:
101+
String of the given timestamp with standard format
102+
"""
103+
return str(datetime.datetime.strptime(value, self.timestamp_format).isoformat())
104+
105+
66106
class IpAddress(FieldType):
67107
"""
68108
An :cls:`IpAddress` object takes only a name. It is used for IP addresses, and checks in the :meth:`validate` method
@@ -227,7 +267,12 @@ def __get_fields_as_json(self, logline: str) -> dict:
227267
return_dict = {}
228268

229269
for i in range(self.number_of_fields):
230-
return_dict[self.instances_by_position[i].name] = parts[i]
270+
if not isinstance(self.instances_by_position[i], Timestamp):
271+
return_dict[self.instances_by_position[i].name] = parts[i]
272+
else:
273+
return_dict[self.instances_by_position[i].name] = (
274+
self.instances_by_position[i].get_timestamp_as_str(parts[i])
275+
)
231276

232277
return return_dict.copy()
233278

@@ -297,6 +342,11 @@ def _create_instance_from_list_entry(field_list: list):
297342
raise ValueError("Invalid RegEx parameters")
298343
instance = cls(name=name, pattern=field_list[2])
299344

345+
elif cls_name == "Timestamp":
346+
if len_of_field_list != 3 or not isinstance(field_list[2], str):
347+
raise ValueError("Invalid Timestamp parameters")
348+
instance = cls(name=name, timestamp_format=field_list[2])
349+
300350
elif cls_name == "ListItem":
301351
if len_of_field_list not in [3, 4] or not isinstance(field_list[2], list):
302352
raise ValueError("Invalid ListItem parameters")

src/inspector/inspector.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
SCORE_THRESHOLD = config["pipeline"]["data_inspection"]["inspector"]["score_threshold"]
3434
TIME_TYPE = config["pipeline"]["data_inspection"]["inspector"]["time_type"]
3535
TIME_RANGE = config["pipeline"]["data_inspection"]["inspector"]["time_range"]
36-
TIMESTAMP_FORMAT = config["environment"]["timestamp_format"]
3736
CONSUME_TOPIC = config["environment"]["kafka_topics"]["pipeline"][
3837
"prefilter_to_inspector"
3938
]
@@ -187,7 +186,7 @@ def _mean_packet_size(self, messages: list, begin_timestamp, end_timestamp):
187186
logger.debug("Convert timestamps to numpy datetime64")
188187
timestamps = np.array(
189188
[
190-
np.datetime64(datetime.strptime(item["timestamp"], TIMESTAMP_FORMAT))
189+
np.datetime64(datetime.fromisoformat(item["timestamp"]))
191190
for item in messages
192191
]
193192
)
@@ -266,7 +265,7 @@ def _count_errors(self, messages: list, begin_timestamp, end_timestamp):
266265
logger.debug("Convert timestamps to numpy datetime64")
267266
timestamps = np.array(
268267
[
269-
np.datetime64(datetime.strptime(item["timestamp"], TIMESTAMP_FORMAT))
268+
np.datetime64(datetime.fromisoformat(item["timestamp"]))
270269
for item in messages
271270
]
272271
)

src/logcollector/batch_handler.py

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
config = setup_config()
2121
BATCH_SIZE = config["pipeline"]["log_collection"]["batch_handler"]["batch_size"]
2222
BATCH_TIMEOUT = config["pipeline"]["log_collection"]["batch_handler"]["batch_timeout"]
23-
TIMESTAMP_FORMAT = config["environment"]["timestamp_format"]
2423
PRODUCE_TOPIC = config["environment"]["kafka_topics"]["pipeline"][
2524
"batch_sender_to_prefilter"
2625
]
@@ -217,13 +216,9 @@ def _get_last_timestamp_of_batch() -> str | None:
217216

218217
data = {
219218
"batch_id": batch_id,
220-
"begin_timestamp": datetime.datetime.strptime(
221-
begin_timestamp,
222-
"%Y-%m-%dT%H:%M:%S.%fZ",
223-
),
224-
"end_timestamp": datetime.datetime.strptime(
225-
_get_last_timestamp_of_batch(),
226-
"%Y-%m-%dT%H:%M:%S.%fZ",
219+
"begin_timestamp": datetime.datetime.fromisoformat(begin_timestamp),
220+
"end_timestamp": datetime.datetime.fromisoformat(
221+
_get_last_timestamp_of_batch()
227222
),
228223
"data": buffer_data + self.batch[key],
229224
}
@@ -322,11 +317,8 @@ def _extract_tuples_from_json_formatted_strings(
322317
@staticmethod
323318
def _sort_by_timestamp(
324319
data: list[tuple[str, str]],
325-
timestamp_format: str = TIMESTAMP_FORMAT,
326320
) -> list[str]:
327-
sorted_data = sorted(
328-
data, key=lambda x: datetime.datetime.strptime(x[0], timestamp_format)
329-
)
321+
sorted_data = sorted(data, key=lambda x: x[0])
330322
loglines = [message for _, message in sorted_data]
331323

332324
return loglines
@@ -410,7 +402,7 @@ def _send_all_batches(self, reset_timer: bool = True) -> None:
410402
Dispatch all batches for the Kafka queue
411403
412404
Args:
413-
reset_timer (bool): whether or not the timer should be reset
405+
reset_timer (bool): whether the timer should be reset
414406
"""
415407
number_of_keys = 0
416408
total_number_of_batch_messages = self.batch.get_message_count_for_batch()
@@ -460,7 +452,7 @@ def _send_batch_for_key(self, key: str) -> None:
460452

461453
def _send_data_packet(self, key: str, data: dict) -> None:
462454
"""
463-
Sends a packet of a batch to the defined Kafka topic
455+
Sends a packet of a Batch to the defined Kafka topic
464456
465457
Args:
466458
key (str): key to identify the batch

src/logcollector/collector.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
IPV6_PREFIX_LENGTH = config["pipeline"]["log_collection"]["batch_handler"]["subnet_id"][
2525
"ipv6_prefix_length"
2626
]
27-
TIMESTAMP_FORMAT = config["environment"]["timestamp_format"]
2827
REQUIRED_FIELDS = [
2928
"timestamp",
3029
"status_code",
@@ -129,9 +128,7 @@ def send(self, timestamp_in: datetime.datetime, message: str) -> None:
129128
dict(
130129
logline_id=logline_id,
131130
subnet_id=subnet_id,
132-
timestamp=datetime.datetime.strptime(
133-
fields.get("timestamp"), TIMESTAMP_FORMAT
134-
),
131+
timestamp=datetime.datetime.fromisoformat(fields.get("timestamp")),
135132
status_code=fields.get("status_code"),
136133
client_ip=fields.get("client_ip"),
137134
record_type=fields.get("record_type"),

tests/logcollector/test_batch_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ def test_send_data_packet(self, mock_clickhouse, mock_produce_handler):
383383
mock_produce_handler_instance.produce.assert_called_once_with(
384384
topic="test_topic",
385385
data='{"batch_id": "b4b6f13e-d064-4ab7-94ed-d02b46063308", "begin_timestamp": '
386-
'"2024-12-06T13:12:30.324015Z", "end_timestamp": "2024-12-06T13:12:31.832173Z", '
386+
'"2024-12-06T13:12:30.324015", "end_timestamp": "2024-12-06T13:12:31.832173", '
387387
'"data": ["test_data"]}',
388388
key=key,
389389
)

tests/logcollector/test_buffered_batch.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -637,10 +637,16 @@ def test_complete_batch_variant_1(self, mock_clickhouse):
637637

638638
# Assert
639639
self.assertEqual(
640-
datetime.datetime(2024, 5, 21, 8, 31, 28, 119000), data["begin_timestamp"]
640+
datetime.datetime(
641+
2024, 5, 21, 8, 31, 28, 119000, tzinfo=datetime.timezone.utc
642+
),
643+
data["begin_timestamp"],
641644
)
642645
self.assertEqual(
643-
datetime.datetime(2024, 5, 21, 8, 31, 28, 249000), data["end_timestamp"]
646+
datetime.datetime(
647+
2024, 5, 21, 8, 31, 28, 249000, tzinfo=datetime.timezone.utc
648+
),
649+
data["end_timestamp"],
644650
)
645651
self.assertEqual(expected_messages, data["data"])
646652

@@ -682,16 +688,28 @@ def test_complete_batch_variant_2(self, mock_clickhouse):
682688

683689
# Assert
684690
self.assertEqual(
685-
datetime.datetime(2024, 5, 21, 8, 31, 28, 119000), data_1["begin_timestamp"]
691+
datetime.datetime(
692+
2024, 5, 21, 8, 31, 28, 119000, tzinfo=datetime.timezone.utc
693+
),
694+
data_1["begin_timestamp"],
686695
)
687696
self.assertEqual(
688-
datetime.datetime(2024, 5, 21, 8, 31, 28, 249000), data_1["end_timestamp"]
697+
datetime.datetime(
698+
2024, 5, 21, 8, 31, 28, 249000, tzinfo=datetime.timezone.utc
699+
),
700+
data_1["end_timestamp"],
689701
)
690702
self.assertEqual(
691-
datetime.datetime(2024, 5, 21, 8, 31, 28, 119000), data_2["begin_timestamp"]
703+
datetime.datetime(
704+
2024, 5, 21, 8, 31, 28, 119000, tzinfo=datetime.timezone.utc
705+
),
706+
data_2["begin_timestamp"],
692707
)
693708
self.assertEqual(
694-
datetime.datetime(2024, 5, 21, 8, 31, 28, 749000), data_2["end_timestamp"]
709+
datetime.datetime(
710+
2024, 5, 21, 8, 31, 28, 749000, tzinfo=datetime.timezone.utc
711+
),
712+
data_2["end_timestamp"],
695713
)
696714
self.assertEqual({key: [message_3, message_4]}, sut.buffer)
697715
self.assertEqual({}, sut.batch)

tests/logcollector/test_collector.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,6 @@ def test_invalid_logline(self):
177177

178178
# Act
179179
with (
180-
patch(
181-
"src.logcollector.collector.TIMESTAMP_FORMAT", "%Y-%m-%d %H:%M:%S.%f"
182-
),
183180
patch("src.logcollector.collector.IPV4_PREFIX_LENGTH", 24),
184181
patch(
185182
"src.logcollector.collector.uuid.uuid4",

tests/miscellaneous/test_field_type.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import re
33
import unittest
44

5-
from src.base.logline_handler import FieldType, RegEx, IpAddress, ListItem
5+
from src.base.logline_handler import FieldType, RegEx, IpAddress, ListItem, Timestamp
66

77

88
class TestFieldType(unittest.TestCase):
@@ -23,6 +23,38 @@ def test_validate(self):
2323
sut.validate(value="test")
2424

2525

26+
class TestTimestamp(unittest.TestCase):
27+
def test_init(self):
28+
# Arrange
29+
name = "test_name"
30+
timestamp_format = "%Y-%m-%dT%H:%M:%S.%fZ"
31+
32+
# Act
33+
sut = Timestamp(name=name, timestamp_format=timestamp_format)
34+
35+
# Assert
36+
self.assertEqual(name, sut.name)
37+
self.assertEqual(timestamp_format, sut.timestamp_format)
38+
39+
def test_validate_successful(self):
40+
# Arrange
41+
name = "test_name"
42+
timestamp_format = "%Y-%m-%dT%H:%M:%S.%fZ"
43+
sut = Timestamp(name=name, timestamp_format=timestamp_format)
44+
45+
# Act and Assert
46+
self.assertTrue(sut.validate(value="2024-07-28T14:45:30.123Z"))
47+
48+
def test_validate_unsuccessful(self):
49+
# Arrange
50+
name = "test_name"
51+
timestamp_format = "%Y-%m-%d %H:%M:%S.%f"
52+
sut = Timestamp(name=name, timestamp_format=timestamp_format)
53+
54+
# Act and Assert
55+
self.assertFalse(sut.validate(value="2024-07-28T14:45:30.123Z"))
56+
57+
2658
class TestRegEx(unittest.TestCase):
2759
def test_init(self):
2860
# Arrange

0 commit comments

Comments
 (0)