Skip to content

Add support for UDF function transformations in PubSubToRedis template#3338

Open
wladabutenko wants to merge 3 commits intoGoogleCloudPlatform:mainfrom
wladabutenko:copy_pr_3262
Open

Add support for UDF function transformations in PubSubToRedis template#3338
wladabutenko wants to merge 3 commits intoGoogleCloudPlatform:mainfrom
wladabutenko:copy_pr_3262

Conversation

@wladabutenko
Copy link
Contributor

Adding optional UDF support to the PubSubToRedis pipeline, enabling user-defined transformations before writing to Redis

@gemini-code-assist
Copy link

Summary of Changes

Hello @wladabutenko, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the PubSubToRedis Dataflow template by introducing optional User-Defined Function (UDF) capabilities. Users can now apply custom JavaScript transformations to their Pub/Sub messages before they are stored in Redis, providing greater flexibility in data processing. Additionally, robust error handling has been integrated through a dead-letter topic, ensuring that any messages failing UDF processing or Redis writes are captured for further analysis. The changes also include updated parameter validation, comprehensive documentation, and new integration tests to ensure the reliability and correctness of the new features.

Highlights

  • User-Defined Function (UDF) Support: Added optional JavaScript UDF support to the PubSubToRedis pipeline, allowing users to define custom transformations for messages before they are written to Redis.
  • Dead-Letter Topic for UDF Failures: Implemented a dead-letter topic mechanism to capture and forward messages that fail during UDF transformation or cannot be written to Redis, improving error handling and troubleshooting.
  • Updated Parameter Validation: Modified the regex validation for 'Integer' and 'Long' template parameters to correctly allow negative numbers, ensuring broader compatibility for numerical inputs.
  • Enhanced Documentation and Terraform Support: Updated the template's README and Terraform configuration to reflect the new UDF parameters (GCS path, function name, reload interval) and the dead-letter topic.
  • New Integration Tests: Introduced comprehensive integration tests for the PubSubToRedis template, covering various Redis sink types (String, Hash, Streams) and UDF functionality, utilizing Testcontainers for Redis.
Changelog
  • metadata/src/main/java/com/google/cloud/teleport/metadata/util/MetadataUtils.java
    • Updated regex for Integer and Long template parameters to allow negative numbers.
  • plugins/templates-maven-plugin/pom.xml
    • Added a dependency on the metadata artifact.
  • v2/pubsub-to-redis/README_Cloud_PubSub_to_Redis.md
    • Updated the overview to mention UDF support and dead-lettering.
    • Added a new required parameter deadletterTopic.
    • Added new optional parameters for UDF: javascriptTextTransformGcsPath, javascriptTextTransformFunctionName, and javascriptTextTransformReloadIntervalMinutes.
    • Included a new section detailing User-Defined Functions (UDFs) and providing links to UDF documentation.
  • v2/pubsub-to-redis/pom.xml
    • Added testcontainers and it-google-cloud-platform dependencies for integration testing.
    • Added a dependency on the metadata artifact.
    • Configured the maven-surefire-plugin to run integration tests sequentially to prevent Redis port conflicts.
  • v2/pubsub-to-redis/src/main/java/com/google/cloud/teleport/v2/templates/PubSubToRedis.java
    • Added new imports for UDF-related classes and FailsafeElement handling.
    • Updated template description to include UDF functionality and dead-letter topic.
    • Added a new requirement for the Pub/Sub unprocessed topic.
    • Defined UDF_OUT, UDF_DEADLETTER_OUT tuple tags and FAILSAFE_ELEMENT_CODER for UDF processing.
    • Extended PubSubToRedisOptions to include JavascriptTextTransformerOptions for UDF configuration.
    • Added a new required parameter deadletterTopic to PubSubToRedisOptions.
    • Registered FAILSAFE_ELEMENT_CODER with the pipeline's CoderRegistry.
    • Modified the pipeline steps to include an optional UDF transformation stage.
    • Implemented the applyUdf method to conditionally apply JavaScript UDFs and handle failures by writing to a dead-letter topic.
    • Added PubsubMessageToFailsafeElementFn to wrap PubsubMessages in FailsafeElements for robust error handling.
  • v2/pubsub-to-redis/src/test/java/com/google/cloud/teleport/v2/templates/PubSubToRedisIT.java
    • Added a new integration test file for the PubSubToRedis template.
    • Implemented setup and teardown methods to manage Pub/Sub resources and a Redis Testcontainer.
    • Added testPubSubToRedisStringSink to verify string sink functionality.
    • Added testPubSubToRedisHashSink to verify hash sink functionality.
    • Added testPubSubToRedisStreamsSink to verify streams sink functionality.
  • v2/pubsub-to-redis/src/test/java/com/google/cloud/teleport/v2/templates/PubSubToRedisTest.java
    • Added new imports for Hamcrest matchers and PipelineOptionsFactory.
    • Added PubsubMessageOutputChecker for asserting PubsubMessage content in tests.
    • Configured the pipeline's CoderRegistry to register FAILSAFE_ELEMENT_CODER.
    • Added testPubSubToRedisWithUdf to verify UDF application.
    • Added testPubSubToRedisWithoutUdf to verify pipeline behavior when UDF is not configured.
    • Added testApplyUdfThrowsWhenFunctionNameMissing to test UDF configuration validation.
    • Added testRedisStringSink to verify string sink functionality.
    • Added testRedisHashSink to verify hash sink functionality.
    • Added testRedisStreamsSink to verify streams sink functionality.
  • v2/pubsub-to-redis/src/test/resources/PubSubToRedisTest/transform.js
    • Added a sample JavaScript UDF file with transform and transformBad functions for testing purposes.
  • v2/pubsub-to-redis/terraform/Cloud_PubSub_to_Redis/dataflow_job.tf
    • Updated descriptions for existing variables to be more precise.
    • Added a new variable deadletterTopic for the Pub/Sub dead-letter topic.
    • Updated the google_dataflow_flex_template_job resource to include deadletterTopic, javascriptTextTransformGcsPath, javascriptTextTransformFunctionName, and javascriptTextTransformReloadIntervalMinutes in the parameters.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@tvalentyn
