Skip to content

feat: 70 write documents to kafka#88

Open
Tuckerle wants to merge 56 commits intomainfrom
70-write-documents-to-kafka
Open

feat: 70 write documents to kafka#88
Tuckerle wants to merge 56 commits intomainfrom
70-write-documents-to-kafka

Conversation

@Tuckerle
Copy link
Copy Markdown
Contributor

@Tuckerle Tuckerle commented Nov 27, 2025

Description

Short description or comments

Reference

Issues #70

Summary by CodeRabbit

  • New Features

    • Kafka messaging integrated into file processing with broker-managed lifecycle and publish-with-rollback behavior
    • Async public API to initialize and connect the broker
  • Improvements

    • File processing converted to async with concurrent batch handling for faster downloads
    • More detailed, module-scoped debug logging and clearer error messages
  • Documentation

    • Kafka and Postgres configuration variables added to env example
  • Chores

    • Added cryptography and Kafka client dependencies; logging verbosity set to DEBUG
  • Tests

    • Tests updated for async behavior and to verify broker interactions

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Nov 27, 2025

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 924db2f9-c0a9-43c8-9f4d-703c7068b60e

📥 Commits

Reviewing files that changed from the base of the PR and between 44ae6c9 and 59dc17f.

📒 Files selected for processing (1)
  • riski-extractor/src/extractor/base_extractor.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • riski-extractor/src/extractor/base_extractor.py

📝 Walkthrough

Walkthrough

Adds Kafka integration and TLS/mTLS setup, introduces a broker-backed file processing flow with publish-and-rollback behavior, tightens logger naming and increases extractor logging to DEBUG, adds Kafka settings and Message model, updates tests, CI, env, and dependencies.

Changes

Cohort / File(s) Summary
App & Broker
riski-extractor/app.py
Adds createKafkaBroker(config, logger) and integrates broker lifecycle into main flow (connect before processing, stop in finally).
Filehandler & Tests
riski-extractor/src/filehandler/filehandler.py, riski-extractor/test/test_filehandler.py
Filehandler now accepts a KafkaBroker, processes files concurrently with semaphore, extracts filenames from Content-Disposition, saves only on content change, publishes Message to Kafka, and rolls back DB content on publish failure. Tests updated to inject and assert broker.publish.
Kafka Model & Security
riski-extractor/src/kafka/message.py, riski-core/src/core/kafka/security.py
Adds Message Pydantic model. Adds setup_security(...) to decode PKCS#12 and CA, build SSLContext, and return a BaseSecurity for Kafka TLS/mTLS.
Settings & Core
riski-core/src/core/settings/kafka.py, riski-core/src/core/settings/core.py
Adds KafkaSettings model and adds CoreSettings.kafka: KafkaSettings field.
Logging changes
riski-extractor/logconf.yaml, riski-extractor/src/logtools.py, riski-extractor/src/extractor/base_extractor.py, riski-extractor/src/parser/base_parser.py, riski-extractor/src/version.py
Sets riski-extractor logger level to DEBUG; changes getLogger to return prefixed logger names and modules now use getLogger(__name__).
Parser error logging
riski-extractor/src/parser/city_council_meeting_parser.py
Exception logs when DB save fails now include the exception object.
Config, CI & Packaging
.env.example, riski-extractor/pyproject.toml, .github/workflows/extractor-tests.yml
Adds Kafka and top-level Postgres env vars to .env.example; adds cryptography>=46.0.5 and faststream[kafka]>=0.6.5 dependencies and updates CI env vars.
Misc utilities
riski-extractor/src/logtools.py
Introduces module-level defautlName and changes getLogger signature/behavior to return prefixed loggers (defautlName[.name]).

Sequence Diagram

sequenceDiagram
    participant App as App
    participant Broker as KafkaBroker
    participant FH as FileHandler
    participant HTTP as AsyncClient
    participant DB as Database
    participant Topic as Kafka Topic

    App->>Broker: createKafkaBroker(config, logger)
    Note over Broker: setup_security(pkcs12, pw, ca) -> SSLContext
    Broker->>Broker: connect()

    App->>FH: instantiate FileHandler(broker)
    App->>FH: await download_and_persist_files()

    loop per-file (concurrent, semaphore)
        FH->>HTTP: fetch file
        HTTP-->>FH: content + headers
        FH->>DB: compare existing content
        alt content differs
            FH->>DB: save new content
            FH->>Topic: publish Message(file_id)
            Note right of Topic: ack or error
            alt publish fails
                FH->>DB: rollback (clear content)
            end
        end
    end

    FH->>HTTP: aclose()
    App->>Broker: stop()
Loading

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~65 minutes

Poem

🐇 I hopped through certs and async streams,

Fetched files, guarded by semaphores' beams,
I nudged a broker, sent a Message bright,
If publish failed I fixed it overnight,
A rabbit cheers: secure, concurrent, right.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 18.42% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: 70 write documents to kafka' accurately reflects the main change—introducing Kafka integration to publish documents/files to Kafka topics throughout the codebase.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch 70-write-documents-to-kafka

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@Tuckerle Tuckerle linked an issue Nov 27, 2025 that may be closed by this pull request
@Tuckerle Tuckerle marked this pull request as ready for review November 27, 2025 14:33
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (9)
riski-extractor/logconf.yaml (1)

29-32: Confirm DEBUG level for riski-extractor in non-dev environments

Bumping the riski-extractor logger to DEBUG will emit a lot more data (including potentially sensitive payloads) everywhere this config is used. Consider keeping INFO in production and overriding to DEBUG only in dev/staging via an env-specific config or override.

riski-extractor/pyproject.toml (1)

