Skip to content

Commit 8b11e00

Browse files
authored
[hotfix] Add authentication to the admin APIs that currently lack it (#2350)
1 parent 7fd7b33 commit 8b11e00

File tree

6 files changed

+191
-70
lines changed

6 files changed

+191
-70
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.client.admin;
1919

20+
import org.apache.fluss.annotation.VisibleForTesting;
2021
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
2122
import org.apache.fluss.client.metadata.KvSnapshots;
2223
import org.apache.fluss.client.metadata.LakeSnapshot;
@@ -649,4 +650,14 @@ private static void sendListOffsetsRequest(
649650
}
650651
});
651652
}
653+
654+
@VisibleForTesting
655+
public AdminGateway getAdminGateway() {
656+
return gateway;
657+
}
658+
659+
@VisibleForTesting
660+
public AdminReadOnlyGateway getAdminReadOnlyGateway() {
661+
return readOnlyGateway;
662+
}
652663
}

fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java

Lines changed: 108 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.client.ConnectionFactory;
2222
import org.apache.fluss.client.FlussConnection;
2323
import org.apache.fluss.client.admin.Admin;
24+
import org.apache.fluss.client.admin.FlussAdmin;
2425
import org.apache.fluss.client.table.Table;
2526
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
2627
import org.apache.fluss.client.table.writer.AppendWriter;
@@ -33,6 +34,9 @@
3334
import org.apache.fluss.config.cluster.AlterConfigOpType;
3435
import org.apache.fluss.config.cluster.ConfigEntry;
3536
import org.apache.fluss.exception.AuthorizationException;
37+
import org.apache.fluss.exception.KvSnapshotNotExistException;
38+
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
39+
import org.apache.fluss.exception.TableNotPartitionedException;
3640
import org.apache.fluss.metadata.DataLakeFormat;
3741
import org.apache.fluss.metadata.DatabaseDescriptor;
3842
import org.apache.fluss.metadata.TableBucket;
@@ -43,9 +47,11 @@
4347
import org.apache.fluss.rpc.GatewayClientProxy;
4448
import org.apache.fluss.rpc.RpcClient;
4549
import org.apache.fluss.rpc.gateway.AdminGateway;
50+
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4651
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
4752
import org.apache.fluss.rpc.gateway.TabletServerGateway;
4853
import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
54+
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
4955
import org.apache.fluss.rpc.messages.InitWriterRequest;
5056
import org.apache.fluss.rpc.messages.InitWriterResponse;
5157
import org.apache.fluss.rpc.messages.MetadataRequest;
@@ -60,13 +66,18 @@
6066
import org.apache.fluss.security.acl.Resource;
6167
import org.apache.fluss.security.acl.ResourceFilter;
6268
import org.apache.fluss.server.testutils.FlussClusterExtension;
69+
import org.apache.fluss.server.zk.ZooKeeperClient;
70+
import org.apache.fluss.server.zk.data.TableRegistration;
6371
import org.apache.fluss.shaded.guava32.com.google.common.collect.Lists;
6472
import org.apache.fluss.utils.CloseableIterator;
6573

74+
import org.assertj.core.api.ThrowableAssert;
6675
import org.junit.jupiter.api.AfterEach;
6776
import org.junit.jupiter.api.BeforeEach;
6877
import org.junit.jupiter.api.Test;
6978
import org.junit.jupiter.api.extension.RegisterExtension;
79+
import org.junit.jupiter.params.ParameterizedTest;
80+
import org.junit.jupiter.params.provider.ValueSource;
7081

7182
import java.time.Duration;
7283
import java.util.Arrays;
@@ -77,8 +88,10 @@
7788

7889
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
7990
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
91+
import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
8092
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
8193
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK;
94+
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
8295
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
8396
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
8497
import static org.apache.fluss.security.acl.AccessControlEntry.WILD_CARD_HOST;
@@ -390,10 +403,26 @@ void testAlterTable() throws Exception {
390403
}
391404

