Skip to content

Commit e02f987

Browse files
authored
Merge branch 'main' into pnpm-9.15.4
2 parents 51bbbfb + a159ef6 commit e02f987

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+555
-171
lines changed

.github/workflows/frontend_tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ jobs:
2828
- name: Install node
2929
uses: actions/[email protected]
3030
with:
31-
node-version: "18.17.1"
31+
node-version: "22.12.0"
3232
cache: "pnpm"
3333
cache-dependency-path: "./frontend/pnpm-lock.yaml"
3434

.mvn/jvm.config

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
-Djava.net.useSystemProxies=true

api/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,7 @@
492492
</goals>
493493
<configuration>
494494
<arguments>build</arguments>
495+
<pnpmInheritsProxyConfigFromMaven>false</pnpmInheritsProxyConfigFromMaven>
495496
</configuration>
496497
</execution>
497498
</executions>

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,16 @@ public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorNam
238238
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
239239
}
240240

241+
@Override
242+
public Mono<Void> stopConnector(String connectorName) throws WebClientResponseException {
243+
return withRetryOnConflictOrRebalance(super.stopConnector(connectorName));
244+
}
245+
246+
@Override
247+
public Mono<ResponseEntity<Void>> stopConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
248+
return withRetryOnConflictOrRebalance(super.stopConnectorWithHttpInfo(connectorName));
249+
}
250+
241251
@Override
242252
public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
243253
throws WebClientResponseException {
@@ -261,6 +271,18 @@ public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connec
261271
return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
262272
}
263273

274+
@Override
275+
public Mono<Void> resetConnectorOffsets(String connectorName)
276+
throws WebClientResponseException {
277+
return withRetryOnConflictOrRebalance(super.resetConnectorOffsets(connectorName));
278+
}
279+
280+
@Override
281+
public Mono<ResponseEntity<Void>> resetConnectorOffsetsWithHttpInfo(String connectorName)
282+
throws WebClientResponseException {
283+
return withRetryOnConflictOrRebalance(super.resetConnectorOffsetsWithHttpInfo(connectorName));
284+
}
285+
264286
@Override
265287
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
266288
return withRetryOnRebalance(super.resumeConnector(connectorName));

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART;
44
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
55
import static io.kafbat.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
6+
import static io.kafbat.ui.model.rbac.permission.ConnectAction.RESET_OFFSETS;
7+
import static io.kafbat.ui.model.rbac.permission.ConnectAction.VIEW;
68

79
import io.kafbat.ui.api.KafkaConnectApi;
810
import io.kafbat.ui.model.ConnectDTO;
@@ -285,4 +287,23 @@ private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumn
285287
default -> defaultComparator;
286288
};
287289
}
290+
291+
@Override
292+
public Mono<ResponseEntity<Void>> resetConnectorOffsets(String clusterName, String connectName,
293+
String connectorName,
294+
ServerWebExchange exchange) {
295+
296+
var context = AccessContext.builder()
297+
.cluster(clusterName)
298+
.connectActions(connectName, VIEW, RESET_OFFSETS)
299+
.operationName("resetConnectorOffsets")
300+
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
301+
.build();
302+
303+
return validateAccess(context).then(
304+
kafkaConnectService
305+
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
306+
.map(ResponseEntity::ok))
307+
.doOnEach(sig -> audit(context, sig));
308+
}
288309
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.kafbat.ui.exception;
2+
3+
public class ConnectorOffsetsResetException extends CustomBaseException {
4+
5+
public ConnectorOffsetsResetException(String message) {
6+
super(message);
7+
}
8+
9+
@Override
10+
public ErrorCode getErrorCode() {
11+
return ErrorCode.CONNECTOR_OFFSETS_RESET_ERROR;
12+
}
13+
}

api/src/main/java/io/kafbat/ui/exception/ErrorCode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public enum ErrorCode {
3232
TOPIC_ANALYSIS_ERROR(4018, HttpStatus.BAD_REQUEST),
3333
FILE_UPLOAD_EXCEPTION(4019, HttpStatus.INTERNAL_SERVER_ERROR),
3434
CEL_ERROR(4020, HttpStatus.BAD_REQUEST),
35+
CONNECTOR_OFFSETS_RESET_ERROR(4021, HttpStatus.BAD_REQUEST),
3536
;
3637

3738
static {

api/src/main/java/io/kafbat/ui/model/BrokerMetrics.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.kafbat.ui.model;
22

3-
import io.kafbat.ui.model.MetricDTO;
43
import java.util.List;
54
import lombok.Builder;
65
import lombok.Data;

api/src/main/java/io/kafbat/ui/model/CleanupPolicy.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,10 @@ public enum CleanupPolicy {
2020
this.policies = policies;
2121
}
2222

23-
public String getPolicy() {
24-
return policies.get(0);
25-
}
26-
2723
public static CleanupPolicy fromString(String string) {
2824
return Arrays.stream(CleanupPolicy.values())
29-
.filter(v ->
30-
v.policies.stream().anyMatch(
31-
s -> s.equals(string.replace(" ", "")
32-
)
33-
)
34-
).findFirst()
25+
.filter(v -> v.policies.stream().anyMatch(s -> s.equals(string.replace(" ", ""))))
26+
.findFirst()
3527
.orElse(UNKNOWN);
3628
}
3729
}

api/src/main/java/io/kafbat/ui/model/InternalBrokerConfig.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.kafbat.ui.model;
22

3-
43
import java.util.List;
54
import lombok.Builder;
65
import lombok.Data;
@@ -17,13 +16,13 @@ public class InternalBrokerConfig {
1716
private final List<ConfigEntry.ConfigSynonym> synonyms;
1817

1918
public static InternalBrokerConfig from(ConfigEntry configEntry, boolean readOnlyCluster) {
20-
InternalBrokerConfig.InternalBrokerConfigBuilder builder = InternalBrokerConfig.builder()
19+
return InternalBrokerConfig.builder()
2120
.name(configEntry.name())
2221
.value(configEntry.value())
2322
.source(configEntry.source())
2423
.isReadOnly(readOnlyCluster || configEntry.isReadOnly())
2524
.isSensitive(configEntry.isSensitive())
26-
.synonyms(configEntry.synonyms());
27-
return builder.build();
25+
.synonyms(configEntry.synonyms())
26+
.build();
2827
}
2928
}

0 commit comments

Comments
 (0)