32-33: Align faststream[kafka] version requirements across dependency groups

Runtime deps require faststream[kafka]>=0.6.1 while the dev group still specifies >=0.5.48. The resolver will effectively enforce the higher bound, but having two different mins is a bit misleading; consider aligning the dev entry to >=0.6.1 or dropping it if it’s redundant.

Also applies to: 43-50

riski-extractor/src/logtools.py (1)

8-9: Logger naming helper works; consider fixing constant name typo

The namespacing scheme ("riski-extractor" or "riski-extractor.<module>") looks good and matches logconf.yaml. The defautlName identifier is clearly meant to be defaultName (or similar); renaming it would avoid future confusion without changing behavior.

Possible tweak:

-defautlName: str = "riski-extractor"
+default_logger_name: str = "riski-extractor"
@@
-    logger_name = f"{defautlName}.{name}" if name else defautlName
-    return logging.getLogger(logger_name)
+    logger_name = f"{default_logger_name}.{name}" if name else default_logger_name
+    return logging.getLogger(logger_name)

Also applies to: 11-26

riski-extractor/src/kafka/security.py (3)

4-4: Inconsistent logger import.

This file imports getLogger from the standard library's logging module, while other files in this PR use getLogger from src.logtools. For consistency with the project's logging infrastructure (which applies configuration and prefixes), consider using the project's custom logger.

-from logging import Logger, getLogger
+from logging import Logger
+from src.logtools import getLogger

28-80: Temporary files are deleted before returning, but this is correct.

The TemporaryDirectory context manager exits at line 79, deleting the temp files before the BaseSecurity is returned. This works because ssl_context.load_cert_chain() reads and parses the certificate/key into memory rather than keeping file handles open.

However, this pattern is subtle and could confuse future maintainers. Consider adding a brief comment explaining why this is safe, or restructuring to make the intent clearer.


32-41: Misleading type hints.

The type hints str | None suggest these values could be None, but according to the config definition, kafka_ca_b64, kafka_pkcs12_data, and kafka_pkcs12_pw are required fields without defaults. If they were None, the subsequent b64decode calls would fail. Consider removing the | None annotations for clarity:

-        ca_data: str | None = config.kafka_ca_b64
+        ca_data: str = config.kafka_ca_b64
         ca_data = b64decode(s=ca_data).decode(encoding="utf-8")
 
         # Unpack PKCS#12 file
-        pkcs12_data: str | None = config.kafka_pkcs12_data
+        pkcs12_data: str = config.kafka_pkcs12_data
         pkcs12_bytes = b64decode(s=pkcs12_data)
 
         # Unpack PKCS#12 password
-        pkcs12_pw: str | None = config.kafka_pkcs12_pw
+        pkcs12_pw: str = config.kafka_pkcs12_pw
riski-extractor/src/filehandler/filehandler.py (1)

35-36: Redundant condition.

len(files) < 1 is redundant when not files already handles the empty list case.

-            if not files or len(files) < 1:
+            if not files:
riski-extractor/src/db/db_access.py (2)

59-59: Standardize logger usage across the module.

There's an inconsistency in logger usage:

  • request_person_by_familyName, request_paper_by_reference, and request_person_by_full_name accept a logger parameter
  • Other functions use the module-level logger

This mixed approach creates an inconsistent API and makes the codebase harder to maintain.

Consider standardizing on the module-level logger (simpler and more common):

-def request_person_by_familyName(familyName: str, logger: Logger, session: Session | None = None) -> Person | None:
+def request_person_by_familyName(familyName: str, session: Session | None = None) -> Person | None:
     logger.debug(f"Request person by familyName {familyName}.")

Apply similar changes to request_paper_by_reference and request_person_by_full_name.

Also applies to: 121-121, 137-140


15-15: Consider lazy log formatting for better performance.

All debug logs use f-strings, which are evaluated even when debug logging is disabled. Python's logging supports lazy formatting that only evaluates when the log level is active.

Replace f-strings with lazy formatting:

-    logger.debug(f"Request object of type {object_type} by risid {risid}.")
+    logger.debug("Request object of type %s by risid %s.", object_type, risid)

This avoids unnecessary string formatting overhead in production when debug logging is typically disabled.

Also applies to: 23-23, 39-39, 47-47, 60-60, 70-70, 81-81, 93-93, 127-127, 130-130, 133-133, 148-148, 151-151, 154-154

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 178fbef and 49fcb9e.

⛔ Files ignored due to path filters (1)
  • riski-extractor/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (14)
  • riski-extractor/app.py (1 hunks)
  • riski-extractor/config/config.py (2 hunks)
  • riski-extractor/logconf.yaml (1 hunks)
  • riski-extractor/pyproject.toml (1 hunks)
  • riski-extractor/src/db/db.py (2 hunks)
  • riski-extractor/src/db/db_access.py (9 hunks)
  • riski-extractor/src/extractor/base_extractor.py (1 hunks)
  • riski-extractor/src/filehandler/filehandler.py (4 hunks)
  • riski-extractor/src/kafka/broker.py (1 hunks)
  • riski-extractor/src/kafka/message.py (1 hunks)
  • riski-extractor/src/kafka/security.py (1 hunks)
  • riski-extractor/src/logtools.py (2 hunks)
  • riski-extractor/src/parser/base_parser.py (1 hunks)
  • riski-extractor/src/version.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (7)
riski-extractor/app.py (1)
riski-extractor/src/logtools.py (1)
  • getLogger (11-26)
riski-extractor/src/extractor/base_extractor.py (1)
riski-extractor/src/logtools.py (1)
  • getLogger (11-26)
riski-extractor/src/db/db_access.py (2)
riski-extractor/src/logtools.py (1)
  • getLogger (11-26)
