diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java index 9ee3f4fddd4..7446dc5378d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java @@ -71,6 +71,15 @@ import org.apache.rocketmq.proxy.grpc.v2.common.ResponseWriter; import org.apache.rocketmq.proxy.processor.MessagingProcessor; +/** + * RocketMQ gRPC protocol implementation + * + * + */ public class GrpcMessagingApplication extends MessagingServiceGrpc.MessagingServiceImplBase implements StartAndShutdown { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); @@ -167,6 +176,16 @@ protected Status convertExceptionToStatus(Throwable t) { return ResponseBuilder.getInstance().buildStatus(t); } + /** + * submit grpc task to related thread pool. + * + * @param executor thread pool + * @param context context + * @param request grpc request + * @param runnable process task + * @param responseObserver grpc response observer + * @param statusResponseCreator error response creator + */ protected void addExecutor(ExecutorService executor, ProxyContext context, V request, Runnable runnable, StreamObserver responseObserver, Function statusResponseCreator) { if (request instanceof GeneratedMessageV3) { @@ -200,6 +219,12 @@ protected void validateContext(ProxyContext context) { } } + /** + * route query api, producer/consumer will call this api while starting. + * + * @param request request + * @param responseObserver gRPC response observer + */ @Override public void queryRoute(QueryRouteRequest request, StreamObserver responseObserver) { Function statusResponseCreator = status -> QueryRouteResponse.newBuilder().setStatus(status).build(); @@ -217,6 +242,12 @@ public void queryRoute(QueryRouteRequest request, StreamObserver responseObserver) { Function statusResponseCreator = status -> HeartbeatResponse.newBuilder().setStatus(status).build(); @@ -251,6 +282,12 @@ public void sendMessage(SendMessageRequest request, StreamObserver responseObserver) { @@ -399,6 +436,15 @@ public void recallMessage(RecallMessageRequest request, StreamObserver + *
  • register producer/consumer
  • + *
  • process trace
  • + *
  • verify message result
  • + * + */ @Override public StreamObserver telemetry(StreamObserver responseObserver) { Function statusResponseCreator = status -> TelemetryCommand.newBuilder().setStatus(status).build();