Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@

public class ProxyStartup {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
/**
* proxy components container, manager components with method start/shutdown/...
* - gRPC thread pool executor
* - message processor (wrap broker controller)
* - grpc server
* - remoting protocol server
* - ...
*/
private static final ProxyStartAndShutdown PROXY_START_AND_SHUTDOWN = new ProxyStartAndShutdown();

private static class ProxyStartAndShutdown extends AbstractStartAndShutdown {
Expand All @@ -73,8 +81,10 @@ public static void main(String[] args) {
// init thread pool monitor for proxy.
initThreadPoolMonitor();

// init business thread pool for grpc server
ThreadPoolExecutor executor = createServerExecutor();

// create message processor, wrap broker controller in local mode
MessagingProcessor messagingProcessor = createMessagingProcessor();

// tls cert update
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,17 +243,46 @@ public class ProxyConfig implements ConfigFile {
private String remotingAccessAddr = "";
private int remotingListenPort = 8080;

// related to proxy's send strategy in cluster mode.
// related to proxy's sending strategy in cluster mode.
private boolean sendLatencyEnable = false;
private boolean startDetectorEnable = false;
private int detectTimeout = 200;
private int detectInterval = 2 * 1000;

private int remotingHeartbeatThreadPoolNums = 2 * PROCESSOR_NUMBER;
private int remotingTopicRouteThreadPoolNums = 2 * PROCESSOR_NUMBER;
/**
* thread pool number for
* 1. send message(and send message v2)
* 2. send batch message
* 3. consume send message back
* 4. end transaction
* 5. recall message
*/
private int remotingSendMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
/**
* thread pool number for
* 1. pull message
* 2. lite pull message
* 3. pop message
*/
private int remotingPullMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
/**
* thread pool number for
* 1. update consumer offset
* 2. ack message
* 3. change message invisible time
* 4. get consumer connection list
*/
private int remotingUpdateOffsetThreadPoolNums = 4 * PROCESSOR_NUMBER;
/**
* thread pool number for
* 1. unregister client
* 2. check client config
* 3. get consumer list by group
* 4. get min/max offset, query consume offset, search offset by timestamp
* 5. lock/unlock batch mq
*/
private int remotingDefaultThreadPoolNums = 4 * PROCESSOR_NUMBER;

private int remotingHeartbeatThreadPoolQueueCapacity = 50000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseWriter;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;

/**
* RocketMQ gRPC protocol implementation
*
* <ul>
* <li>implements gRPC protocol</li>
* <li>execute request in independent thread pool</li>
* <li>execute pipeline, ...</li>
* </ul>
*/
public class GrpcMessagingApplication extends MessagingServiceGrpc.MessagingServiceImplBase implements StartAndShutdown {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);

Expand Down Expand Up @@ -167,6 +176,16 @@ protected Status convertExceptionToStatus(Throwable t) {
return ResponseBuilder.getInstance().buildStatus(t);
}

/**
* submit grpc task to related thread pool.
*
* @param executor thread pool
* @param context context
* @param request grpc request
* @param runnable process task
* @param responseObserver grpc response observer
* @param statusResponseCreator error response creator
*/
protected <V, T> void addExecutor(ExecutorService executor, ProxyContext context, V request, Runnable runnable,
StreamObserver<T> responseObserver, Function<Status, T> statusResponseCreator) {
if (request instanceof GeneratedMessageV3) {
Expand Down Expand Up @@ -200,6 +219,12 @@ protected void validateContext(ProxyContext context) {
}
}

/**
* route query api, producer/consumer will call this api while starting.
*
* @param request request
* @param responseObserver gRPC response observer
*/
@Override
public void queryRoute(QueryRouteRequest request, StreamObserver<QueryRouteResponse> responseObserver) {
Function<Status, QueryRouteResponse> statusResponseCreator = status -> QueryRouteResponse.newBuilder().setStatus(status).build();
Expand All @@ -217,6 +242,12 @@ public void queryRoute(QueryRouteRequest request, StreamObserver<QueryRouteRespo
}
}

/**
* heartbeat api, register producer/consumer
*
* @param request request
* @param responseObserver responseObserver
*/
@Override
public void heartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
Function<Status, HeartbeatResponse> statusResponseCreator = status -> HeartbeatResponse.newBuilder().setStatus(status).build();
Expand Down Expand Up @@ -251,6 +282,12 @@ public void sendMessage(SendMessageRequest request, StreamObserver<SendMessageRe
}
}

/**
* assignment query api, consumer will call this api while starting.
*
* @param request request
* @param responseObserver gRPC response observer
*/
@Override
public void queryAssignment(QueryAssignmentRequest request,
StreamObserver<QueryAssignmentResponse> responseObserver) {
Expand Down Expand Up @@ -399,6 +436,15 @@ public void recallMessage(RecallMessageRequest request, StreamObserver<RecallMes
}
}

/**
* telemetry API
*
* <ul>
* <li>register producer/consumer</li>
* <li>process trace</li>
* <li>verify message result</li>
* </ul>
*/
@Override
public StreamObserver<TelemetryCommand> telemetry(StreamObserver<TelemetryCommand> responseObserver) {
Function<Status, TelemetryCommand> statusResponseCreator = status -> TelemetryCommand.newBuilder().setStatus(status).build();
Expand Down