Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
f62fb38
feat: publish document id to kafka
Tuckerle Nov 25, 2025
b34ec24
Merge branch 'main' into 70-write-documents-to-kafka
Tuckerle Nov 26, 2025
a4e2026
fix: avoid infinite loop
Tuckerle Nov 26, 2025
76c2cad
fix: debugging
Tuckerle Nov 26, 2025
025b7f2
fix: debug
Tuckerle Nov 26, 2025
dd29f48
exper: remove filehandling
Tuckerle Nov 26, 2025
62048bf
experi:: renable filehandller
Tuckerle Nov 26, 2025
e8dabad
experi: simplified handler
Tuckerle Nov 26, 2025
24e0867
fix: messagetype
Tuckerle Nov 26, 2025
983bb33
fix: fix message
Tuckerle Nov 26, 2025
e81e983
fix: reenable file downloads
Tuckerle Nov 26, 2025
f349f9e
experi: always publish id
Tuckerle Nov 26, 2025
106cebf
experi: try blocking call
Tuckerle Nov 26, 2025
aeca0c7
fix: message
Tuckerle Nov 26, 2025
7a61139
fix: broker
Tuckerle Nov 26, 2025
6cdc344
experi: more logging
Tuckerle Nov 27, 2025
8502079
fix: await publishing
Tuckerle Nov 27, 2025
2c6b49b
fix: logging
Tuckerle Nov 27, 2025
de7ade8
refactor: code quality
Tuckerle Nov 27, 2025
46725db
fix: logging
Tuckerle Nov 27, 2025
bae3214
experi: debug
Tuckerle Nov 27, 2025
5705302
experi: logging
Tuckerle Nov 27, 2025
49fcb9e
fix: kafka publishing
Tuckerle Nov 27, 2025
98fcebb
fix: publishing
Tuckerle Nov 27, 2025
d28bfdd
refactor: code rabbit
Tuckerle Nov 27, 2025
99b5164
refactor: code rabbit
Tuckerle Nov 27, 2025
4575e83
refactor: code rabbit
Tuckerle Nov 27, 2025
6ee0c47
fix: async code
Tuckerle Nov 27, 2025
ea21fb9
feat: add close
Tuckerle Nov 28, 2025
bbb78ff
refactor: broker async
Tuckerle Nov 28, 2025
343f156
refactor: code rabbit
Tuckerle Nov 28, 2025
a801b69
Merge branch 'main' into 70-write-documents-to-kafka
Tuckerle Dec 11, 2025
4cf2078
ci: code rabbit
Tuckerle Dec 12, 2025
978112a
Merge branch 'main' into 70-write-documents-to-kafka
Tuckerle Jan 9, 2026
3a6cdb2
chore: faststream[kafka] upgrade
Tuckerle Jan 12, 2026
dadbcde
fix: ensure closed broker
Tuckerle Jan 12, 2026
453cc9f
Merge branch 'main' into 70-write-documents-to-kafka
Tuckerle Jan 14, 2026
938eed4
Merge branch '95-filehandler-async-support' into 70-write-documents-t…
Tuckerle Jan 14, 2026
59a5d73
Merge branch 'main' into 70-write-documents-to-kafka
Tuckerle Jan 15, 2026
1e1d251
feat: kafka settings
Tuckerle Jan 15, 2026
8d1b489
feat: kafka in core package
Tuckerle Jan 19, 2026
da0b856
feat: kafka security integration
Tuckerle Jan 20, 2026
241d611
fix: avoid redundant downloads
Tuckerle Jan 23, 2026
e4f849e
Merge branch 'main' into 70-write-documents-to-kafka
Tuckerle Jan 26, 2026
412178e
ci: required env vars
Tuckerle Jan 26, 2026
89e11c8
fix: remove method
Tuckerle Jan 26, 2026
a42979e
fix: async tests
Tuckerle Jan 27, 2026
6d75428
fix: assert publish
Tuckerle Jan 28, 2026
da81cac
fix: assert omit calling publish
Tuckerle Jan 28, 2026
1c77a0c
fix: logger as non Async Mock
Tuckerle Jan 28, 2026
af2227c
Merge branch 'main' into 70-write-documents-to-kafka
Tuckerle Feb 10, 2026
265aecc
Merge branch 'main' into 70-write-documents-to-kafka
Tuckerle Mar 9, 2026
ba4fce0
chore: dependency up
Tuckerle Mar 9, 2026
44ae6c9
fix: ruff issues
Tuckerle Mar 9, 2026
59dc17f
fix: ruff issues
Tuckerle Mar 9, 2026
47d3525
fix: code rabbit
Tuckerle Mar 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,30 @@
#################
DEBUG=

