Skip to content

Commit c67dcd4

Browse files
committed
revert
1 parent 9735710 commit c67dcd4

File tree

1 file changed

+68
-41
lines changed

1 file changed

+68
-41
lines changed

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

Lines changed: 68 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Objects;
2323
import javax.annotation.Nullable;
2424
import lombok.extern.slf4j.Slf4j;
25+
import org.apache.commons.lang3.StringUtils;
2526
import org.springframework.http.ResponseEntity;
2627
import org.springframework.util.unit.DataSize;
2728
import org.springframework.web.client.RestClientException;
@@ -51,14 +52,36 @@ private static Retry conflictCodeRetry() {
5152
(WebClientResponseException.Conflict) signal.failure()));
5253
}
5354

54-
private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
55-
return publisher.retryWhen(conflictCodeRetry());
55+
private static @NotNull Retry retryOnRebalance() {
56+
return Retry.fixedDelay(MAX_RETRIES, RETRIES_DELAY).filter(e -> {
57+
58+
if (e instanceof WebClientResponseException.InternalServerError exception) {
59+
final var errorMessage = getMessage(exception);
60+
return StringUtils.equals(errorMessage,
61+
// From https://github.com/apache/kafka/blob/dfc07e0e0c6e737a56a5402644265f634402b864/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2340
62+
"Request cannot be completed because a rebalance is expected");
63+
}
64+
return false;
65+
});
66+
}
67+
68+
private static <T> Mono<T> withRetryOnConflictOrRebalance(Mono<T> publisher) {
69+
return publisher
70+
.retryWhen(retryOnRebalance())
71+
.retryWhen(conflictCodeRetry());
5672
}
5773

58-
private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
59-
return publisher.retryWhen(conflictCodeRetry());
74+
private static <T> Flux<T> withRetryOnConflictOrRebalance(Flux<T> publisher) {
75+
return publisher
76+
.retryWhen(retryOnRebalance())
77+
.retryWhen(conflictCodeRetry());
6078
}
6179

80+
private static <T> Mono<T> withRetryOnRebalance(Mono<T> publisher) {
81+
return publisher.retryWhen(retryOnRebalance());
82+
}
83+
84+
6285
private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
6386
return publisher
6487
.onErrorResume(WebClientResponseException.BadRequest.class,
@@ -73,197 +96,200 @@ private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
7396
}
7497

7598
private static <T> @NotNull Mono<T> parseConnectErrorMessage(WebClientResponseException parseException) {
99+
return Mono.error(new ValidationException(getMessage(parseException)));
100+
}
101+
102+
private static String getMessage(WebClientResponseException parseException) {
76103
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
77-
return Mono.error(new ValidationException(
78-
Objects.requireNonNull(errorMessage,
79-
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
80-
"This should not happen according to the ConnectExceptionMapper")
81-
.message()));
104+
return Objects.requireNonNull(errorMessage,
105+
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
106+
"This should not happen according to the ConnectExceptionMapper")
107+
.message();
82108
}
83109

84110
@Override
85111
public Mono<Connector> createConnector(NewConnector newConnector) throws RestClientException {
86112
return withBadRequestErrorHandling(
87-
super.createConnector(newConnector)
113+
withRetryOnRebalance(super.createConnector(newConnector))
88114
);
89115
}
90116

91117
@Override
92118
public Mono<Connector> setConnectorConfig(String connectorName, Map<String, Object> requestBody)
93119
throws RestClientException {
94120
return withBadRequestErrorHandling(
95-
super.setConnectorConfig(connectorName, requestBody)
121+
withRetryOnRebalance(super.setConnectorConfig(connectorName, requestBody))
96122
);
97123
}
98124

99125
@Override
100126
public Mono<ResponseEntity<Connector>> createConnectorWithHttpInfo(NewConnector newConnector)
101127
throws WebClientResponseException {
102-
return withRetryOnConflict(super.createConnectorWithHttpInfo(newConnector));
128+
return withRetryOnConflictOrRebalance(super.createConnectorWithHttpInfo(newConnector));
103129
}
104130

105131
@Override
106132
public Mono<Void> deleteConnector(String connectorName) throws WebClientResponseException {
107-
return withRetryOnConflict(super.deleteConnector(connectorName));
133+
return withRetryOnConflictOrRebalance(super.deleteConnector(connectorName));
108134
}
109135

110136
@Override
111137
public Mono<ResponseEntity<Void>> deleteConnectorWithHttpInfo(String connectorName)
112138
throws WebClientResponseException {
113-
return withRetryOnConflict(super.deleteConnectorWithHttpInfo(connectorName));
139+
return withRetryOnConflictOrRebalance(super.deleteConnectorWithHttpInfo(connectorName));
114140
}
115141

116142

