Skip to content

Commit 130bd43

Browse files
author
Ulada Butsenka
committed
Update RedisHashIO to use AutoValue and move UDF comment in PubSubToRedis
1 parent 5cef334 commit 130bd43

File tree

2 files changed

+29
-19
lines changed

2 files changed

+29
-19
lines changed

v2/pubsub-to-redis/src/main/java/com/google/cloud/teleport/v2/templates/PubSubToRedis.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,12 +390,11 @@ public static PipelineResult run(PubSubToRedisOptions options) {
390390
*
391391
* <p>If no UDF is configured, returns the input messages unchanged.
392392
*
393-
* <p>This follows the same pattern as {@code PubsubProtoToBigQuery.runUdf}.
394-
*
395393
* @param messages the input PubSub messages
396394
* @param options the pipeline options
397395
* @return the (possibly transformed) PubSub messages
398396
*/
397+
// This follows the same pattern as PubsubProtoToBigQuery.runUdf
399398
static PCollection<PubsubMessage> applyUdf(
400399
PCollection<PubsubMessage> messages, PubSubToRedisOptions options) {
401400

v2/pubsub-to-redis/src/main/java/com/google/cloud/teleport/v2/templates/io/RedisHashIO.java

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.google.cloud.teleport.v2.templates.io;
1717

18+
import com.google.auto.value.AutoValue;
1819
import com.google.common.base.Preconditions;
1920
import java.util.HashMap;
2021
import java.util.Map;
@@ -27,43 +28,41 @@
2728
import org.apache.beam.sdk.values.PCollection;
2829
import org.apache.beam.sdk.values.PDone;
2930
import org.checkerframework.checker.nullness.qual.NonNull;
31+
import org.checkerframework.checker.nullness.qual.Nullable;
3032
import redis.clients.jedis.Jedis;
3133
import redis.clients.jedis.Pipeline;
3234
import redis.clients.jedis.Transaction;
3335

3436
public abstract class RedisHashIO {
3537

3638
public static WriteHash write() {
37-
return new WriteHash(RedisConnectionConfiguration.create(), null);
39+
40+
return (new AutoValue_RedisHashIO_WriteHash.Builder())
41+
.setConnectionConfiguration(RedisConnectionConfiguration.create())
42+
.build();
3843
}
3944

40-
public static class WriteHash
45+
@AutoValue
46+
public abstract static class WriteHash
4147
extends PTransform<@NonNull PCollection<KV<String, KV<String, String>>>, @NonNull PDone> {
48+
public WriteHash() {}
4249

43-
private final RedisConnectionConfiguration connectionConfiguration;
44-
private final Long expireTime;
50+
@Nullable
51+
abstract RedisConnectionConfiguration connectionConfiguration();
4552

46-
private WriteHash(RedisConnectionConfiguration connectionConfiguration, Long expireTime) {
47-
this.connectionConfiguration = connectionConfiguration;
48-
this.expireTime = expireTime;
49-
}
53+
@Nullable
54+
abstract Long expireTime();
5055

51-
public RedisConnectionConfiguration connectionConfiguration() {
52-
return connectionConfiguration;
53-
}
54-
55-
public Long expireTime() {
56-
return expireTime;
57-
}
56+
abstract RedisHashIO.WriteHash.Builder builder();
5857

5958
public RedisHashIO.WriteHash withConnectionConfiguration(
6059
RedisConnectionConfiguration connectionConfiguration) {
6160
Preconditions.checkArgument(connectionConfiguration != null, "connection cannot be null");
62-
return new WriteHash(connectionConfiguration, this.expireTime);
61+
return this.builder().setConnectionConfiguration(connectionConfiguration).build();
6362
}
6463

6564
public RedisHashIO.WriteHash withTtl(Long expireTimeMillis) {
66-
return new WriteHash(this.connectionConfiguration, expireTimeMillis);
65+
return this.builder().setExpireTime(expireTimeMillis).build();
6766
}
6867

6968
@NonNull
@@ -149,5 +148,17 @@ public void teardown() {
149148
this.jedis.close();
150149
}
151150
}
151+
152+
@AutoValue.Builder
153+
abstract static class Builder {
154+
Builder() {}
155+
156+
abstract WriteHash.Builder setExpireTime(Long expireTimeMillis);
157+
158+
abstract WriteHash.Builder setConnectionConfiguration(
159+
RedisConnectionConfiguration connectionConfiguration);
160+
161+
abstract WriteHash build();
162+
}
152163
}
153164
}

0 commit comments

Comments
 (0)