2222import java .util .Objects ;
2323import javax .annotation .Nullable ;
2424import lombok .extern .slf4j .Slf4j ;
25+ import org .apache .commons .lang3 .StringUtils ;
2526import org .springframework .http .ResponseEntity ;
2627import org .springframework .util .unit .DataSize ;
2728import org .springframework .web .client .RestClientException ;
@@ -51,14 +52,36 @@ private static Retry conflictCodeRetry() {
5152 (WebClientResponseException .Conflict ) signal .failure ()));
5253 }
5354
54- private static <T > Mono <T > withRetryOnConflict (Mono <T > publisher ) {
55- return publisher .retryWhen (conflictCodeRetry ());
55+ private static @ NotNull Retry retryOnRebalance () {
56+ return Retry .fixedDelay (MAX_RETRIES , RETRIES_DELAY ).filter (e -> {
57+
58+ if (e instanceof WebClientResponseException .InternalServerError exception ) {
59+ final var errorMessage = getMessage (exception );
60+ return StringUtils .equals (errorMessage ,
61+ // From https://github.com/apache/kafka/blob/dfc07e0e0c6e737a56a5402644265f634402b864/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2340
62+ "Request cannot be completed because a rebalance is expected" );
63+ }
64+ return false ;
65+ });
66+ }
67+
68+ private static <T > Mono <T > withRetryOnConflictOrRebalance (Mono <T > publisher ) {
69+ return publisher
70+ .retryWhen (retryOnRebalance ())
71+ .retryWhen (conflictCodeRetry ());
72+ }
73+
74+ private static <T > Flux <T > withRetryOnConflictOrRebalance (Flux <T > publisher ) {
75+ return publisher
76+ .retryWhen (retryOnRebalance ())
77+ .retryWhen (conflictCodeRetry ());
5678 }
5779
58- private static <T > Flux <T > withRetryOnConflict ( Flux <T > publisher ) {
59- return publisher .retryWhen (conflictCodeRetry ());
80+ private static <T > Mono <T > withRetryOnRebalance ( Mono <T > publisher ) {
81+ return publisher .retryWhen (retryOnRebalance ());
6082 }
6183
84+
6285 private static <T > Mono <T > withBadRequestErrorHandling (Mono <T > publisher ) {
6386 return publisher
6487 .onErrorResume (WebClientResponseException .BadRequest .class ,
@@ -73,197 +96,200 @@ private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
7396 }
7497
7598 private static <T > @ NotNull Mono <T > parseConnectErrorMessage (WebClientResponseException parseException ) {
99+ return Mono .error (new ValidationException (getMessage (parseException )));
100+ }
101+
102+ private static String getMessage (WebClientResponseException parseException ) {
76103 final var errorMessage = parseException .getResponseBodyAs (ErrorMessage .class );
77- return Mono .error (new ValidationException (
78- Objects .requireNonNull (errorMessage ,
79- // see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
80- "This should not happen according to the ConnectExceptionMapper" )
81- .message ()));
104+ return Objects .requireNonNull (errorMessage ,
105+ // see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
106+ "This should not happen according to the ConnectExceptionMapper" )
107+ .message ();
82108 }
83109
84110 @ Override
85111 public Mono <Connector > createConnector (NewConnector newConnector ) throws RestClientException {
86112 return withBadRequestErrorHandling (
87- super .createConnector (newConnector )
113+ withRetryOnRebalance ( super .createConnector (newConnector ) )
88114 );
89115 }
90116
91117 @ Override
92118 public Mono <Connector > setConnectorConfig (String connectorName , Map <String , Object > requestBody )
93119 throws RestClientException {
94120 return withBadRequestErrorHandling (
95- super .setConnectorConfig (connectorName , requestBody )
121+ withRetryOnRebalance ( super .setConnectorConfig (connectorName , requestBody ) )
96122 );
97123 }
98124
99125 @ Override
100126 public Mono <ResponseEntity <Connector >> createConnectorWithHttpInfo (NewConnector newConnector )
101127 throws WebClientResponseException {
102- return withRetryOnConflict (super .createConnectorWithHttpInfo (newConnector ));
128+ return withRetryOnConflictOrRebalance (super .createConnectorWithHttpInfo (newConnector ));
103129 }
104130
105131 @ Override
106132 public Mono <Void > deleteConnector (String connectorName ) throws WebClientResponseException {
107- return withRetryOnConflict (super .deleteConnector (connectorName ));
133+ return withRetryOnConflictOrRebalance (super .deleteConnector (connectorName ));
108134 }
109135
110136 @ Override
111137 public Mono <ResponseEntity <Void >> deleteConnectorWithHttpInfo (String connectorName )
112138 throws WebClientResponseException {
113- return withRetryOnConflict (super .deleteConnectorWithHttpInfo (connectorName ));
139+ return withRetryOnConflictOrRebalance (super .deleteConnectorWithHttpInfo (connectorName ));
114140 }
115141
116142
117143 @ Override
118144 public Mono <Connector > getConnector (String connectorName ) throws WebClientResponseException {
119- return withRetryOnConflict (super .getConnector (connectorName ));
145+ return withRetryOnConflictOrRebalance (super .getConnector (connectorName ));
120146 }
121147
122148 @ Override
123149 public Mono <ResponseEntity <Connector >> getConnectorWithHttpInfo (String connectorName )
124150 throws WebClientResponseException {
125- return withRetryOnConflict (super .getConnectorWithHttpInfo (connectorName ));
151+ return withRetryOnConflictOrRebalance (super .getConnectorWithHttpInfo (connectorName ));
126152 }
127153
128154 @ Override
129155 public Mono <Map <String , Object >> getConnectorConfig (String connectorName ) throws WebClientResponseException {
130- return withRetryOnConflict (super .getConnectorConfig (connectorName ));
156+ return withRetryOnConflictOrRebalance (super .getConnectorConfig (connectorName ));
131157 }
132158
133159 @ Override
134160 public Mono <ResponseEntity <Map <String , Object >>> getConnectorConfigWithHttpInfo (String connectorName )
135161 throws WebClientResponseException {
136- return withRetryOnConflict (super .getConnectorConfigWithHttpInfo (connectorName ));
162+ return withRetryOnConflictOrRebalance (super .getConnectorConfigWithHttpInfo (connectorName ));
137163 }
138164
139165 @ Override
140166 public Flux <ConnectorPlugin > getConnectorPlugins () throws WebClientResponseException {
141- return withRetryOnConflict (super .getConnectorPlugins ());
167+ return withRetryOnConflictOrRebalance (super .getConnectorPlugins ());
142168 }
143169
144170 @ Override
145171 public Mono <ResponseEntity <List <ConnectorPlugin >>> getConnectorPluginsWithHttpInfo ()
146172 throws WebClientResponseException {
147- return withRetryOnConflict (super .getConnectorPluginsWithHttpInfo ());
173+ return withRetryOnConflictOrRebalance (super .getConnectorPluginsWithHttpInfo ());
148174 }
149175
150176 @ Override
151177 public Mono <ConnectorStatus > getConnectorStatus (String connectorName ) throws WebClientResponseException {
152- return withRetryOnConflict (super .getConnectorStatus (connectorName ));
178+ return withRetryOnConflictOrRebalance (super .getConnectorStatus (connectorName ));
153179 }
154180
155181 @ Override
156182 public Mono <ResponseEntity <ConnectorStatus >> getConnectorStatusWithHttpInfo (String connectorName )
157183 throws WebClientResponseException {
158- return withRetryOnConflict (super .getConnectorStatusWithHttpInfo (connectorName ));
184+ return withRetryOnConflictOrRebalance (super .getConnectorStatusWithHttpInfo (connectorName ));
159185 }
160186
161187 @ Override
162188 public Mono <TaskStatus > getConnectorTaskStatus (String connectorName , Integer taskId )
163189 throws WebClientResponseException {
164- return withRetryOnConflict (super .getConnectorTaskStatus (connectorName , taskId ));
190+ return withRetryOnConflictOrRebalance (super .getConnectorTaskStatus (connectorName , taskId ));
165191 }
166192
167193 @ Override
168194 public Mono <ResponseEntity <TaskStatus >> getConnectorTaskStatusWithHttpInfo (String connectorName , Integer taskId )
169195 throws WebClientResponseException {
170- return withRetryOnConflict (super .getConnectorTaskStatusWithHttpInfo (connectorName , taskId ));
196+ return withRetryOnConflictOrRebalance (super .getConnectorTaskStatusWithHttpInfo (connectorName , taskId ));
171197 }
172198
173199 @ Override
174200 public Flux <ConnectorTask > getConnectorTasks (String connectorName ) throws WebClientResponseException {
175- return withRetryOnConflict (super .getConnectorTasks (connectorName ));
201+ return withRetryOnConflictOrRebalance (super .getConnectorTasks (connectorName ));
176202 }
177203
178204 @ Override
179205 public Mono <ResponseEntity <List <ConnectorTask >>> getConnectorTasksWithHttpInfo (String connectorName )
180206 throws WebClientResponseException {
181- return withRetryOnConflict (super .getConnectorTasksWithHttpInfo (connectorName ));
207+ return withRetryOnConflictOrRebalance (super .getConnectorTasksWithHttpInfo (connectorName ));
182208 }
183209
184210 @ Override
185211 public Mono <Map <String , ConnectorTopics >> getConnectorTopics (String connectorName ) throws WebClientResponseException {
186- return withRetryOnConflict (super .getConnectorTopics (connectorName ));
212+ return withRetryOnConflictOrRebalance (super .getConnectorTopics (connectorName ));
187213 }
188214
189215 @ Override
190216 public Mono <ResponseEntity <Map <String , ConnectorTopics >>> getConnectorTopicsWithHttpInfo (String connectorName )
191217 throws WebClientResponseException {
192- return withRetryOnConflict (super .getConnectorTopicsWithHttpInfo (connectorName ));
218+ return withRetryOnConflictOrRebalance (super .getConnectorTopicsWithHttpInfo (connectorName ));
193219 }
194220
195221 @ Override
196222 public Mono <List <String >> getConnectors (String search ) throws WebClientResponseException {
197- return withRetryOnConflict (super .getConnectors (search ));
223+ return withRetryOnConflictOrRebalance (super .getConnectors (search ));
198224 }
199225
200226 @ Override
201227 public Mono <ResponseEntity <List <String >>> getConnectorsWithHttpInfo (String search ) throws WebClientResponseException {
202- return withRetryOnConflict (super .getConnectorsWithHttpInfo (search ));
228+ return withRetryOnConflictOrRebalance (super .getConnectorsWithHttpInfo (search ));
203229 }
204230
205231 @ Override
206232 public Mono <Void > pauseConnector (String connectorName ) throws WebClientResponseException {
207- return withRetryOnConflict (super .pauseConnector (connectorName ));
233+ return withRetryOnConflictOrRebalance (super .pauseConnector (connectorName ));
208234 }
209235
210236 @ Override
211237 public Mono <ResponseEntity <Void >> pauseConnectorWithHttpInfo (String connectorName ) throws WebClientResponseException {
212- return withRetryOnConflict (super .pauseConnectorWithHttpInfo (connectorName ));
238+ return withRetryOnConflictOrRebalance (super .pauseConnectorWithHttpInfo (connectorName ));
213239 }
214240
215241 @ Override
216242 public Mono <Void > restartConnector (String connectorName , Boolean includeTasks , Boolean onlyFailed )
217243 throws WebClientResponseException {
218- return withRetryOnConflict (super .restartConnector (connectorName , includeTasks , onlyFailed ));
244+ return withRetryOnConflictOrRebalance (super .restartConnector (connectorName , includeTasks , onlyFailed ));
219245 }
220246
221247 @ Override
222248 public Mono <ResponseEntity <Void >> restartConnectorWithHttpInfo (String connectorName , Boolean includeTasks ,
223249 Boolean onlyFailed ) throws WebClientResponseException {
224- return withRetryOnConflict (super .restartConnectorWithHttpInfo (connectorName , includeTasks , onlyFailed ));
250+ return withRetryOnConflictOrRebalance (super .restartConnectorWithHttpInfo (connectorName , includeTasks , onlyFailed ));
225251 }
226252
227253 @ Override
228254 public Mono <Void > restartConnectorTask (String connectorName , Integer taskId ) throws WebClientResponseException {
229- return withRetryOnConflict (super .restartConnectorTask (connectorName , taskId ));
255+ return withRetryOnConflictOrRebalance (super .restartConnectorTask (connectorName , taskId ));
230256 }
231257
232258 @ Override
233259 public Mono <ResponseEntity <Void >> restartConnectorTaskWithHttpInfo (String connectorName , Integer taskId )
234260 throws WebClientResponseException {
235- return withRetryOnConflict (super .restartConnectorTaskWithHttpInfo (connectorName , taskId ));
261+ return withRetryOnConflictOrRebalance (super .restartConnectorTaskWithHttpInfo (connectorName , taskId ));
236262 }
237263
238264 @ Override
239265 public Mono <Void > resumeConnector (String connectorName ) throws WebClientResponseException {
240- return super .resumeConnector (connectorName );
266+ return withRetryOnRebalance ( super .resumeConnector (connectorName ) );
241267 }
242268
243269 @ Override
244270 public Mono <ResponseEntity <Void >> resumeConnectorWithHttpInfo (String connectorName )
245271 throws WebClientResponseException {
246- return withRetryOnConflict (super .resumeConnectorWithHttpInfo (connectorName ));
272+ return withRetryOnConflictOrRebalance (super .resumeConnectorWithHttpInfo (connectorName ));
247273 }
248274
249275 @ Override
250276 public Mono <ResponseEntity <Connector >> setConnectorConfigWithHttpInfo (String connectorName ,
251277 Map <String , Object > requestBody )
252278 throws WebClientResponseException {
253- return withRetryOnConflict (super .setConnectorConfigWithHttpInfo (connectorName , requestBody ));
279+ return withRetryOnConflictOrRebalance (super .setConnectorConfigWithHttpInfo (connectorName , requestBody ));
254280 }
255281
256282 @ Override
257283 public Mono <ConnectorPluginConfigValidationResponse > validateConnectorPluginConfig (String pluginName ,
258284 Map <String , Object > requestBody )
259285 throws WebClientResponseException {
260- return withRetryOnConflict (super .validateConnectorPluginConfig (pluginName , requestBody ));
286+ return withRetryOnConflictOrRebalance (super .validateConnectorPluginConfig (pluginName , requestBody ));
261287 }
262288
263289 @ Override
264290 public Mono <ResponseEntity <ConnectorPluginConfigValidationResponse >> validateConnectorPluginConfigWithHttpInfo (
265291 String pluginName , Map <String , Object > requestBody ) throws WebClientResponseException {
266- return withRetryOnConflict (super .validateConnectorPluginConfigWithHttpInfo (pluginName , requestBody ));
292+ return withRetryOnConflictOrRebalance (super .validateConnectorPluginConfigWithHttpInfo (pluginName , requestBody ));
267293 }
268294
269295 private static class RetryingApiClient extends ApiClient {
0 commit comments