Skip to content

Commit c8d9e25

Browse files
wladabutenkoUlada Butsenka
andauthored
Add support for UDF function transformations in PubSubToRedis template (#3338)
* Merge branch 'udb_redis' * Update RedisHashIO to use AutoValue and move UDF comment in PubSubToRedis * Specify standard charset as UTF-8 * Add assertions to RedisStreamsSink test * Update deadletter Pub/Sub topic description * Relax validation so DLQ is required when UDF is configured * Apply formatting with spotless --------- Co-authored-by: Ulada Butsenka <uladabutsenka@google.com>
1 parent 378a2af commit c8d9e25

File tree

9 files changed

+853
-141
lines changed

9 files changed

+853
-141
lines changed

metadata/src/main/java/com/google/cloud/teleport/metadata/util/MetadataUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,10 @@ public static List<String> getRegexes(Annotation parameterAnnotation) {
184184
return List.of("^(true|false)$");
185185
case "Integer":
186186
TemplateParameter.Integer integerParam = (TemplateParameter.Integer) parameterAnnotation;
187-
return List.of("^[0-9]+$");
187+
return List.of("^-?[0-9]+$");
188188
case "Long":
189189
TemplateParameter.Long longParam = (TemplateParameter.Long) parameterAnnotation;
190-
return List.of("^[0-9]+$");
190+
return List.of("^-?[0-9]+$");
191191
case "Float":
192192
TemplateParameter.Float floatParam = (TemplateParameter.Float) parameterAnnotation;
193193
return List.of("^-?(0|[1-9][0-9]*)(\\.[0-9]+)?([eE][-+]?[0-9]+)?$");

plugins/templates-maven-plugin/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@
5050
<artifactId>core-plugin</artifactId>
5151
<version>${project.version}</version>
5252
</dependency>
53+
<dependency>
54+
<groupId>com.google.cloud.teleport.metadata</groupId>
55+
<artifactId>metadata</artifactId>
56+
<version>${project.version}</version>
57+
</dependency>
5358
<dependency>
5459
<groupId>com.google.cloud</groupId>
5560
<artifactId>google-cloud-dataflow</artifactId>

v2/pubsub-to-redis/README_Cloud_PubSub_to_Redis.md

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ Pub/Sub subscription and writes the message payload to Redis. The most common us
66
case of this template is to export logs to Redis Enterprise for advanced
77
search-based log analysis in real time.
88

9+
Before writing to Redis, you can apply a JavaScript user-defined function to the
10+
message payload. Any messages that experience processing failures are forwarded
11+
to a Pub/Sub unprocessed topic for further troubleshooting and reprocessing.
12+
913
For added security, enable an SSL connection when setting up your database
1014
endpoint connection.
1115

@@ -33,7 +37,21 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
3337
* **redisSinkType**: The Redis sink. Supported values are `STRING_SINK, HASH_SINK, STREAMS_SINK, and LOGGING_SINK`. For example, `STRING_SINK`. Defaults to: STRING_SINK.
3438
* **connectionTimeout**: The Redis connection timeout in milliseconds. For example, `2000`. Defaults to: 2000.
3539
* **ttl**: The key expiration time in seconds. The `ttl` default for `HASH_SINK` is -1, which means it never expires.
40+
* **outputDeadletterTopic**: The Pub/Sub topic to forward unprocessable messages to. Messages that fail UDF transformation are forwarded here, Required if using a JavaScript UDF. For example, `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.
41+
* **javascriptTextTransformGcsPath**: The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. For example, `gs://my-bucket/my-udfs/my_file.js`.
42+
* **javascriptTextTransformFunctionName**: The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is `myTransform(inJson) { /*...do stuff...*/ }`, then the function name is `myTransform`. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
43+
* **javascriptTextTransformReloadIntervalMinutes**: Specifies how frequently to reload the UDF, in minutes. If the value is greater than 0, Dataflow periodically checks the UDF file in Cloud Storage, and reloads the UDF if the file is modified. This parameter allows you to update the UDF while the pipeline is running, without needing to restart the job. If the value is `0`, UDF reloading is disabled. The default value is `0`.
44+
45+
46+
## User-Defined functions (UDFs)
47+
48+
The Pub/Sub to Redis Template supports User-Defined functions (UDFs).
49+
UDFs allow you to customize functionality by providing a JavaScript function
50+
without having to maintain or build the entire template code.
3651

52+
Check [Create user-defined functions for Dataflow templates](https://cloud.google.com/dataflow/docs/guides/templates/create-template-udf)
53+
and [Using UDFs](https://github.com/GoogleCloudPlatform/DataflowTemplates#using-udfs)
54+
for more information about how to create and test those functions.
3755

3856

3957
## Getting Started
@@ -135,6 +153,10 @@ export SSL_ENABLED=false
135153
export REDIS_SINK_TYPE=STRING_SINK
136154
export CONNECTION_TIMEOUT=2000
137155
export TTL=-1
156+
export OUTPUT_DEADLETTER_TOPIC=<outputDeadletterTopic>
157+
export JAVASCRIPT_TEXT_TRANSFORM_GCS_PATH=<javascriptTextTransformGcsPath>
158+
export JAVASCRIPT_TEXT_TRANSFORM_FUNCTION_NAME=<javascriptTextTransformFunctionName>
159+
export JAVASCRIPT_TEXT_TRANSFORM_RELOAD_INTERVAL_MINUTES=0
138160

139161
gcloud dataflow flex-template run "cloud-pubsub-to-redis-job" \
140162
--project "$PROJECT" \
@@ -147,7 +169,11 @@ gcloud dataflow flex-template run "cloud-pubsub-to-redis-job" \
147169
--parameters "sslEnabled=$SSL_ENABLED" \
148170
--parameters "redisSinkType=$REDIS_SINK_TYPE" \
149171
--parameters "connectionTimeout=$CONNECTION_TIMEOUT" \
150-
--parameters "ttl=$TTL"
172+
--parameters "ttl=$TTL" \
173+
--parameters "outputDeadletterTopic=$OUTPUT_DEADLETTER_TOPIC" \
174+
--parameters "javascriptTextTransformGcsPath=$JAVASCRIPT_TEXT_TRANSFORM_GCS_PATH" \
175+
--parameters "javascriptTextTransformFunctionName=$JAVASCRIPT_TEXT_TRANSFORM_FUNCTION_NAME" \
176+
--parameters "javascriptTextTransformReloadIntervalMinutes=$JAVASCRIPT_TEXT_TRANSFORM_RELOAD_INTERVAL_MINUTES"
151177
```
152178

153179
For more information about the command, please check:
@@ -176,6 +202,10 @@ export SSL_ENABLED=false
176202
export REDIS_SINK_TYPE=STRING_SINK
177203
export CONNECTION_TIMEOUT=2000
178204
export TTL=-1
205+
export OUTPUT_DEADLETTER_TOPIC=<outputDeadletterTopic>
206+
export JAVASCRIPT_TEXT_TRANSFORM_GCS_PATH=<javascriptTextTransformGcsPath>
207+
export JAVASCRIPT_TEXT_TRANSFORM_FUNCTION_NAME=<javascriptTextTransformFunctionName>
208+
export JAVASCRIPT_TEXT_TRANSFORM_RELOAD_INTERVAL_MINUTES=0
179209

180210
mvn clean package -PtemplatesRun \
181211
-DskipTests \
@@ -184,7 +214,7 @@ mvn clean package -PtemplatesRun \
184214
-Dregion="$REGION" \
185215
-DjobName="cloud-pubsub-to-redis-job" \
186216
-DtemplateName="Cloud_PubSub_to_Redis" \
187-
-Dparameters="inputSubscription=$INPUT_SUBSCRIPTION,redisHost=$REDIS_HOST,redisPort=$REDIS_PORT,redisPassword=$REDIS_PASSWORD,sslEnabled=$SSL_ENABLED,redisSinkType=$REDIS_SINK_TYPE,connectionTimeout=$CONNECTION_TIMEOUT,ttl=$TTL" \
217+
-Dparameters="inputSubscription=$INPUT_SUBSCRIPTION,redisHost=$REDIS_HOST,redisPort=$REDIS_PORT,redisPassword=$REDIS_PASSWORD,sslEnabled=$SSL_ENABLED,redisSinkType=$REDIS_SINK_TYPE,connectionTimeout=$CONNECTION_TIMEOUT,ttl=$TTL,outputDeadletterTopic=$OUTPUT_DEADLETTER_TOPIC,javascriptTextTransformGcsPath=$JAVASCRIPT_TEXT_TRANSFORM_GCS_PATH,javascriptTextTransformFunctionName=$JAVASCRIPT_TEXT_TRANSFORM_FUNCTION_NAME,javascriptTextTransformReloadIntervalMinutes=$JAVASCRIPT_TEXT_TRANSFORM_RELOAD_INTERVAL_MINUTES" \
188218
-f v2/pubsub-to-redis
189219
```
190220

@@ -237,6 +267,10 @@ resource "google_dataflow_flex_template_job" "cloud_pubsub_to_redis" {
237267
# redisSinkType = "STRING_SINK"
238268
# connectionTimeout = "2000"
239269
# ttl = "-1"
270+
# outputDeadletterTopic = "<outputDeadletterTopic>"
271+
# javascriptTextTransformGcsPath = "<javascriptTextTransformGcsPath>"
272+
# javascriptTextTransformFunctionName = "<javascriptTextTransformFunctionName>"
273+
# javascriptTextTransformReloadIntervalMinutes = "0"
240274
}
241275
}
242276
```

v2/pubsub-to-redis/pom.xml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,41 @@
6262
<version>${ptr.jedis-mock.version}</version>
6363
<scope>test</scope>
6464
</dependency>
65+
<dependency>
66+
<groupId>org.testcontainers</groupId>
67+
<artifactId>testcontainers</artifactId>
68+
<version>${testcontainers.version}</version>
69+
<scope>test</scope>
70+
</dependency>
71+
<!-- Integration Test Dependencies -->
72+
<dependency>
73+
<groupId>com.google.cloud.teleport</groupId>
74+
<artifactId>it-google-cloud-platform</artifactId>
75+
<version>${project.version}</version>
76+
<scope>test</scope>
77+
</dependency>
78+
<dependency>
79+
<groupId>com.google.cloud.teleport.metadata</groupId>
80+
<artifactId>metadata</artifactId>
81+
<version>${project.version}</version>
82+
</dependency>
6583
</dependencies>
6684

85+
<build>
86+
<plugins>
87+
<!-- Configure Surefire to run integration tests sequentially.
88+
This is required because all tests share the same fixed Redis port (6379). -->
89+
<plugin>
90+
<groupId>org.apache.maven.plugins</groupId>
91+
<artifactId>maven-surefire-plugin</artifactId>
92+
<version>${surefire.version}</version>
93+
<configuration>
94+
<!-- Run tests sequentially to avoid port conflicts on fixed port 6379 -->
95+
<parallel>none</parallel>
96+
<threadCount>1</threadCount>
97+
</configuration>
98+
</plugin>
99+
</plugins>
100+
</build>
101+
67102
</project>

0 commit comments

Comments
 (0)