117143
@Override
118144
public Mono<Connector> getConnector(String connectorName) throws WebClientResponseException {
119-
return withRetryOnConflict(super.getConnector(connectorName));
145+
return withRetryOnConflictOrRebalance(super.getConnector(connectorName));
120146
}
121147

122148
@Override
123149
public Mono<ResponseEntity<Connector>> getConnectorWithHttpInfo(String connectorName)
124150
throws WebClientResponseException {
125-
return withRetryOnConflict(super.getConnectorWithHttpInfo(connectorName));
151+
return withRetryOnConflictOrRebalance(super.getConnectorWithHttpInfo(connectorName));
126152
}
127153

128154
@Override
129155
public Mono<Map<String, Object>> getConnectorConfig(String connectorName) throws WebClientResponseException {
130-
return withRetryOnConflict(super.getConnectorConfig(connectorName));
156+
return withRetryOnConflictOrRebalance(super.getConnectorConfig(connectorName));
131157
}
132158

133159
@Override
134160
public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfigWithHttpInfo(String connectorName)
135161
throws WebClientResponseException {
136-
return withRetryOnConflict(super.getConnectorConfigWithHttpInfo(connectorName));
162+
return withRetryOnConflictOrRebalance(super.getConnectorConfigWithHttpInfo(connectorName));
137163
}
138164

139165
@Override
140166
public Flux<ConnectorPlugin> getConnectorPlugins() throws WebClientResponseException {
141-
return withRetryOnConflict(super.getConnectorPlugins());
167+
return withRetryOnConflictOrRebalance(super.getConnectorPlugins());
142168
}
143169

144170
@Override
145171
public Mono<ResponseEntity<List<ConnectorPlugin>>> getConnectorPluginsWithHttpInfo()
146172
throws WebClientResponseException {
147-
return withRetryOnConflict(super.getConnectorPluginsWithHttpInfo());
173+
return withRetryOnConflictOrRebalance(super.getConnectorPluginsWithHttpInfo());
148174
}
149175

150176
@Override
151177
public Mono<ConnectorStatus> getConnectorStatus(String connectorName) throws WebClientResponseException {
152-
return withRetryOnConflict(super.getConnectorStatus(connectorName));
178+
return withRetryOnConflictOrRebalance(super.getConnectorStatus(connectorName));
153179
}
154180

155181
@Override
156182
public Mono<ResponseEntity<ConnectorStatus>> getConnectorStatusWithHttpInfo(String connectorName)
157183
throws WebClientResponseException {
158-
return withRetryOnConflict(super.getConnectorStatusWithHttpInfo(connectorName));
184+
return withRetryOnConflictOrRebalance(super.getConnectorStatusWithHttpInfo(connectorName));
159185
}
160186

161187
@Override
162188
public Mono<TaskStatus> getConnectorTaskStatus(String connectorName, Integer taskId)
163189
throws WebClientResponseException {
164-
return withRetryOnConflict(super.getConnectorTaskStatus(connectorName, taskId));
190+
return withRetryOnConflictOrRebalance(super.getConnectorTaskStatus(connectorName, taskId));
165191
}
166192

167193
@Override
168194
public Mono<ResponseEntity<TaskStatus>> getConnectorTaskStatusWithHttpInfo(String connectorName, Integer taskId)
169195
throws WebClientResponseException {
170-
return withRetryOnConflict(super.getConnectorTaskStatusWithHttpInfo(connectorName, taskId));
196+
return withRetryOnConflictOrRebalance(super.getConnectorTaskStatusWithHttpInfo(connectorName, taskId));
171197
}
172198

173199
@Override
174200
public Flux<ConnectorTask> getConnectorTasks(String connectorName) throws WebClientResponseException {
175-
return withRetryOnConflict(super.getConnectorTasks(connectorName));
201+
return withRetryOnConflictOrRebalance(super.getConnectorTasks(connectorName));
176202
}
177203

178204
@Override
179205
public Mono<ResponseEntity<List<ConnectorTask>>> getConnectorTasksWithHttpInfo(String connectorName)
180206
throws WebClientResponseException {
181-
return withRetryOnConflict(super.getConnectorTasksWithHttpInfo(connectorName));
207+
return withRetryOnConflictOrRebalance(super.getConnectorTasksWithHttpInfo(connectorName));
182208
}
183209

184210
@Override
185211
public Mono<Map<String, ConnectorTopics>> getConnectorTopics(String connectorName) throws WebClientResponseException {
186-
return withRetryOnConflict(super.getConnectorTopics(connectorName));
212+
return withRetryOnConflictOrRebalance(super.getConnectorTopics(connectorName));
187213
}
188214

