Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseWriter;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;

/**
* RocketMQ gRPC protocol implementation
*
* <ul>
* <li>implements gRPC protocol</li>
* <li>execute request in independent thread pool</li>
* <li>execute pipeline, ...</li>
* </ul>
*/
public class GrpcMessagingApplication extends MessagingServiceGrpc.MessagingServiceImplBase implements StartAndShutdown {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);

Expand Down Expand Up @@ -167,6 +176,16 @@ protected Status convertExceptionToStatus(Throwable t) {
return ResponseBuilder.getInstance().buildStatus(t);
}

/**
* submit grpc task to related thread pool.
*
* @param executor thread pool
* @param context context
* @param request grpc request
* @param runnable process task
* @param responseObserver grpc response observer
* @param statusResponseCreator error response creator
*/
protected <V, T> void addExecutor(ExecutorService executor, ProxyContext context, V request, Runnable runnable,
StreamObserver<T> responseObserver, Function<Status, T> statusResponseCreator) {
if (request instanceof GeneratedMessageV3) {
Expand Down Expand Up @@ -200,6 +219,12 @@ protected void validateContext(ProxyContext context) {
}
}

/**
* route query api, producer/consumer will call this api while starting.
*
* @param request request
* @param responseObserver gRPC response observer
*/
@Override
public void queryRoute(QueryRouteRequest request, StreamObserver<QueryRouteResponse> responseObserver) {
Function<Status, QueryRouteResponse> statusResponseCreator = status -> QueryRouteResponse.newBuilder().setStatus(status).build();
Expand All @@ -217,6 +242,12 @@ public void queryRoute(QueryRouteRequest request, StreamObserver<QueryRouteRespo
}
}

/**
* heartbeat api, register producer/consumer
*
* @param request request
* @param responseObserver responseObserver
*/
@Override
public void heartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
Function<Status, HeartbeatResponse> statusResponseCreator = status -> HeartbeatResponse.newBuilder().setStatus(status).build();
Expand Down Expand Up @@ -251,6 +282,12 @@ public void sendMessage(SendMessageRequest request, StreamObserver<SendMessageRe
}
}

/**
* assignment query api, consumer will call this api while starting.
*
* @param request request
* @param responseObserver gRPC response observer
*/
@Override
public void queryAssignment(QueryAssignmentRequest request,
StreamObserver<QueryAssignmentResponse> responseObserver) {
Expand Down Expand Up @@ -399,6 +436,15 @@ public void recallMessage(RecallMessageRequest request, StreamObserver<RecallMes
}
}

/**
* telemetry API
*
* <ul>
* <li>register producer/consumer</li>
* <li>process trace</li>
* <li>verify message result</li>
* </ul>
*/
@Override
public StreamObserver<TelemetryCommand> telemetry(StreamObserver<TelemetryCommand> responseObserver) {
Function<Status, TelemetryCommand> statusResponseCreator = status -> TelemetryCommand.newBuilder().setStatus(status).build();
Expand Down