Skip to content

Commit 9ea7fc7

Browse files
committed
Default timeout for http requests to prevent hanging - ability to configurate in properties
1 parent 8f73f48 commit 9ea7fc7

File tree

5 files changed

+24
-7
lines changed

5 files changed

+24
-7
lines changed

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
3939

4040
public RetryingKafkaConnectClient(ClustersProperties.ConnectCluster config,
4141
@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
42-
DataSize maxBuffSize) {
43-
super(new RetryingApiClient(config, truststoreConfig, maxBuffSize));
42+
DataSize maxBuffSize,
43+
Duration responseTimeout) {
44+
super(new RetryingApiClient(config, truststoreConfig, maxBuffSize, responseTimeout));
4445
}
4546

4647
private static Retry conflictCodeRetry() {
@@ -318,14 +319,16 @@ private static class RetryingApiClient extends ApiClient {
318319

319320
public RetryingApiClient(ClustersProperties.ConnectCluster config,
320321
ClustersProperties.TruststoreConfig truststoreConfig,
321-
DataSize maxBuffSize) {
322-
super(buildWebClient(maxBuffSize, config, truststoreConfig), null, null);
322+
DataSize maxBuffSize,
323+
Duration responseTimeout) {
324+
super(buildWebClient(maxBuffSize, responseTimeout, config, truststoreConfig), null, null);
323325
setBasePath(config.getAddress());
324326
setUsername(config.getUsername());
325327
setPassword(config.getPassword());
326328
}
327329

328330
public static WebClient buildWebClient(DataSize maxBuffSize,
331+
Duration responseTimeout,
329332
ClustersProperties.ConnectCluster config,
330333
ClustersProperties.TruststoreConfig truststoreConfig) {
331334
return new WebClientConfigurator()
@@ -341,6 +344,7 @@ public static WebClient buildWebClient(DataSize maxBuffSize,
341344
config.getPassword()
342345
)
343346
.configureBufferSize(maxBuffSize)
347+
.configureResponseTimeout(responseTimeout)
344348
.build();
345349
}
346350
}

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public static class PollingProperties {
6868
Integer pollTimeoutMs;
6969
Integer maxPageSize;
7070
Integer defaultPageSize;
71+
Integer responseTimeoutMs;
7172
}
7273

7374
@Data

api/src/main/java/io/kafbat/ui/config/WebclientProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
public class WebclientProperties {
1414

1515
String maxInMemoryBufferSize;
16+
Integer responseTimeout;
1617

1718
@PostConstruct
1819
public void validate() {

api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.kafbat.ui.util.KafkaServicesValidation;
1717
import io.kafbat.ui.util.ReactiveFailover;
1818
import io.kafbat.ui.util.WebClientConfigurator;
19+
import java.time.Duration;
1920
import java.util.HashMap;
2021
import java.util.List;
2122
import java.util.Map;
@@ -37,13 +38,18 @@
3738
public class KafkaClusterFactory {
3839

3940
private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB");
41+
private static final Duration DEFAULT_RESPONSE_TIMEOUT = Duration.ofSeconds(20);
4042

4143
private final DataSize webClientMaxBuffSize;
44+
private final Duration responseTimeout;
4245

4346
public KafkaClusterFactory(WebclientProperties webclientProperties) {
4447
this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize())
4548
.map(DataSize::parse)
4649
.orElse(DEFAULT_WEBCLIENT_BUFFER);
50+
this.responseTimeout = Optional.ofNullable(webclientProperties.getResponseTimeout())
51+
.map(Duration::ofMillis)
52+
.orElse(DEFAULT_RESPONSE_TIMEOUT);
4753
}
4854

4955
public KafkaCluster create(ClustersProperties properties,
@@ -145,7 +151,8 @@ private ReactiveFailover<KafkaConnectClientApi> connectClient(ClustersProperties
145151
url -> new RetryingKafkaConnectClient(
146152
connectCluster.toBuilder().address(url).build(),
147153
cluster.getSsl(),
148-
webClientMaxBuffSize
154+
webClientMaxBuffSize,
155+
responseTimeout
149156
),
150157
ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
151158
"No alive connect instances available",

api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ public class WebClientConfigurator {
3232
private final WebClient.Builder builder = WebClient.builder();
3333
private HttpClient httpClient = HttpClient
3434
.create()
35-
.proxyWithSystemProperties()
36-
.responseTimeout(Duration.ofSeconds(10));
35+
.proxyWithSystemProperties();
3736

3837
public WebClientConfigurator() {
3938
configureObjectMapper(defaultOM());
@@ -146,6 +145,11 @@ public WebClientConfigurator configureCodecs(Consumer<ClientCodecConfigurer> con
146145
return this;
147146
}
148147

148+
public WebClientConfigurator configureResponseTimeout(Duration responseTimeout) {
149+
httpClient = httpClient.responseTimeout(responseTimeout);
150+
return this;
151+
}
152+
149153
public WebClient build() {
150154
return builder.clientConnector(new ReactorClientHttpConnector(httpClient)).build();
151155
}

0 commit comments

Comments
 (0)