diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckHandler.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckHandler.java new file mode 100644 index 000000000..9e175725c --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckHandler.java @@ -0,0 +1,42 @@ +/* + * Copyright 2019 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.google.cloud.pubsub.v1; + +import io.opentelemetry.api.common.AttributesBuilder; + +public class AckHandler implements RpcOperationHandler{ + @Override + public String getCodeFunction() { + return "sendAckOperations"; + } + + @Override + public String getOperationName() { + return "ack"; + } + + @Override + public void addAttributes(AttributesBuilder builder, int ackDeadline, boolean isReceiptModack) { + // No extra attributes for "ack" + } + + @Override + public void addEvent(PubsubMessageWrapper message) { + message.addAckStartEvent(); + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModAckHandler.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModAckHandler.java new file mode 100644 index 000000000..d486d40b9 --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/ModAckHandler.java @@ -0,0 +1,47 @@ +/* + * Copyright 2019 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.google.cloud.pubsub.v1; + +import io.opentelemetry.api.common.AttributesBuilder; + +public class ModAckHandler implements RpcOperationHandler{ + + private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; + private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; + + @Override + public String getCodeFunction() { + return "sendModAckOperations"; + } + + @Override + public String getOperationName() { + return "modack"; + } + + @Override + public void addAttributes(AttributesBuilder builder, int ackDeadline, boolean isReceiptModack) { + builder.put("messaging.gcp_pubsub.message.ack_deadline", ackDeadline) + .put("messaging.gcp_pubsub.is_receipt_modack", isReceiptModack); + } + + @Override + public void addEvent(PubsubMessageWrapper message) { + message.addModAckStartEvent(); + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/NackHandler.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/NackHandler.java new file mode 100644 index 000000000..e2265637e --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/NackHandler.java @@ -0,0 +1,42 @@ +/* + * Copyright 2019 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.google.cloud.pubsub.v1; + +import io.opentelemetry.api.common.AttributesBuilder; + +public class NackHandler implements RpcOperationHandler{ + @Override + public String getCodeFunction() { + return "sendModAckOperations"; + } + + @Override + public String getOperationName() { + return "nack"; + } + + @Override + public void addAttributes(AttributesBuilder builder, int ackDeadline, boolean isReceiptModack) { + // No extra attributes for "nack" + } + + @Override + public void addEvent(PubsubMessageWrapper message) { + message.addNackStartEvent(); + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java index fce664ff2..69848ffa0 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java @@ -43,19 +43,25 @@ */ public class OpenCensusUtil { private static final Logger logger = Logger.getLogger(OpenCensusUtil.class.getName()); - + private static final OpenCensusUtil INSTANCE = new OpenCensusUtil(); public static final String TAG_CONTEXT_KEY = "googclient_OpenCensusTagContextKey"; public static final String TRACE_CONTEXT_KEY = "googclient_OpenCensusTraceContextKey"; @VisibleForTesting static final String MESSAGE_RECEIVER_SPAN_NAME = "OpenCensusMessageReceiver"; private static final String TRACEPARENT_KEY = "traceparent"; - private static final Tagger tagger = Tags.getTagger(); - private static final TagContextBinarySerializer serializer = - Tags.getTagPropagationComponent().getBinarySerializer(); - private static final Tracer tracer = Tracing.getTracer(); - private static final TextFormat traceContextTextFormat = - Tracing.getPropagationComponent().getTraceContextFormat(); + private final Tagger tagger; + private final TagContextBinarySerializer serializer ; + + private final Tracer tracer; + private final TextFormat traceContextTextFormat ; + + public OpenCensusUtil() { + this.tagger = Tags.getTagger(); + this.serializer = Tags.getTagPropagationComponent().getBinarySerializer(); + this.tracer = Tracing.getTracer(); + this.traceContextTextFormat = Tracing.getPropagationComponent().getTraceContextFormat(); + } /** * Propagates active OpenCensus trace and tag contexts from the Publisher by adding them as @@ -66,8 +72,8 @@ public class OpenCensusUtil { @Override public PubsubMessage apply(PubsubMessage message) { PubsubMessage.Builder builder = PubsubMessage.newBuilder(message); - String encodedSpanContext = encodeSpanContext(tracer.getCurrentSpan().getContext()); - String encodedTagContext = encodeTagContext(tagger.getCurrentTagContext()); + String encodedSpanContext = encodeSpanContext(INSTANCE.tracer.getCurrentSpan().getContext()); + String encodedTagContext = encodeTagContext(INSTANCE.tagger.getCurrentTagContext()); if (encodedSpanContext.isEmpty() && encodedTagContext.isEmpty()) { return message; } @@ -102,7 +108,7 @@ public String get(String carrier, String key) { @VisibleForTesting static String encodeSpanContext(SpanContext ctxt) { StringBuilder builder = new StringBuilder(); - traceContextTextFormat.inject(ctxt, builder, setter); + INSTANCE.traceContextTextFormat.inject(ctxt, builder, setter); return builder.toString(); } @@ -115,14 +121,14 @@ private static String encodeTagContext(TagContext tags) { // TODO: update this code once the text encoding of tags has been resolved // (https://github.com/census-instrumentation/opencensus-specs/issues/65). private static Scope createScopedTagContext(String encodedTags) { - return tagger.withTagContext(tagger.getCurrentTagContext()); + return INSTANCE.tagger.withTagContext(INSTANCE.tagger.getCurrentTagContext()); } @VisibleForTesting @MustBeClosed static Scope createScopedSpan(String name) { - return tracer - .spanBuilderWithExplicitParent(name, tracer.getCurrentSpan()) + return INSTANCE.tracer + .spanBuilderWithExplicitParent(name, INSTANCE.tracer.getCurrentSpan()) .setRecordEvents(true) // Note: we preserve the sampling decision from the publisher. .setSampler(Samplers.alwaysSample()) @@ -131,8 +137,8 @@ static Scope createScopedSpan(String name) { private static void addParentLink(String encodedParentSpanContext) { try { - SpanContext ctxt = traceContextTextFormat.extract(encodedParentSpanContext, getter); - tracer.getCurrentSpan().addLink(Link.fromSpanContext(ctxt, Link.Type.PARENT_LINKED_SPAN)); + SpanContext ctxt = INSTANCE.traceContextTextFormat.extract(encodedParentSpanContext, getter); + INSTANCE.tracer.getCurrentSpan().addLink(Link.fromSpanContext(ctxt, Link.Type.PARENT_LINKED_SPAN)); } catch (SpanContextParseException exn) { logger.log(Level.INFO, "OpenCensus: Trace Context Deserialization Exception: " + exn); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java index 9ee751135..7c59c1971 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java @@ -47,8 +47,6 @@ public class OpenTelemetryPubsubTracer { "messaging.gcp_pubsub.message.exactly_once_delivery"; private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY = "messaging.gcp_pubsub.message.delivery_attempt"; - private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; - private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; private static final String PROJECT_ATTR_KEY = "gcp.project_id"; private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish"; @@ -366,20 +364,21 @@ Span startSubscribeRpcSpan( if (!enabled) { return null; } - String codeFunction = rpcOperation == "ack" ? "sendAckOperations" : "sendModAckOperations"; + + // Get the appropriate handler for the operation + RpcOperationHandler handler = getHandler(rpcOperation); + AttributesBuilder attributesBuilder = createCommonSpanAttributesBuilder( subscriptionName.getSubscription(), subscriptionName.getProject(), - codeFunction, + handler.getCodeFunction(), rpcOperation) .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()); // Ack deadline and receipt modack are specific to the modack operation if (rpcOperation == "modack") { - attributesBuilder - .put(ACK_DEADLINE_ATTR_KEY, ackDeadline) - .put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack); + handler.addAttributes(attributesBuilder, ackDeadline, isReceiptModack); } SpanBuilder rpcSpanBuilder = @@ -399,22 +398,25 @@ Span startSubscribeRpcSpan( for (PubsubMessageWrapper message : messages) { if (rpcSpan.getSpanContext().isSampled()) { message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes); - switch (rpcOperation) { - case "ack": - message.addAckStartEvent(); - break; - case "modack": - message.addModAckStartEvent(); - break; - case "nack": - message.addNackStartEvent(); - break; - } + handler.addEvent(message); } } return rpcSpan; } + private RpcOperationHandler getHandler(String rpcOperation) { + switch (rpcOperation) { + case "ack": + return new AckHandler(); + case "modack": + return new ModAckHandler(); + case "nack": + return new NackHandler(); + default: + throw new IllegalArgumentException("Unknown RPC operation: " + rpcOperation); + } + } + /** Ends the given subscribe RPC span if it exists. */ void endSubscribeRpcSpan(Span rpcSpan) { if (!enabled) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 113cbf932..e35dff45d 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -120,7 +120,7 @@ public class Publisher implements PublisherInterface { private final AtomicBoolean shutdown; private final BackgroundResource backgroundResources; - private final Waiter messagesWaiter; + private final Waiter pendingPublishesTracker; private ScheduledFuture currentAlarmFuture; private final ApiFunction messageTransform; @@ -227,7 +227,7 @@ private Publisher(Builder builder) throws IOException { backgroundResourceList.add(publisherStub); backgroundResources = new BackgroundResourceAggregation(backgroundResourceList); shutdown = new AtomicBoolean(false); - messagesWaiter = new Waiter(); + pendingPublishesTracker = new Waiter(); this.publishContext = GrpcCallContext.createDefault(); this.publishContextWithCompression = GrpcCallContext.createDefault() @@ -338,7 +338,7 @@ public ApiFuture publish(PubsubMessage message) { messagesBatchLock.unlock(); } - messagesWaiter.incrementPendingCount(1); + pendingPublishesTracker.incrementPendingCount(1); // For messages without ordering keys, it is okay to send batches without holding // messagesBatchLock. @@ -524,14 +524,18 @@ public void onSuccess(PublishResponse result) { result.getMessageIdsCount(), outstandingBatch.size()))); } else { outstandingBatch.onSuccess(result.getMessageIdsList()); - if (!activeAlarm.get() - && outstandingBatch.orderingKey != null - && !outstandingBatch.orderingKey.isEmpty()) { + + // Break down the complex conditional into descriptive variables + boolean noActiveAlarm = !activeAlarm.get(); + boolean orderingKeyNotEmpty = outstandingBatch.orderingKey != null && !outstandingBatch.orderingKey.isEmpty(); + + // Simplified and readable condition + if (noActiveAlarm && orderingKeyNotEmpty) { publishAllWithoutInflightForKey(outstandingBatch.orderingKey); } } } finally { - messagesWaiter.incrementPendingCount(-outstandingBatch.size()); + pendingPublishesTracker.incrementPendingCount(-outstandingBatch.size()); } } @@ -555,7 +559,7 @@ public void onFailure(Throwable t) { } outstandingBatch.onFailure(t); } finally { - messagesWaiter.incrementPendingCount(-outstandingBatch.size()); + pendingPublishesTracker.incrementPendingCount(-outstandingBatch.size()); } } }; @@ -667,7 +671,7 @@ public void shutdown() { currentAlarmFuture.cancel(false); } publishAllOutstanding(); - messagesWaiter.waitComplete(); + pendingPublishesTracker.waitComplete(); backgroundResources.shutdown(); } @@ -990,92 +994,108 @@ private static class MessageFlowController { this.awaitingBytesAcquires = new LinkedList(); } + void acquire(long messageSize) throws FlowController.FlowControlException { + // Check if the message size exceeds the byte limit outright if (messageSize > byteLimit) { - logger.log( - Level.WARNING, - "Attempted to publish message with byte size > request byte flow control limit."); + logger.log(Level.WARNING, "Attempted to publish message with byte size > request byte flow control limit."); throw new FlowController.MaxOutstandingRequestBytesReachedException(byteLimit); } + + // Acquire the lock for thread safety lock.lock(); try { - if (outstandingMessages >= messageLimit - && limitBehavior == FlowController.LimitExceededBehavior.ThrowException) { - throw new FlowController.MaxOutstandingElementCountReachedException(messageLimit); - } - if (outstandingBytes + messageSize >= byteLimit - && limitBehavior == FlowController.LimitExceededBehavior.ThrowException) { - throw new FlowController.MaxOutstandingRequestBytesReachedException(byteLimit); - } + // Validate limits and throw exceptions if necessary + checkMessageLimit(); + checkByteLimit(messageSize); - // We can acquire or we should wait until we can acquire. - // Start by acquiring a slot for a message. - CountDownLatch messageWaiter = null; - while (outstandingMessages >= messageLimit) { - if (messageWaiter == null) { - // This message gets added to the back of the line. - messageWaiter = new CountDownLatch(1); - awaitingMessageAcquires.addLast(messageWaiter); - } else { - // This message already in line stays at the head of the line. - messageWaiter = new CountDownLatch(1); - awaitingMessageAcquires.set(0, messageWaiter); - } - lock.unlock(); - try { - messageWaiter.await(); - } catch (InterruptedException e) { - logger.log(Level.WARNING, "Interrupted while waiting to acquire flow control tokens"); - } - lock.lock(); - } - ++outstandingMessages; - if (messageWaiter != null) { - awaitingMessageAcquires.removeFirst(); - } + // Acquire resources, potentially waiting if limits are reached + acquireMessageSlot(); + acquireByteSpace(messageSize); + } finally { + // Always release the lock + lock.unlock(); + } + } - // There may be some surplus messages left; let the next message waiting for a token have - // one. - if (!awaitingMessageAcquires.isEmpty() && outstandingMessages < messageLimit) { - awaitingMessageAcquires.getFirst().countDown(); - } + /** + * Checks if the outstanding message count has reached the limit and throws an exception + * if the limit behavior is set to ThrowException. + * + * @throws FlowController.FlowControlException if the message limit is exceeded + */ + private void checkMessageLimit() throws FlowController.FlowControlException { + if (outstandingMessages >= messageLimit && limitBehavior == FlowController.LimitExceededBehavior.ThrowException) { + throw new FlowController.MaxOutstandingElementCountReachedException(messageLimit); + } + } - // Now acquire space for bytes. - CountDownLatch bytesWaiter = null; - Long bytesRemaining = messageSize; - while (outstandingBytes + bytesRemaining >= byteLimit) { - // Take what is available. - Long available = byteLimit - outstandingBytes; - bytesRemaining -= available; - outstandingBytes = byteLimit; - if (bytesWaiter == null) { - // This message gets added to the back of the line. - bytesWaiter = new CountDownLatch(1); - awaitingBytesAcquires.addLast(bytesWaiter); - } else { - // This message already in line stays at the head of the line. - bytesWaiter = new CountDownLatch(1); - awaitingBytesAcquires.set(0, bytesWaiter); - } - lock.unlock(); - try { - bytesWaiter.await(); - } catch (InterruptedException e) { - logger.log(Level.WARNING, "Interrupted while waiting to acquire flow control tokens"); - } - lock.lock(); - } + /** + * Checks if adding the message size would exceed the byte limit and throws an exception + * if the limit behavior is set to ThrowException. + * + * @param messageSize the size of the message in bytes + * @throws FlowController.FlowControlException if the byte limit would be exceeded + */ + private void checkByteLimit(long messageSize) throws FlowController.FlowControlException { + if (outstandingBytes + messageSize >= byteLimit && limitBehavior == FlowController.LimitExceededBehavior.ThrowException) { + throw new FlowController.MaxOutstandingRequestBytesReachedException(byteLimit); + } + } - outstandingBytes += bytesRemaining; - if (bytesWaiter != null) { - awaitingBytesAcquires.removeFirst(); - } - // There may be some surplus bytes left; let the next message waiting for bytes have some. - if (!awaitingBytesAcquires.isEmpty() && outstandingBytes < byteLimit) { - awaitingBytesAcquires.getFirst().countDown(); - } + /** + * Acquires a slot for a message, waiting if the message limit is reached until a slot becomes available. + */ + private void acquireMessageSlot() { + if (outstandingMessages < messageLimit) { + outstandingMessages++; + return; + } + + // Wait for a slot if the limit is reached + CountDownLatch messageWaiter = new CountDownLatch(1); + awaitingMessageAcquires.addLast(messageWaiter); + waitForResource(messageWaiter); + outstandingMessages++; + awaitingMessageAcquires.removeFirst(); + + // Notify the next waiter if there's still room + if (!awaitingMessageAcquires.isEmpty() && outstandingMessages < messageLimit) { + awaitingMessageAcquires.getFirst().countDown(); + } + } + + private void acquireByteSpace(long messageSize) { + long bytesRemaining = messageSize; + while (outstandingBytes + bytesRemaining >= byteLimit) { + long available = byteLimit - outstandingBytes; + bytesRemaining -= available; + outstandingBytes = byteLimit; + + // Wait for enough byte space to become available + CountDownLatch bytesWaiter = new CountDownLatch(1); + awaitingBytesAcquires.addLast(bytesWaiter); + waitForResource(bytesWaiter); + awaitingBytesAcquires.removeFirst(); + } + + // Add the remaining bytes + outstandingBytes += bytesRemaining; + + // Notify the next waiter if there's still space + if (!awaitingBytesAcquires.isEmpty() && outstandingBytes < byteLimit) { + awaitingBytesAcquires.getFirst().countDown(); + } + } + + private void waitForResource(CountDownLatch waiter) { + lock.unlock(); + try { + waiter.await(); + } catch (InterruptedException e) { + logger.log(Level.WARNING, "Interrupted while waiting to acquire flow control tokens"); } finally { - lock.unlock(); + lock.lock(); } } @@ -1102,7 +1122,7 @@ void release(long messageSize) { private class MessagesBatch { private List messages; private int initialBatchedBytes; - private int batchedBytes; + private int totalMessageBytesInBatch; private String orderingKey; private final BatchingSettings batchingSettings; @@ -1115,14 +1135,14 @@ private MessagesBatch( } private OutstandingBatch popOutstandingBatch() { - OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey); + OutstandingBatch batch = new OutstandingBatch(messages, totalMessageBytesInBatch, orderingKey); reset(); return batch; } private void reset() { messages = new LinkedList<>(); - batchedBytes = initialBatchedBytes; + totalMessageBytesInBatch = initialBatchedBytes; } private boolean isEmpty() { @@ -1130,7 +1150,7 @@ private boolean isEmpty() { } private int getBatchedBytes() { - return batchedBytes; + return totalMessageBytesInBatch; } private int getMessagesCount() { @@ -1155,7 +1175,7 @@ && getBatchedBytes() + outstandingPublish.messageSize >= getMaxBatchBytes()) { } messages.add(outstandingPublish); - batchedBytes += outstandingPublish.messageSize; + totalMessageBytesInBatch += outstandingPublish.messageSize; // Border case: If the message to send is greater or equals to the max batch size then send it // immediately. diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/RpcOperationHandler.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/RpcOperationHandler.java new file mode 100644 index 000000000..c7558d19f --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/RpcOperationHandler.java @@ -0,0 +1,27 @@ +/* + * Copyright 2019 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.google.cloud.pubsub.v1; + +import io.opentelemetry.api.common.AttributesBuilder; + +public interface RpcOperationHandler { + String getCodeFunction(); + String getOperationName(); + void addAttributes(AttributesBuilder builder, int ackDeadline, boolean isReceiptModack); + void addEvent(PubsubMessageWrapper message); +} diff --git a/grpc-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/MethodDescriptorFactory.java b/grpc-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/MethodDescriptorFactory.java new file mode 100644 index 000000000..9d5ebf739 --- /dev/null +++ b/grpc-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/MethodDescriptorFactory.java @@ -0,0 +1,51 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.pubsub.v1; + +import com.google.protobuf.Message; +import io.grpc.MethodDescriptor; +import io.grpc.protobuf.ProtoUtils; + +/** + * A utility class to create gRPC MethodDescriptors for SchemaService operations. + */ +public final class MethodDescriptorFactory { + + private MethodDescriptorFactory() {} // Prevent instantiation + + /** + * Creates a MethodDescriptor for a unary gRPC method. + * + * @param serviceName The name of the service (e.g., "google.pubsub.v1.SchemaService"). + * @param methodName The name of the method (e.g., "CreateSchema"). + * @param requestType The request message type. + * @param responseType The response message type. + * @return A configured MethodDescriptor. + */ + @SuppressWarnings("unchecked") + public static MethodDescriptor createUnaryMethodDescriptor( + String serviceName, String methodName, ReqT requestType, RespT responseType) { + return MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName(MethodDescriptor.generateFullMethodName(serviceName, methodName)) + .setSampledToLocalTracing(true) + .setRequestMarshaller(ProtoUtils.marshaller(requestType)) + .setResponseMarshaller(ProtoUtils.marshaller(responseType)) + .setSchemaDescriptor(new SchemaServiceGrpc.SchemaServiceMethodDescriptorSupplier(methodName)) + .build(); + } +} \ No newline at end of file diff --git a/grpc-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SchemaServiceGrpc.java b/grpc-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SchemaServiceGrpc.java index 076e1f798..a4b5f9bc1 100644 --- a/grpc-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SchemaServiceGrpc.java +++ b/grpc-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SchemaServiceGrpc.java @@ -53,22 +53,12 @@ private SchemaServiceGrpc() {} synchronized (SchemaServiceGrpc.class) { if ((getCreateSchemaMethod = SchemaServiceGrpc.getCreateSchemaMethod) == null) { SchemaServiceGrpc.getCreateSchemaMethod = - getCreateSchemaMethod = - io.grpc.MethodDescriptor - . - newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "CreateSchema")) - .setSampledToLocalTracing(true) - .setRequestMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.CreateSchemaRequest.getDefaultInstance())) - .setResponseMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.Schema.getDefaultInstance())) - .setSchemaDescriptor( - new SchemaServiceMethodDescriptorSupplier("CreateSchema")) - .build(); + getCreateSchemaMethod = + MethodDescriptorFactory.createUnaryMethodDescriptor( + SERVICE_NAME, + "CreateSchema", + com.google.pubsub.v1.CreateSchemaRequest.getDefaultInstance(), + com.google.pubsub.v1.Schema.getDefaultInstance()); } } } @@ -93,21 +83,12 @@ private SchemaServiceGrpc() {} synchronized (SchemaServiceGrpc.class) { if ((getGetSchemaMethod = SchemaServiceGrpc.getGetSchemaMethod) == null) { SchemaServiceGrpc.getGetSchemaMethod = - getGetSchemaMethod = - io.grpc.MethodDescriptor - . - newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "GetSchema")) - .setSampledToLocalTracing(true) - .setRequestMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.GetSchemaRequest.getDefaultInstance())) - .setResponseMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.Schema.getDefaultInstance())) - .setSchemaDescriptor(new SchemaServiceMethodDescriptorSupplier("GetSchema")) - .build(); + getGetSchemaMethod = + MethodDescriptorFactory.createUnaryMethodDescriptor( + SERVICE_NAME, + "GetSchema", + com.google.pubsub.v1.GetSchemaRequest.getDefaultInstance(), + com.google.pubsub.v1.Schema.getDefaultInstance()); } } } @@ -133,22 +114,12 @@ private SchemaServiceGrpc() {} synchronized (SchemaServiceGrpc.class) { if ((getListSchemasMethod = SchemaServiceGrpc.getListSchemasMethod) == null) { SchemaServiceGrpc.getListSchemasMethod = - getListSchemasMethod = - io.grpc.MethodDescriptor - . - newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "ListSchemas")) - .setSampledToLocalTracing(true) - .setRequestMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.ListSchemasRequest.getDefaultInstance())) - .setResponseMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.ListSchemasResponse.getDefaultInstance())) - .setSchemaDescriptor(new SchemaServiceMethodDescriptorSupplier("ListSchemas")) - .build(); + getListSchemasMethod = + MethodDescriptorFactory.createUnaryMethodDescriptor( + SERVICE_NAME, + "ListSchemas", + com.google.pubsub.v1.ListSchemasRequest.getDefaultInstance(), + com.google.pubsub.v1.ListSchemasResponse.getDefaultInstance()); } } } @@ -178,25 +149,12 @@ private SchemaServiceGrpc() {} if ((getListSchemaRevisionsMethod = SchemaServiceGrpc.getListSchemaRevisionsMethod) == null) { SchemaServiceGrpc.getListSchemaRevisionsMethod = - getListSchemaRevisionsMethod = - io.grpc.MethodDescriptor - . - newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName( - generateFullMethodName(SERVICE_NAME, "ListSchemaRevisions")) - .setSampledToLocalTracing(true) - .setRequestMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.ListSchemaRevisionsRequest.getDefaultInstance())) - .setResponseMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.ListSchemaRevisionsResponse - .getDefaultInstance())) - .setSchemaDescriptor( - new SchemaServiceMethodDescriptorSupplier("ListSchemaRevisions")) - .build(); + getListSchemaRevisionsMethod = + MethodDescriptorFactory.createUnaryMethodDescriptor( + SERVICE_NAME, + "ListSchemaRevisions", + com.google.pubsub.v1.ListSchemaRevisionsRequest.getDefaultInstance(), + com.google.pubsub.v1.ListSchemaRevisionsResponse.getDefaultInstance()); } } } @@ -221,22 +179,12 @@ private SchemaServiceGrpc() {} synchronized (SchemaServiceGrpc.class) { if ((getCommitSchemaMethod = SchemaServiceGrpc.getCommitSchemaMethod) == null) { SchemaServiceGrpc.getCommitSchemaMethod = - getCommitSchemaMethod = - io.grpc.MethodDescriptor - . - newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "CommitSchema")) - .setSampledToLocalTracing(true) - .setRequestMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.CommitSchemaRequest.getDefaultInstance())) - .setResponseMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.Schema.getDefaultInstance())) - .setSchemaDescriptor( - new SchemaServiceMethodDescriptorSupplier("CommitSchema")) - .build(); + getCommitSchemaMethod = + MethodDescriptorFactory.createUnaryMethodDescriptor( + SERVICE_NAME, + "CommitSchema", + com.google.pubsub.v1.CommitSchemaRequest.getDefaultInstance(), + com.google.pubsub.v1.Schema.getDefaultInstance()); } } } @@ -262,22 +210,12 @@ private SchemaServiceGrpc() {} synchronized (SchemaServiceGrpc.class) { if ((getRollbackSchemaMethod = SchemaServiceGrpc.getRollbackSchemaMethod) == null) { SchemaServiceGrpc.getRollbackSchemaMethod = - getRollbackSchemaMethod = - io.grpc.MethodDescriptor - . - newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "RollbackSchema")) - .setSampledToLocalTracing(true) - .setRequestMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.RollbackSchemaRequest.getDefaultInstance())) - .setResponseMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.Schema.getDefaultInstance())) - .setSchemaDescriptor( - new SchemaServiceMethodDescriptorSupplier("RollbackSchema")) - .build(); + getRollbackSchemaMethod = + MethodDescriptorFactory.createUnaryMethodDescriptor( + SERVICE_NAME, + "RollbackSchema", + com.google.pubsub.v1.RollbackSchemaRequest.getDefaultInstance(), + com.google.pubsub.v1.Schema.getDefaultInstance()); } } } @@ -304,25 +242,12 @@ private SchemaServiceGrpc() {} if ((getDeleteSchemaRevisionMethod = SchemaServiceGrpc.getDeleteSchemaRevisionMethod) == null) { SchemaServiceGrpc.getDeleteSchemaRevisionMethod = - getDeleteSchemaRevisionMethod = - io.grpc.MethodDescriptor - . - newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName( - generateFullMethodName(SERVICE_NAME, "DeleteSchemaRevision")) - .setSampledToLocalTracing(true) - .setRequestMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.DeleteSchemaRevisionRequest - .getDefaultInstance())) - .setResponseMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.Schema.getDefaultInstance())) - .setSchemaDescriptor( - new SchemaServiceMethodDescriptorSupplier("DeleteSchemaRevision")) - .build(); + getDeleteSchemaRevisionMethod = + MethodDescriptorFactory.createUnaryMethodDescriptor( + SERVICE_NAME, + "DeleteSchemaRevision", + com.google.pubsub.v1.DeleteSchemaRevisionRequest.getDefaultInstance(), + com.google.pubsub.v1.Schema.getDefaultInstance()); } } } @@ -347,22 +272,12 @@ private SchemaServiceGrpc() {} synchronized (SchemaServiceGrpc.class) { if ((getDeleteSchemaMethod = SchemaServiceGrpc.getDeleteSchemaMethod) == null) { SchemaServiceGrpc.getDeleteSchemaMethod = - getDeleteSchemaMethod = - io.grpc.MethodDescriptor - . - newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "DeleteSchema")) - .setSampledToLocalTracing(true) - .setRequestMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.DeleteSchemaRequest.getDefaultInstance())) - .setResponseMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.protobuf.Empty.getDefaultInstance())) - .setSchemaDescriptor( - new SchemaServiceMethodDescriptorSupplier("DeleteSchema")) - .build(); + getDeleteSchemaMethod = + MethodDescriptorFactory.createUnaryMethodDescriptor( + SERVICE_NAME, + "DeleteSchema", + com.google.pubsub.v1.DeleteSchemaRequest.getDefaultInstance(), + com.google.protobuf.Empty.getDefaultInstance()); } } } @@ -388,23 +303,12 @@ private SchemaServiceGrpc() {} synchronized (SchemaServiceGrpc.class) { if ((getValidateSchemaMethod = SchemaServiceGrpc.getValidateSchemaMethod) == null) { SchemaServiceGrpc.getValidateSchemaMethod = - getValidateSchemaMethod = - io.grpc.MethodDescriptor - . - newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "ValidateSchema")) - .setSampledToLocalTracing(true) - .setRequestMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.ValidateSchemaRequest.getDefaultInstance())) - .setResponseMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.ValidateSchemaResponse.getDefaultInstance())) - .setSchemaDescriptor( - new SchemaServiceMethodDescriptorSupplier("ValidateSchema")) - .build(); + getValidateSchemaMethod = + MethodDescriptorFactory.createUnaryMethodDescriptor( + SERVICE_NAME, + "ValidateSchema", + com.google.pubsub.v1.ValidateSchemaRequest.getDefaultInstance(), + com.google.pubsub.v1.ValidateSchemaResponse.getDefaultInstance()); } } } @@ -431,23 +335,12 @@ private SchemaServiceGrpc() {} synchronized (SchemaServiceGrpc.class) { if ((getValidateMessageMethod = SchemaServiceGrpc.getValidateMessageMethod) == null) { SchemaServiceGrpc.getValidateMessageMethod = - getValidateMessageMethod = - io.grpc.MethodDescriptor - . - newBuilder() - .setType(io.grpc.MethodDescriptor.MethodType.UNARY) - .setFullMethodName(generateFullMethodName(SERVICE_NAME, "ValidateMessage")) - .setSampledToLocalTracing(true) - .setRequestMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.ValidateMessageRequest.getDefaultInstance())) - .setResponseMarshaller( - io.grpc.protobuf.ProtoUtils.marshaller( - com.google.pubsub.v1.ValidateMessageResponse.getDefaultInstance())) - .setSchemaDescriptor( - new SchemaServiceMethodDescriptorSupplier("ValidateMessage")) - .build(); + getValidateMessageMethod = + MethodDescriptorFactory.createUnaryMethodDescriptor( + SERVICE_NAME, + "ValidateMessage", + com.google.pubsub.v1.ValidateMessageRequest.getDefaultInstance(), + com.google.pubsub.v1.ValidateMessageResponse.getDefaultInstance()); } } } @@ -1331,7 +1224,7 @@ private static final class SchemaServiceFileDescriptorSupplier SchemaServiceFileDescriptorSupplier() {} } - private static final class SchemaServiceMethodDescriptorSupplier + static final class SchemaServiceMethodDescriptorSupplier extends SchemaServiceBaseDescriptorSupplier implements io.grpc.protobuf.ProtoMethodDescriptorSupplier { private final java.lang.String methodName;