@@ -155,6 +155,7 @@ public class SaslAuthenticatorTest {
155155 private static final long CONNECTIONS_MAX_REAUTH_MS_VALUE = 100L ;
156156 private static final int BUFFER_SIZE = 4 * 1024 ;
157157 private static Time time = Time .SYSTEM ;
158+ private static boolean needLargeExpiration = false ;
158159
159160 private NioEchoServer server ;
160161 private Selector selector ;
@@ -178,6 +179,7 @@ public void setup() throws Exception {
178179
179180 @ AfterEach
180181 public void teardown () throws Exception {
182+ needLargeExpiration = false ;
181183 if (server != null )
182184 this .server .close ();
183185 if (selector != null )
@@ -1607,6 +1609,42 @@ public void testCannotReauthenticateWithDifferentPrincipal() throws Exception {
16071609 server .verifyReauthenticationMetrics (0 , 1 );
16081610 }
16091611
1612+ @ Test
1613+ public void testReauthenticateWithLargeReauthValue () throws Exception {
1614+ // enable it, we'll get a large expiration timestamp token
1615+ needLargeExpiration = true ;
1616+ String node = "0" ;
1617+ SecurityProtocol securityProtocol = SecurityProtocol .SASL_SSL ;
1618+
1619+ configureMechanisms (OAuthBearerLoginModule .OAUTHBEARER_MECHANISM ,
1620+ List .of (OAuthBearerLoginModule .OAUTHBEARER_MECHANISM ));
1621+ // set a large re-auth timeout in server side
1622+ saslServerConfigs .put (BrokerSecurityConfigs .CONNECTIONS_MAX_REAUTH_MS_CONFIG , Long .MAX_VALUE );
1623+ server = createEchoServer (securityProtocol );
1624+
1625+ // set to default value for sasl login configs for initialization in ExpiringCredentialRefreshConfig
1626+ saslClientConfigs .put (SaslConfigs .SASL_LOGIN_REFRESH_WINDOW_FACTOR , SaslConfigs .DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR );
1627+ saslClientConfigs .put (SaslConfigs .SASL_LOGIN_REFRESH_WINDOW_JITTER , SaslConfigs .DEFAULT_LOGIN_REFRESH_WINDOW_JITTER );
1628+ saslClientConfigs .put (SaslConfigs .SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS , SaslConfigs .DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS );
1629+ saslClientConfigs .put (SaslConfigs .SASL_LOGIN_REFRESH_BUFFER_SECONDS , SaslConfigs .DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS );
1630+ saslClientConfigs .put (SaslConfigs .SASL_LOGIN_CALLBACK_HANDLER_CLASS , AlternateLoginCallbackHandler .class );
1631+
1632+ createCustomClientConnection (securityProtocol , OAuthBearerLoginModule .OAUTHBEARER_MECHANISM , node , true );
1633+
1634+ // channel should be not null before sasl handshake
1635+ assertNotNull (selector .channel (node ));
1636+
1637+ TestUtils .waitForCondition (() -> {
1638+ selector .poll (1000 );
1639+ // this channel should be closed due to session timeout calculation overflow
1640+ return selector .channel (node ) == null ;
1641+ }, "channel didn't close with large re-authentication value" );
1642+
1643+ // ensure metrics are as expected
1644+ server .verifyAuthenticationMetrics (0 , 0 );
1645+ server .verifyReauthenticationMetrics (0 , 0 );
1646+ }
1647+
16101648 @ Test
16111649 public void testCorrelationId () {
16121650 SaslClientAuthenticator authenticator = new SaslClientAuthenticator (
@@ -1936,7 +1974,7 @@ private void createClientConnection(SecurityProtocol securityProtocol, String sa
19361974 if (enableSaslAuthenticateHeader )
19371975 createClientConnection (securityProtocol , node );
19381976 else
1939- createClientConnectionWithoutSaslAuthenticateHeader (securityProtocol , saslMechanism , node );
1977+ createCustomClientConnection (securityProtocol , saslMechanism , node , false );
19401978 }
19411979
19421980 private NioEchoServer startServerApiVersionsUnsupportedByClient (final SecurityProtocol securityProtocol , String saslMechanism ) throws Exception {
@@ -2024,15 +2062,13 @@ protected void enableKafkaSaslAuthenticateHeaders(boolean flag) {
20242062 return server ;
20252063 }
20262064
2027- private void createClientConnectionWithoutSaslAuthenticateHeader (final SecurityProtocol securityProtocol ,
2028- final String saslMechanism , String node ) throws Exception {
2029-
2030- final ListenerName listenerName = ListenerName .forSecurityProtocol (securityProtocol );
2031- final Map <String , ?> configs = Collections .emptyMap ();
2032- final JaasContext jaasContext = JaasContext .loadClientContext (configs );
2033- final Map <String , JaasContext > jaasContexts = Collections .singletonMap (saslMechanism , jaasContext );
2034-
2035- SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder (ConnectionMode .CLIENT , jaasContexts ,
2065+ private SaslChannelBuilder saslChannelBuilderWithoutHeader (
2066+ final SecurityProtocol securityProtocol ,
2067+ final String saslMechanism ,
2068+ final Map <String , JaasContext > jaasContexts ,
2069+ final ListenerName listenerName
2070+ ) {
2071+ return new SaslChannelBuilder (ConnectionMode .CLIENT , jaasContexts ,
20362072 securityProtocol , listenerName , false , saslMechanism ,
20372073 null , null , null , time , new LogContext (), null ) {
20382074
@@ -2059,6 +2095,42 @@ protected void setSaslAuthenticateAndHandshakeVersions(ApiVersionsResponse apiVe
20592095 };
20602096 }
20612097 };
2098+ }
2099+
2100+ private void createCustomClientConnection (
2101+ final SecurityProtocol securityProtocol ,
2102+ final String saslMechanism ,
2103+ String node ,
2104+ boolean withSaslAuthenticateHeader
2105+ ) throws Exception {
2106+
2107+ final ListenerName listenerName = ListenerName .forSecurityProtocol (securityProtocol );
2108+ final Map <String , ?> configs = Collections .emptyMap ();
2109+ final JaasContext jaasContext = JaasContext .loadClientContext (configs );
2110+ final Map <String , JaasContext > jaasContexts = Collections .singletonMap (saslMechanism , jaasContext );
2111+
2112+ SaslChannelBuilder clientChannelBuilder ;
2113+ if (!withSaslAuthenticateHeader ) {
2114+ clientChannelBuilder = saslChannelBuilderWithoutHeader (securityProtocol , saslMechanism , jaasContexts , listenerName );
2115+ } else {
2116+ clientChannelBuilder = new SaslChannelBuilder (ConnectionMode .CLIENT , jaasContexts ,
2117+ securityProtocol , listenerName , false , saslMechanism ,
2118+ null , null , null , time , new LogContext (), null ) {
2119+
2120+ @ Override
2121+ protected SaslClientAuthenticator buildClientAuthenticator (Map <String , ?> configs ,
2122+ AuthenticateCallbackHandler callbackHandler ,
2123+ String id ,
2124+ String serverHost ,
2125+ String servicePrincipal ,
2126+ TransportLayer transportLayer ,
2127+ Subject subject ) {
2128+
2129+ return new SaslClientAuthenticator (configs , callbackHandler , id , subject ,
2130+ servicePrincipal , serverHost , saslMechanism , transportLayer , time , new LogContext ());
2131+ }
2132+ };
2133+ }
20622134 clientChannelBuilder .configure (saslClientConfigs );
20632135 this .selector = NetworkTestUtils .createSelector (clientChannelBuilder , time );
20642136 InetSocketAddress addr = new InetSocketAddress ("localhost" , server .port ());
@@ -2507,10 +2579,11 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback
25072579 + ++numInvocations ;
25082580 String headerJson = "{" + claimOrHeaderJsonText ("alg" , "none" ) + "}" ;
25092581 /*
2510- * Use a short lifetime so the background refresh thread replaces it before we
2582+ * If we're testing large expiration scenario, use a large lifetime.
2583+ * Otherwise, use a short lifetime so the background refresh thread replaces it before we
25112584 * re-authenticate
25122585 */
2513- String lifetimeSecondsValueToUse = "1" ;
2586+ String lifetimeSecondsValueToUse = needLargeExpiration ? String . valueOf ( Long . MAX_VALUE ) : "1" ;
25142587 String claimsJson ;
25152588 try {
25162589 claimsJson = String .format ("{%s,%s,%s}" ,
0 commit comments