189215
@Override
190216
public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWithHttpInfo(String connectorName)
191217
throws WebClientResponseException {
192-
return withRetryOnConflict(super.getConnectorTopicsWithHttpInfo(connectorName));
218+
return withRetryOnConflictOrRebalance(super.getConnectorTopicsWithHttpInfo(connectorName));
193219
}
194220

195221
@Override
196222
public Mono<List<String>> getConnectors(String search) throws WebClientResponseException {
197-
return withRetryOnConflict(super.getConnectors(search));
223+
return withRetryOnConflictOrRebalance(super.getConnectors(search));
198224
}
199225

200226
@Override
201227
public Mono<ResponseEntity<List<String>>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException {
202-
return withRetryOnConflict(super.getConnectorsWithHttpInfo(search));
228+
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search));
203229
}
204230

205231
@Override
206232
public Mono<Void> pauseConnector(String connectorName) throws WebClientResponseException {
207-
return withRetryOnConflict(super.pauseConnector(connectorName));
233+
return withRetryOnConflictOrRebalance(super.pauseConnector(connectorName));
208234
}
209235

210236
@Override
211237
public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
212-
return withRetryOnConflict(super.pauseConnectorWithHttpInfo(connectorName));
238+
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
213239
}
214240

215241
@Override
216242
public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
217243
throws WebClientResponseException {
218-
return withRetryOnConflict(super.restartConnector(connectorName, includeTasks, onlyFailed));
244+
return withRetryOnConflictOrRebalance(super.restartConnector(connectorName, includeTasks, onlyFailed));
219245
}
220246

221247
@Override
222248
public Mono<ResponseEntity<Void>> restartConnectorWithHttpInfo(String connectorName, Boolean includeTasks,
223249
Boolean onlyFailed) throws WebClientResponseException {
224-
return withRetryOnConflict(super.restartConnectorWithHttpInfo(connectorName, includeTasks, onlyFailed));
250+
return withRetryOnConflictOrRebalance(super.restartConnectorWithHttpInfo(connectorName, includeTasks, onlyFailed));
225251
}
226252

227253
@Override
228254
public Mono<Void> restartConnectorTask(String connectorName, Integer taskId) throws WebClientResponseException {
229-
return withRetryOnConflict(super.restartConnectorTask(connectorName, taskId));
255+
return withRetryOnConflictOrRebalance(super.restartConnectorTask(connectorName, taskId));
230256
}
231257

232258
@Override
233259
public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connectorName, Integer taskId)
234260
throws WebClientResponseException {
235-
return withRetryOnConflict(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
261+
return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
236262
}
237263

238264
@Override
239265
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
240-
return super.resumeConnector(connectorName);
266+
return withRetryOnRebalance(super.resumeConnector(connectorName));
241267
}
242268

243269
@Override
244270
public Mono<ResponseEntity<Void>> resumeConnectorWithHttpInfo(String connectorName)
245271
throws WebClientResponseException {
246-
return withRetryOnConflict(super.resumeConnectorWithHttpInfo(connectorName));
272+
return withRetryOnConflictOrRebalance(super.resumeConnectorWithHttpInfo(connectorName));
247273
}
248274

249275
@Override
250276
public Mono<ResponseEntity<Connector>> setConnectorConfigWithHttpInfo(String connectorName,
251277
Map<String, Object> requestBody)
252278
throws WebClientResponseException {
253-
return withRetryOnConflict(super.setConnectorConfigWithHttpInfo(connectorName, requestBody));
279+
return withRetryOnConflictOrRebalance(super.setConnectorConfigWithHttpInfo(connectorName, requestBody));
254280
}
255281

256282
@Override
257283
public Mono<ConnectorPluginConfigValidationResponse> validateConnectorPluginConfig(String pluginName,
258284
Map<String, Object> requestBody)
259285
throws WebClientResponseException {
260-
return withRetryOnConflict(super.validateConnectorPluginConfig(pluginName, requestBody));
286+
return withRetryOnConflictOrRebalance(super.validateConnectorPluginConfig(pluginName, requestBody));
261287
}
262288

263289
@Override
264290
public Mono<ResponseEntity<ConnectorPluginConfigValidationResponse>> validateConnectorPluginConfigWithHttpInfo(
265291
String pluginName, Map<String, Object> requestBody) throws WebClientResponseException {
266-
return withRetryOnConflict(super.validateConnectorPluginConfigWithHttpInfo(pluginName, requestBody));
292+
return withRetryOnConflictOrRebalance(super.validateConnectorPluginConfigWithHttpInfo(pluginName, requestBody));
267293
}
268294

269295
private static class RetryingApiClient extends ApiClient {
@@ -295,5 +321,6 @@ public static WebClient buildWebClient(DataSize maxBuffSize,
295321
.configureBufferSize(maxBuffSize)
296322
.build();
297323
}
324+
298325
}
299326
}

0 commit comments

Comments
 (0)