1717
1818package im .turms .gateway .access .client .common ;
1919
20+ import java .util .concurrent .atomic .AtomicInteger ;
21+
2022import io .netty .buffer .ByteBuf ;
2123import io .netty .buffer .Unpooled ;
2224import org .springframework .stereotype .Component ;
2325import reactor .core .publisher .Mono ;
2426
27+ import im .turms .gateway .domain .observation .service .MetricsService ;
2528import im .turms .gateway .domain .servicerequest .service .ServiceRequestService ;
2629import im .turms .gateway .domain .session .access .client .controller .SessionClientController ;
2730import im .turms .gateway .domain .session .service .SessionService ;
4346import im .turms .server .common .infra .healthcheck .ServiceAvailability ;
4447import im .turms .server .common .infra .logging .core .logger .Logger ;
4548import im .turms .server .common .infra .logging .core .logger .LoggerFactory ;
49+ import im .turms .server .common .infra .metrics .CommonMetricNameConst ;
4650import im .turms .server .common .infra .property .TurmsPropertiesManager ;
4751import im .turms .server .common .infra .proto .ProtoDecoder ;
4852import im .turms .server .common .infra .proto .ProtoEncoder ;
@@ -84,6 +88,8 @@ public class ClientRequestDispatcher {
8488 private final ServiceRequestService serviceRequestService ;
8589 private final ServerStatusManager serverStatusManager ;
8690
91+ private final AtomicInteger pendingRequestCount ;
92+
8793 public ClientRequestDispatcher (
8894 ApiLoggingContext apiLoggingContext ,
8995 BlocklistService blocklistService ,
@@ -92,6 +98,7 @@ public ClientRequestDispatcher(
9298 SessionService sessionService ,
9399 ServiceRequestService serviceRequestService ,
94100 ServerStatusManager serverStatusManager ,
101+ MetricsService metricsService ,
95102 TurmsPropertiesManager propertiesManager ) {
96103 this .apiLoggingContext = apiLoggingContext ;
97104 this .blocklistService = blocklistService ;
@@ -100,6 +107,8 @@ public ClientRequestDispatcher(
100107 this .sessionService = sessionService ;
101108 this .serviceRequestService = serviceRequestService ;
102109 this .serverStatusManager = serverStatusManager ;
110+ pendingRequestCount = metricsService .getRegistry ()
111+ .gauge (CommonMetricNameConst .TURMS_CLIENT_REQUEST_PENDING , new AtomicInteger ());
103112 NotificationFactory .init (propertiesManager );
104113 }
105114
@@ -113,6 +122,19 @@ public ClientRequestDispatcher(
113122 public Mono <ByteBuf > handleRequest (
114123 UserSessionWrapper sessionWrapper ,
115124 ByteBuf serviceRequestBuffer ) {
125+ pendingRequestCount .incrementAndGet ();
126+ try {
127+ return handleRequest0 (sessionWrapper , serviceRequestBuffer )
128+ .doFinally (signalType -> pendingRequestCount .decrementAndGet ());
129+ } catch (Exception e ) {
130+ pendingRequestCount .decrementAndGet ();
131+ return Mono .error (e );
132+ }
133+ }
134+
135+ public Mono <ByteBuf > handleRequest0 (
136+ UserSessionWrapper sessionWrapper ,
137+ ByteBuf serviceRequestBuffer ) {
116138 // Check if it is a heartbeat request
117139 if (!serviceRequestBuffer .isReadable ()) {
118140 serviceRequestBuffer .release ();
0 commit comments