# Proxy

############
# Postgres #
############
RISKI__DB__HOSTNAME=db
RISKI__DB__NAME=example_db
RISKI__DB__PASSWORD=password
RISKI__DB__USER=postgres
RISKI__DB__PORT=5432
RISKI__DB__BATCH_SIZE=100

#########
# Kafka #
#########
RISKI__KAFKA__SERVER=<kafka-broker-url>
RISKI__KAFKA__SECURITY=true
RISKI__KAFKA__TOPIC=<kafka-topic>
RISKI__KAFKA__CA_B64=<ca_in_base64>
RISKI__KAFKA__PKCS12_DATA=<auth_pkcs12_in_base64>
RISKI__KAFKA__PKCS12_PW=<auth_pkcs12_pw_in_base64>

#########
# Proxy #
#########
HTTP_PROXY=
HTTPS_PROXY=
NO_PROXY=
Expand Down
10 changes: 10 additions & 0 deletions .github/workflows/extractor-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ jobs:
OPENAI_API_BASE: https://example.de
OPENAI_API_VERSION: 2024-08-01-preview

TIKTOKEN_CACHE_DIR: tiktoken_cache
RISKI_OPENAI_EMBEDDING_MODEL: text-embedding-3-large
RISKI_EXTRACTOR__TEST__DB_NAME: test_db
RISKI_EXTRACTOR__TEST__DB_USER: postgres
RISKI_EXTRACTOR__TEST__DB_PASSWORD: password
RISKI_EXTRACTOR__TEST__DB_HOSTNAME: 127.0.0.1
RISKI_EXTRACTOR__TEST__DB_PORT: 5432
RISKI__KAFKA__SERVER: <no-server>
RISKI__KAFKA__SECURITY: false

defaults:
run:
working-directory: riski-extractor
Expand Down
Empty file.
81 changes: 81 additions & 0 deletions riski-core/src/core/kafka/security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Security helpers for configuring Kafka TLS/mTLS."""

from base64 import b64decode
from logging import Logger
from pathlib import Path
from ssl import SSLContext, create_default_context
from tempfile import TemporaryDirectory

from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import pkcs12
from faststream.security import BaseSecurity

from src.logtools import getLogger

logger: Logger = getLogger(__name__)


def setup_security(kafka_pkcs12_data: str, kafka_pkcs12_pw: str, kafka_ca_b64: str) -> BaseSecurity:
"""Set up Kafka security configuration based on application settings.

Returns:
BaseSecurity: The configured security object for Kafka.

