1313import com .clickhouse .client .api .data_formats .internal .SerializerUtils ;
1414import com .clickhouse .client .api .enums .ProxyType ;
1515import com .clickhouse .client .api .http .ClickHouseHttpProto ;
16+ import io .micrometer .core .annotation .Timed ;
1617import org .apache .hc .client5 .http .AuthenticationStrategy ;
1718import org .apache .hc .client5 .http .ConnectTimeoutException ;
1819import org .apache .hc .client5 .http .classic .ExecChain ;
9495import java .util .Objects ;
9596import java .util .Properties ;
9697import java .util .Set ;
98+ import java .util .concurrent .ConcurrentLinkedQueue ;
9799import java .util .concurrent .TimeUnit ;
100+ import java .util .concurrent .atomic .AtomicLong ;
98101import java .util .function .Function ;
99102
100103public class HttpAPIClientHelper {
@@ -112,6 +115,9 @@ public class HttpAPIClientHelper {
112115
113116 private final Set <ClientFaultCause > defaultRetryCauses ;
114117
118+ private AtomicLong requestCount = new AtomicLong (0 );
119+ private AtomicLong failureCount = new AtomicLong (0 );
120+
115121 private String defaultUserAgent ;
116122 private Object metricsRegistry ;
117123 public HttpAPIClientHelper (Map <String , String > configuration , Object metricsRegistry , boolean initSslContext ) {
@@ -229,22 +235,12 @@ private HttpClientConnectionManager poolConnectionManager(LayeredConnectionSocke
229235
230236
231237 int networkBufferSize = MapUtils .getInt (chConfiguration , "client_network_buffer_size" );
232- ManagedHttpClientConnectionFactory connectionFactory = new ManagedHttpClientConnectionFactory (
238+ MeteredManagedHttpClientConnectionFactory connectionFactory = new MeteredManagedHttpClientConnectionFactory (
233239 Http1Config .custom ()
234240 .setBufferSize (networkBufferSize )
235241 .build (),
236242 CharCodingConfig .DEFAULT ,
237- DefaultHttpResponseParserFactory .INSTANCE ) {
238- @ Override
239- public ManagedHttpClientConnection createConnection (Socket socket ) throws IOException {
240- long startT = System .nanoTime ();
241- try {
242- return super .createConnection (socket );
243- } finally {
244- LOG .info ("connection created in " + (System .nanoTime () - startT ) + "ns" );
245- }
246- }
247- };
243+ DefaultHttpResponseParserFactory .INSTANCE );
248244
249245 connMgrBuilder .setConnectionFactory (connectionFactory );
250246 connMgrBuilder .setSSLSocketFactory (sslConnectionSocketFactory );
@@ -258,6 +254,12 @@ public ManagedHttpClientConnection createConnection(Socket socket) throws IOExce
258254 Class <?> micrometerLoader = getClass ().getClassLoader ().loadClass ("com.clickhouse.client.api.metrics.MicrometerLoader" );
259255 Method applyMethod = micrometerLoader .getDeclaredMethod ("applyPoolingMetricsBinder" , Object .class , String .class , PoolingHttpClientConnectionManager .class );
260256 applyMethod .invoke (micrometerLoader , metricsRegistry , mGroupName , phccm );
257+
258+ applyMethod = micrometerLoader .getDeclaredMethod ("applyConnectionMetricsBinder" , Object .class , String .class , MeteredManagedHttpClientConnectionFactory .class );
259+ applyMethod .invoke (micrometerLoader , metricsRegistry , mGroupName , connectionFactory );
260+
261+ applyMethod = micrometerLoader .getDeclaredMethod ("applyFailureRatioMetricsBinder" , Object .class , String .class , HttpAPIClientHelper .class );
262+ applyMethod .invoke (micrometerLoader , metricsRegistry , mGroupName , this );
261263 } catch (Exception e ) {
262264 LOG .error ("Failed to register metrics" , e );
263265 }
@@ -418,9 +420,7 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
418420 HttpClientContext context = HttpClientContext .create ();
419421
420422 try {
421- {
422- // increment request count
423- }
423+ requestCount .incrementAndGet ();
424424 ClassicHttpResponse httpResponse = httpClient .executeOpen (null , req , context );
425425 boolean serverCompression = MapUtils .getFlag (requestConfig , chConfiguration , ClientConfigProperties .COMPRESS_SERVER_RESPONSE .getKey ());
426426 httpResponse .setEntity (wrapResponseEntity (httpResponse .getEntity (), httpResponse .getCode (), serverCompression , useHttpCompression ));
@@ -446,9 +446,9 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
446446 LOG .warn ("Failed to connect to '{}': {}" , server .getHost (), e .getMessage ());
447447 throw new ClientException ("Failed to connect" , e );
448448 } catch (ConnectionRequestTimeoutException | ServerException | NoHttpResponseException | ClientException | SocketTimeoutException e ) {
449- if ( e instanceof ConnectionRequestTimeoutException ) {
450- // add failed request because of connection request timeout
451- req .getConfig ().getConnectionRequestTimeout (); // record timeout value
449+ failureCount . incrementAndGet ();
450+ if ( e instanceof ConnectionRequestTimeoutException || e instanceof SocketTimeoutException ) {
451+ LOG . warn ( "Request failed with timeout: {}" , req .getConfig ().getConnectionRequestTimeout () ); // record timeout value
452452 }
453453 throw e ;
454454 } catch (Exception e ) {
@@ -784,6 +784,10 @@ public void close() {
784784 }
785785
786786
787+ public long getRequestRatio () {//Metrics
788+ return requestCount .get () == 0 ? 0 : Math .round (((float ) failureCount .get () / requestCount .get ()) * 100 );
789+ }
790+
787791 /**
788792 * This factory is used only when no ssl connections are required (no https endpoints).
789793 * Internally http client would create factory and spend time if no supplied.
@@ -804,4 +808,34 @@ public Socket connectSocket(TimeValue connectTimeout, Socket socket, HttpHost ho
804808 return null ;
805809 }
806810 }
811+
812+ public class MeteredManagedHttpClientConnectionFactory extends ManagedHttpClientConnectionFactory {
813+ public MeteredManagedHttpClientConnectionFactory (Http1Config http1Config , CharCodingConfig charCodingConfig , DefaultHttpResponseParserFactory defaultHttpResponseParserFactory ) {
814+ super (http1Config , charCodingConfig , defaultHttpResponseParserFactory );
815+ }
816+
817+ ConcurrentLinkedQueue <Long > times = new ConcurrentLinkedQueue <>();
818+
819+
820+ @ Override
821+ public ManagedHttpClientConnection createConnection (Socket socket ) throws IOException {
822+ long startT = System .currentTimeMillis ();
823+ try {
824+ return super .createConnection (socket );
825+ } finally {
826+ long endT = System .currentTimeMillis ();
827+ times .add (endT - startT );
828+ }
829+ }
830+
831+ public long getTime () {
832+ int count = times .size ();
833+ long runningAverage = 0 ;
834+ for (int i = 0 ; i < count ; i ++) {
835+ runningAverage += times .poll ();
836+ }
837+
838+ return count > 0 ? runningAverage / count : 0 ;
839+ }
840+ }
807841}
0 commit comments