Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ public static List<String> getRegexes(Annotation parameterAnnotation) {
return List.of("^(true|false)$");
case "Integer":
TemplateParameter.Integer integerParam = (TemplateParameter.Integer) parameterAnnotation;
return List.of("^[0-9]+$");
return List.of("^-?[0-9]+$");
case "Long":
TemplateParameter.Long longParam = (TemplateParameter.Long) parameterAnnotation;
return List.of("^[0-9]+$");
return List.of("^-?[0-9]+$");
case "Float":
TemplateParameter.Float floatParam = (TemplateParameter.Float) parameterAnnotation;
return List.of("^-?(0|[1-9][0-9]*)(\\.[0-9]+)?([eE][-+]?[0-9]+)?$");
Expand Down
34 changes: 32 additions & 2 deletions v2/pubsub-to-redis/README_Cloud_PubSub_to_Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,22 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **redisSinkType**: The Redis sink. Supported values are `STRING_SINK, HASH_SINK, STREAMS_SINK, and LOGGING_SINK`. For example, `STRING_SINK`. Defaults to: STRING_SINK.
* **connectionTimeout**: The Redis connection timeout in milliseconds. For example, `2000`. Defaults to: 2000.
* **ttl**: The key expiration time in seconds. The `ttl` default for `HASH_SINK` is -1, which means it never expires.
* **deadletterTopic**: The Pub/Sub topic to publish failed messages to. Messages that fail UDF transformation will be sent here. For example, `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.
* **javascriptTextTransformGcsPath**: The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. For example, `gs://my-bucket/my-udfs/my_file.js`.
* **javascriptTextTransformFunctionName**: The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is `myTransform(inJson) { /*...do stuff...*/ }`, then the function name is `myTransform`. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
* **javascriptTextTransformReloadIntervalMinutes**: Specifies how frequently to reload the UDF, in minutes. If the value is greater than 0, Dataflow periodically checks the UDF file in Cloud Storage, and reloads the UDF if the file is modified. This parameter allows you to update the UDF while the pipeline is running, without needing to restart the job. If the value is `0`, UDF reloading is disabled. The default value is `0`.


## User-Defined functions (UDFs)

The Pub/Sub to Redis Template supports User-Defined functions (UDFs).
UDFs allow you to customize functionality by providing a JavaScript function
without having to maintain or build the entire template code.

Check [Create user-defined functions for Dataflow templates](https://cloud.google.com/dataflow/docs/guides/templates/create-template-udf)
and [Using UDFs](https://github.com/GoogleCloudPlatform/DataflowTemplates#using-udfs)
for more information about how to create and test those functions.


## Getting Started

Expand Down Expand Up @@ -135,6 +149,10 @@ export SSL_ENABLED=false
export REDIS_SINK_TYPE=STRING_SINK
export CONNECTION_TIMEOUT=2000
export TTL=-1
export DEADLETTER_TOPIC=<deadletterTopic>
export JAVASCRIPT_TEXT_TRANSFORM_GCS_PATH=<javascriptTextTransformGcsPath>
export JAVASCRIPT_TEXT_TRANSFORM_FUNCTION_NAME=<javascriptTextTransformFunctionName>
export JAVASCRIPT_TEXT_TRANSFORM_RELOAD_INTERVAL_MINUTES=0

gcloud dataflow flex-template run "cloud-pubsub-to-redis-job" \
--project "$PROJECT" \
Expand All @@ -147,7 +165,11 @@ gcloud dataflow flex-template run "cloud-pubsub-to-redis-job" \
--parameters "sslEnabled=$SSL_ENABLED" \
--parameters "redisSinkType=$REDIS_SINK_TYPE" \
--parameters "connectionTimeout=$CONNECTION_TIMEOUT" \
--parameters "ttl=$TTL"
--parameters "ttl=$TTL" \
--parameters "deadletterTopic=$DEADLETTER_TOPIC" \
--parameters "javascriptTextTransformGcsPath=$JAVASCRIPT_TEXT_TRANSFORM_GCS_PATH" \
--parameters "javascriptTextTransformFunctionName=$JAVASCRIPT_TEXT_TRANSFORM_FUNCTION_NAME" \
--parameters "javascriptTextTransformReloadIntervalMinutes=$JAVASCRIPT_TEXT_TRANSFORM_RELOAD_INTERVAL_MINUTES"
```

For more information about the command, please check:
Expand Down Expand Up @@ -176,6 +198,10 @@ export SSL_ENABLED=false
export REDIS_SINK_TYPE=STRING_SINK
export CONNECTION_TIMEOUT=2000
export TTL=-1
export DEADLETTER_TOPIC=<deadletterTopic>
export JAVASCRIPT_TEXT_TRANSFORM_GCS_PATH=<javascriptTextTransformGcsPath>
export JAVASCRIPT_TEXT_TRANSFORM_FUNCTION_NAME=<javascriptTextTransformFunctionName>
export JAVASCRIPT_TEXT_TRANSFORM_RELOAD_INTERVAL_MINUTES=0

mvn clean package -PtemplatesRun \
-DskipTests \
Expand All @@ -184,7 +210,7 @@ mvn clean package -PtemplatesRun \
-Dregion="$REGION" \
-DjobName="cloud-pubsub-to-redis-job" \
-DtemplateName="Cloud_PubSub_to_Redis" \
-Dparameters="inputSubscription=$INPUT_SUBSCRIPTION,redisHost=$REDIS_HOST,redisPort=$REDIS_PORT,redisPassword=$REDIS_PASSWORD,sslEnabled=$SSL_ENABLED,redisSinkType=$REDIS_SINK_TYPE,connectionTimeout=$CONNECTION_TIMEOUT,ttl=$TTL" \
-Dparameters="inputSubscription=$INPUT_SUBSCRIPTION,redisHost=$REDIS_HOST,redisPort=$REDIS_PORT,redisPassword=$REDIS_PASSWORD,sslEnabled=$SSL_ENABLED,redisSinkType=$REDIS_SINK_TYPE,connectionTimeout=$CONNECTION_TIMEOUT,ttl=$TTL,deadletterTopic=$DEADLETTER_TOPIC,javascriptTextTransformGcsPath=$JAVASCRIPT_TEXT_TRANSFORM_GCS_PATH,javascriptTextTransformFunctionName=$JAVASCRIPT_TEXT_TRANSFORM_FUNCTION_NAME,javascriptTextTransformReloadIntervalMinutes=$JAVASCRIPT_TEXT_TRANSFORM_RELOAD_INTERVAL_MINUTES" \
-f v2/pubsub-to-redis
```

Expand Down Expand Up @@ -237,6 +263,10 @@ resource "google_dataflow_flex_template_job" "cloud_pubsub_to_redis" {
# redisSinkType = "STRING_SINK"
# connectionTimeout = "2000"
# ttl = "-1"
# deadletterTopic = "<deadletterTopic>"
# javascriptTextTransformGcsPath = "<javascriptTextTransformGcsPath>"
# javascriptTextTransformFunctionName = "<javascriptTextTransformFunctionName>"
# javascriptTextTransformReloadIntervalMinutes = "0"
}
}
```
12 changes: 12 additions & 0 deletions v2/pubsub-to-redis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@
<version>${ptr.jedis-mock.version}</version>
<scope>test</scope>
</dependency>
<!-- Integration Test Dependencies -->
<dependency>
<groupId>com.google.cloud.teleport</groupId>
<artifactId>it-google-cloud-platform</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.teleport.metadata</groupId>
<artifactId>metadata</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Loading
Loading