Skip to content

Fixes 25437: For kafka message consuming, switch to using poll() instead of consume()#25838

Open
LasseGravesenSaxo wants to merge 5 commits intoopen-metadata:mainfrom
LasseGravesenSaxo:fix/kafka-consuming-poll-consume
Open

Fixes 25437: For kafka message consuming, switch to using poll() instead of consume()#25838
LasseGravesenSaxo wants to merge 5 commits intoopen-metadata:mainfrom
LasseGravesenSaxo:fix/kafka-consuming-poll-consume

Conversation

@LasseGravesenSaxo
Copy link

@LasseGravesenSaxo LasseGravesenSaxo commented Feb 12, 2026

Describe your changes:

Fixes #25437: For kafka message consuming, switch to using poll() instead of consume().

DeserializingConsumer does not implement consume, it raises a NotImplementedError. Instead of using that, switch to using poll. In yield_topic_sample_data in CommonBrokerSource.
See here for the documentation about DeserializingConsumer.consume.

Type of change:

  • Bug fix

Checklist:

  • I have read the CONTRIBUTING document.
  • My PR title is Fixes <issue-number>: <short explanation>
  • I have commented on my code, particularly in hard-to-understand areas.
  • For JSON Schema changes: I updated the migration scripts or explained why it is not needed.

Bug fix

  • I have added a test that covers the exact scenario we are fixing. For complex issues, comment the issue number in the test for future reference.

I need help with this. I'm not too familiar with the codebase.


Summary by Gitar

  • Replaces single consume() API call with a polling loop that calls poll() up to 10 times, collecting messages individually while respecting a 10-second total timeout using time.monotonic() deadline tracking
  • Adds robust error handling for ConsumeError, KeyDeserializationError, and ValueDeserializationError exceptions, logging warnings but continuing to collect additional valid messages instead of failing completely
  • Fixes CommonBrokerSource.yield_topic_sample_data() in ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py:294 where confluent_kafka.DeserializingConsumer.consume() raises NotImplementedError because the method is not implemented in the library

@github-actions
Copy link
Contributor

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

@github-actions
Copy link
Contributor

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

@github-actions
Copy link
Contributor

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

messages = self.consumer_client.consume(num_messages=10, timeout=10)
# DeserializingConsumer does not implement consume(), use poll() in a loop instead.
messages = []
n_poll = 10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Quality: Magic numbers for poll count and timeout should be named constants

The values n_poll = 10 and total_timeout = 10 are local variables, but they mirror the original consume(num_messages=10, timeout=10) behavior and are configuration-like values. Consider extracting them as class-level or module-level constants for better discoverability and consistency, e.g., _SAMPLE_DATA_MAX_MESSAGES = 10 and _SAMPLE_DATA_POLL_TIMEOUT_SECS = 10. This is a minor style point - the current implementation is functional and correct.

Was this helpful? React with 👍 / 👎

@gitar-bot
Copy link

gitar-bot bot commented Feb 18, 2026

🔍 CI failure analysis for 8ba77f7: CI failures have two root causes: (1) missing 'safe to test' label blocking most jobs, and (2) infrastructure timeout in py-run-build-tests waiting for labeler check.

Issue

CI failures are now showing two distinct failure patterns affecting 20 jobs total.

Root Causes

Root Cause 1: Missing 'safe to test' Label (Primary)

Most CI jobs include a "Verify PR labels" step that validates the PR has the safe to test label before proceeding. This security gate prevents untrusted code from executing in CI.

Root Cause 2: Infrastructure Timeout (Secondary)

The py-run-build-tests job is failing at the "Wait for the labeler" step with a network timeout:

Failed to open TCP connection to api.github.com:443 (execution expired) (Net::OpenTimeout)
Process completed with exit code 1

This job waits for the label verification check to complete before proceeding. Due to network connectivity issues with GitHub API, it timed out after multiple retry attempts.

Details

PR Status:

  • Current commit: 8ba77f7fd49693bcd8052c1d68ee4aefad6b1a6d (merged with main)
  • File changed: ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py
  • Change type: Bug fix for Kafka consumer API (switching from consume() to poll())
  • Labels: None (empty)
  • Original fix: Preserved intact through all merges

Failed Jobs Breakdown (20 total failures):

Blocked at label verification (primary failures):

  • Python unit tests (py-run-tests 3.10, 3.11)
  • Unit Tests & Static Checks (3.10, 3.11)
  • Integration Tests (shard-1 and shard-2 for both 3.10, 3.11)
  • Python checkstyle
  • Java checkstyle
  • Playwright E2E tests (shards 1, 2, 3, 4, 5, 6)
  • Build and scan jobs (2 instances)

Infrastructure timeout (secondary failure):

  • py-run-build-tests (timeout waiting for labeler check)

Dependent job failures:

  • py-combine-coverage (no coverage artifacts to combine)

No actual code validation has occurred - all test execution, builds, linting, and security scans are blocked.

Context

This PR has been:

  • Code reviewed and approved by Gitar
  • Enhanced with technical summary
  • Documented with confluent-kafka API references
  • Kept up-to-date with main branch through multiple merges
  • Awaiting maintainer action for security label

The infrastructure timeout in py-run-build-tests is a transient issue that will resolve once the label is added and jobs can proceed normally.

Code Review 👍 Approved with suggestions 0 resolved / 1 findings

Solid bug fix correctly replacing the unimplemented DeserializingConsumer.consume() with a poll() loop. Error handling properly covers all documented exception types from the confluent-kafka library. One minor naming suggestion.

💡 Quality: Magic numbers for poll count and timeout should be named constants

📄 ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py:299

The values n_poll = 10 and total_timeout = 10 are local variables, but they mirror the original consume(num_messages=10, timeout=10) behavior and are configuration-like values. Consider extracting them as class-level or module-level constants for better discoverability and consistency, e.g., _SAMPLE_DATA_MAX_MESSAGES = 10 and _SAMPLE_DATA_POLL_TIMEOUT_SECS = 10. This is a minor style point - the current implementation is functional and correct.

Tip

Comment Gitar fix CI or enable auto-apply: gitar auto-apply:on

Options

Auto-apply is off → Gitar will not commit updates to this branch.
Display: compact → Showing less information.

Comment with these commands to change:

Auto-apply Compact
gitar auto-apply:on         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kafka Connection cannot import topic sample data- using an unimplemented Confluent DeserializingConsumer method

1 participant

Comments