392405
@Test
393-
void testListTables() throws Exception {
406+
void testDescribeTableOperation() throws Exception {
407+
// test describe table operations like:
408+
// 1. listTables
409+
// 2. getTableInfo
410+
// 3. getTableSchema
411+
// 4. getLatestKvSnapshots
412+
// 5. listPartitionInfos
413+
// 6. getLatestLakeSnapshot
414+
415+
// first check call these methods without authorization.
394416
assertThat(guestAdmin.listTables(DATA1_TABLE_PATH_PK.getDatabaseName()).get())
395417
.isEqualTo(Collections.emptyList());
396-
418+
assertNoTableDescribeAuth(() -> guestAdmin.getTableInfo(DATA1_TABLE_PATH_PK).get());
419+
assertNoTableDescribeAuth(() -> guestAdmin.getTableSchema(DATA1_TABLE_PATH_PK).get());
420+
assertNoTableDescribeAuth(() -> guestAdmin.getLatestKvSnapshots(DATA1_TABLE_PATH_PK).get());
421+
assertNoTableDescribeAuth(() -> guestAdmin.listPartitionInfos(DATA1_TABLE_PATH_PK).get());
422+
assertNoTableDescribeAuth(
423+
() -> guestAdmin.getLatestLakeSnapshot(DATA1_TABLE_PATH_PK).get());
424+
425+
// add acl to allow guest describe table resource
397426
List<AclBinding> aclBindings =
398427
Collections.singletonList(
399428
new AclBinding(
@@ -405,8 +434,70 @@ void testListTables() throws Exception {
405434
PermissionType.ALLOW)));
406435
rootAdmin.createAcls(aclBindings).all().get();
407436
FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
437+
438+
// check call these methods with authorization.
408439
assertThat(guestAdmin.listTables(DATA1_TABLE_PATH_PK.getDatabaseName()).get())
409440
.isEqualTo(Collections.singletonList(DATA1_TABLE_PATH_PK.getTableName()));
441+
assertThat(guestAdmin.getTableInfo(DATA1_TABLE_PATH_PK).get().getTablePath())
442+
.isEqualTo(DATA1_TABLE_INFO_PK.getTablePath());
443+
assertThat(guestAdmin.getTableSchema(DATA1_TABLE_PATH_PK).get().getSchema())
444+
.isEqualTo(DATA1_SCHEMA_PK);
445+
assertThat(guestAdmin.tableExists(DATA1_TABLE_PATH_PK).get()).isTrue();
446+
assertThat(guestAdmin.getLatestKvSnapshots(DATA1_TABLE_PATH_PK).get().getBucketIds())
447+
.containsExactlyInAnyOrder(0, 1, 2);
448+
assertThatThrownBy(() -> guestAdmin.listPartitionInfos(DATA1_TABLE_PATH_PK).get())
449+
.rootCause()
450+
.isInstanceOf(TableNotPartitionedException.class)
451+
.hasMessageContaining(
452+
"Table 'test_db_1.test_pk_table_1' is not a partitioned table.");
453+
assertThatThrownBy(() -> guestAdmin.getLatestLakeSnapshot(DATA1_TABLE_PATH_PK).get())
454+
.rootCause()
455+
.isInstanceOf(LakeTableSnapshotNotExistException.class)
456+
.hasMessageContaining("Lake table snapshot not exist for table");
457+
}
458+
459+
@ParameterizedTest
460+
@ValueSource(strings = {"CoordinatorServer", "TabletServer"})
461+
void testGetKvSnapshotMetadata(String serverType) throws Exception {
462+
AdminReadOnlyGateway readOnlyGateway;
463+
if (serverType.equals("CoordinatorServer")) {
464+
readOnlyGateway = ((FlussAdmin) guestAdmin).getAdminGateway();
465+
} else {
466+
readOnlyGateway = ((FlussAdmin) guestAdmin).getAdminReadOnlyGateway();
467+
}
468+
469+
ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
470+
TableRegistration tableRegistration = zooKeeperClient.getTable(DATA1_TABLE_PATH_PK).get();
471+
long tableId = tableRegistration.tableId;
472+
FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
473+
474+
GetKvSnapshotMetadataRequest request = new GetKvSnapshotMetadataRequest();
475+
request.setTableId(tableId).setBucketId(0).setSnapshotId(0);
476+
// Make sure all tabletServer has ready replica and ready metadata for the table.
477+
FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(new TableBucket(tableId, 0));
478+
479+
// call getKvSnapshotMetadata without authorization.
480+
assertNoTableDescribeAuth(() -> readOnlyGateway.getKvSnapshotMetadata(request).get());
481+
482+
// add acl to allow guest describe table resource
483+
List<AclBinding> aclBindings =
484+
Collections.singletonList(
485+
new AclBinding(
486+
Resource.database(DATA1_TABLE_PATH_PK.getDatabaseName()),
487+
new AccessControlEntry(
488+
guestPrincipal,
489+
"*",
490+
OperationType.DESCRIBE,
491+
PermissionType.ALLOW)));
492+
rootAdmin.createAcls(aclBindings).all().get();
493+
FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
494+
495+
// call getKvSnapshotMetadata with authorization. no authorization exception should be
496+
// thrown.
497+
assertThatThrownBy(() -> readOnlyGateway.getKvSnapshotMetadata(request).get())
498+
.rootCause()
499+
.isInstanceOf(KvSnapshotNotExistException.class)
500+
.hasMessageContaining("Failed to get kv snapshot metadata for table bucket");
410501
}
411502

412503
@Test
@@ -572,9 +663,9 @@ void testProduceWithNoWriteAuthorization() throws Exception {
572663
.rootCause()
573664
.hasMessageContaining(
574665
String.format(
575-
"No permission to WRITE table %s in database %s",
576-
noWriteAclTable.getTableName(),
577-
noWriteAclTable.getDatabaseName()));
666+
"Principal FlussPrincipal{name='guest', type='User'} have no authorization to "
667+
+ "operate WRITE on resource Resource{type=TABLE, name='%s'} ",
668+
noWriteAclTable));
578669
}
579670
}
580671