Copy link
Contributor

/gemini review

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds support for user-defined JavaScript functions (UDFs) and a dead-letter topic to the Pub/Sub to Redis template. The changes are well-structured and include updates to the pipeline code, documentation, and tests. My review focuses on improving documentation accuracy, ensuring code correctness, and enhancing test coverage. I've identified a few areas for improvement:

  • The descriptions for the new deadletterTopic parameter in the README, Javadoc, and Terraform configuration are slightly misleading and should be updated to reflect that only UDF failures are handled.
  • There's a potential bug in character encoding when creating a PubsubMessage from the transformed payload.
  • The new integration tests don't cover the UDF and dead-letter functionality, and one of the new unit tests is incomplete.
    Overall, this is a great addition to the template. Addressing these points will make it more robust and easier to use.

FailsafeElement<PubsubMessage, String> element = c.element();
c.output(
new PubsubMessage(
element.getPayload().getBytes(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Calling getBytes() on a String without specifying a charset can lead to unexpected behavior as it uses the platform's default charset. Since the payload is consistently handled as UTF-8 in other parts of the code (e.g., in PubsubMessageToFailsafeElementFn), it's best to explicitly use StandardCharsets.UTF_8 here for consistency and correctness.

Suggested change
element.getPayload().getBytes(),
element.getPayload().getBytes(java.nio.charset.StandardCharsets.UTF_8),

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* **redisHost**: The Redis database host. For example, `your.cloud.db.redislabs.com`. Defaults to: 127.0.0.1.
* **redisPort**: The Redis database port. For example, `12345`. Defaults to: 6379.
* **redisPassword**: The Redis database password. Defaults to `empty`.
* **deadletterTopic**: The Pub/Sub topic to forward unprocessable messages to. Messages that fail UDF transformation or cannot be written to Redis are forwarded here. For example, `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The description for deadletterTopic states that messages that "cannot be written to Redis" are forwarded to this topic. Based on the current implementation, only messages that fail the UDF transformation are sent to the dead-letter topic. Failures during the write to Redis are not handled this way. To avoid confusion, please update the description to accurately reflect that only UDF failures are sent to the dead-letter topic.

Suggested change
* **deadletterTopic**: The Pub/Sub topic to forward unprocessable messages to. Messages that fail UDF transformation or cannot be written to Redis are forwarded here. For example, `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.
* **deadletterTopic**: The Pub/Sub topic to forward unprocessable messages to. Messages that fail UDF transformation are forwarded here. For example, `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.

Comment on lines +260 to +261
helpText =
"The Pub/Sub topic to forward unprocessable messages to. Messages that fail UDF transformation or cannot be written to Redis are forwarded here.",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The helpText for the deadletterTopic parameter states that messages that "cannot be written to Redis" are forwarded to this topic. However, the implementation only sends messages that fail the UDF transformation to the dead-letter topic. Please update the documentation to be consistent with the implementation.

Suggested change
helpText =
"The Pub/Sub topic to forward unprocessable messages to. Messages that fail UDF transformation or cannot be written to Redis are forwarded here.",
helpText =
"The Pub/Sub topic to forward unprocessable messages to. Messages that fail UDF transformation are forwarded here.",

// SkipDirectRunnerTest: PubsubIO doesn't trigger panes on the DirectRunner.
@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
@RunWith(JUnit4.class)
public final class PubSubToRedisIT extends TemplateTestBase {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These integration tests are a good start for the Redis sinks. However, they don't cover the new UDF transformation and dead-lettering functionality, which is the main feature of this pull request. To ensure the end-to-end flow is working as expected, please consider adding integration tests for the following scenarios:

  • A successful UDF transformation.
  • A failing UDF transformation, verifying that the message is sent to the dead-letter topic.
  • The case where no UDF is specified (passthrough).
    This will significantly improve the test coverage for the new features.

Comment on lines +293 to +294
// Verify that the data was written to Redis Streams (additional verification logic may be
// needed)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test case for the Redis Streams sink appears to be incomplete. It runs the pipeline but does not include any assertions to verify that the data was actually written to the Redis stream. Please add assertions to confirm the expected outcome, for example by checking the stream length with client.xlen() or inspecting its content with client.xrange().


variable "deadletterTopic" {
type = string
description = "The Pub/Sub topic to forward unprocessable messages to. Messages that fail UDF transformation or cannot be written to Redis are forwarded here. For example, `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The description for the deadletterTopic variable states that messages that "cannot be written to Redis" are forwarded to this topic. This is inconsistent with the implementation, which only handles UDF failures. Please update the description to accurately reflect the functionality.

  description = "The Pub/Sub topic to forward unprocessable messages to. Messages that fail UDF transformation are forwarded here. For example, `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts on this one? what do other templates do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is due to the nature of Redis as it treats the data as an binary-safe, opaque block of text. So for STRING_SINK type it does not validate whether the string malformed/correct, it's takes the raw characters and stores them as a Redis string value.

Based on v2/pubsub-to-redis/src/main/java/com/google/cloud/teleport/v2/templates/transforms/MessageTransformation.java the entire message payload mapped into fieldKey

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a failure mode where a message is not written to Reddis for whatever reason? if so, will such message be forwarded to DLQ?

Copy link
Contributor

@tvalentyn tvalentyn Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment (which I agree with) is that if only messages for which UDF failed to run are forwarded to DLQ, then the docstring should not say ...Messages that fail UDF transformation or cannot be written to Redis are forwarded here...

@codecov
Copy link

codecov bot commented Feb 17, 2026

Codecov Report

❌ Patch coverage is 81.66667% with 11 lines in your changes missing coverage. Please review.
✅ Project coverage is 51.08%. Comparing base (18949a0) to head (130bd43).
⚠️ Report is 8 commits behind head on main.

Files with missing lines Patch % Lines
...gle/cloud/teleport/v2/templates/PubSubToRedis.java 84.48% 9 Missing ⚠️
...le/cloud/teleport/metadata/util/MetadataUtils.java 0.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #3338      +/-   ##
============================================
+ Coverage     51.04%   51.08%   +0.03%     
- Complexity     5213     5598     +385     
============================================
  Files           994      994              
  Lines         60672    60725      +53     
  Branches       6651     6653       +2     
============================================
+ Hits          30972    31022      +50     
  Misses        27527    27527              
- Partials       2173     2176       +3     
Components Coverage Δ
spanner-templates 70.85% <ø> (-0.03%) ⬇️
spanner-import-export 69.08% <ø> (ø)
spanner-live-forward-migration 79.80% <ø> (-0.04%) ⬇️
spanner-live-reverse-replication 77.39% <ø> (-0.08%) ⬇️
spanner-bulk-migration 87.91% <ø> (-0.03%) ⬇️
Files with missing lines Coverage Δ
...le/cloud/teleport/metadata/util/MetadataUtils.java 50.00% <0.00%> (ø)
...gle/cloud/teleport/v2/templates/PubSubToRedis.java 47.00% <84.48%> (+47.00%) ⬆️

... and 2 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants