1313import com .clickhouse .client .api .data_formats .internal .SerializerUtils ;
1414import com .clickhouse .client .api .enums .ProxyType ;
1515import com .clickhouse .client .api .http .ClickHouseHttpProto ;
16+ import io .micrometer .core .annotation .Timed ;
17+ import org .apache .hc .client5 .http .AuthenticationStrategy ;
1618import org .apache .hc .client5 .http .ConnectTimeoutException ;
19+ import org .apache .hc .client5 .http .classic .ExecChain ;
20+ import org .apache .hc .client5 .http .classic .ExecChainHandler ;
1721import org .apache .hc .client5 .http .classic .methods .HttpPost ;
1822import org .apache .hc .client5 .http .config .ConnectionConfig ;
1923import org .apache .hc .client5 .http .config .RequestConfig ;
24+ import org .apache .hc .client5 .http .impl .ChainElement ;
25+ import org .apache .hc .client5 .http .impl .DefaultAuthenticationStrategy ;
26+ import org .apache .hc .client5 .http .impl .DefaultClientConnectionReuseStrategy ;
27+ import org .apache .hc .client5 .http .impl .DefaultSchemePortResolver ;
2028import org .apache .hc .client5 .http .impl .classic .CloseableHttpClient ;
29+ import org .apache .hc .client5 .http .impl .classic .ConnectExec ;
2130import org .apache .hc .client5 .http .impl .classic .HttpClientBuilder ;
2231import org .apache .hc .client5 .http .impl .io .BasicHttpClientConnectionManager ;
2332import org .apache .hc .client5 .http .impl .io .ManagedHttpClientConnectionFactory ;
2433import org .apache .hc .client5 .http .impl .io .PoolingHttpClientConnectionManager ;
2534import org .apache .hc .client5 .http .impl .io .PoolingHttpClientConnectionManagerBuilder ;
2635import org .apache .hc .client5 .http .io .HttpClientConnectionManager ;
36+ import org .apache .hc .client5 .http .io .ManagedHttpClientConnection ;
2737import org .apache .hc .client5 .http .protocol .HttpClientContext ;
2838import org .apache .hc .client5 .http .socket .ConnectionSocketFactory ;
2939import org .apache .hc .client5 .http .socket .LayeredConnectionSocketFactory ;
3040import org .apache .hc .client5 .http .socket .PlainConnectionSocketFactory ;
3141import org .apache .hc .client5 .http .ssl .SSLConnectionSocketFactory ;
42+ import org .apache .hc .core5 .http .ClassicHttpRequest ;
3243import org .apache .hc .core5 .http .ClassicHttpResponse ;
3344import org .apache .hc .core5 .http .ConnectionRequestTimeoutException ;
3445import org .apache .hc .core5 .http .ContentType ;
3546import org .apache .hc .core5 .http .Header ;
3647import org .apache .hc .core5 .http .HttpEntity ;
48+ import org .apache .hc .core5 .http .HttpException ;
3749import org .apache .hc .core5 .http .HttpHeaders ;
3850import org .apache .hc .core5 .http .HttpHost ;
3951import org .apache .hc .core5 .http .HttpRequest ;
4557import org .apache .hc .core5 .http .impl .io .DefaultHttpResponseParserFactory ;
4658import org .apache .hc .core5 .http .io .SocketConfig ;
4759import org .apache .hc .core5 .http .io .entity .EntityTemplate ;
60+ import org .apache .hc .core5 .http .protocol .DefaultHttpProcessor ;
4861import org .apache .hc .core5 .http .protocol .HttpContext ;
62+ import org .apache .hc .core5 .http .protocol .RequestTargetHost ;
63+ import org .apache .hc .core5 .http .protocol .RequestUserAgent ;
4964import org .apache .hc .core5 .io .CloseMode ;
5065import org .apache .hc .core5 .io .IOCallback ;
5166import org .apache .hc .core5 .net .URIBuilder ;
8196import java .util .Objects ;
8297import java .util .Properties ;
8398import java .util .Set ;
99+ import java .util .concurrent .ConcurrentLinkedQueue ;
84100import java .util .concurrent .TimeUnit ;
101+ import java .util .concurrent .atomic .AtomicLong ;
85102import java .util .function .Function ;
86103
87104public class HttpAPIClientHelper {
@@ -219,7 +236,7 @@ private HttpClientConnectionManager poolConnectionManager(LayeredConnectionSocke
219236
220237
221238 int networkBufferSize = MapUtils .getInt (chConfiguration , "client_network_buffer_size" );
222- ManagedHttpClientConnectionFactory connectionFactory = new ManagedHttpClientConnectionFactory (
239+ MeteredManagedHttpClientConnectionFactory connectionFactory = new MeteredManagedHttpClientConnectionFactory (
223240 Http1Config .custom ()
224241 .setBufferSize (networkBufferSize )
225242 .build (),
@@ -238,6 +255,9 @@ private HttpClientConnectionManager poolConnectionManager(LayeredConnectionSocke
238255 Class <?> micrometerLoader = getClass ().getClassLoader ().loadClass ("com.clickhouse.client.api.metrics.MicrometerLoader" );
239256 Method applyMethod = micrometerLoader .getDeclaredMethod ("applyPoolingMetricsBinder" , Object .class , String .class , PoolingHttpClientConnectionManager .class );
240257 applyMethod .invoke (micrometerLoader , metricsRegistry , mGroupName , phccm );
258+
259+ applyMethod = micrometerLoader .getDeclaredMethod ("applyConnectionMetricsBinder" , Object .class , String .class , MeteredManagedHttpClientConnectionFactory .class );
260+ applyMethod .invoke (micrometerLoader , metricsRegistry , mGroupName , connectionFactory );
241261 } catch (Exception e ) {
242262 LOG .error ("Failed to register metrics" , e );
243263 }
@@ -758,7 +778,6 @@ public void close() {
758778 httpClient .close (CloseMode .IMMEDIATE );
759779 }
760780
761-
762781 /**
763782 * This factory is used only when no ssl connections are required (no https endpoints).
764783 * Internally http client would create factory and spend time if no supplied.
@@ -779,4 +798,34 @@ public Socket connectSocket(TimeValue connectTimeout, Socket socket, HttpHost ho
779798 return null ;
780799 }
781800 }
801+
802+ public class MeteredManagedHttpClientConnectionFactory extends ManagedHttpClientConnectionFactory {
803+ public MeteredManagedHttpClientConnectionFactory (Http1Config http1Config , CharCodingConfig charCodingConfig , DefaultHttpResponseParserFactory defaultHttpResponseParserFactory ) {
804+ super (http1Config , charCodingConfig , defaultHttpResponseParserFactory );
805+ }
806+
807+ ConcurrentLinkedQueue <Long > times = new ConcurrentLinkedQueue <>();
808+
809+
810+ @ Override
811+ public ManagedHttpClientConnection createConnection (Socket socket ) throws IOException {
812+ long startT = System .currentTimeMillis ();
813+ try {
814+ return super .createConnection (socket );
815+ } finally {
816+ long endT = System .currentTimeMillis ();
817+ times .add (endT - startT );
818+ }
819+ }
820+
821+ public long getTime () {
822+ int count = times .size ();
823+ long runningAverage = 0 ;
824+ for (int i = 0 ; i < count ; i ++) {
825+ runningAverage += times .poll ();
826+ }
827+
828+ return count > 0 ? runningAverage / count : 0 ;
829+ }
830+ }
782831}
0 commit comments