diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java index 131faffa38e..c34c3f4e5d3 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java @@ -55,6 +55,14 @@ public class ProxyStartup { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + /** + * proxy components container, manager components with method start/shutdown/... + * - gRPC thread pool executor + * - message processor (wrap broker controller) + * - grpc server + * - remoting protocol server + * - ... + */ private static final ProxyStartAndShutdown PROXY_START_AND_SHUTDOWN = new ProxyStartAndShutdown(); private static class ProxyStartAndShutdown extends AbstractStartAndShutdown { @@ -73,8 +81,10 @@ public static void main(String[] args) { // init thread pool monitor for proxy. initThreadPoolMonitor(); + // init business thread pool for grpc server ThreadPoolExecutor executor = createServerExecutor(); + // create message processor, wrap broker controller in local mode MessagingProcessor messagingProcessor = createMessagingProcessor(); // tls cert update diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index bc1919c07a1..ad8e1d5312c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -243,7 +243,7 @@ public class ProxyConfig implements ConfigFile { private String remotingAccessAddr = ""; private int remotingListenPort = 8080; - // related to proxy's send strategy in cluster mode. + // related to proxy's sending strategy in cluster mode. private boolean sendLatencyEnable = false; private boolean startDetectorEnable = false; private int detectTimeout = 200; @@ -251,9 +251,38 @@ public class ProxyConfig implements ConfigFile { private int remotingHeartbeatThreadPoolNums = 2 * PROCESSOR_NUMBER; private int remotingTopicRouteThreadPoolNums = 2 * PROCESSOR_NUMBER; + /** + * thread pool number for + * 1. send message(and send message v2) + * 2. send batch message + * 3. consume send message back + * 4. end transaction + * 5. recall message + */ private int remotingSendMessageThreadPoolNums = 4 * PROCESSOR_NUMBER; + /** + * thread pool number for + * 1. pull message + * 2. lite pull message + * 3. pop message + */ private int remotingPullMessageThreadPoolNums = 4 * PROCESSOR_NUMBER; + /** + * thread pool number for + * 1. update consumer offset + * 2. ack message + * 3. change message invisible time + * 4. get consumer connection list + */ private int remotingUpdateOffsetThreadPoolNums = 4 * PROCESSOR_NUMBER; + /** + * thread pool number for + * 1. unregister client + * 2. check client config + * 3. get consumer list by group + * 4. get min/max offset, query consume offset, search offset by timestamp + * 5. lock/unlock batch mq + */ private int remotingDefaultThreadPoolNums = 4 * PROCESSOR_NUMBER; private int remotingHeartbeatThreadPoolQueueCapacity = 50000; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java index 9ee3f4fddd4..7446dc5378d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java @@ -71,6 +71,15 @@ import org.apache.rocketmq.proxy.grpc.v2.common.ResponseWriter; import org.apache.rocketmq.proxy.processor.MessagingProcessor; +/** + * RocketMQ gRPC protocol implementation + * + * + */ public class GrpcMessagingApplication extends MessagingServiceGrpc.MessagingServiceImplBase implements StartAndShutdown { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); @@ -167,6 +176,16 @@ protected Status convertExceptionToStatus(Throwable t) { return ResponseBuilder.getInstance().buildStatus(t); } + /** + * submit grpc task to related thread pool. + * + * @param executor thread pool + * @param context context + * @param request grpc request + * @param runnable process task + * @param responseObserver grpc response observer + * @param statusResponseCreator error response creator + */ protected void addExecutor(ExecutorService executor, ProxyContext context, V request, Runnable runnable, StreamObserver responseObserver, Function statusResponseCreator) { if (request instanceof GeneratedMessageV3) { @@ -200,6 +219,12 @@ protected void validateContext(ProxyContext context) { } } + /** + * route query api, producer/consumer will call this api while starting. + * + * @param request request + * @param responseObserver gRPC response observer + */ @Override public void queryRoute(QueryRouteRequest request, StreamObserver responseObserver) { Function statusResponseCreator = status -> QueryRouteResponse.newBuilder().setStatus(status).build(); @@ -217,6 +242,12 @@ public void queryRoute(QueryRouteRequest request, StreamObserver responseObserver) { Function statusResponseCreator = status -> HeartbeatResponse.newBuilder().setStatus(status).build(); @@ -251,6 +282,12 @@ public void sendMessage(SendMessageRequest request, StreamObserver responseObserver) { @@ -399,6 +436,15 @@ public void recallMessage(RecallMessageRequest request, StreamObserver + *
  • register producer/consumer
  • + *
  • process trace
  • + *
  • verify message result
  • + * + */ @Override public StreamObserver telemetry(StreamObserver responseObserver) { Function statusResponseCreator = status -> TelemetryCommand.newBuilder().setStatus(status).build();