riski-extractor/src/db/db.py (1)
  • get_session (24-29)
riski-extractor/src/version.py (1)
riski-extractor/src/logtools.py (1)
  • getLogger (11-26)
riski-extractor/src/db/db.py (2)
riski-extractor/src/logtools.py (1)
  • getLogger (11-26)
riski-extractor/config/config.py (2)
  • Config (21-261)
  • get_config (265-267)
riski-extractor/config/config.py (1)
riski-extractor/src/logtools.py (1)
  • getLogger (11-26)
riski-extractor/src/parser/base_parser.py (1)
riski-extractor/src/logtools.py (1)
  • getLogger (11-26)
🔇 Additional comments (9)
riski-extractor/src/kafka/message.py (1)

1-6: Message model looks good for current usage

The minimal Message model (content + republished flag) is straightforward and appropriate for a Kafka envelope at this stage.

riski-extractor/app.py (1)

23-27: Module-qualified logger initialization is appropriate

Switching to getLogger(__name__) here aligns app logging with the new "riski-extractor.<module>" naming convention and keeps startup logs clearly scoped to this module.

riski-extractor/src/extractor/base_extractor.py (1)

29-40: Consistent per-module logger for extractors

Using getLogger(__name__) for BaseExtractor instances keeps extractor logs under a stable, module-qualified namespace and matches the rest of the logging refactor.

riski-extractor/src/version.py (1)

8-8: LGTM!

The logger initialization now uses module-qualified naming via __name__, which aligns with the project-wide logging namespace updates and improves log traceability.

riski-extractor/src/parser/base_parser.py (1)

16-16: LGTM!

Consistent with the project-wide shift to module-qualified loggers.

riski-extractor/src/filehandler/filehandler.py (1)

19-26: LGTM on logging and broker initialization.

The module-qualified logger and broker setup look correct.

riski-extractor/src/kafka/broker.py (1)

12-21: Initialization looks reasonable, pending async fixes.

The security setup and broker configuration are correctly wired. Once the async/sync design is clarified, the initialization logic should work properly.

riski-extractor/src/db/db_access.py (2)

1-1: LGTM! Good logging setup.

Using a module-level logger with __name__ follows Python best practices and integrates well with the custom getLogger function from src.logtools.

Also applies to: 11-12


46-56: LGTM! Proper exception handling and refresh logic.

The additions are solid improvements:

  • session.refresh(obj) ensures the object is updated with any database-generated values (e.g., auto-incremented IDs, timestamps)
  • Exception handling with explicit rollback follows database transaction best practices

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
riski-extractor/config/config.py (1)

117-127: Kafka TLS secrets are not excluded from debug logging.

The exclude set is missing kafka_ca_b64, kafka_pkcs12_data, and kafka_pkcs12_pw. These secrets will be emitted to logs at DEBUG level, which is a security/compliance risk.

🔒 Proposed fix
     def print_config(self):
         logger = getLogger()
         logger.debug(
             self.model_dump(
                 exclude={
                     "openai_api_key": True,
                     "core": {"db": {"password", "database_url"}},
                     "test": {"db_password", "database_url"},
+                    "kafka_ca_b64": True,
+                    "kafka_pkcs12_data": True,
+                    "kafka_pkcs12_pw": True,
                 }
             )
         )
riski-extractor/src/filehandler/filehandler.py (1)

79-90: cgi module is removed in Python 3.13 and will cause ImportError at runtime.

The project requires Python 3.13.9, but the cgi module was removed in Python 3.13 (deprecated since 3.11 per PEP 594). The code will crash when attempting to import cgi at line 81. Replace with email.message.Message to parse the Content-Disposition header:

from email.message import Message

# Then in the code:
if content_disposition:
    msg = Message()
    msg['Content-Disposition'] = content_disposition
    fileName = msg.get_param('filename', header='Content-Disposition')
    if fileName:
        fileName = urllib.parse.unquote(fileName)
        self.logger.debug(f"Extracted fileName: {fileName}")
    else:
        self.logger.warning(f"No filename found in Content-Disposition header for {file.id}")
        fileName = file.name
🤖 Fix all issues with AI agents
In `@riski-extractor/src/filehandler/filehandler.py`:
- Around line 38-69: The batch loop in download_and_persist_files is running
each batch immediately and then re-processing/inspecting a non-existent global
list: remove the dead/duplicate code by deleting the unused all_files
initialization and the post-loop gather/results/for-loop that re-gathers tasks
and checks all_files; keep per-batch semaphore, sem_task, and the await
asyncio.gather(*tasks, return_exceptions=True) inside the loop (so each batch is
awaited once) and do not attempt to gather tasks after the loop finishes.
🧹 Nitpick comments (2)
riski-extractor/src/parser/city_council_meeting_parser.py (1)

85-86: Minor redundancy: logger.exception() already includes exception details.

Since logger.exception() automatically appends the full traceback (which includes the exception message), explicitly adding {e} to the format string is redundant. You could simplify to:

self.logger.exception(f"Could not save File: {doc_url}")

That said, having the exception message inline can help with quick log scanning, so this is purely a style preference.

riski-extractor/app.py (1)

113-124: Consider renaming to snake_case for PEP8 consistency.

The function name createKafkaBroker uses camelCase. Python convention (PEP8) recommends snake_case for function names: create_kafka_broker.

♻️ Proposed rename
-async def createKafkaBroker(config: Config, logger: Logger) -> KafkaBroker:
+async def create_kafka_broker(config: Config, logger: Logger) -> KafkaBroker:

Update the call site on line 82:

-    broker = await createKafkaBroker(config, logger)
+    broker = await create_kafka_broker(config, logger)
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dadbcde and 938eed4.

