Skip to content

Commit 908d70f

Browse files
authored
fix: Change types for Cloud Bigtable Changestream methods (#1639)
Use org.threeten.bp.Instant for below fields: - startTime - endTime - commitTimestamp - estimatedLowWatermark Use bigtable.common.Status for CloseStream::getStatus() Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
1 parent 612854f commit 908d70f

18 files changed

+193
-64
lines changed

google-cloud-bigtable/clirr-ignored-differences.xml

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,41 @@
9191
<differenceType>7006</differenceType>
9292
<className>com/google/cloud/bigtable/data/v2/models/Heartbeat</className>
9393
<method>*getEstimatedLowWatermark*</method>
94-
<to>long</to>
94+
<to>org.threeten.bp.Instant</to>
95+
</difference>
96+
<!-- change method return type is ok because CloseStream is InternalApi -->
97+
<difference>
98+
<differenceType>7006</differenceType>
99+
<className>com/google/cloud/bigtable/data/v2/models/CloseStream</className>
100+
<method>*getStatus*</method>
101+
<to>com.google.cloud.bigtable.common.Status</to>
102+
</difference>
103+
<!-- change method return type is ok because ChangeStreamMutation is InternalApi -->
104+
<difference>
105+
<differenceType>7006</differenceType>
106+
<className>com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation</className>
107+
<method>*getCommitTimestamp*</method>
108+
<to>org.threeten.bp.Instant</to>
109+
</difference>
110+
<!-- change method return type is ok because ChangeStreamMutation is InternalApi -->
111+
<difference>
112+
<differenceType>7006</differenceType>
113+
<className>com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation</className>
114+
<method>*getEstimatedLowWatermark*</method>
115+
<to>org.threeten.bp.Instant</to>
116+
</difference>
117+
<!-- change method argument type is ok because ChangeStreamRecordAdapter is InternalApi -->
118+
<difference>
119+
<differenceType>7005</differenceType>
120+
<className>com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter$ChangeStreamRecordBuilder</className>
121+
<method>*</method>
122+
<to>*</to>
123+
</difference>
124+
<!-- change method argument type is ok because ReadChangeStreamQuery is InternalApi -->
125+
<difference>
126+
<differenceType>7005</differenceType>
127+
<className>com/google/cloud/bigtable/data/v2/models/ReadChangeStreamQuery</className>
128+
<method>*</method>
129+
<to>*</to>
95130
</difference>
96131
</differences>

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/common/Status.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616
package com.google.cloud.bigtable.common;
1717

1818
import com.google.common.base.Objects;
19+
import java.io.Serializable;
1920

2021
/**
2122
* The `Status` type defines a logical error model. Each `Status` message contains an error code and
2223
* a error message.
2324
*
2425
* <p>This primarily wraps the protobuf {@link com.google.rpc.Status}.
2526
*/
26-
public final class Status {
27+
public final class Status implements Serializable {
28+
private static final long serialVersionUID = -5512896228725308380L;
29+
2730
public enum Code {
2831
OK(com.google.rpc.Code.OK),
2932
CANCELLED(com.google.rpc.Code.CANCELLED),

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.protobuf.ByteString;
2424
import java.io.Serializable;
2525
import javax.annotation.Nonnull;
26+
import org.threeten.bp.Instant;
2627

2728
/**
2829
* A ChangeStreamMutation represents a list of mods(represented by List<{@link Entry}>) targeted at
@@ -72,7 +73,7 @@ public enum MutationType {
7273
static Builder createUserMutation(
7374
@Nonnull ByteString rowKey,
7475
@Nonnull String sourceClusterId,
75-
long commitTimestamp,
76+
Instant commitTimestamp,
7677
int tieBreaker) {
7778
return builder()
7879
.setRowKey(rowKey)
@@ -88,7 +89,7 @@ static Builder createUserMutation(
8889
* mutation.
8990
*/
9091
static Builder createGcMutation(
91-
@Nonnull ByteString rowKey, long commitTimestamp, int tieBreaker) {
92+
@Nonnull ByteString rowKey, Instant commitTimestamp, int tieBreaker) {
9293
return builder()
9394
.setRowKey(rowKey)
9495
.setType(MutationType.GARBAGE_COLLECTION)
@@ -110,7 +111,7 @@ static Builder createGcMutation(
110111
public abstract String getSourceClusterId();
111112

112113
/** Get the commit timestamp of the current mutation. */
113-
public abstract long getCommitTimestamp();
114+
public abstract Instant getCommitTimestamp();
114115

115116
/**
116117
* Get the tie breaker of the current mutation. This is used to resolve conflicts when multiple
@@ -123,7 +124,7 @@ static Builder createGcMutation(
123124
public abstract String getToken();
124125

125126
/** Get the low watermark of the current mutation. */
126-
public abstract long getEstimatedLowWatermark();
127+
public abstract Instant getEstimatedLowWatermark();
127128

128129
/** Get the list of mods of the current mutation. */
129130
@Nonnull
@@ -144,15 +145,15 @@ abstract static class Builder {
144145

145146
abstract Builder setSourceClusterId(@Nonnull String sourceClusterId);
146147

147-
abstract Builder setCommitTimestamp(long commitTimestamp);
148+
abstract Builder setCommitTimestamp(Instant commitTimestamp);
148149

149150
abstract Builder setTieBreaker(int tieBreaker);
150151

151152
abstract ImmutableList.Builder<Entry> entriesBuilder();
152153

153154
abstract Builder setToken(@Nonnull String token);
154155

155-
abstract Builder setEstimatedLowWatermark(long estimatedLowWatermark);
156+
abstract Builder setEstimatedLowWatermark(Instant estimatedLowWatermark);
156157

157158
Builder setCell(
158159
@Nonnull String familyName,

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
2121
import com.google.protobuf.ByteString;
2222
import javax.annotation.Nonnull;
23+
import org.threeten.bp.Instant;
2324

2425
/**
2526
* An extension point that allows end users to plug in a custom implementation of logical change
@@ -115,15 +116,15 @@ interface ChangeStreamRecordBuilder<ChangeStreamRecordT> {
115116
void startUserMutation(
116117
@Nonnull ByteString rowKey,
117118
@Nonnull String sourceClusterId,
118-
long commitTimestamp,
119+
Instant commitTimestamp,
119120
int tieBreaker);
120121

121122
/**
122123
* Called to start a new Garbage Collection ChangeStreamMutation. This will be called at most
123124
* once. If called, the current change stream record must not include any close stream message
124125
* or heartbeat.
125126
*/
126-
void startGcMutation(@Nonnull ByteString rowKey, long commitTimestamp, int tieBreaker);
127+
void startGcMutation(@Nonnull ByteString rowKey, Instant commitTimestamp, int tieBreaker);
127128

128129
/** Called to add a DeleteFamily mod. */
129130
void deleteFamily(@Nonnull String familyName);
@@ -164,7 +165,7 @@ void deleteCells(
164165

165166
/** Called once per stream record to signal that all mods have been processed (unless reset). */
166167
ChangeStreamRecordT finishChangeStreamMutation(
167-
@Nonnull String token, long estimatedLowWatermark);
168+
@Nonnull String token, Instant estimatedLowWatermark);
168169

169170
/** Called when the current in progress change stream record should be dropped */
170171
void reset();

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
import com.google.api.core.InternalApi;
1919
import com.google.auto.value.AutoValue;
2020
import com.google.bigtable.v2.ReadChangeStreamResponse;
21+
import com.google.cloud.bigtable.common.Status;
2122
import com.google.common.collect.ImmutableList;
22-
import com.google.rpc.Status;
2323
import java.io.Serializable;
2424
import java.util.List;
2525
import javax.annotation.Nonnull;
@@ -34,8 +34,9 @@ public abstract class CloseStream implements ChangeStreamRecord, Serializable {
3434
private static final long serialVersionUID = 7316215828353608505L;
3535

3636
private static CloseStream create(
37-
Status status, List<ChangeStreamContinuationToken> changeStreamContinuationTokens) {
38-
return new AutoValue_CloseStream(status, changeStreamContinuationTokens);
37+
com.google.rpc.Status status,
38+
List<ChangeStreamContinuationToken> changeStreamContinuationTokens) {
39+
return new AutoValue_CloseStream(Status.fromProto(status), changeStreamContinuationTokens);
3940
}
4041

4142
/** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.protobuf.ByteString;
2323
import javax.annotation.Nonnull;
2424
import javax.annotation.Nullable;
25+
import org.threeten.bp.Instant;
2526

2627
/**
2728
* Default implementation of a {@link ChangeStreamRecordAdapter} that uses {@link
@@ -102,7 +103,7 @@ public ChangeStreamRecord onCloseStream(ReadChangeStreamResponse.CloseStream clo
102103
public void startUserMutation(
103104
@Nonnull ByteString rowKey,
104105
@Nonnull String sourceClusterId,
105-
long commitTimestamp,
106+
Instant commitTimestamp,
106107
int tieBreaker) {
107108
this.changeStreamMutationBuilder =
108109
ChangeStreamMutation.createUserMutation(
@@ -111,7 +112,8 @@ public void startUserMutation(
111112

112113
/** {@inheritDoc} */
113114
@Override
114-
public void startGcMutation(@Nonnull ByteString rowKey, long commitTimestamp, int tieBreaker) {
115+
public void startGcMutation(
116+
@Nonnull ByteString rowKey, Instant commitTimestamp, int tieBreaker) {
115117
this.changeStreamMutationBuilder =
116118
ChangeStreamMutation.createGcMutation(rowKey, commitTimestamp, tieBreaker);
117119
}
@@ -156,7 +158,7 @@ public void finishCell() {
156158
/** {@inheritDoc} */
157159
@Override
158160
public ChangeStreamRecord finishChangeStreamMutation(
159-
@Nonnull String token, long estimatedLowWatermark) {
161+
@Nonnull String token, Instant estimatedLowWatermark) {
160162
this.changeStreamMutationBuilder.setToken(token);
161163
this.changeStreamMutationBuilder.setEstimatedLowWatermark(estimatedLowWatermark);
162164
return this.changeStreamMutationBuilder.build();

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
import com.google.api.core.InternalApi;
1919
import com.google.auto.value.AutoValue;
2020
import com.google.bigtable.v2.ReadChangeStreamResponse;
21-
import com.google.protobuf.util.Timestamps;
2221
import java.io.Serializable;
2322
import javax.annotation.Nonnull;
23+
import org.threeten.bp.Instant;
2424

2525
/** A simple wrapper for {@link ReadChangeStreamResponse.Heartbeat}. */
2626
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
@@ -29,20 +29,22 @@ public abstract class Heartbeat implements ChangeStreamRecord, Serializable {
2929
private static final long serialVersionUID = 7316215828353608504L;
3030

3131
private static Heartbeat create(
32-
ChangeStreamContinuationToken changeStreamContinuationToken, long estimatedLowWatermark) {
32+
ChangeStreamContinuationToken changeStreamContinuationToken, Instant estimatedLowWatermark) {
3333
return new AutoValue_Heartbeat(changeStreamContinuationToken, estimatedLowWatermark);
3434
}
3535

3636
/** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */
3737
static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) {
3838
return create(
3939
ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()),
40-
Timestamps.toNanos(heartbeat.getEstimatedLowWatermark()));
40+
Instant.ofEpochSecond(
41+
heartbeat.getEstimatedLowWatermark().getSeconds(),
42+
heartbeat.getEstimatedLowWatermark().getNanos()));
4143
}
4244

4345
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
4446
public abstract ChangeStreamContinuationToken getChangeStreamContinuationToken();
4547

4648
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
47-
public abstract long getEstimatedLowWatermark();
49+
public abstract Instant getEstimatedLowWatermark();
4850
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ReadChangeStreamQuery.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,15 @@
2828
import com.google.common.base.Preconditions;
2929
import com.google.protobuf.ByteString;
3030
import com.google.protobuf.Duration;
31-
import com.google.protobuf.util.Timestamps;
31+
import com.google.protobuf.Timestamp;
3232
import java.io.IOException;
3333
import java.io.ObjectInputStream;
3434
import java.io.ObjectOutputStream;
3535
import java.io.Serializable;
3636
import java.util.List;
3737
import javax.annotation.Nonnull;
3838
import javax.annotation.Nullable;
39+
import org.threeten.bp.Instant;
3940

4041
/** A simple wrapper to construct a query for the ReadChangeStream RPC. */
4142
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
@@ -142,18 +143,26 @@ public ReadChangeStreamQuery streamPartition(ByteStringRange range) {
142143
return streamPartition(rangeBuilder.build());
143144
}
144145

145-
/** Sets the startTime(Nanosecond) to read the change stream. */
146-
public ReadChangeStreamQuery startTime(long value) {
146+
/** Sets the startTime to read the change stream. */
147+
public ReadChangeStreamQuery startTime(Instant value) {
147148
Preconditions.checkState(
148149
!builder.hasContinuationTokens(),
149150
"startTime and continuationTokens can't be specified together");
150-
builder.setStartTime(Timestamps.fromNanos(value));
151+
builder.setStartTime(
152+
Timestamp.newBuilder()
153+
.setSeconds(value.getEpochSecond())
154+
.setNanos(value.getNano())
155+
.build());
151156
return this;
152157
}
153158

154-
/** Sets the endTime(Nanosecond) to read the change stream. */
155-
public ReadChangeStreamQuery endTime(long value) {
156-
builder.setEndTime(Timestamps.fromNanos(value));
159+
/** Sets the endTime to read the change stream. */
160+
public ReadChangeStreamQuery endTime(Instant value) {
161+
builder.setEndTime(
162+
Timestamp.newBuilder()
163+
.setSeconds(value.getEpochSecond())
164+
.setNanos(value.getNano())
165+
.build());
157166
return this;
158167
}
159168

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamStateMachine.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecordAdapter.ChangeStreamRecordBuilder;
2222
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
2323
import com.google.common.base.Preconditions;
24-
import com.google.protobuf.util.Timestamps;
24+
import org.threeten.bp.Instant;
2525

2626
/**
2727
* A state machine to produce change stream records from a stream of {@link
@@ -338,7 +338,9 @@ State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) {
338338
"AWAITING_NEW_STREAM_RECORD: GC mutation shouldn't have source cluster id.");
339339
builder.startGcMutation(
340340
dataChange.getRowKey(),
341-
Timestamps.toNanos(dataChange.getCommitTimestamp()),
341+
Instant.ofEpochSecond(
342+
dataChange.getCommitTimestamp().getSeconds(),
343+
dataChange.getCommitTimestamp().getNanos()),
342344
dataChange.getTiebreaker());
343345
} else if (dataChange.getType() == Type.USER) {
344346
validate(
@@ -347,7 +349,9 @@ State handleMod(ReadChangeStreamResponse.DataChange dataChange, int index) {
347349
builder.startUserMutation(
348350
dataChange.getRowKey(),
349351
dataChange.getSourceClusterId(),
350-
Timestamps.toNanos(dataChange.getCommitTimestamp()),
352+
Instant.ofEpochSecond(
353+
dataChange.getCommitTimestamp().getSeconds(),
354+
dataChange.getCommitTimestamp().getNanos()),
351355
dataChange.getTiebreaker());
352356
} else {
353357
validate(false, "AWAITING_NEW_STREAM_RECORD: Unexpected type: " + dataChange.getType());
@@ -591,7 +595,10 @@ private State checkAndFinishMutationIfNeeded(
591595
validate(dataChange.hasEstimatedLowWatermark(), "Last data change missing lowWatermark");
592596
completeChangeStreamRecord =
593597
builder.finishChangeStreamMutation(
594-
dataChange.getToken(), Timestamps.toNanos(dataChange.getEstimatedLowWatermark()));
598+
dataChange.getToken(),
599+
Instant.ofEpochSecond(
600+
dataChange.getEstimatedLowWatermark().getSeconds(),
601+
dataChange.getEstimatedLowWatermark().getNanos()));
595602
return AWAITING_STREAM_RECORD_CONSUME;
596603
}
597604
// Case 2_2): The current DataChange itself is chunked, so wait for the next

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/common/StatusTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
import static com.google.common.truth.Truth.assertWithMessage;
2020

2121
import com.google.rpc.Code;
22+
import java.io.ByteArrayInputStream;
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.IOException;
25+
import java.io.ObjectInputStream;
26+
import java.io.ObjectOutputStream;
2227
import org.junit.Test;
2328
import org.junit.runner.RunWith;
2429
import org.junit.runners.JUnit4;
@@ -89,4 +94,23 @@ public void testToProto() {
8994

9095
assertThat(model.toString()).isEqualTo(proto.toString());
9196
}
97+
98+
@Test
99+
public void testSerialization() throws IOException, ClassNotFoundException {
100+
com.google.rpc.Status proto =
101+
com.google.rpc.Status.newBuilder()
102+
.setCode(Code.UNAVAILABLE.getNumber())
103+
.setMessage("some message")
104+
.build();
105+
106+
Status model = Status.fromProto(proto);
107+
108+
ByteArrayOutputStream bos = new ByteArrayOutputStream();
109+
ObjectOutputStream oos = new ObjectOutputStream(bos);
110+
oos.writeObject(model);
111+
oos.close();
112+
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()));
113+
Status actual = (Status) ois.readObject();
114+
assertThat(actual).isEqualTo(model);
115+
}
92116
}

0 commit comments

Comments
 (0)