Skip to content

Commit ecbc599

Browse files
[feat] Remove custom deduplication logic in favor of beam Deduplicate. (#120)
1 parent bad81a5 commit ecbc599

File tree

7 files changed

+20
-192
lines changed

7 files changed

+20
-192
lines changed

google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/CommitterImplTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,12 @@ public void multipleSentCompletedInOrder() {
185185
public void stopInCommitCallback() throws Exception {
186186
ApiFuture<Void> future = committer.commitOffset(Offset.of(10));
187187
CountDownLatch latch = new CountDownLatch(1);
188-
ExtractStatus.addFailureHandler(future, (error) -> {
189-
committer.stopAsync();
190-
latch.countDown();
191-
});
188+
ExtractStatus.addFailureHandler(
189+
future,
190+
(error) -> {
191+
committer.stopAsync();
192+
latch.countDown();
193+
});
192194
leakedResponseObserver.onError(Status.FAILED_PRECONDITION.asException());
193195
latch.await();
194196
assertFutureThrowsCode(future, Code.FAILED_PRECONDITION);

pubsublite-beam-io/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<dependency>
2929
<groupId>org.apache.beam</groupId>
3030
<artifactId>beam-sdks-java-core</artifactId>
31-
<version>2.20.0</version>
31+
<version>2.21.0</version>
3232
</dependency>
3333
<dependency>
3434
<groupId>com.google.code.findbugs</groupId>
@@ -104,7 +104,7 @@
104104
<dependency>
105105
<groupId>org.apache.beam</groupId>
106106
<artifactId>beam-runners-direct-java</artifactId>
107-
<version>2.20.0</version>
107+
<version>2.21.0</version>
108108
<scope>test</scope>
109109
</dependency>
110110
<dependency>

pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/DeduplicationFn.java

Lines changed: 0 additions & 110 deletions
This file was deleted.

pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/DeduplicationFnOptions.java

Lines changed: 0 additions & 51 deletions
This file was deleted.

pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/UuidDeduplicationOptions.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.common.collect.ImmutableList;
2424
import com.google.protobuf.ByteString;
2525
import java.io.Serializable;
26+
import org.apache.beam.sdk.state.TimeDomain;
27+
import org.apache.beam.sdk.transforms.Deduplicate;
2628

2729
@AutoValue
2830
public abstract class UuidDeduplicationOptions implements Serializable {
@@ -44,7 +46,7 @@ public abstract class UuidDeduplicationOptions implements Serializable {
4446
// All parameters are optional.
4547
public abstract SerializableStatusFunction<SequencedMessage, Uuid> uuidExtractor();
4648

47-
public abstract DeduplicationFnOptions<Uuid> deduplicationFnOptions();
49+
public abstract Deduplicate.KeyedValues<Uuid, SequencedMessage> deduplicate();
4850

4951
// The number of partitions to hash values into.
5052
public abstract int hashPartitions();
@@ -53,8 +55,8 @@ public abstract class UuidDeduplicationOptions implements Serializable {
5355
public static Builder newBuilder() {
5456
Builder builder = new AutoValue_UuidDeduplicationOptions.Builder();
5557
builder.setUuidExtractor(DEFAULT_UUID_EXTRACTOR);
56-
builder.setDeduplicationFnOptions(
57-
DeduplicationFnOptions.<Uuid>newBuilder().setKeyCoder(Uuid.getCoder()).build());
58+
builder.setDeduplicate(
59+
Deduplicate.<Uuid, SequencedMessage>keyedValues().withTimeDomain(TimeDomain.EVENT_TIME));
5860
builder.setHashPartitions(DEFAULT_HASH_PARTITIONS);
5961
return builder;
6062
}
@@ -64,7 +66,8 @@ public abstract static class Builder {
6466
public abstract Builder setUuidExtractor(
6567
SerializableStatusFunction<SequencedMessage, Uuid> uuidExtractor);
6668

67-
public abstract Builder setDeduplicationFnOptions(DeduplicationFnOptions<Uuid> options);
69+
public abstract Builder setDeduplicate(
70+
Deduplicate.KeyedValues<Uuid, SequencedMessage> deduplicate);
6871

6972
public abstract Builder setHashPartitions(int partitions);
7073

pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransform.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@
1717
package com.google.cloud.pubsublite.beam;
1818

1919
import com.google.cloud.pubsublite.SequencedMessage;
20-
import java.math.BigInteger;
2120
import org.apache.beam.sdk.transforms.MapElements;
2221
import org.apache.beam.sdk.transforms.PTransform;
23-
import org.apache.beam.sdk.transforms.ParDo;
2422
import org.apache.beam.sdk.transforms.ProcessFunction;
2523
import org.apache.beam.sdk.transforms.Values;
2624
import org.apache.beam.sdk.values.KV;
@@ -48,22 +46,8 @@ public PCollection<SequencedMessage> expand(PCollection<SequencedMessage> input)
4846
input.apply(
4947
"MapUuids",
5048
MapElements.into(new TypeDescriptor<KV<Uuid, SequencedMessage>>() {}).via(mapWithKeys));
51-
ProcessFunction<KV<Uuid, SequencedMessage>, KV<Integer, KV<Uuid, SequencedMessage>>>
52-
mapWithHash =
53-
kv ->
54-
KV.of(
55-
new BigInteger(/*signum=*/ 1, kv.getKey().value().toByteArray())
56-
.mod(BigInteger.valueOf(options.hashPartitions()))
57-
.intValue(),
58-
kv);
59-
PCollection<KV<Integer, KV<Uuid, SequencedMessage>>> hashedUuids =
60-
uuidMapped.apply(
61-
"HashUuids",
62-
MapElements.into(new TypeDescriptor<KV<Integer, KV<Uuid, SequencedMessage>>>() {})
63-
.via(mapWithHash));
6449
PCollection<KV<Uuid, SequencedMessage>> unique =
65-
hashedUuids.apply(
66-
"Deduplicate", ParDo.of(new DeduplicationFn<>(options.deduplicationFnOptions())));
50+
uuidMapped.apply("Deduplicate", options.deduplicate());
6751
return unique.apply("StripUuids", Values.create());
6852
}
6953
}

pubsublite-beam-io/src/test/java/com/google/cloud/pubsublite/beam/UuidDeduplicationTransformTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.beam.sdk.testing.PAssert;
2828
import org.apache.beam.sdk.testing.TestPipeline;
2929
import org.apache.beam.sdk.testing.TestStream;
30+
import org.apache.beam.sdk.transforms.Deduplicate;
3031
import org.apache.beam.sdk.values.PCollection;
3132
import org.joda.time.Duration;
3233
import org.joda.time.Instant;
@@ -60,7 +61,7 @@ public void unrelatedUuidsProxied() {
6061
TestStream.create(new SequencedMessageCoder())
6162
.advanceWatermarkTo(START)
6263
.addElements(message1)
63-
.advanceWatermarkTo(START.plus(DeduplicationFnOptions.DEFAULT_GC_DELAY.dividedBy(2)))
64+
.advanceWatermarkTo(START.plus(Deduplicate.DEFAULT_DURATION.dividedBy(2)))
6465
.addElements(message2)
6566
.advanceWatermarkToInfinity();
6667
PCollection<SequencedMessage> results =
@@ -81,7 +82,7 @@ public void sameUuidsWithinWindowOnlyOne() {
8182
TestStream.create(new SequencedMessageCoder())
8283
.advanceWatermarkTo(START)
8384
.addElements(message)
84-
.advanceWatermarkTo(START.plus(DeduplicationFnOptions.DEFAULT_GC_DELAY.dividedBy(2)))
85+
.advanceWatermarkTo(START.plus(Deduplicate.DEFAULT_DURATION.dividedBy(2)))
8586
.advanceWatermarkToInfinity();
8687
PCollection<SequencedMessage> results =
8788
pipeline
@@ -101,8 +102,7 @@ public void sameUuidsAfterGcOutsideWindowHasBoth() {
101102
TestStream.create(new SequencedMessageCoder())
102103
.advanceWatermarkTo(START)
103104
.addElements(message1)
104-
.advanceWatermarkTo(
105-
START.plus(DeduplicationFnOptions.DEFAULT_GC_DELAY.plus(Duration.millis(1))))
105+
.advanceWatermarkTo(START.plus(Deduplicate.DEFAULT_DURATION.plus(Duration.millis(1))))
106106
.addElements(message1)
107107
.advanceWatermarkToInfinity();
108108
PCollection<SequencedMessage> results =

0 commit comments

Comments
 (0)