Skip to content

Commit 954bb32

Browse files
authored
Merge branch 'main' into issues/53
2 parents ad7c9a1 + 22a5053 commit 954bb32

File tree

8 files changed

+55
-17
lines changed

8 files changed

+55
-17
lines changed

api/src/main/java/io/kafbat/ui/controller/ApplicationConfigController.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import io.kafbat.ui.api.ApplicationConfigApi;
77
import io.kafbat.ui.config.ClustersProperties;
8+
import io.kafbat.ui.model.ActionDTO;
89
import io.kafbat.ui.model.ApplicationConfigDTO;
910
import io.kafbat.ui.model.ApplicationConfigPropertiesDTO;
1011
import io.kafbat.ui.model.ApplicationConfigValidationDTO;
@@ -18,6 +19,7 @@
1819
import io.kafbat.ui.util.ApplicationRestarter;
1920
import io.kafbat.ui.util.DynamicConfigOperations;
2021
import java.util.Map;
22+
import java.util.Optional;
2123
import javax.annotation.Nullable;
2224
import lombok.RequiredArgsConstructor;
2325
import lombok.extern.slf4j.Slf4j;
@@ -46,6 +48,12 @@ interface PropertiesMapper {
4648
DynamicConfigOperations.PropertiesStructure fromDto(ApplicationConfigPropertiesDTO dto);
4749

4850
ApplicationConfigPropertiesDTO toDto(DynamicConfigOperations.PropertiesStructure propertiesStructure);
51+
52+
default ActionDTO stringToActionDto(String str) {
53+
return Optional.ofNullable(str)
54+
.map(s -> Enum.valueOf(ActionDTO.class, s.toUpperCase()))
55+
.orElseThrow();
56+
}
4957
}
5058

