Skip to content

Commit 741bbc1

Browse files
iliaxiliaxHaarolean
authored
[BE] Supress kafka authorization errors (#3376)
* wip * wip * typo fix, minor impr * test fixes * wip --------- Co-authored-by: iliax <[email protected]> Co-authored-by: Roman Zabaluev <[email protected]>
1 parent e584b15 commit 741bbc1

File tree

2 files changed

+32
-12
lines changed

2 files changed

+32
-12
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,11 @@
6262
import org.apache.kafka.common.TopicPartitionReplica;
6363
import org.apache.kafka.common.acl.AclOperation;
6464
import org.apache.kafka.common.config.ConfigResource;
65+
import org.apache.kafka.common.errors.ClusterAuthorizationException;
6566
import org.apache.kafka.common.errors.GroupIdNotFoundException;
6667
import org.apache.kafka.common.errors.GroupNotEmptyException;
6768
import org.apache.kafka.common.errors.InvalidRequestException;
69+
import org.apache.kafka.common.errors.TopicAuthorizationException;
6870
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
6971
import org.apache.kafka.common.errors.UnsupportedVersionException;
7072
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
@@ -176,6 +178,7 @@ public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
176178
}
177179

178180
//NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
181+
//and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
179182
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
180183
var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc;
181184
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
@@ -196,7 +199,8 @@ private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<Stri
196199
client.describeConfigs(
197200
resources,
198201
new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(),
199-
UnknownTopicOrPartitionException.class
202+
UnknownTopicOrPartitionException.class,
203+
TopicAuthorizationException.class
200204
).map(config -> config.entrySet().stream()
201205
.collect(toMap(
202206
c -> c.getKey().name(),
@@ -208,11 +212,17 @@ private static Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(AdminClie
208212
.map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
209213
.collect(toList());
210214
return toMono(client.describeConfigs(resources).all())
211-
.doOnError(InvalidRequestException.class,
212-
th -> log.trace("Error while getting broker {} configs", brokerIds, th))
213215
// some kafka backends (like MSK serverless) do not support broker's configs retrieval,
214216
// in that case InvalidRequestException will be thrown
215-
.onErrorResume(InvalidRequestException.class, th -> Mono.just(Map.of()))
217+
.onErrorResume(InvalidRequestException.class, th -> {
218+
log.trace("Error while getting broker {} configs", brokerIds, th);
219+
return Mono.just(Map.of());
220+
})
221+
// there are situations when kafka-ui user has no DESCRIBE_CONFIGS permission on cluster
222+
.onErrorResume(ClusterAuthorizationException.class, th -> {
223+
log.trace("AuthorizationException while getting configs for brokers {}", brokerIds, th);
224+
return Mono.just(Map.of());
225+
})
216226
.map(config -> config.entrySet().stream()
217227
.collect(toMap(
218228
c -> Integer.valueOf(c.getKey().name()),
@@ -242,13 +252,16 @@ public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> top
242252

243253
private Mono<Map<String, TopicDescription>> describeTopicsImpl(Collection<String> topics) {
244254
return toMonoWithExceptionFilter(
245-
client.describeTopics(topics).values(),
246-
UnknownTopicOrPartitionException.class
255+
client.describeTopics(topics).topicNameValues(),
256+
UnknownTopicOrPartitionException.class,
257+
// we only describe topics that we see from listTopics() API, so we should have permission to do it,
258+
// but also adding this exception here for rare case when access restricted after we called listTopics()
259+
TopicAuthorizationException.class
247260
);
248261
}
249262

250263
/**
251-
* Returns TopicDescription mono, or Empty Mono if topic not found.
264+
* Returns TopicDescription mono, or Empty Mono if topic not visible.
252265
*/
253266
public Mono<TopicDescription> describeTopic(String topic) {
254267
return describeTopics(List.of(topic)).flatMap(m -> Mono.justOrEmpty(m.get(topic)));
@@ -262,10 +275,11 @@ public Mono<TopicDescription> describeTopic(String topic) {
262275
* such topics in resulting map.
263276
* <p/>
264277
* This method converts input map into Mono[Map] ignoring keys for which KafkaFutures
265-
* finished with <code>clazz</code> exception and empty Monos.
278+
* finished with <code>classes</code> exceptions and empty Monos.
266279
*/
280+
@SafeVarargs
267281
static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
268-
Class<? extends KafkaException> clazz) {
282+
Class<? extends KafkaException>... classes) {
269283
if (values.isEmpty()) {
270284
return Mono.just(Map.of());
271285
}
@@ -277,7 +291,7 @@ static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> v
277291
.defaultIfEmpty(Tuples.of(e.getKey(), Optional.empty())) //tracking empty Monos
278292
.onErrorResume(
279293
// tracking Monos with suppressible error
280-
th -> th.getClass().isAssignableFrom(clazz),
294+
th -> Stream.of(classes).anyMatch(clazz -> th.getClass().isAssignableFrom(clazz)),
281295
th -> Mono.just(Tuples.of(e.getKey(), Optional.empty()))))
282296
.toList();
283297

@@ -300,6 +314,7 @@ public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> descr
300314
Collection<Integer> brokerIds) {
301315
return toMono(client.describeLogDirs(brokerIds).all())
302316
.onErrorResume(UnsupportedVersionException.class, th -> Mono.just(Map.of()))
317+
.onErrorResume(ClusterAuthorizationException.class, th -> Mono.just(Map.of()))
303318
.onErrorResume(th -> true, th -> {
304319
log.warn("Error while calling describeLogDirs", th);
305320
return Mono.just(Map.of());

kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,14 @@ public Mono<InternalTopic> getTopicDetails(KafkaCluster cluster, String topicNam
162162
}
163163

164164
public Mono<List<ConfigEntry>> getTopicConfigs(KafkaCluster cluster, String topicName) {
165+
// there 2 case that we cover here:
166+
// 1. topic not found/visible - describeTopic() will be empty and we will throw TopicNotFoundException
167+
// 2. topic is visible, but we don't have DESCRIBE_CONFIG permission - we should return empty list
165168
return adminClientService.get(cluster)
166-
.flatMap(ac -> ac.getTopicsConfig(List.of(topicName), true))
167-
.map(m -> m.values().stream().findFirst().orElseThrow(TopicNotFoundException::new));
169+
.flatMap(ac -> ac.describeTopic(topicName)
170+
.switchIfEmpty(Mono.error(new TopicNotFoundException()))
171+
.then(ac.getTopicsConfig(List.of(topicName), true))
172+
.map(m -> m.values().stream().findFirst().orElse(List.of())));
168173
}
169174

170175
private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient adminClient,

0 commit comments

Comments
 (0)