Skip to content

Fix: Inconsistent table names in Flink Kafka Sinks. #287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 12, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix made to resolve the inconsistent table names in Flink Kafka Sinks is concise and correctly addresses the issue where the same name was being used for different Kafka sinks and sources, potentially leading to data flow issues.

Code Quality

  • The change is clean and adheres to the current coding style, making it easy to understand and maintain.
  • Appropriate environment variable usage ensures that sensitive information such as Kafka keys and secrets are handled properly.

Review of Changes

  • Change: The table name was previously set to process_events_kafka and has been updated to raw_events_kafka. This change likely corrects a logical inconsistency to differentiate between processed and raw event data sink points.

Creator Fairness

  • The change made by PrinceSajjadHussain seems well thought-out, enhancing the robustness of data flow and organization within the project.

Recommendation

  • I recommend merging this pull request. The alteration resolves the described issue and aligns with best practices for managing Kafka topics, which aids in preventing any accidental data overlaps or misrouting in Flink jobs.

Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
import json
import requests
from pyflink.table import EnvironmentSettings, DataTypes, TableEnvironment, StreamTableEnvironment

def create_processed_events_sink_kafka(t_env):
table_name = "process_events_kafka"
table_name = "raw_events_kafka"
kafka_key = os.environ.get("KAFKA_WEB_TRAFFIC_KEY", "")
kafka_secret = os.environ.get("KAFKA_WEB_TRAFFIC_SECRET", "")
sasl_config = f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_key}" password="{kafka_secret}";'
Expand Down