@@ -34,17 +34,18 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
3434 private static final Duration RETRIES_DELAY = Duration .ofMillis (200 );
3535
3636 public RetryingKafkaConnectClient (ClustersProperties .ConnectCluster config ,
37- @ Nullable ClustersProperties .TruststoreConfig truststoreConfig ,
38- DataSize maxBuffSize ) {
37+ @ Nullable ClustersProperties .TruststoreConfig truststoreConfig ,
38+ DataSize maxBuffSize ) {
3939 super (new RetryingApiClient (config , truststoreConfig , maxBuffSize ));
4040 }
4141
4242 private static Retry conflictCodeRetry () {
4343 return Retry
4444 .fixedDelay (MAX_RETRIES , RETRIES_DELAY )
4545 .filter (e -> e instanceof WebClientResponseException .Conflict )
46- .onRetryExhaustedThrow ((spec , signal ) -> new KafkaConnectConflictReponseException (
47- (WebClientResponseException .Conflict ) signal .failure ()));
46+ .onRetryExhaustedThrow ((spec , signal ) ->
47+ new KafkaConnectConflictReponseException (
48+ (WebClientResponseException .Conflict ) signal .failure ()));
4849 }
4950
5051 private static <T > Mono <T > withRetryOnConflict (Mono <T > publisher ) {
@@ -57,23 +58,25 @@ private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
5758
5859 private static <T > Mono <T > withBadRequestErrorHandling (Mono <T > publisher ) {
5960 return publisher
60- .onErrorResume (WebClientResponseException .BadRequest .class ,
61- e -> Mono .error (new ValidationException ("Invalid configuration" )))
62- .onErrorResume (WebClientResponseException .InternalServerError .class ,
63- e -> Mono .error (new ValidationException ("Invalid configuration" )));
61+ .onErrorResume (WebClientResponseException .BadRequest .class , e ->
62+ Mono .error (new ValidationException ("Invalid configuration" )))
63+ .onErrorResume (WebClientResponseException .InternalServerError .class , e ->
64+ Mono .error (new ValidationException ("Invalid configuration" )));
6465 }
6566
6667 @ Override
6768 public Mono <Connector > createConnector (NewConnector newConnector ) throws RestClientException {
6869 return withBadRequestErrorHandling (
69- super .createConnector (newConnector ));
70+ super .createConnector (newConnector )
71+ );
7072 }
7173
7274 @ Override
7375 public Mono <Connector > setConnectorConfig (String connectorName , Map <String , Object > requestBody )
7476 throws RestClientException {
7577 return withBadRequestErrorHandling (
76- super .setConnectorConfig (connectorName , requestBody ));
78+ super .setConnectorConfig (connectorName , requestBody )
79+ );
7780 }
7881
7982 @ Override
@@ -93,6 +96,7 @@ public Mono<ResponseEntity<Void>> deleteConnectorWithHttpInfo(String connectorNa
9396 return withRetryOnConflict (super .deleteConnectorWithHttpInfo (connectorName ));
9497 }
9598
99+
96100 @ Override
97101 public Mono <Connector > getConnector (String connectorName ) throws WebClientResponseException {
98102 return withRetryOnConflict (super .getConnector (connectorName ));
@@ -204,7 +208,7 @@ public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, B
204208
205209 @ Override
206210 public Mono <ResponseEntity <Void >> restartConnectorWithHttpInfo (String connectorName , Boolean includeTasks ,
207- Boolean onlyFailed ) throws WebClientResponseException {
211+ Boolean onlyFailed ) throws WebClientResponseException {
208212 return withRetryOnConflict (super .restartConnectorWithHttpInfo (connectorName , includeTasks , onlyFailed ));
209213 }
210214
@@ -232,14 +236,14 @@ public Mono<ResponseEntity<Void>> resumeConnectorWithHttpInfo(String connectorNa
232236
233237 @ Override
234238 public Mono <ResponseEntity <Connector >> setConnectorConfigWithHttpInfo (String connectorName ,
235- Map <String , Object > requestBody )
239+ Map <String , Object > requestBody )
236240 throws WebClientResponseException {
237241 return withRetryOnConflict (super .setConnectorConfigWithHttpInfo (connectorName , requestBody ));
238242 }
239243
240244 @ Override
241245 public Mono <ConnectorPluginConfigValidationResponse > validateConnectorPluginConfig (String pluginName ,
242- Map <String , Object > requestBody )
246+ Map <String , Object > requestBody )
243247 throws WebClientResponseException {
244248 return withRetryOnConflict (super .validateConnectorPluginConfig (pluginName , requestBody ));
245249 }
@@ -253,26 +257,29 @@ public Mono<ResponseEntity<ConnectorPluginConfigValidationResponse>> validateCon
253257 private static class RetryingApiClient extends ApiClient {
254258
255259 public RetryingApiClient (ClustersProperties .ConnectCluster config ,
256- ClustersProperties .TruststoreConfig truststoreConfig ,
257- DataSize maxBuffSize ) {
260+ ClustersProperties .TruststoreConfig truststoreConfig ,
261+ DataSize maxBuffSize ) {
258262 super (buildWebClient (maxBuffSize , config , truststoreConfig ), null , null );
259263 setBasePath (config .getAddress ());
260264 setUsername (config .getUsername ());
261265 setPassword (config .getPassword ());
262266 }
263267
264268 public static WebClient buildWebClient (DataSize maxBuffSize ,
265- ClustersProperties .ConnectCluster config ,
266- ClustersProperties .TruststoreConfig truststoreConfig ) {
269+ ClustersProperties .ConnectCluster config ,
270+ ClustersProperties .TruststoreConfig truststoreConfig ) {
267271 return new WebClientConfigurator ()
268272 .configureSsl (
269273 truststoreConfig ,
270274 new ClustersProperties .KeystoreConfig (
271275 config .getKeystoreLocation (),
272- config .getKeystorePassword ()))
276+ config .getKeystorePassword ()
277+ )
278+ )
273279 .configureBasicAuth (
274280 config .getUsername (),
275- config .getPassword ())
281+ config .getPassword ()
282+ )
276283 .configureBufferSize (maxBuffSize )
277284 .build ();
278285 }
0 commit comments