Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 59 additions & 117 deletions src/logcollector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,19 @@ async def start(self) -> None:
)

task_fetch = asyncio.Task(self.fetch())
task_send = asyncio.Task(self.send())

try:
await asyncio.gather(
task_fetch,
task_send,
)
except KeyboardInterrupt:
task_fetch.cancel()
task_send.cancel()

logger.info("LogCollector stopped.")

async def fetch(self) -> None:
"""Starts a loop to continuously listen on the configured Kafka topic. If a message is consumed, it is
decoded and stored."""
decoded and sent."""
loop = asyncio.get_running_loop()

while True:
Expand All @@ -90,134 +87,79 @@ async def fetch(self) -> None:
)
logger.debug(f"From Kafka: '{value}'")

await self.store(datetime.datetime.now(), value)
self.send(datetime.datetime.now(), value)

async def send(self) -> None:
"""Continuously sends the next logline in JSON format to the BatchSender, where it is stored in
def send(self, timestamp_in: datetime.datetime, message: str) -> None:
"""Sends the logline in JSON format to the BatchSender, where it is stored in
a temporary batch before being sent to the :class:`Prefilter`. Adds the subnet ID to the message.

Args:
timestamp_in (datetime.datetime): Timestamp of entering the pipeline
message (str): Message to be stored
"""

try:
while True:
if not self.loglines.empty():
timestamp_in, logline = await self.loglines.get()

self.fill_levels.insert(
dict(
timestamp=datetime.datetime.now(),
stage=module_name,
entry_type="total_loglines",
entry_count=self.loglines.qsize(),
)
)

try:
fields = self.logline_handler.validate_logline_and_get_fields_as_json(
logline
)
except ValueError:
self.failed_dns_loglines.insert(
dict(
message_text=logline,
timestamp_in=timestamp_in,
timestamp_failed=datetime.datetime.now(),
reason_for_failure=None, # TODO: Add actual reason
)
)
continue

subnet_id = self._get_subnet_id(
ipaddress.ip_address(fields.get("client_ip"))
)

additional_fields = fields.copy()
for field in REQUIRED_FIELDS:
additional_fields.pop(field)

logline_id = uuid.uuid4()

self.dns_loglines.insert(
dict(
logline_id=logline_id,
subnet_id=subnet_id,
timestamp=datetime.datetime.strptime(
fields.get("timestamp"), TIMESTAMP_FORMAT
),
status_code=fields.get("status_code"),
client_ip=fields.get("client_ip"),
record_type=fields.get("record_type"),
additional_fields=json.dumps(additional_fields),
)
)

self.logline_timestamps.insert(
dict(
logline_id=logline_id,
stage=module_name,
status="in_process",
timestamp=timestamp_in,
is_active=True,
)
)

message_fields = fields.copy()
message_fields["logline_id"] = str(logline_id)

self.logline_timestamps.insert(
dict(
logline_id=logline_id,
stage=module_name,
status="finished",
timestamp=datetime.datetime.now(),
is_active=True,
)
)

self.batch_handler.add_message(
subnet_id, json.dumps(message_fields)
)
logger.debug(f"Sent: '{logline}'")
else:
await asyncio.sleep(0.1)
except KeyboardInterrupt:
while not self.loglines.empty():
logline = await self.loglines.get()

self.fill_levels.insert(
dict(
timestamp=datetime.datetime.now(),
stage=module_name,
entry_type="total_loglines",
entry_count=self.loglines.qsize(),
)
fields = self.logline_handler.validate_logline_and_get_fields_as_json(
message
)
except ValueError:
self.failed_dns_loglines.insert(
dict(
message_text=message,
timestamp_in=timestamp_in,
timestamp_failed=datetime.datetime.now(),
reason_for_failure=None, # TODO: Add actual reason
)
)
return

fields = self.logline_handler.validate_logline_and_get_fields_as_json(
logline
)
subnet_id = self._get_subnet_id(
ipaddress.ip_address(fields.get("client_ip"))
)
additional_fields = fields.copy()
for field in REQUIRED_FIELDS:
additional_fields.pop(field)

self.batch_handler.add_message(subnet_id, json.dumps(fields))
subnet_id = self._get_subnet_id(ipaddress.ip_address(fields.get("client_ip")))
logline_id = uuid.uuid4()

async def store(self, timestamp_in: datetime.datetime, message: str):
"""Stores the message temporarily.
self.dns_loglines.insert(
dict(
logline_id=logline_id,
subnet_id=subnet_id,
timestamp=datetime.datetime.strptime(
fields.get("timestamp"), TIMESTAMP_FORMAT
),
status_code=fields.get("status_code"),
client_ip=fields.get("client_ip"),
record_type=fields.get("record_type"),
additional_fields=json.dumps(additional_fields),
)
)

Args:
timestamp_in (datetime.datetime): Timestamp of entering the pipeline
message (str): Message to be stored
"""
await self.loglines.put((timestamp_in, message))
self.logline_timestamps.insert(
dict(
logline_id=logline_id,
stage=module_name,
status="in_process",
timestamp=timestamp_in,
is_active=True,
)
)

self.fill_levels.insert(
message_fields = fields.copy()
message_fields["logline_id"] = str(logline_id)

self.logline_timestamps.insert(
dict(
timestamp=datetime.datetime.now(),
logline_id=logline_id,
stage=module_name,
entry_type="total_loglines",
entry_count=self.loglines.qsize(),
status="finished",
timestamp=datetime.datetime.now(),
is_active=True,
)
)

self.batch_handler.add_message(subnet_id, json.dumps(message_fields))
logger.debug(f"Sent: '{message}'")

@staticmethod
def _get_subnet_id(address: ipaddress.IPv4Address | ipaddress.IPv6Address) -> str:
"""
Expand Down
Loading
Loading