Skip to content

Commit ebb5d3b

Browse files
feat: Convert internal interfaces to use protos (#1335)
* feat: Convert internal interfaces to use protos * feat: Convert internal interfaces to use protos
1 parent 4daf3ab commit ebb5d3b

38 files changed

+382
-232
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/ResettableSubscriberFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package com.google.cloud.pubsublite.cloudpubsub.internal;
1818

1919
import com.google.api.gax.rpc.ApiException;
20-
import com.google.cloud.pubsublite.SequencedMessage;
2120
import com.google.cloud.pubsublite.internal.wire.Subscriber;
2221
import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler;
22+
import com.google.cloud.pubsublite.proto.SequencedMessage;
2323
import java.io.Serializable;
2424
import java.util.List;
2525
import java.util.function.Consumer;

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/SinglePartitionSubscriber.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,10 @@ private boolean terminated() {
8383
}
8484

8585
@VisibleForTesting
86-
void onMessages(List<SequencedMessage> sequencedMessages) {
86+
void onMessages(List<com.google.cloud.pubsublite.proto.SequencedMessage> sequencedMessages) {
8787
try {
88-
for (SequencedMessage message : sequencedMessages) {
88+
for (com.google.cloud.pubsublite.proto.SequencedMessage proto : sequencedMessages) {
89+
SequencedMessage message = SequencedMessage.fromProto(proto);
8990
PubsubMessage userMessage = transformer.transform(message);
9091
long bytes = message.byteSize();
9192
Runnable trackerConsumer = ackSetTracker.track(message);

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
7373
return ApiFutures.immediateFailedFuture(e.underlying);
7474
}
7575
return ApiFutures.transform(
76-
wirePublisher.publish(wireMessage),
76+
wirePublisher.publish(wireMessage.toProto()),
7777
MessageMetadata::encode,
7878
MoreExecutors.directExecutor());
7979
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package com.google.cloud.pubsublite.internal;
1818

1919
import com.google.api.core.ApiFuture;
20-
import com.google.cloud.pubsublite.SequencedMessage;
20+
import com.google.cloud.pubsublite.proto.SequencedMessage;
2121
import java.util.Optional;
2222
import javax.annotation.concurrent.ThreadSafe;
2323

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@
2424
import com.google.api.core.ApiService.State;
2525
import com.google.api.core.SettableApiFuture;
2626
import com.google.api.gax.rpc.StatusCode;
27-
import com.google.cloud.pubsublite.SequencedMessage;
2827
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
2928
import com.google.cloud.pubsublite.internal.wire.Subscriber;
3029
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
3130
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
3231
import com.google.cloud.pubsublite.proto.FlowControlRequest;
32+
import com.google.cloud.pubsublite.proto.SequencedMessage;
3333
import com.google.errorprone.annotations.concurrent.GuardedBy;
3434
import java.util.ArrayDeque;
3535
import java.util.Collection;
@@ -110,7 +110,7 @@ public synchronized Optional<SequencedMessage> messageIfAvailable() throws Check
110110
underlying.allowFlow(
111111
FlowControlRequest.newBuilder()
112112
.setAllowedMessages(1)
113-
.setAllowedBytes(msg.byteSize())
113+
.setAllowedBytes(msg.getSizeBytes())
114114
.build());
115115
return Optional.of(msg);
116116
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BufferingPullSubscriber.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
import com.google.api.core.ApiService.Listener;
2222
import com.google.api.core.ApiService.State;
2323
import com.google.cloud.pubsublite.Offset;
24-
import com.google.cloud.pubsublite.SequencedMessage;
2524
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
2625
import com.google.cloud.pubsublite.internal.wire.Subscriber;
2726
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
2827
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
2928
import com.google.cloud.pubsublite.proto.FlowControlRequest;
29+
import com.google.cloud.pubsublite.proto.SequencedMessage;
3030
import com.google.common.collect.ImmutableList;
3131
import com.google.common.collect.Iterables;
3232
import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -85,13 +85,13 @@ public synchronized List<SequencedMessage> pull() throws CheckedApiException {
8585
}
8686
Deque<SequencedMessage> collection = messages;
8787
messages = new ArrayDeque<>();
88-
long bytes = collection.stream().mapToLong(SequencedMessage::byteSize).sum();
88+
long bytes = collection.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
8989
underlying.allowFlow(
9090
FlowControlRequest.newBuilder()
9191
.setAllowedBytes(bytes)
9292
.setAllowedMessages(collection.size())
9393
.build());
94-
lastDelivered = Optional.of(Iterables.getLast(collection).offset());
94+
lastDelivered = Optional.of(Offset.of(Iterables.getLast(collection).getCursor().getOffset()));
9595
return ImmutableList.copyOf(collection);
9696
}
9797

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/Publisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.core.ApiService;
21-
import com.google.cloud.pubsublite.Message;
21+
import com.google.cloud.pubsublite.proto.PubSubMessage;
2222
import java.io.Flushable;
2323

2424
/** A generic PubSub Lite publisher. Errors are handled out of band. Thread safe. */
@@ -28,7 +28,7 @@ public interface Publisher<ResponseT> extends ApiService, Flushable {
2828
//
2929
// Guarantees that if a single publish future has an exception set, all publish calls made after
3030
// that will also have an exception set.
31-
ApiFuture<ResponseT> publish(Message message);
31+
ApiFuture<ResponseT> publish(PubSubMessage message);
3232

3333
// Attempts to cancel all outstanding publishes.
3434
void cancelOutstandingPublishes();

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SequencedPublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.core.ApiService;
21-
import com.google.cloud.pubsublite.Message;
21+
import com.google.cloud.pubsublite.proto.PubSubMessage;
2222
import java.io.Flushable;
2323

2424
/**
@@ -35,7 +35,7 @@ public interface SequencedPublisher<ResponseT> extends ApiService, Flushable {
3535
* <p>Guarantees that if a single publish future has an exception set, all publish calls made
3636
* after that will also have an exception set.
3737
*/
38-
ApiFuture<ResponseT> publish(Message message, PublishSequenceNumber sequenceNumber);
38+
ApiFuture<ResponseT> publish(PubSubMessage message, PublishSequenceNumber sequenceNumber);
3939

4040
/** Attempts to cancel all outstanding publishes. */
4141
void cancelOutstandingPublishes();

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionPublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.gax.rpc.ApiException;
23-
import com.google.cloud.pubsublite.Message;
2423
import com.google.cloud.pubsublite.internal.ProxyService;
2524
import com.google.cloud.pubsublite.internal.Publisher;
25+
import com.google.cloud.pubsublite.proto.PubSubMessage;
2626
import java.io.IOException;
2727

2828
public class ApiExceptionPublisher<T> extends ProxyService implements Publisher<T> {
@@ -34,7 +34,7 @@ public class ApiExceptionPublisher<T> extends ProxyService implements Publisher<
3434
}
3535

3636
@Override
37-
public ApiFuture<T> publish(Message message) {
37+
public ApiFuture<T> publish(PubSubMessage message) {
3838
return toClientFuture(publisher.publish(message));
3939
}
4040

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedSubscriberFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.google.cloud.pubsublite.internal.wire;
1818

19-
import com.google.cloud.pubsublite.SequencedMessage;
19+
import com.google.cloud.pubsublite.proto.SequencedMessage;
2020
import com.google.cloud.pubsublite.proto.SubscribeRequest;
2121
import com.google.cloud.pubsublite.proto.SubscribeResponse;
2222
import java.util.List;

0 commit comments

Comments
 (0)