5159
private final DynamicConfigOperations dynamicConfigOperations;
@@ -75,7 +83,7 @@ public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExch
7583
@Override
7684
public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> restartRequestDto,
7785
ServerWebExchange exchange) {
78-
var context = AccessContext.builder()
86+
var context = AccessContext.builder()
7987
.applicationConfigActions(EDIT)
8088
.operationName("restartWithConfig")
8189
.build();

api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
import java.util.HashSet;
77
import java.util.Map;
88
import java.util.Set;
9+
import java.util.stream.Collectors;
910
import lombok.Getter;
1011
import lombok.extern.slf4j.Slf4j;
1112
import org.apache.commons.lang3.mutable.MutableLong;
1213
import org.apache.kafka.clients.consumer.Consumer;
1314
import org.apache.kafka.common.TopicPartition;
15+
import org.apache.kafka.common.errors.UnsupportedVersionException;
1416

1517
@Slf4j
1618
@Getter
@@ -34,7 +36,7 @@ class OffsetsInfo {
3436

3537
OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
3638
this.consumer = consumer;
37-
this.beginOffsets = consumer.beginningOffsets(targetPartitions);
39+
this.beginOffsets = firstOffsetsForPolling(consumer, targetPartitions);
3840
this.endOffsets = consumer.endOffsets(targetPartitions);
3941
endOffsets.forEach((tp, endOffset) -> {
4042
var beginningOffset = beginOffsets.get(tp);
@@ -46,6 +48,28 @@ class OffsetsInfo {
4648
});
4749
}
4850

51+
52+
private Map<TopicPartition, Long> firstOffsetsForPolling(Consumer<?, ?> consumer,
53+
Collection<TopicPartition> partitions) {
54+
try {
55+
// we try to use offsetsForTimes() to find earliest offsets, since for
56+
// some topics (like compacted) beginningOffsets() ruturning 0 offsets
57+
// even when effectively first offset can be very high
58+
var offsets = consumer.offsetsForTimes(
59+
partitions.stream().collect(Collectors.toMap(p -> p, p -> 0L))
60+
);
61+
// result of offsetsForTimes() can be null, if message version < 0.10.0
62+
if (offsets.entrySet().stream().noneMatch(e -> e.getValue() == null)) {
63+
return offsets.entrySet().stream()
64+
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
65+
}
66+
} catch (UnsupportedOperationException | UnsupportedVersionException e) {
67+
// offsetsForTimes() not supported
68+
}
69+
//falling back to beginningOffsets() if offsetsForTimes() not supported
70+
return consumer.beginningOffsets(partitions);
71+
}
72+
4973
boolean assignedPartitionsFullyPolled() {
5074
for (var tp : consumer.assignment()) {
5175
Preconditions.checkArgument(endOffsets.containsKey(tp));

api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import io.kafbat.ui.model.ConsumerPosition;
44
import io.kafbat.ui.model.TopicMessageEventDTO;
55
import java.util.ArrayList;
6+
import java.util.HashSet;
67
import java.util.List;
8+
import java.util.Set;
79
import java.util.TreeMap;
810
import java.util.function.Supplier;
911
import lombok.extern.slf4j.Slf4j;
@@ -84,20 +86,22 @@ private List<ConsumerRecord<Bytes, Bytes>> poll(EnhancedConsumer consumer,
8486
range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));
8587

8688
List<ConsumerRecord<Bytes, Bytes>> result = new ArrayList<>();
87-
while (!sink.isCancelled() && consumer.paused().size() < range.size()) {
89+
Set<TopicPartition> paused = new HashSet<>();
90+
while (!sink.isCancelled() && paused.size() < range.size()) {
8891
var polledRecords = poll(sink, consumer);
8992
range.forEach((tp, fromTo) -> {
9093
polledRecords.records(tp).stream()
9194
.filter(r -> r.offset() < fromTo.to)
9295
.forEach(result::add);
9396

9497
//next position is out of target range -> pausing partition
95-
if (consumer.position(tp) >= fromTo.to) {
98+
if (!paused.contains(tp) && consumer.position(tp) >= fromTo.to) {
99+
paused.add(tp);
96100
consumer.pause(List.of(tp));
97101
}
98102
});
99103
}
100-
consumer.resume(consumer.paused());
104+
consumer.resume(paused);
101105
return result;
102106
}
103107
}

api/src/main/java/io/kafbat/ui/model/rbac/Permission.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static org.apache.commons.collections.CollectionUtils.isNotEmpty;
44

55
import com.google.common.base.Preconditions;
6+
import io.kafbat.ui.model.ActionDTO;
67
import io.kafbat.ui.model.rbac.permission.PermissibleAction;
78
import java.util.List;
89
import java.util.regex.Pattern;
@@ -50,7 +51,7 @@ public void transform() {
5051
if (value != null) {
5152
this.compiledValuePattern = Pattern.compile(value);
5253
}
53-
if (actions.stream().anyMatch("ALL"::equalsIgnoreCase)) {
54+
if (actions.stream().anyMatch(ActionDTO.ALL.name()::equalsIgnoreCase)) {
5455
this.parsedActions = resource.allActions();
5556
} else {
5657
this.parsedActions = resource.parseActionsWithDependantsUnnest(actions);

contract/src/main/resources/swagger/kafbat-ui-api.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3821,6 +3821,7 @@ components:
38213821
Action:
38223822
type: string
38233823
enum:
3824+
- ALL
38243825
- VIEW
38253826
- EDIT
38263827
- CREATE

frontend/src/components/Connect/Details/Actions/Actions.tsx

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ const Actions: React.FC = () => {
7878
permission={{
7979
resource: ResourceType.CONNECT,
8080
action: Action.EDIT,
81-
value: routerProps.connectorName,
81+
value: routerProps.connectName,
8282
}}
8383
>
8484
Pause
@@ -91,7 +91,7 @@ const Actions: React.FC = () => {
9191
permission={{
9292
resource: ResourceType.CONNECT,
9393
action: Action.EDIT,
94-
value: routerProps.connectorName,
94+
value: routerProps.connectName,
9595
}}
9696
>
9797
Resume
@@ -103,7 +103,7 @@ const Actions: React.FC = () => {
103103
permission={{
104104
resource: ResourceType.CONNECT,
105105
action: Action.RESTART,
106-
value: routerProps.connectorName,
106+
value: routerProps.connectName,
107107
}}
108108
>
109109
Restart Connector
@@ -114,7 +114,7 @@ const Actions: React.FC = () => {
114114
permission={{
115115
resource: ResourceType.CONNECT,
116116
action: Action.RESTART,
117-
value: routerProps.connectorName,
117+
value: routerProps.connectName,
118118
}}
119119
>
120120
Restart All Tasks
@@ -125,7 +125,7 @@ const Actions: React.FC = () => {
125125
permission={{
126126
resource: ResourceType.CONNECT,
127127
action: Action.RESTART,
128-
value: routerProps.connectorName,
128+
value: routerProps.connectName,
129129
}}
130130
>
131131
Restart Failed Tasks
@@ -139,7 +139,7 @@ const Actions: React.FC = () => {
139139
permission={{
140140
resource: ResourceType.CONNECT,
141141
action: Action.DELETE,
142-
value: routerProps.connectorName,
142+
value: routerProps.connectName,
143143
}}
144144
>
145145
Delete

frontend/src/components/Connect/Details/Tasks/ActionsCellTasks.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const ActionsCellTasks: React.FC<CellContext<Task, unknown>> = ({ row }) => {
2626
permission={{
2727
resource: ResourceType.CONNECT,
2828
action: Action.RESTART,
29-
value: routerProps.connectorName,
29+
value: routerProps.connectName,
3030
}}
3131
>
3232
<span>Restart task</span>

frontend/src/components/Connect/List/ActionsCell.tsx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ const ActionsCell: React.FC<CellContext<FullConnectorInfo, unknown>> = ({
6767
permission={{
6868
resource: ResourceType.CONNECT,
6969
action: Action.EDIT,
70-
value: name,
70+
value: connect,
7171
}}
7272
>
7373
Resume
@@ -79,7 +79,7 @@ const ActionsCell: React.FC<CellContext<FullConnectorInfo, unknown>> = ({
7979
permission={{
8080
resource: ResourceType.CONNECT,
8181
action: Action.RESTART,
82-
value: name,
82+
value: connect,
8383
}}
8484
>
8585
Restart Connector
@@ -90,7 +90,7 @@ const ActionsCell: React.FC<CellContext<FullConnectorInfo, unknown>> = ({
9090
permission={{
9191
resource: ResourceType.CONNECT,
9292
action: Action.RESTART,
93-
value: name,
93+
value: connect,
9494
}}
9595
>
9696
Restart All Tasks
@@ -101,7 +101,7 @@ const ActionsCell: React.FC<CellContext<FullConnectorInfo, unknown>> = ({
101101
permission={{
102102
resource: ResourceType.CONNECT,
103103
action: Action.RESTART,
104-
value: name,
104+
value: connect,
105105
}}
106106
>
107107
Restart Failed Tasks

0 commit comments

Comments
 (0)