@@ -28,26 +28,12 @@ public class ProteusZipkinHttpBridge implements ProteusTracingService {
2828 private final int port ;
2929
3030 private final String zipkinUrl ;
31-
3231 private HttpClient httpClient ;
3332
3433 public ProteusZipkinHttpBridge (String host , int port , String zipkinUrl ) {
3534 this .zipkinUrl = zipkinUrl ;
3635 this .host = host ;
3736 this .port = port ;
38- this .httpClient =
39- HttpClient .builder ()
40- .options (
41- builder ->
42- builder
43- .compression (true )
44- .poolResources (PoolResources .fixed ("proteusZipkinBridge" ))
45- .option (ChannelOption .SO_KEEPALIVE , true )
46- .option (ChannelOption .SO_TIMEOUT , 60_000 )
47- .option (ChannelOption .CONNECT_TIMEOUT_MILLIS , 30_000 )
48- .host (host )
49- .port (port ))
50- .build ();
5137 }
5238
5339 public ProteusZipkinHttpBridge (String host , int port ) {
@@ -94,6 +80,28 @@ public static void main(String... args) {
9480 proteus .onClose ().block ();
9581 }
9682
83+ private synchronized HttpClient getClient () {
84+ if (httpClient == null ) {
85+ this .httpClient =
86+ HttpClient .builder ()
87+ .options (
88+ builder ->
89+ builder
90+ .compression (true )
91+ .poolResources (PoolResources .fixed ("proteusZipkinBridge" ))
92+ .option (ChannelOption .SO_KEEPALIVE , true )
93+ .option (ChannelOption .CONNECT_TIMEOUT_MILLIS , 30_000 )
94+ .host (host )
95+ .port (port ))
96+ .build ();
97+ }
98+ return httpClient ;
99+ }
100+
101+ private synchronized void resetHttpClient () {
102+ this .httpClient = null ;
103+ }
104+
97105 @ Override
98106 public Mono <Ack > streamSpans (Publisher <Span > messages , ByteBuf metadata ) {
99107 return Flux .from (messages )
@@ -119,22 +127,19 @@ public Mono<Ack> streamSpans(Publisher<Span> messages, ByteBuf metadata) {
119127 .flatMap (stringMono -> stringMono )
120128 .concatMap (
121129 spans ->
122- httpClient
130+ getClient ()
123131 .post (
124132 zipkinUrl ,
125133 request -> {
126134 request .addHeader ("Content-Type" , "application/json" );
127135 return request .sendString (Mono .just (spans ));
128136 })
129- .doOnError (
130- throwable ->
131- logger .error (
132- "error sending data to tracing data to url "
133- + zipkinUrl
134- + " and payload [\n " + spans +"\n ]" ,
135- throwable ))
136- .timeout (Duration .ofSeconds (30 )),
137- 256 )
137+ .timeout (Duration .ofSeconds (30 ))
138+ .doOnError (throwable -> resetHttpClient ()),
139+ 8 )
140+ .doOnError (
141+ throwable ->
142+ logger .error ("error sending data to tracing data to url " + zipkinUrl , throwable ))
138143 .then (Mono .never ());
139144 }
140145}
0 commit comments