Raises:
ValueError: If required environment variables are not set or if there are issues
loading the PKCS#12 file.
"""
with TemporaryDirectory() as tempdir:
tempdir_path: Path = Path(tempdir)
logger.debug("Setting up Kafka mTLS security using environment variables.")

# Unpack CA file
ca_data: str | None = kafka_ca_b64
ca_data = b64decode(s=ca_data).decode(encoding="utf-8")

# Unpack PKCS#12 file
pkcs12_data: str | None = kafka_pkcs12_data
pkcs12_bytes = b64decode(s=pkcs12_data)

# Unpack PKCS#12 password
pkcs12_pw: str | None = kafka_pkcs12_pw
pkcs12_pw_bytes: bytes = b64decode(s=pkcs12_pw)

# Extract the private key and certificate from the PKCS#12 file
try:
private_key, certificate, _ = pkcs12.load_key_and_certificates(
data=pkcs12_bytes,
password=pkcs12_pw_bytes,
)
except Exception as e:
raise ValueError(f"Failed to load PKCS#12 file: {e}")

if private_key is None or certificate is None:
raise ValueError("PKCS#12 file does not contain a private key and certificate.")

# Write cert and key to temporary files
cert_file: Path = tempdir_path / "kafka.cert"
key_file: Path = tempdir_path / "kafka.key"
with open(file=cert_file, mode="wb") as f:
f.write(certificate.public_bytes(encoding=serialization.Encoding.PEM))
with open(file=key_file, mode="wb") as f:
f.write(
private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
)

# Create SSL context
ssl_context: SSLContext = create_default_context(
cadata=ca_data,
)

# Load client cert and key
ssl_context.load_cert_chain(
certfile=cert_file,
keyfile=key_file,
)

return BaseSecurity(ssl_context=ssl_context, use_ssl=True)
3 changes: 3 additions & 0 deletions riski-core/src/core/settings/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from core.settings.db import DatabaseSettings
from core.settings.genai import GenAISettings
from core.settings.kafka import KafkaSettings
from core.settings.testdb import TestDBSettings
from pydantic import BaseModel, Field

Expand All @@ -15,6 +16,8 @@ class CoreSettings(BaseModel):
default_factory=lambda: GenAISettings(),
)

kafka: KafkaSettings = Field(default_factory=lambda: KafkaSettings(), description="Kafka related settings")

testdb: TestDBSettings = Field(
description="Test DB related settings",
default_factory=lambda: TestDBSettings(),
Expand Down
31 changes: 31 additions & 0 deletions riski-core/src/core/settings/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from pydantic import BaseModel, Field


class KafkaSettings(BaseModel):
"""
Kafka configuration settings.
"""

# === Kafka Settings ===
server: str = Field(
description="Kafka Server URL",
)
topic: str = Field(
default="lhm-riski-parse",
description="Kafka Topic Name",
)
security: bool = Field(
description="Enable mTLS security for accessing Kafka Server",
)
ca_b64: str | None = Field(
default=None,
description="Kafka Server CA (B64 Encoded)",
)
pkcs12_data: str | None = Field(
default=None,
description="Kafka P12 (B64 Encoded)",
)
pkcs12_pw: str | None = Field(
default=None,
description="Kafka P12 Password (B64 Encoded)",
)
30 changes: 27 additions & 3 deletions riski-extractor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from config.config import Config, get_config
from core.db.db import create_db_and_tables, init_db
from core.kafka.security import setup_security
from faststream.kafka import KafkaBroker
from src.extractor.city_council_faction_extractor import CityCouncilFactionExtractor
from src.extractor.city_council_meeting_extractor import CityCouncilMeetingExtractor
from src.extractor.city_council_meeting_template_extractor import CityCouncilMeetingTemplateExtractor
Expand All @@ -23,7 +25,7 @@
async def main():
config = get_config()
config.print_config()
logger = getLogger()
logger = getLogger(__name__)
version = get_version()

init_db(config.core.db.database_url)
Expand Down Expand Up @@ -63,8 +65,13 @@ async def main():
city_council_motion_extractor.run()
logger.info("Extracted City Council Motions")

async with Filehandler() as filehandler:
await filehandler.download_and_persist_files(batch_size=config.core.db.batch_size)
broker = await createKafkaBroker(config, logger)
try:
async with Filehandler(kafkaBroker=broker) as filehandler:
await filehandler.download_and_persist_files(batch_size=config.core.db.batch_size)
finally:
await broker.stop()
logger.info("Broker closed.")

confidential_file_deleter = ConfidentialFileDeleter()
confidential_file_deleter.delete_confidential_files()
Expand All @@ -75,5 +82,22 @@ async def main():
return 0


async def createKafkaBroker(config: Config, logger: Logger) -> KafkaBroker:
security = None
if config.core.kafka.security:
security = setup_security(config.core.kafka.pkcs12_data, config.core.kafka.pkcs12_pw, config.core.kafka.ca_b64)

# Kafka Broker and FastStream app setup
broker = KafkaBroker(bootstrap_servers=config.core.kafka.server, security=security)
logger.debug("Connecting to Broker...")
try:
await broker.connect()
logger.info("Broker connected.")
return broker
except Exception as e:
logger.exception(f"Failed to connect to broker. - {e}")
raise


if __name__ == "__main__":
sys.exit(asyncio.run(main()))
2 changes: 1 addition & 1 deletion riski-extractor/logconf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ loggers:
handlers: [ console ]
propagate: no
riski-extractor:
level: INFO
level: DEBUG
handlers: [ console ]
propagate: no
httpx:
Expand Down
4 changes: 3 additions & 1 deletion riski-extractor/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ dependencies = [
"tokenizers==0.21.2",
"torch==2.8.0",
"torchvision==0.23.0",
"cryptography>=46.0.5",
"faststream[kafka]>=0.6.5",
"pytest-asyncio>=1.3.0",
"truststore>=0.10.4",
"gitpython>=3.1.45",
Expand Down Expand Up @@ -42,7 +44,7 @@ dev = [
"langchain-docling>=1.1.0",
"langchain-openai>=0.3.32",
"pytest==8.4.2",
"faststream[kafka]==0.6.1",
"faststream[kafka]==0.6.5",
"ruff>=0.14.11",
"python-dotenv>=1.2.1",
]
Expand Down
9 changes: 5 additions & 4 deletions riski-extractor/src/extractor/base_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(
else:
self.client = Client(timeout=config.request_timeout)

self.logger = getLogger()
self.logger = getLogger(__name__)
self.base_url = base_url
self.base_path = base_path
self.parser = parser
Expand Down Expand Up @@ -78,7 +78,7 @@ def _get_object_html(self, link: str) -> str:
def _get_sanitized_url(self, unsanitized_path: str) -> str:
return f"{self.base_url}/{unsanitized_path.lstrip('./')}"

def run(self) -> list[T]:
def run(self):
try:
# Initial request for cookies, sessionID etc.
self._initial_request()
Expand All @@ -105,8 +105,9 @@ def run(self) -> list[T]:
break

self._get_next_page(path=results_per_page_redirect_path, next_page_link=nav_top_next_link)
except Exception:
self.logger.exception("Error extracting objects")

except Exception as e:
self.logger.exception(f"Error extracting objects. - {e}")

def _parse_objects_from_links(self, object_links: list[str]):
for link in object_links:
Expand Down
24 changes: 21 additions & 3 deletions riski-extractor/src/filehandler/filehandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from config.config import Config, get_config
from core.db.db_access import request_batch, update_file_content
from core.model.data_models import File
from faststream.kafka import KafkaBroker
from httpx import AsyncClient
from src.kafka.message import Message

from src.logtools import getLogger

Expand All @@ -18,14 +20,17 @@ class Filehandler:
logger: Logger
client: AsyncClient

def __init__(self) -> None:
self.logger = getLogger()
def __init__(self, kafkaBroker: KafkaBroker) -> None:
self.logger = getLogger(__name__)
if config.https_proxy or config.http_proxy:
limits = httpx.Limits(max_keepalive_connections=5, max_connections=250)
self.client = AsyncClient(proxy=config.https_proxy or config.http_proxy, timeout=config.request_timeout, limits=limits)
else:
self.client = AsyncClient(timeout=config.request_timeout)

self.broker = kafkaBroker
self.logger.info("Filehandler created.")

async def __aenter__(self):
return self

Expand All @@ -34,10 +39,11 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):

async def download_and_persist_files(self, batch_size: int = 100):
self.logger.info("Persisting content of all scraped files to database.")

tasks = []
offset = 0
while True:
files: list[File] = request_batch(File, offset=offset, limit=batch_size)

if not files or len(files) < 1:
break
semaphore = asyncio.Semaphore(batch_size)
Expand All @@ -64,6 +70,7 @@ async def download_and_persist_file(self, file: File):
response = await self.client.get(url=file.id)
response.raise_for_status()
content = response.content
self.logger.debug(f"Checking necessity of inserting/updating file {file.name} to database.")
if file.content is None or content != file.content:
content_disposition = response.headers.get("content-disposition")
if content_disposition:
Expand All @@ -83,3 +90,14 @@ async def download_and_persist_file(self, file: File):
fileName = file.name
self.logger.debug(f"Saving content of file {file.name} to database.")
update_file_content(file.db_id, content, fileName)
self.logger.debug(f"Saved content of file {file.name} to database.")
msg = Message(content=str(file.db_id))
self.logger.debug(f"Publishing: {msg}.")
try:
await self.broker.publish(msg, topic=config.core.kafka.topic)
self.logger.debug(f"Published: {msg}.")
except Exception as e:
# If Kafka Broker is unavailable rollback the file download to ensure
# All Documents that have content, are published to the Kafka Queue
update_file_content(file.db_id, b"", "")
self.logger.error(f"Publishing failed. Rolled file download back: {file.db_id}. - {e}")
6 changes: 6 additions & 0 deletions riski-extractor/src/kafka/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pydantic import BaseModel


class Message(BaseModel):
content: str
republished: bool = False
8 changes: 6 additions & 2 deletions riski-extractor/src/logtools.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

from yaml import safe_load

defautlName: str = "riski-extractor"

def getLogger(name: str = "riski-extractor") -> logging.Logger:

def getLogger(name: str = None) -> logging.Logger:
"""Configures logging and returns a logger with the specified name.

Parameters:
Expand All @@ -19,7 +21,9 @@ def getLogger(name: str = "riski-extractor") -> logging.Logger:
log_config = safe_load(file)

logging.config.dictConfig(log_config)
return logging.getLogger(name)

logger_name = f"{defautlName}.{name}" if name else defautlName
return logging.getLogger(logger_name)


class JsonFormatter(logging.Formatter):
Expand Down
2 changes: 1 addition & 1 deletion riski-extractor/src/parser/base_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

class BaseParser(ABC, Generic[T]):
def __init__(self) -> None:
self.logger = getLogger()
self.logger = getLogger(__name__)
if platform.system() == "Windows":
# For Windows, use the specific code page that works
locale.setlocale(locale.LC_TIME, "German_Germany.1252")
Expand Down
Loading
Loading