@@ -609,10 +700,9 @@ void testProduceAndConsumer() throws Exception {
609700
assertThatThrownBy(() -> batchScanner.pollBatch(Duration.ofMinutes(1)))
610701
.hasMessageContaining(
611702
String.format(
612-
"No permission to %s table %s in database %s",
613-
READ,
614-
DATA1_TABLE_PATH.getTableName(),
615-
DATA1_TABLE_PATH.getDatabaseName()));
703+
"Principal FlussPrincipal{name='guest', type='User'} have no authorization to "
704+
+ "operate %s on resource Resource{type=TABLE, name='%s'}",
705+
READ, DATA1_TABLE_PATH));
616706
}
617707
rootAdmin
618708
.createAcls(
@@ -955,4 +1045,13 @@ private static Configuration initConfig() {
9551045
conf.set(ConfigOptions.AUTHORIZER_ENABLED, true);
9561046
return conf;
9571047
}
1048+
1049+
private void assertNoTableDescribeAuth(ThrowableAssert.ThrowingCallable callable) {
1050+
assertThatThrownBy(callable)
1051+
.cause()
1052+
.isInstanceOf(AuthorizationException.class)
1053+
.hasMessageContaining(
1054+
"Principal FlussPrincipal{name='guest', type='User'} have no authorization to "
1055+
+ "operate DESCRIBE on resource Resource{type=TABLE, name='test_db_1.test_pk_table_1'}");
1056+
}
9581057
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,8 +383,9 @@ void testPutAndLookupKvTable() throws Exception {
383383
.rootCause()
384384
.hasMessageContaining(
385385
String.format(
386-
"No permission to READ table %s in database %s",
387-
tablePath.getTableName(), tablePath.getDatabaseName()));
386+
"Principal FlussPrincipal{name='guest', type='User'} have no authorization to "
387+
+ "operate READ on resource Resource{type=TABLE, name='%s'} ",
388+
tablePath));
388389
addAcl(Resource.database(tablePath.getDatabaseName()), READ);
389390
assertQueryResultExactOrder(
390391
tBatchEnv,

fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,20 @@ public ServerType providerType() {
170170
return provider;
171171
}
172172

173+
public abstract void authorizeTable(OperationType operationType, long tableId);
174+
175+
public void authorizeDatabase(OperationType operationType, String databaseName) {
176+
if (authorizer != null) {
177+
authorizer.authorize(currentSession(), operationType, Resource.database(databaseName));
178+
}
179+
}
180+
181+
public void authorizeTable(OperationType operationType, TablePath tablePath) {
182+
if (authorizer != null) {
183+
authorizer.authorize(currentSession(), operationType, Resource.table(tablePath));
184+
}
185+
}
186+
173187
@Override
174188
public CompletableFuture<ApiVersionsResponse> apiVersions(ApiVersionsRequest request) {
175189
Set<ApiKeys> apiKeys = apiManager.enabledApis();
@@ -210,15 +224,11 @@ public CompletableFuture<ListDatabasesResponse> listDatabases(ListDatabasesReque
210224
@Override
211225
public CompletableFuture<GetDatabaseInfoResponse> getDatabaseInfo(
212226
GetDatabaseInfoRequest request) {
213-
if (authorizer != null) {
214-
authorizer.authorize(
215-
currentSession(),
216-
OperationType.DESCRIBE,
217-
Resource.database(request.getDatabaseName()));
218-
}
227+
String databaseName = request.getDatabaseName();
228+
authorizeDatabase(OperationType.DESCRIBE, databaseName);
219229

220230
GetDatabaseInfoResponse response = new GetDatabaseInfoResponse();
221-
DatabaseInfo databaseInfo = metadataManager.getDatabase(request.getDatabaseName());
231+
DatabaseInfo databaseInfo = metadataManager.getDatabase(databaseName);
222232
response.setDatabaseJson(databaseInfo.getDatabaseDescriptor().toJsonBytes())
223233
.setCreatedTime(databaseInfo.getCreatedTime())
224234
.setModifiedTime(databaseInfo.getModifiedTime());
@@ -227,6 +237,7 @@ public CompletableFuture<GetDatabaseInfoResponse> getDatabaseInfo(
227237

228238
@Override
229239
public CompletableFuture<DatabaseExistsResponse> databaseExists(DatabaseExistsRequest request) {
240+
// By design: database exists not need to check database authorization.
230241
DatabaseExistsResponse response = new DatabaseExistsResponse();
231242
boolean exists = metadataManager.databaseExists(request.getDatabaseName());
232243
response.setExists(exists);
@@ -258,12 +269,7 @@ public CompletableFuture<ListTablesResponse> listTables(ListTablesRequest reques
258269
@Override
259270
public CompletableFuture<GetTableInfoResponse> getTableInfo(GetTableInfoRequest request) {
260271
TablePath tablePath = toTablePath(request.getTablePath());
261-
if (authorizer != null) {
262-
authorizer.authorize(
263-
currentSession(),
264-
OperationType.DESCRIBE,
265-
Resource.table(tablePath.getDatabaseName(), tablePath.getTableName()));
266-
}
272+
authorizeTable(OperationType.DESCRIBE, tablePath);
267273

268274
GetTableInfoResponse response = new GetTableInfoResponse();
269275
TableInfo tableInfo = metadataManager.getTable(tablePath);
@@ -278,6 +284,8 @@ public CompletableFuture<GetTableInfoResponse> getTableInfo(GetTableInfoRequest
278284
@Override
279285
public CompletableFuture<GetTableSchemaResponse> getTableSchema(GetTableSchemaRequest request) {
280286
TablePath tablePath = toTablePath(request.getTablePath());
287+
authorizeTable(OperationType.DESCRIBE, tablePath);
288+
281289
final SchemaInfo schemaInfo;
282290
if (request.hasSchemaId()) {
283291
schemaInfo = metadataManager.getSchemaById(tablePath, request.getSchemaId());
@@ -292,6 +300,7 @@ public CompletableFuture<GetTableSchemaResponse> getTableSchema(GetTableSchemaRe
292300

293301
@Override
294302
public CompletableFuture<TableExistsResponse> tableExists(TableExistsRequest request) {
303+
// By design: table exists not need to check table authorization.
295304
TableExistsResponse response = new TableExistsResponse();
296305
boolean exists = metadataManager.tableExists(toTablePath(request.getTablePath()));
297306
response.setExists(exists);
@@ -302,6 +311,8 @@ public CompletableFuture<TableExistsResponse> tableExists(TableExistsRequest req
302311
public CompletableFuture<GetLatestKvSnapshotsResponse> getLatestKvSnapshots(
303312
GetLatestKvSnapshotsRequest request) {
304313
TablePath tablePath = toTablePath(request.getTablePath());
314+
authorizeTable(OperationType.DESCRIBE, tablePath);
315+
305316
// get table info
306317
TableInfo tableInfo = metadataManager.getTable(tablePath);
307318

@@ -363,9 +374,12 @@ private long getPartitionId(TablePath tablePath, String partitionName) {
363374
@Override
364375
public CompletableFuture<GetKvSnapshotMetadataResponse> getKvSnapshotMetadata(
365376
GetKvSnapshotMetadataRequest request) {
377+
long tableId = request.getTableId();
378+
authorizeTable(OperationType.DESCRIBE, tableId);
379+
366380
TableBucket tableBucket =
367381
new TableBucket(
368-
request.getTableId(),
382+
tableId,
369383
request.hasPartitionId() ? request.getPartitionId() : null,
370384
request.getBucketId());
371385
long snapshotId = request.getSnapshotId();
@@ -412,6 +426,8 @@ public CompletableFuture<GetFileSystemSecurityTokenResponse> getFileSystemSecuri
412426
public CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
413427
ListPartitionInfosRequest request) {
414428
TablePath tablePath = toTablePath(request.getTablePath());
429+
authorizeTable(OperationType.DESCRIBE, tablePath);
430+
415431
Map<String, Long> partitionNameAndIds;
416432
if (request.hasPartialPartitionSpec()) {
417433
ResolvedPartitionSpec partitionSpecFromRequest =
@@ -430,8 +446,10 @@ public CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
430446
@Override
431447
public CompletableFuture<GetLatestLakeSnapshotResponse> getLatestLakeSnapshot(
432448
GetLatestLakeSnapshotRequest request) {
433-
// get table info
434449
TablePath tablePath = toTablePath(request.getTablePath());
450+
authorizeTable(OperationType.DESCRIBE, tablePath);
451+
452+
// get table info
435453
TableInfo tableInfo = metadataManager.getTable(tablePath);
436454
// get table id
437455
long tableId = tableInfo.getTableId();

0 commit comments

Comments
 (0)