Skip to content

Fix: Inconsistent table names in Flink Kafka Sinks. #287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 12, 2025

Conversation

PrinceSajjadHussain
Copy link
Contributor

The start_job.py file creates a Kafka sink named process_events_kafka, while aggregation_job.py creates a Kafka source with the same name process_events_kafka. This can cause confusion and potentially lead to the aggregation job reading from the sink it is writing to, which would be incorrect.

Fix:
--- a/bootcamp\materials\4-apache-flink-training\src\job\start_job.py
+++ b/bootcamp\materials\4-apache-flink-training\src\job\start_job.py
@@ -5,7 +5,7 @@
from pyflink.table import EnvironmentSettings, DataTypes, TableEnvironment, StreamTableEnvironment

def create_processed_events_sink_kafka(t_env):

  • table_name = "process_events_kafka"
  • table_name = "raw_events_kafka"
    kafka_key = os.environ.get("KAFKA_WEB_TRAFFIC_KEY", "")
    kafka_secret = os.environ.get("KAFKA_WEB_TRAFFIC_SECRET", "")
    sasl_config = f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_key}" password="{kafka_secret}";'

Copy link
Member

@EcZachly EcZachly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update resolves name conflicts in Flink Kafka sinks, essential for the correct reading/writing processes. Preventing such confusion is crucial for robust data processing pipelines.

Recommendation: Approve for Merge

Copy link
Member

@EcZachly EcZachly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revoke previous approval due to changes needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix made to resolve the inconsistent table names in Flink Kafka Sinks is concise and correctly addresses the issue where the same name was being used for different Kafka sinks and sources, potentially leading to data flow issues.

Code Quality

  • The change is clean and adheres to the current coding style, making it easy to understand and maintain.
  • Appropriate environment variable usage ensures that sensitive information such as Kafka keys and secrets are handled properly.

Review of Changes

  • Change: The table name was previously set to process_events_kafka and has been updated to raw_events_kafka. This change likely corrects a logical inconsistency to differentiate between processed and raw event data sink points.

Creator Fairness

  • The change made by PrinceSajjadHussain seems well thought-out, enhancing the robustness of data flow and organization within the project.

Recommendation

  • I recommend merging this pull request. The alteration resolves the described issue and aligns with best practices for managing Kafka topics, which aids in preventing any accidental data overlaps or misrouting in Flink jobs.

@isangwanrahul isangwanrahul merged commit d4d71b9 into DataExpert-io:main Aug 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants