Skip to content

Commit a7e57e0

Browse files
wonwoobclozel
authored andcommitted
Configure codec buffer size in ES Reactive Rest client
This commit adds a new configuration property `"spring.data.elasticsearch.client.reactive.max-in-memory-size"` which configures the maximum amount of memory buffered by the `WebClient` used by the Reactive ElasticSearch client. See gh-20205
1 parent 2815e6e commit a7e57e0

File tree

3 files changed

+32
-0
lines changed

3 files changed

+32
-0
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientAutoConfiguration.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
3030
import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients;
3131
import org.springframework.http.HttpHeaders;
32+
import org.springframework.util.unit.DataSize;
33+
import org.springframework.web.reactive.function.client.ExchangeStrategies;
3234
import org.springframework.web.reactive.function.client.WebClient;
3335

3436
/**
@@ -52,6 +54,7 @@ public ClientConfiguration clientConfiguration(ReactiveRestClientProperties prop
5254
builder.usingSsl();
5355
}
5456
configureTimeouts(builder, properties);
57+
configureWebClient(builder, properties);
5558
return builder.build();
5659
}
5760

@@ -67,6 +70,19 @@ private void configureTimeouts(ClientConfiguration.TerminalClientConfigurationBu
6770
});
6871
}
6972

73+
private void configureWebClient(ClientConfiguration.TerminalClientConfigurationBuilder builder,
74+
ReactiveRestClientProperties properties) {
75+
PropertyMapper map = PropertyMapper.get();
76+
builder.withWebClientConfigurer((webClient) -> {
77+
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
78+
.codecs((configurer) -> map.from(properties.getMaxInMemorySize()).whenNonNull()
79+
.asInt(DataSize::toBytes)
80+
.to((maxInMemorySize) -> configurer.defaultCodecs().maxInMemorySize(maxInMemorySize)))
81+
.build();
82+
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
83+
});
84+
}
85+
7086
@Bean
7187
@ConditionalOnMissingBean
7288
public ReactiveElasticsearchClient reactiveElasticsearchClient(ClientConfiguration clientConfiguration) {

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientProperties.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323

2424
import org.springframework.boot.context.properties.ConfigurationProperties;
25+
import org.springframework.util.unit.DataSize;
2526

2627
/**
2728
* Configuration properties for Elasticsearch Reactive REST clients.
@@ -62,6 +63,12 @@ public class ReactiveRestClientProperties {
6263
*/
6364
private Duration socketTimeout;
6465

66+
/**
67+
* Limit on the number of bytes that can be buffered whenever the input stream needs
68+
* to be aggregated.
69+
*/
70+
private DataSize maxInMemorySize;
71+
6572
public List<String> getEndpoints() {
6673
return this.endpoints;
6774
}
@@ -110,4 +117,12 @@ public void setSocketTimeout(Duration socketTimeout) {
110117
this.socketTimeout = socketTimeout;
111118
}
112119

120+
public DataSize getMaxInMemorySize() {
121+
return this.maxInMemorySize;
122+
}
123+
124+
public void setMaxInMemorySize(DataSize maxInMemorySize) {
125+
this.maxInMemorySize = maxInMemorySize;
126+
}
127+
113128
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/data/elasticsearch/ReactiveRestClientAutoConfigurationTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ void restClientCanQueryElasticsearchNode() {
7777
this.contextRunner.withPropertyValues(
7878
"spring.data.elasticsearch.client.reactive.endpoints=" + elasticsearch.getContainerIpAddress() + ":"
7979
+ elasticsearch.getFirstMappedPort(),
80+
"spring.data.elasticsearch.client.reactive.max-in-memory-size=-1",
8081
"spring.data.elasticsearch.client.reactive.connection-timeout=120s",
8182
"spring.data.elasticsearch.client.reactive.socket-timeout=120s").run((context) -> {
8283
ReactiveElasticsearchClient client = context.getBean(ReactiveElasticsearchClient.class);

0 commit comments

Comments
 (0)