|
12 | 12 | import java.util.concurrent.CompletableFuture; |
13 | 13 | import java.util.concurrent.CompletionStage; |
14 | 14 | import java.util.concurrent.TimeUnit; |
| 15 | +import java.util.concurrent.atomic.AtomicBoolean; |
15 | 16 |
|
16 | 17 | import org.apache.kafka.common.Uuid; |
17 | 18 | import org.apache.kafka.common.message.ProduceRequestData; |
@@ -75,6 +76,7 @@ public class FilterHandler extends ChannelDuplexHandler { |
75 | 76 | private CompletableFuture<Void> readFuture = CompletableFuture.completedFuture(null); |
76 | 77 | private @Nullable ChannelHandlerContext ctx; |
77 | 78 | private @Nullable PromiseFactory promiseFactory; |
| 79 | + private static AtomicBoolean deprecationWarningEmitted = new AtomicBoolean(false); |
78 | 80 |
|
79 | 81 | public FilterHandler(FilterAndInvoker filterAndInvoker, |
80 | 82 | long timeoutMs, |
@@ -521,6 +523,13 @@ public Optional<ClientTlsContext> clientTlsContext() { |
521 | 523 | @Override |
522 | 524 | public void clientSaslAuthenticationSuccess(String mechanism, |
523 | 525 | String authorizedId) { |
| 526 | + if (deprecationWarningEmitted.compareAndSet(false, true)) { |
| 527 | + LOGGER.warn("Deprecated clientSaslAuthenticationSuccess(String mechanism, String authorizedId) was invoked by filter '{}'. Instead call " |
| 528 | + + "clientSaslAuthenticationSuccess(String mechanism, Subject subject), ensuring that the Subject contains a {} principal with " |
| 529 | + + "name equal to authorizedId", |
| 530 | + filterAndInvoker.filterName(), |
| 531 | + User.class.getName()); |
| 532 | + } |
524 | 533 | clientSaslAuthenticationSuccess(mechanism, new Subject(Set.of(new User(authorizedId)))); |
525 | 534 | } |
526 | 535 |
|
|
0 commit comments