⛔ Files ignored due to path filters (1)
  • riski-extractor/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (6)
  • riski-extractor/app.py
  • riski-extractor/config/config.py
  • riski-extractor/pyproject.toml
  • riski-extractor/src/filehandler/filehandler.py
  • riski-extractor/src/parser/base_parser.py
  • riski-extractor/src/parser/city_council_meeting_parser.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • riski-extractor/src/parser/base_parser.py
  • riski-extractor/pyproject.toml
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2026-01-09T11:17:24.897Z
Learnt from: Tuckerle
Repo: it-at-m/riski PR: 119
File: riski-extractor/src/filehandler/filehandler.py:34-54
Timestamp: 2026-01-09T11:17:24.897Z
Learning: In the riski-extractor project, apply this guideline during code reviews: prioritize incremental async adoption. First focus on parallelizing HTTP requests where applicable, and only after that convert database I/O to async. Favor small, observable async improvements with proper error handling and compatibility considerations; avoid sweeping rewrites that change behavior.

Applied to files:

  • riski-extractor/src/parser/city_council_meeting_parser.py
  • riski-extractor/src/filehandler/filehandler.py
📚 Learning: 2026-01-14T15:11:52.943Z
Learnt from: shteenft
Repo: it-at-m/riski PR: 130
File: riski-document-pipeline/main.py:7-10
Timestamp: 2026-01-14T15:11:52.943Z
Learning: Do not rely on module-level logging configuration that runs on import. Loading log configuration (e.g., dictConfig via a loaded logconf.yaml) at module import time causes side effects and hidden initialization. Prefer configuring logging explicitly in a centralized entry point (e.g., the program's main module) and have modules obtain a configured logger without triggering config loading themselves. Ensure each entry point loads/merges the logging config deterministically, and modules simply call getLogger(__name__) or getLogger() for contextual loggers.

Applied to files:

  • riski-extractor/src/parser/city_council_meeting_parser.py
  • riski-extractor/config/config.py
  • riski-extractor/src/filehandler/filehandler.py
  • riski-extractor/app.py
🧬 Code graph analysis (1)
riski-extractor/app.py (5)
riski-extractor/src/kafka/security.py (1)
  • setup_security (19-81)
riski-extractor/src/logtools.py (1)
  • getLogger (11-26)
riski-extractor/src/version.py (1)
  • get_version (11-23)
riski-extractor/config/config.py (2)
  • Config (16-127)
  • get_config (131-133)
riski-extractor/src/filehandler/filehandler.py (2)
  • Filehandler (18-106)
  • download_and_persist_files (38-69)
🔇 Additional comments (7)
riski-extractor/config/config.py (1)

75-100: LGTM - Kafka configuration fields are well-structured.

The required fields without defaults appropriately enforce that Kafka credentials must be provided via environment variables. The validation_alias usage correctly maps environment variable names to field names.

riski-extractor/src/filehandler/filehandler.py (3)

22-30: LGTM - Initialization with broker injection is well-structured.

The KafkaBroker dependency injection and conditional proxy configuration are appropriate. Consider applying the same Limits to the non-proxy case for consistency if high concurrency is expected.


32-36: LGTM - Proper async context manager for resource cleanup.


97-106: LGTM - Rollback on publish failure ensures consistency.

The approach of clearing content on Kafka publish failure ensures that only successfully published files retain their content. Note that this also clears the filename to an empty string, which may differ from the original filename if the file existed before.

riski-extractor/app.py (3)

1-31: LGTM - Async main setup with proper imports.

The conversion to async def main() and scoped logging via getLogger(__name__) align with best practices. Based on learnings, this follows the pattern of configuring logging at the entry point.


82-88: LGTM - Proper broker lifecycle management with try/finally.

The broker is properly stopped in the finally block, and Filehandler is used as an async context manager to ensure the HTTP client is closed. This addresses the resource leak concern from previous reviews.


127-128: LGTM - Standard async entry point.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@Tuckerle Tuckerle marked this pull request as draft January 15, 2026 09:06
@Tuckerle Tuckerle self-assigned this Jan 22, 2026
@Tuckerle Tuckerle marked this pull request as ready for review January 23, 2026 15:42
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
riski-extractor/src/filehandler/filehandler.py (1)

76-79: Use email.message.Message to replace deprecated cgi.parse_header().

The cgi module is deprecated since Python 3.11 and was removed in Python 3.13. Use email.message.Message with get_filename() to extract the filename from Content-Disposition headers.

♻️ Proposed fix

Replace the import and parsing logic:

+from email.message import Message
 import urllib.parse
 from logging import Logger

Then update the parsing:

             if content_disposition:
-                # Parse using cgi module for robust header parsing
-                import cgi
-
-                _, params = cgi.parse_header(content_disposition)
-                fileName = params.get("filename")
+                # Parse Content-Disposition header
+                msg = Message()
+                msg["Content-Disposition"] = content_disposition
+                fileName = msg.get_filename()
🤖 Fix all issues with AI agents
In `@riski-core/src/core/settings/kafka.py`:
- Around line 33-46: Remove the erroneous database_url property from the
KafkaSettings class and delete the now-unused PostgresDsn import; specifically,
remove the database_url `@property` (which references non-existent attributes like
self.user, self.password, self.hostname, self.port, self.name) and the
PostgresDsn import at the top so KafkaSettings contains only Kafka-relevant
fields and no dead Postgres DSN code.

In `@riski-extractor/app.py`:
- Around line 114-128: createKafkaBroker currently calls setup_security when
config.core.kafka.security is True but doesn't validate that
config.core.kafka.pkcs12_data, config.core.kafka.pkcs12_pw, and
config.core.kafka.ca_b64 are present; add a pre-check in createKafkaBroker to
verify these three fields are non-empty (or raise/log a clear error) before
calling setup_security, and surface a helpful exception or logger.error
indicating the missing credential(s) so setup_security is never invoked with
None values.
🧹 Nitpick comments (2)
riski-core/src/core/kafka/security.py (1)

33-42: Minor: Redundant intermediate variable assignments.

The pattern of assigning parameters to local variables with the same name before immediately reassigning them is unnecessary:

ca_data: str | None = kafka_ca_b64  # Redundant
ca_data = b64decode(s=ca_data).decode(encoding="utf-8")

Could be simplified to:

ca_data = b64decode(s=kafka_ca_b64).decode(encoding="utf-8")

This applies to pkcs12_data and pkcs12_pw as well. Not blocking, just a readability nit.

riski-extractor/src/filehandler/filehandler.py (1)

39-49: Redundant tasks initialization on line 41.

The tasks = [] on line 41 is immediately overwritten by tasks = [] on line 49 in the first loop iteration. The outer initialization can be removed.

♻️ Proposed fix
     async def download_and_persist_files(self, batch_size: int = 100):
         self.logger.info("Persisting content of all scraped files to database.")
-        tasks = []
         offset = 0
         while True:
             files: list[File] = request_batch(File, offset=offset, limit=batch_size)

             if not files or len(files) < 1:
                 break
             semaphore = asyncio.Semaphore(batch_size)
             tasks = []

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @.github/workflows/extractor-tests.yml:
- Around line 46-47: The RISKI__KAFKA__SERVER env var in the workflow is set to
the invalid placeholder "<no-server>"; update the workflow to either remove
RISKI__KAFKA__SERVER entirely (if tests don't require Kafka) or set it to a
valid test broker address (e.g., "localhost:9092"), and ensure the test
environment respects RISKI__KAFKA__SECURITY=false; alternatively, implement
explicit disable logic in your configuration code that checks a flag (e.g.,
RISKI__KAFKA__ENABLED or the existing RISKI__KAFKA__SERVER value) and prevents
Kafka initialization when tests run so CI/local workflows can omit a real
bootstrap server.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@riski-extractor/test/test_filehandler.py`:
- Around line 22-34: The test test_download_and_persist_file_updates_filename
should also assert that the Kafka broker publish is invoked: after calling
filehandler_instance.download_and_persist_file(mock_file) assert that
filehandler_instance.broker.publish was called once with the expected message
and topic (use ANY for the message payload if constructing it is complex, and
assert the topic equals the broker topic used by the production code or
filehandler_instance.broker_topic); locate the call in the production flow
around download_and_persist_file and update the test to patch or spy on
filehandler_instance.broker.publish and add
mock_publish.assert_called_once_with(ANY, topic=<expected-topic>) (or the exact
msg if you can build it).
- Around line 36-46: The test
test_download_and_persist_file_updates_filename_urlencoding should also assert
that the broker publish method was invoked after update_file_content; after
calling await filehandler_instance.download_and_persist_file(mock_file) add a
mock or patch reference to filehandler_instance.broker.publish (or patch
"src.filehandler.filehandler.broker.publish") and assert it was called once
(e.g., mock_broker_publish.assert_called_once_with(ANY) or with the expected
message payload), ensuring the assertion comes after
mock_update.assert_called_once_with(...) to verify publish occurs post-update in
download_and_persist_file.
🧹 Nitpick comments (2)
riski-extractor/test/test_filehandler.py (2)

13-19: Consider using patch to avoid creating a real AsyncClient.

The current fixture instantiates Filehandler, which creates a real AsyncClient in __init__, then immediately replaces it with an AsyncMock. The real client is never closed, which may trigger resource warnings.

♻️ Optional: Patch the client during instantiation
 `@pytest.fixture`
 def filehandler_instance():
     kafkaBroker = AsyncMock()
-    instance = Filehandler(kafkaBroker)
-    instance.client = AsyncMock()
-    instance.logger = AsyncMock()
+    with patch("src.filehandler.filehandler.AsyncClient") as mock_client_class:
+        mock_client_class.return_value = AsyncMock()
+        instance = Filehandler(kafkaBroker)
+        instance.logger = AsyncMock()
     return instance

60-62: Consider adding a test for the Kafka publish failure rollback scenario.

Per the production code, when broker.publish fails, the method rolls back by calling update_file_content(file.db_id, None, ""). This error handling path should be tested to ensure data consistency when Kafka is unavailable.

🧪 Suggested test for rollback scenario
`@pytest.mark.asyncio`
async def test_download_and_persist_file_rollback_on_publish_failure(filehandler_instance, mock_file):
    mock_response = AsyncMock()
    mock_response.content = b"test content"
    mock_response.headers = {"content-disposition": 'inline; filename="test_file.txt"'}

    filehandler_instance.client.get = AsyncMock(return_value=mock_response)
    filehandler_instance.broker.publish = AsyncMock(side_effect=Exception("Kafka unavailable"))

    with patch("src.filehandler.filehandler.update_file_content") as mock_update:
        await filehandler_instance.download_and_persist_file(mock_file)

        # First call: save content
        # Second call: rollback (content=None, fileName="")
        assert mock_update.call_count == 2
        mock_update.assert_any_call(ANY, b"test content", "test_file.txt")
        mock_update.assert_any_call(ANY, None, "")

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@riski-extractor/test/test_filehandler.py`:
- Around line 51-63: The test currently asserts update_file_content is not
called when file content is unchanged but misses asserting the broker wasn't
invoked; update the
test_download_and_persist_file_not_updates_filename_when_unchanged_file to also
patch or spy on the broker.publish used by download_and_persist_file (e.g.,
patch "src.filehandler.filehandler.broker.publish" or attach a mock to
filehandler_instance.broker.publish) and assert that
broker.publish.assert_not_called() after awaiting
filehandler_instance.download_and_persist_file(mock_file), ensuring both
update_file_content and broker.publish are verified as not called.
🧹 Nitpick comments (1)
riski-extractor/test/test_filehandler.py (1)

18-18: Consider using MagicMock for the logger.

Logger methods (info, debug, warning, error) are synchronous. Using AsyncMock works but is semantically incorrect. MagicMock is more appropriate for synchronous interfaces.

-    instance.logger = AsyncMock()
+    from unittest.mock import MagicMock
+    instance.logger = MagicMock()

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@riski-extractor/pyproject.toml`:
- Around line 18-20: Remove "pytest-asyncio>=1.3.0" from the main
[project].dependencies list and instead add it under the dev dependency group
(e.g., [project.optional-dependencies] or [project.dependency-groups] dev) so it
is only installed for development/testing; locate the pytest-asyncio entry in
pyproject.toml and move it into the dev group alongside other test/dev deps.
🧹 Nitpick comments (1)
.env.example (1)

12-20: Kafka configuration section looks good overall.

The new environment variables cover the necessary Kafka broker, TLS, and mTLS settings with appropriate placeholder values.

One minor note: dotenv-linter flags these keys as unordered. If your CI enforces dotenv-linter, consider alphabetizing:

♻️ Alphabetically ordered keys
-RISKI__KAFKA__SERVER=<kafka-broker-url>
-RISKI__KAFKA__SECURITY=true
-RISKI__KAFKA__TOPIC=<kafka-topic>
 RISKI__KAFKA__CA_B64=<ca_in_base64>
+RISKI__KAFKA__PKCS12_DATA=<auth_pkcs12_in_base64>
+RISKI__KAFKA__PKCS12_PW=<auth_pkcs12_pw_in_base64>
+RISKI__KAFKA__SECURITY=true
+RISKI__KAFKA__SERVER=<kafka-broker-url>
+RISKI__KAFKA__TOPIC=<kafka-topic>
-RISKI__KAFKA__PKCS12_DATA=<auth_pkcs12_in_base64>
-RISKI__KAFKA__PKCS12_PW=<auth_pkcs12_pw_in_base64>

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 9, 2026

⚠️ Deprecation Warning: The deny-licenses option is deprecated for possible removal in the next major release. For more information, see issue 997.

Dependency Review

The following issues were found:
  • ❌ 1 vulnerable package(s)
  • ✅ 0 package(s) with incompatible licenses
  • ✅ 0 package(s) with invalid SPDX license definitions
  • ✅ 0 package(s) with unknown licenses.
See the Details below.

Vulnerabilities

riski-extractor/uv.lock

NameVersionVulnerabilitySeverity
cryptography46.0.3cryptography Vulnerable to a Subgroup Attack Due to Missing Subgroup Validation for SECT Curveshigh

OpenSSF Scorecard

PackageVersionScoreDetails
pip/cryptography 46.0.3 UnknownUnknown
pip/faststream 0.6.5 UnknownUnknown

Scanned Files

  • riski-extractor/uv.lock

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (1)
riski-core/src/core/settings/core.py (1)

19-20: Consider consistent multi-line formatting.

The kafka field is defined on a single line while other fields use multi-line formatting. Consider aligning with the existing style for consistency.

♻️ Proposed formatting
-    kafka: KafkaSettings = Field(default_factory=lambda: KafkaSettings(), description="Kafka related settings")
-      
+    kafka: KafkaSettings = Field(
+        description="Kafka related settings",
+        default_factory=lambda: KafkaSettings(),
+    )
+
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@riski-core/src/core/settings/core.py` around lines 19 - 20, The kafka Field
definition in core.py is on one line while other settings use multi-line
formatting; update the kafka declaration to match the multi-line style (split
the attribute name, type, Field call and description onto separate lines) so it
aligns with other fields, referencing the kafka field and KafkaSettings type and
keeping the default_factory=lambda: KafkaSettings() and the description
unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@riski-core/src/core/settings/core.py`:
- Around line 1-5: The imports from core.settings are not alphabetically sorted:
currently DatabaseSettings, KafkaSettings, GenAISettings, TestDBSettings—move
the core.settings.kafka import (KafkaSettings) so the group reads
DatabaseSettings, GenAISettings, KafkaSettings, TestDBSettings (or run your
project's import sorter) so the symbols DatabaseSettings, GenAISettings,
KafkaSettings, and TestDBSettings are alphabetically ordered among the
core.settings.* imports.

In `@riski-extractor/app.py`:
- Around line 74-80: The Filehandler is being instantiated without the required
kafkaBroker parameter which causes a TypeError; update the instantiation in the
async block to pass the created broker (the broker returned by
createKafkaBroker) into Filehandler (e.g., Filehandler(broker) or
Filehandler(kafkaBroker=broker)) so that Filehandler.__init__ receives its
required argument before calling filehandler.download_and_persist_files; keep
the existing broker lifecycle (create before the async with and await
broker.stop() in the finally) unchanged.
- Around line 3-14: Remove the three unused imports at the top of the module:
delete the import of os, remove the from core.db.db_access import
update_or_insert_objects_to_database, and remove the from core.model.data_models
import ExtractArtifact; keep the remaining imports (sys, Logger,
Config/get_config, create_db_and_tables/init_db, setup_security, KafkaBroker)
intact and run the linter or tests to ensure no other references exist to these
symbols.

In `@riski-extractor/src/extractor/base_extractor.py`:
- Around line 108-111: The method run() returns a variable extracted_objects
that is never defined, causing a NameError that is swallowed by the broad except
and turns successful runs into failures; fix by having run() accumulate and
return the list of parsed objects from the helper (e.g., call and return the
result of _parse_objects_from_links(object_links) or create extracted_objects in
run() and extend it with values returned by _parse_objects_from_links), ensure
_parse_objects_from_links actually returns list[T] (populate and return
extracted_objects there), and narrow or rethrow exceptions as appropriate so
successful parses return a list instead of None; reference run() and
_parse_objects_from_links to locate the changes.

In `@riski-extractor/src/filehandler/filehandler.py`:
- Around line 40-44: There are two identical stubs of the async method
download_and_persist_files defined; remove the duplicate/incomplete definition
and keep the complete implementation (ensure only one async def
download_and_persist_files(self, batch_size: int = 100): remains with its full
body and logger call). Locate both occurrences of download_and_persist_files in
the filehandler.FileHandler class, delete the extraneous stub, and run
tests/static analysis to confirm no other references rely on the removed
duplicate.
- Around line 102-106: The rollback call to update_file_content(file.db_id,
None, "") will raise TypeError because update_file_content assumes content
supports len(); change the rollback to pass empty bytes instead
(update_file_content(file.db_id, b"", "")) so len(content) works; update the
error logging line (self.logger.error(...)) remains but ensure the rollback call
uses b"" to avoid masking the original Kafka publish exception.

---

Nitpick comments:
In `@riski-core/src/core/settings/core.py`:
- Around line 19-20: The kafka Field definition in core.py is on one line while
other settings use multi-line formatting; update the kafka declaration to match
the multi-line style (split the attribute name, type, Field call and description
onto separate lines) so it aligns with other fields, referencing the kafka field
and KafkaSettings type and keeping the default_factory=lambda: KafkaSettings()
and the description unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 2fdcef46-beab-4e8c-b2fd-a6d843400626

📥 Commits

Reviewing files that changed from the base of the PR and between af2227c and ba4fce0.

⛔ Files ignored due to path filters (1)
  • riski-extractor/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (8)
  • .env.example
  • .github/workflows/extractor-tests.yml
  • riski-core/src/core/settings/core.py
  • riski-extractor/app.py
  • riski-extractor/pyproject.toml
  • riski-extractor/src/extractor/base_extractor.py
  • riski-extractor/src/filehandler/filehandler.py
  • riski-extractor/test/test_filehandler.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • riski-extractor/pyproject.toml

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (3)
riski-extractor/src/filehandler/filehandler.py (1)

99-103: ⚠️ Potential issue | 🔴 Critical

Rollback will fail with TypeError: update_file_content cannot handle None content.

Per riski-core/src/core/db/db_access.py line 248, update_file_content executes file_db.size = len(content). Passing content=None raises TypeError: object of type 'NoneType' has no len(), causing the rollback itself to fail and masking the original Kafka publish error.

,

🐛 Proposed fix - use empty bytes instead of None
             except Exception as e:
                 # If Kafka Broker is unavailable rollback the file download to ensure
                 # All Documents that have content, are published to the Kafka Queue
-                update_file_content(file.db_id, None, "")
+                update_file_content(file.db_id, b"", "")
                 self.logger.error(f"Publishing failed. Rolled file download back: {file.db_id}. - {e}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@riski-extractor/src/filehandler/filehandler.py` around lines 99 - 103, The
rollback is passing None to update_file_content which calls len(content) and
raises TypeError; change the exception handler to call
update_file_content(file.db_id, b"", "") (use an empty bytes object for content)
so len(content) works and the rollback succeeds, and keep the existing
self.logger.error call to log the original exception from the Kafka publish
attempt.
riski-extractor/app.py (2)

85-99: ⚠️ Potential issue | 🟠 Major

Validate security credentials when security=True.

If config.core.kafka.security is True but the optional credentials (pkcs12_data, pkcs12_pw, ca_b64) are None, setup_security will fail when attempting to base64-decode None.

,

🔒 Proposed fix - validate credentials before calling setup_security
 async def createKafkaBroker(config: Config, logger: Logger) -> KafkaBroker:
     security = None
     if config.core.kafka.security:
+        kafka_cfg = config.core.kafka
+        if not all([kafka_cfg.pkcs12_data, kafka_cfg.pkcs12_pw, kafka_cfg.ca_b64]):
+            raise ValueError("Kafka security is enabled but required credentials (pkcs12_data, pkcs12_pw, ca_b64) are missing")
         security = setup_security(config.core.kafka.pkcs12_data, config.core.kafka.pkcs12_pw, config.core.kafka.ca_b64)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@riski-extractor/app.py` around lines 85 - 99, In createKafkaBroker, when
config.core.kafka.security is True you must validate that pkcs12_data, pkcs12_pw
and ca_b64 are present before calling setup_security; update createKafkaBroker
to check those fields on config.core.kafka, log a clear error (using
logger.error or logger.exception) and raise a descriptive exception if any are
missing, otherwise call setup_security with the validated values and proceed to
create/connect the KafkaBroker; reference createKafkaBroker,
config.core.kafka.pkcs12_data, config.core.kafka.pkcs12_pw,
config.core.kafka.ca_b64 and setup_security so the check is added just prior to
calling setup_security.

68-74: ⚠️ Potential issue | 🔴 Critical

Critical bug: Filehandler instantiated without required broker argument.

The Filehandler.__init__ at riski-extractor/src/filehandler/filehandler.py line 23 requires a kafkaBroker parameter, but line 70 creates Filehandler() without passing the broker. This will cause a TypeError at runtime.

,

🐛 Proposed fix
     broker = await createKafkaBroker(config, logger)
     try:
-        async with Filehandler() as filehandler:
+        async with Filehandler(broker) as filehandler:
             await filehandler.download_and_persist_files(batch_size=config.core.db.batch_size)
     finally:
         await broker.stop()
         logger.info("Broker closed.")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@riski-extractor/app.py` around lines 68 - 74, Filehandler is being
instantiated without the required kafkaBroker parameter: after creating the
broker with createKafkaBroker (broker variable) pass that broker into the
Filehandler constructor (e.g., Filehandler(broker) or
Filehandler(kafkaBroker=broker)) so the Filehandler.__init__ receives the
required argument; ensure you use the same broker variable created by
createKafkaBroker and keep the existing async with and
download_and_persist_files flow.
🧹 Nitpick comments (2)
riski-core/src/core/settings/core.py (1)

19-19: Consider consistent formatting with other fields.

The kafka field uses a single-line format with different argument order (default_factory before description), while db, genai, and testdb use multi-line format with description first.

🧹 Proposed formatting for consistency
-    kafka: KafkaSettings = Field(default_factory=lambda: KafkaSettings(), description="Kafka related settings")
+    kafka: KafkaSettings = Field(
+        description="Kafka related settings",
+        default_factory=lambda: KafkaSettings(),
+    )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@riski-core/src/core/settings/core.py` at line 19, The kafka Field declaration
is formatted differently from the other fields (db, genai, testdb) and places
default_factory before description; update the kafka field to match the
multi-line style and argument order used elsewhere by putting description first
and default_factory second, e.g. declare kafka: KafkaSettings =
Field(description="Kafka related settings", default_factory=lambda:
KafkaSettings()) while keeping the same types and default behavior so it
visually matches the existing db/genai/testdb declarations.
riski-extractor/src/filehandler/filehandler.py (1)

40-50: Remove redundant tasks initialization.

tasks = [] on line 42 is immediately overwritten by tasks = [] on line 50 inside the loop, making the first initialization dead code.

♻️ Proposed fix
     async def download_and_persist_files(self, batch_size: int = 100):
         self.logger.info("Persisting content of all scraped files to database.")
-        tasks = []
         offset = 0
         while True:
             files: list[File] = request_batch(File, offset=offset, limit=batch_size)
-
             if not files or len(files) < 1:
                 break
             semaphore = asyncio.Semaphore(batch_size)
             tasks = []
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@riski-extractor/src/filehandler/filehandler.py` around lines 40 - 50, In
download_and_persist_files(), remove the redundant initial assignment to tasks
(the first tasks = [] before entering the loop) since it is immediately
reassigned inside the while loop; keep the inner tasks = [] that resets
per-batch and ensure only one tasks variable is declared/used in the function
(referencing the tasks variable and the download_and_persist_files method to
locate the change).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@riski-extractor/app.py`:
- Around line 85-99: In createKafkaBroker, when config.core.kafka.security is
True you must validate that pkcs12_data, pkcs12_pw and ca_b64 are present before
calling setup_security; update createKafkaBroker to check those fields on
config.core.kafka, log a clear error (using logger.error or logger.exception)
and raise a descriptive exception if any are missing, otherwise call
setup_security with the validated values and proceed to create/connect the
KafkaBroker; reference createKafkaBroker, config.core.kafka.pkcs12_data,
config.core.kafka.pkcs12_pw, config.core.kafka.ca_b64 and setup_security so the
check is added just prior to calling setup_security.
- Around line 68-74: Filehandler is being instantiated without the required
kafkaBroker parameter: after creating the broker with createKafkaBroker (broker
variable) pass that broker into the Filehandler constructor (e.g.,
Filehandler(broker) or Filehandler(kafkaBroker=broker)) so the
Filehandler.__init__ receives the required argument; ensure you use the same
broker variable created by createKafkaBroker and keep the existing async with
and download_and_persist_files flow.

In `@riski-extractor/src/filehandler/filehandler.py`:
- Around line 99-103: The rollback is passing None to update_file_content which
calls len(content) and raises TypeError; change the exception handler to call
update_file_content(file.db_id, b"", "") (use an empty bytes object for content)
so len(content) works and the rollback succeeds, and keep the existing
self.logger.error call to log the original exception from the Kafka publish
attempt.

---

Nitpick comments:
In `@riski-core/src/core/settings/core.py`:
- Line 19: The kafka Field declaration is formatted differently from the other
fields (db, genai, testdb) and places default_factory before description; update
the kafka field to match the multi-line style and argument order used elsewhere
by putting description first and default_factory second, e.g. declare kafka:
KafkaSettings = Field(description="Kafka related settings",
default_factory=lambda: KafkaSettings()) while keeping the same types and
default behavior so it visually matches the existing db/genai/testdb
declarations.

In `@riski-extractor/src/filehandler/filehandler.py`:
- Around line 40-50: In download_and_persist_files(), remove the redundant
initial assignment to tasks (the first tasks = [] before entering the loop)
since it is immediately reassigned inside the while loop; keep the inner tasks =
[] that resets per-batch and ensure only one tasks variable is declared/used in
the function (referencing the tasks variable and the download_and_persist_files
method to locate the change).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: f7570e60-f959-4740-bf55-0a39c7984a51

📥 Commits

Reviewing files that changed from the base of the PR and between ba4fce0 and 44ae6c9.

📒 Files selected for processing (3)
  • riski-core/src/core/settings/core.py
  • riski-extractor/app.py
  • riski-extractor/src/filehandler/filehandler.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Write scraped document ids to kafka

1 participant