|
23 | 23 | import org.apache.fluss.metadata.TablePath; |
24 | 24 | import org.apache.fluss.utils.CloseableIterator; |
25 | 25 |
|
| 26 | +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
| 27 | +import org.apache.flink.table.api.EnvironmentSettings; |
| 28 | +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; |
26 | 29 | import org.apache.flink.types.Row; |
27 | 30 | import org.apache.paimon.catalog.Catalog; |
28 | 31 | import org.apache.paimon.catalog.CatalogContext; |
@@ -60,10 +63,27 @@ protected static void beforeAll() { |
60 | 63 | Configuration configuration = new Configuration(); |
61 | 64 | configuration.setString("type", "paimon"); |
62 | 65 | configuration.setString("warehouse", tempWarehouseDir.toString()); |
| 66 | + configuration.setString("user", "root"); |
| 67 | + configuration.setString("password", "root-password"); |
63 | 68 | lakeStorage = new PaimonLakeStorage(configuration); |
64 | 69 | paimonCatalog = |
65 | 70 | CatalogFactory.createCatalog( |
66 | 71 | CatalogContext.create(Options.fromMap(configuration.toMap()))); |
| 72 | + initPaimonPrivilege(); |
| 73 | + } |
| 74 | + |
| 75 | + // Test for paimon privilege table |
| 76 | + public static void initPaimonPrivilege() { |
| 77 | + StreamTableEnvironment streamTEnv = |
| 78 | + StreamTableEnvironment.create( |
| 79 | + StreamExecutionEnvironment.getExecutionEnvironment(), |
| 80 | + EnvironmentSettings.inStreamingMode()); |
| 81 | + streamTEnv.executeSql( |
| 82 | + String.format( |
| 83 | + "create catalog %s with ('type'='paimon', 'warehouse' = '%s')", |
| 84 | + "paimon_catalog", tempWarehouseDir)); |
| 85 | + streamTEnv.executeSql( |
| 86 | + "CALL paimon_catalog.sys.init_file_based_privilege('root-password');"); |
67 | 87 | } |
68 | 88 |
|
69 | 89 | public void createTable(TablePath tablePath, Schema schema) throws Exception { |
|
0 commit comments