Skip to content

Allow defining multiple source kafka topics #2060

Closed
Fadelis wants to merge 1 commit intoGoogleCloudPlatform:mainfrom
Fadelis:main
Closed

Allow defining multiple source kafka topics #2060
Fadelis wants to merge 1 commit intoGoogleCloudPlatform:mainfrom
Fadelis:main

Conversation

@Fadelis
Copy link
Copy Markdown

@Fadelis Fadelis commented Dec 12, 2024

Supporting possibility listening to multiple Kafka topics in dataflows would be very valuable. It was supported in non Flex templates and seems like the base options example hints to it as well, but somehow it wasn't not implemented (and there's a bug for it #2038)


With the possibility of listening to multiple topics, it would be also very beneficial to be able to enrich the stored record with the topic of the message, so added such functionality in a similar fashion as the persistKafkaKey option.

@github-actions
Copy link
Copy Markdown
Contributor

This pull request has been marked as stale due to 180 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jun 11, 2025
@Fadelis
Copy link
Copy Markdown
Author

Fadelis commented Jun 13, 2025

Bump @GoogleCloudPlatform/dataflow-templates-wg

@stale stale bot removed the stale label Jun 13, 2025
@liferoad liferoad added the improvement Making existing code better label Jul 12, 2025
@liferoad liferoad requested review from an2x, fozzie15 and johnjcasey July 12, 2025 19:30
@codecov
Copy link
Copy Markdown

codecov bot commented Jul 12, 2025

Codecov Report

Attention: Patch coverage is 36.36364% with 14 lines in your changes missing coverage. Please review.

Project coverage is 49.58%. Comparing base (8902c7d) to head (900b2e5).
Report is 79 commits behind head on main.

Files with missing lines Patch % Lines
...oud/teleport/v2/transforms/BigQueryWriteUtils.java 0.00% 8 Missing ⚠️
...oud/teleport/v2/templates/KafkaToBigQueryFlex.java 0.00% 5 Missing ⚠️
...le/cloud/teleport/v2/templates/KafkaToGcsFlex.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2060      +/-   ##
============================================
+ Coverage     49.57%   49.58%   +0.01%     
- Complexity     4792     5142     +350     
============================================
  Files           941      941              
  Lines         57618    57628      +10     
  Branches       6235     6236       +1     
============================================
+ Hits          28565    28577      +12     
+ Misses        27010    27006       -4     
- Partials       2043     2045       +2     
Components Coverage Δ
spanner-templates 69.85% <ø> (-0.01%) ⬇️
spanner-import-export 68.61% <ø> (-0.03%) ⬇️
spanner-live-forward-migration 78.77% <ø> (ø)
spanner-live-reverse-replication 77.30% <ø> (ø)
spanner-bulk-migration 87.80% <ø> (ø)
Files with missing lines Coverage Δ
...cloud/teleport/v2/kafka/utils/KafkaTopicUtils.java 93.33% <100.00%> (+93.33%) ⬆️
...le/cloud/teleport/v2/templates/KafkaToGcsFlex.java 0.00% <0.00%> (ø)
...oud/teleport/v2/templates/KafkaToBigQueryFlex.java 0.00% <0.00%> (ø)
...oud/teleport/v2/transforms/BigQueryWriteUtils.java 0.00% <0.00%> (ø)

... and 1 file with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@an2x
Copy link
Copy Markdown
Member

an2x commented Jul 14, 2025

The new Flex templates support reading from a single topic only (the example is incorrect, I'll fix the example), because they have a lot of extra functionality compared to the legacy templates (e.g. reading messages with different schema from the same topic, writing to different BigQuery tables based on schema, etc.), and supporting this while also reading from multiple topics is not straightforward and will require more changes than just parsing the input topic as a list.

At the minimum, we would need better support for the schemaFormat = SINGLE_SCHEMA_FILE case (because different topics typically have messages with different schema, so the user would somehow have to provide a schema file for every topic, which is hard to configure through the current UI), and better support for the DYNAMIC_TABLE_NAMS write strategy in the Kafka to BigQuery table (because right now the target table names are auto-generated and we don't account for messages potentially coming from different topics, there could be conflicts).

All in all, IMO all this would make the template way too complicated. It's supposed to cover only some basic use cases. If someone has a complex pipeline that requires all this functionality while also reading from multiple topics, I'd recommend writing a custom pipeline for this use case instead (or a custom template).

@an2x an2x closed this Jul 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

improvement Making existing code better size/L

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants