|
17 | 17 |
|
18 | 18 | package org.apache.fluss.flink.security.acl; |
19 | 19 |
|
| 20 | +import org.apache.commons.lang3.RandomUtils; |
| 21 | +import org.apache.flink.table.api.EnvironmentSettings; |
| 22 | +import org.apache.flink.table.api.TableEnvironment; |
| 23 | +import org.apache.flink.test.util.AbstractTestBase; |
| 24 | +import org.apache.flink.types.Row; |
| 25 | +import org.apache.flink.util.CollectionUtil; |
20 | 26 | import org.apache.fluss.config.ConfigOptions; |
21 | 27 | import org.apache.fluss.config.Configuration; |
22 | 28 | import org.apache.fluss.config.MemorySize; |
|
36 | 42 | import org.apache.fluss.utils.FileUtils; |
37 | 43 | import org.apache.fluss.utils.ParentResourceBlockingClassLoader; |
38 | 44 | import org.apache.fluss.utils.TemporaryClassLoaderContext; |
39 | | - |
40 | | -import org.apache.commons.lang3.RandomUtils; |
41 | | -import org.apache.flink.table.api.EnvironmentSettings; |
42 | | -import org.apache.flink.table.api.TableEnvironment; |
43 | | -import org.apache.flink.test.util.AbstractTestBase; |
44 | | -import org.apache.flink.types.Row; |
45 | | -import org.apache.flink.util.CollectionUtil; |
46 | 45 | import org.apache.hadoop.minikdc.MiniKdc; |
47 | 46 | import org.junit.jupiter.api.AfterEach; |
48 | 47 | import org.junit.jupiter.api.BeforeAll; |
@@ -277,6 +276,62 @@ void testKerberosAuthorization() throws Exception { |
277 | 276 | tEnv.executeSql("show databases").collect()); |
278 | 277 | assertThat(databases).contains(Row.of(testDB)); |
279 | 278 | }); |
| 279 | + |
| 280 | + // Test: Invalid credentials should fail |
| 281 | + String invalidPrincipal = String.format("nonexistent@%s", realm); |
| 282 | + String invalidJaas = |
| 283 | + String.format( |
| 284 | + "com.sun.security.auth.module.Krb5LoginModule required " |
| 285 | + + "useKeyTab=true storeKey=true useTicketCache=false " |
| 286 | + + "keyTab=\"%s\" principal=\"%s\";", |
| 287 | + keytab.getAbsolutePath(), invalidPrincipal); |
| 288 | + |
| 289 | + String createInvalidCatalogDDL = |
| 290 | + String.format( |
| 291 | + "create catalog invalid_kerberos_catalog with ( \n" |
| 292 | + + "'type' = 'fluss', \n" |
| 293 | + + "'bootstrap.servers' = '%s', \n" |
| 294 | + + "'client.security.protocol' = 'sasl', \n" |
| 295 | + + "'client.security.sasl.mechanism' = 'GSSAPI', \n" |
| 296 | + + "'client.security.sasl.jaas.config' = '%s' \n" |
| 297 | + + ")", |
| 298 | + bootstrapServers, invalidJaas); |
| 299 | + |
| 300 | + assertThatThrownBy( |
| 301 | + () -> { |
| 302 | + tEnv.executeSql(createInvalidCatalogDDL).await(); |
| 303 | + tEnv.executeSql("use catalog invalid_kerberos_catalog").await(); |
| 304 | + tEnv.executeSql("show databases").await(); |
| 305 | + }) |
| 306 | + .hasRootCauseInstanceOf(javax.security.auth.login.LoginException.class); |
| 307 | + |
| 308 | + // Test: keytab/principal config options |
| 309 | + String keytabPrincipalCatalogDDL = |
| 310 | + String.format( |
| 311 | + "create catalog keytab_principal_catalog with ( \n" |
| 312 | + + "'type' = 'fluss', \n" |
| 313 | + + "'bootstrap.servers' = '%s', \n" |
| 314 | + + "'client.security.protocol' = 'sasl', \n" |
| 315 | + + "'client.security.sasl.mechanism' = 'GSSAPI', \n" |
| 316 | + + "'client.security.kerberos.keytab' = '%s', \n" |
| 317 | + + "'client.security.kerberos.principal' = '%s' \n" |
| 318 | + + ")", |
| 319 | + bootstrapServers, keytab.getAbsolutePath(), clientPrincipal); |
| 320 | + |
| 321 | + tEnv.executeSql(keytabPrincipalCatalogDDL).await(); |
| 322 | + tEnv.executeSql("use catalog keytab_principal_catalog").await(); |
| 323 | + |
| 324 | + String keytabTestDB = "test_keytab_principal_db"; |
| 325 | + tEnv.executeSql("create database " + keytabTestDB).await(); |
| 326 | + |
| 327 | + org.apache.fluss.testutils.common.CommonTestUtils.retry( |
| 328 | + java.time.Duration.ofSeconds(60), |
| 329 | + () -> { |
| 330 | + List<Row> databases = |
| 331 | + CollectionUtil.iteratorToList( |
| 332 | + tEnv.executeSql("show databases").collect()); |
| 333 | + assertThat(databases).contains(Row.of(keytabTestDB)); |
| 334 | + }); |
280 | 335 | } finally { |
281 | 336 | kerberosFluss.close(); |
282 | 337 | } |
|
0 commit comments