diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index efaba6cf1..9a1898e1e 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -36,13 +36,17 @@ import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.httpjson.HttpJsonCallContext; +import com.google.api.gax.httpjson.InstantiatingHttpJsonChannelProvider; import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.NoHeaderProvider; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub; +import com.google.cloud.pubsub.v1.stub.HttpJsonPublisherStub; import com.google.cloud.pubsub.v1.stub.PublisherStub; import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; import com.google.common.base.Preconditions; @@ -120,9 +124,10 @@ public class Publisher implements PublisherInterface { private final boolean enableCompression; private final long compressionBytesThreshold; + private final boolean enableRESTJsonTransport; - private final GrpcCallContext publishContext; - private final GrpcCallContext publishContextWithCompression; + private final ApiCallContext publishContext; + private final ApiCallContext publishContextWithCompression; /** The maximum number of messages in one request. Defined by the API. */ public static long getApiMaxRequestElementCount() { @@ -152,6 +157,8 @@ private Publisher(Builder builder) throws IOException { this.messageTransform = builder.messageTransform; this.enableCompression = builder.enableCompression; this.compressionBytesThreshold = builder.compressionBytesThreshold; + this.enableRESTJsonTransport = + builder.channelProvider instanceof InstantiatingHttpJsonChannelProvider; messagesBatches = new HashMap<>(); messagesBatchLock = new ReentrantLock(); @@ -199,15 +206,24 @@ private Publisher(Builder builder) throws IOException { StatusCode.Code.UNAVAILABLE) .setRetrySettings(retrySettingsBuilder.build()) .setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build()); - this.publisherStub = GrpcPublisherStub.create(stubSettings.build()); + this.publisherStub = + this.enableRESTJsonTransport + ? HttpJsonPublisherStub.create(stubSettings.build()) + : GrpcPublisherStub.create(stubSettings.build()); backgroundResourceList.add(publisherStub); backgroundResources = new BackgroundResourceAggregation(backgroundResourceList); shutdown = new AtomicBoolean(false); messagesWaiter = new Waiter(); - this.publishContext = GrpcCallContext.createDefault(); + this.publishContext = + this.enableRESTJsonTransport + ? HttpJsonCallContext.createDefault() + : GrpcCallContext.createDefault(); this.publishContextWithCompression = - GrpcCallContext.createDefault() - .withCallOptions(CallOptions.DEFAULT.withCompression(GZIP_COMPRESSION)); + this.enableRESTJsonTransport + ? this.publishContext + : // TODO + GrpcCallContext.createDefault() + .withCallOptions(CallOptions.DEFAULT.withCompression(GZIP_COMPRESSION)); } /** Topic which the publisher publishes to. */ @@ -448,7 +464,7 @@ private void publishAllWithoutInflightForKey(final String orderingKey) { } private ApiFuture publishCall(OutstandingBatch outstandingBatch) { - GrpcCallContext context = publishContext; + ApiCallContext context = publishContext; if (enableCompression && outstandingBatch.batchSizeBytes >= compressionBytesThreshold) { context = publishContextWithCompression; }