Skip to content

Commit b5b53d1

Browse files
committed
add data token test in RESTCatalogITCase
1 parent ec78235 commit b5b53d1

File tree

4 files changed

+73
-31
lines changed

4 files changed

+73
-31
lines changed

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@
3232
import org.apache.paimon.catalog.TableMetadata;
3333
import org.apache.paimon.fs.FileIO;
3434
import org.apache.paimon.fs.Path;
35+
import org.apache.paimon.fs.local.LocalFileIO;
36+
import org.apache.paimon.fs.local.LocalFileIOLoader;
3537
import org.apache.paimon.operation.Lock;
38+
import org.apache.paimon.options.CatalogOptions;
3639
import org.apache.paimon.options.Options;
3740
import org.apache.paimon.partition.Partition;
3841
import org.apache.paimon.rest.auth.BearTokenAuthProvider;
@@ -118,12 +121,11 @@ public class RESTCatalogServer {
118121
public final Map<String, List<Partition>> tablePartitionsStore = new HashMap<>();
119122
public final Map<String, View> viewStore = new HashMap<>();
120123
public final Map<String, Snapshot> tableSnapshotStore = new HashMap<>();
121-
public final Map<String, FileIO> tableFileIOStore = new HashMap<>();
122124
public final ConfigResponse configResponse;
123125
public final String serverId;
124126

125127
public RESTCatalogServer(
126-
String warehouse, String initToken, ConfigResponse config, String serverId) {
128+
String dataPath, String initToken, ConfigResponse config, String serverId) {
127129
this.serverId = serverId;
128130
this.configResponse = config;
129131
this.prefix =
@@ -132,12 +134,12 @@ public RESTCatalogServer(
132134
authToken = initToken;
133135
Options conf = new Options();
134136
this.configResponse.getDefaults().forEach((k, v) -> conf.setString(k, v));
135-
conf.setString("warehouse", warehouse);
137+
conf.setString(CatalogOptions.WAREHOUSE.key(), dataPath);
136138
CatalogContext context = CatalogContext.create(conf);
137-
Path warehousePath = new Path(warehouse);
139+
Path warehousePath = new Path(dataPath);
138140
FileIO fileIO;
139141
try {
140-
fileIO = FileIO.get(warehousePath, context);
142+
fileIO = new LocalFileIO();
141143
fileIO.checkOrMkdirs(warehousePath);
142144
} catch (IOException e) {
143145
throw new UncheckedIOException(e);
@@ -169,10 +171,6 @@ public void setDataToken(Identifier identifier, RESTToken token) {
169171
DataTokenStore.putDataToken(serverId, identifier.getFullName(), token);
170172
}
171173

172-
public void setFileIO(Identifier identifier, FileIO fileIO) {
173-
tableFileIOStore.put(identifier.getFullName(), fileIO);
174-
}
175-
176174
public void removeDataToken(Identifier identifier) {
177175
DataTokenStore.removeDataToken(serverId, identifier.getFullName());
178176
}
@@ -525,7 +523,7 @@ private FileStoreTable getFileTable(Identifier identifier) {
525523
catalog instanceof SupportsSnapshots,
526524
catalog instanceof SupportsBranches);
527525
Path path = new Path(schema.options().get(PATH.key()));
528-
FileIO dataFileIO = tableFileIOStore.get(identifier.getFullName());
526+
FileIO dataFileIO = catalog.fileIO();
529527
FileStoreTable table = FileStoreTableFactory.create(dataFileIO, path, schema, catalogEnv);
530528
return table;
531529
}
@@ -956,7 +954,9 @@ private TableMetadata createTableMetadata(
956954
Identifier identifier, long schemaId, Schema schema, String uuid, boolean isExternal) {
957955
Map<String, String> options = new HashMap<>(schema.options());
958956
Path path = catalog.getTableLocation(identifier);
959-
options.put(PATH.key(), path.toString());
957+
String restPath =
958+
path.toString().replaceFirst(LocalFileIOLoader.SCHEME, RESTFileIOTestLoader.SCHEME);
959+
options.put(PATH.key(), restPath);
960960
TableSchema tableSchema =
961961
new TableSchema(
962962
schemaId,

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ class RESTCatalogTest extends CatalogTestBase {
8484
@Override
8585
public void setUp() throws Exception {
8686
super.setUp();
87-
warehouse = RESTFileIOTestLoader.SCHEME + "://" + tmpFilePath;
8887
String serverId = UUID.randomUUID().toString();
8988
this.config =
9089
new ConfigResponse(
@@ -237,7 +236,6 @@ public void testDataTokenExpired() throws Exception {
237236
System.currentTimeMillis() - 100_000);
238237
restCatalogServer.setDataToken(identifier, expiredDataToken);
239238
FileStoreTable tableTestWrite = (FileStoreTable) catalog.getTable(identifier);
240-
restCatalogServer.setFileIO(identifier, tableTestWrite.fileIO());
241239
List<Integer> data = Lists.newArrayList(12);
242240
Exception exception =
243241
assertThrows(UncheckedIOException.class, () -> batchWrite(tableTestWrite, data));
@@ -264,7 +262,6 @@ public void testDataTokenUnExistInServer() throws Exception {
264262
// as RESTTokenFileIO is lazy so we need to call isObjectStore() to init fileIO
265263
restTokenFileIO.isObjectStore();
266264
restCatalogServer.removeDataToken(identifier);
267-
restCatalogServer.setFileIO(identifier, tableTestWrite.fileIO());
268265
Exception exception =
269266
assertThrows(UncheckedIOException.class, () -> batchWrite(tableTestWrite, data));
270267
assertEquals(RESTTestFileIO.TOKEN_UN_EXIST_MSG, exception.getCause().getMessage());
@@ -304,7 +301,6 @@ public void testBatchRecordsWrite() throws Exception {
304301
Identifier tableIdentifier = Identifier.create("my_db", "my_table");
305302
createTable(tableIdentifier, Maps.newHashMap(), Lists.newArrayList("col1"));
306303
FileStoreTable tableTestWrite = (FileStoreTable) catalog.getTable(tableIdentifier);
307-
restCatalogServer.setFileIO(tableIdentifier, tableTestWrite.fileIO());
308304
// write
309305
BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder();
310306
BatchTableWrite write = writeBuilder.newWrite();

paimon-core/src/test/java/org/apache/paimon/rest/RESTTestFileIO.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.paimon.fs.PositionOutputStream;
2626
import org.apache.paimon.fs.SeekableInputStream;
2727
import org.apache.paimon.fs.local.LocalFileIO;
28+
import org.apache.paimon.fs.local.LocalFileIOLoader;
29+
import org.apache.paimon.options.CatalogOptions;
2830
import org.apache.paimon.options.Options;
2931

3032
import java.io.IOException;
@@ -113,11 +115,17 @@ private void checkDataToken(Path path) throws IOException {
113115
}
114116

115117
private RESTToken getToken(Path path) {
116-
String basePath = options.get("warehouse").replaceAll("rest-test-file-io://", "");
118+
String dataPath = options.get(CatalogOptions.WAREHOUSE.key());
119+
String basePath = "";
120+
if (dataPath.startsWith(RESTFileIOTestLoader.SCHEME)) {
121+
basePath = dataPath.replaceAll(RESTFileIOTestLoader.SCHEME + "://", "");
122+
} else if (dataPath.startsWith(LocalFileIOLoader.SCHEME)) {
123+
basePath = dataPath.replaceAll(LocalFileIOLoader.SCHEME + "://", "");
124+
}
117125
String filePath = path.toString().split(":")[1].replaceAll(basePath, "");
118126
String[] paths = filePath.split("/");
119-
String database = paths[1].replaceAll("\\.db", "");
120-
String table = paths[2];
127+
String database = paths[0].replaceAll("\\.db", "");
128+
String table = paths[1];
121129
return DataTokenStore.getDataToken(
122130
options.get("catalog-server-id"), Identifier.create(database, table).getFullName());
123131
}

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
package org.apache.paimon.flink;
2020

21+
import org.apache.paimon.catalog.Identifier;
2122
import org.apache.paimon.options.CatalogOptions;
2223
import org.apache.paimon.rest.RESTCatalogInternalOptions;
2324
import org.apache.paimon.rest.RESTCatalogOptions;
2425
import org.apache.paimon.rest.RESTCatalogServer;
26+
import org.apache.paimon.rest.RESTToken;
2527
import org.apache.paimon.rest.auth.AuthProviderEnum;
2628
import org.apache.paimon.rest.responses.ConfigResponse;
2729

@@ -40,6 +42,7 @@
4042
import java.util.UUID;
4143

4244
import static org.assertj.core.api.Assertions.assertThat;
45+
import static org.junit.jupiter.api.Assertions.assertThrows;
4346

4447
/** ITCase for REST catalog. */
4548
class RESTCatalogITCase extends CatalogITCaseBase {
@@ -57,16 +60,20 @@ class RESTCatalogITCase extends CatalogITCaseBase {
5760
public void before() throws IOException {
5861
String initToken = "init_token";
5962
warehouse = tempFile.toUri().toString();
63+
String serverId = UUID.randomUUID().toString();
6064
ConfigResponse config =
6165
new ConfigResponse(
6266
ImmutableMap.of(
6367
RESTCatalogInternalOptions.PREFIX.key(),
6468
"paimon",
6569
CatalogOptions.WAREHOUSE.key(),
66-
warehouse),
70+
warehouse,
71+
RESTCatalogOptions.DATA_TOKEN_ENABLED.key(),
72+
"true",
73+
"catalog-server-id",
74+
serverId),
6775
ImmutableMap.of());
68-
restCatalogServer =
69-
new RESTCatalogServer(warehouse, initToken, config, UUID.randomUUID().toString());
76+
restCatalogServer = new RESTCatalogServer(warehouse, initToken, config, serverId);
7077
restCatalogServer.start();
7178
serverUrl = restCatalogServer.getUrl();
7279
super.before();
@@ -109,19 +116,38 @@ void testAlterTable() {
109116
DATABASE_NAME, TABLE_NAME));
110117
}
111118

112-
@Override
113-
protected Map<String, String> catalogOptions() {
114-
String initToken = "init_token";
115-
Map<String, String> options = new HashMap<>();
116-
options.put("metastore", "rest");
117-
options.put(RESTCatalogOptions.URI.key(), serverUrl);
118-
options.put(RESTCatalogOptions.TOKEN.key(), initToken);
119-
options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(), AuthProviderEnum.BEAR.identifier());
120-
return options;
119+
@Test
120+
public void testWriteAndRead() {
121+
batchSql(
122+
String.format(
123+
"INSERT INTO %s.%s VALUES ('1', 11), ('2', 22)",
124+
DATABASE_NAME, TABLE_NAME));
125+
assertThat(batchSql(String.format("SELECT * FROM %s.%s", DATABASE_NAME, TABLE_NAME)))
126+
.containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2", 22.0D));
121127
}
122128

123129
@Test
124-
public void testWriteAndRead() {
130+
public void testExpiredDataToken() {
131+
Identifier identifier = Identifier.create(DATABASE_NAME, TABLE_NAME);
132+
RESTToken expiredDataToken =
133+
new RESTToken(
134+
ImmutableMap.of(
135+
"akId", "akId-expire", "akSecret", UUID.randomUUID().toString()),
136+
System.currentTimeMillis() - 100_000);
137+
restCatalogServer.setDataToken(identifier, expiredDataToken);
138+
assertThrows(
139+
RuntimeException.class,
140+
() ->
141+
batchSql(
142+
String.format(
143+
"INSERT INTO %s.%s VALUES ('1', 11), ('2', 22)",
144+
DATABASE_NAME, TABLE_NAME)));
145+
// update token and retry
146+
RESTToken dataToken =
147+
new RESTToken(
148+
ImmutableMap.of("akId", "akId", "akSecret", UUID.randomUUID().toString()),
149+
System.currentTimeMillis() + 100_000);
150+
restCatalogServer.setDataToken(identifier, dataToken);
125151
batchSql(
126152
String.format(
127153
"INSERT INTO %s.%s VALUES ('1', 11), ('2', 22)",
@@ -130,6 +156,18 @@ public void testWriteAndRead() {
130156
.containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2", 22.0D));
131157
}
132158

159+
@Override
160+
protected Map<String, String> catalogOptions() {
161+
String initToken = "init_token";
162+
Map<String, String> options = new HashMap<>();
163+
options.put("metastore", "rest");
164+
options.put(RESTCatalogOptions.URI.key(), serverUrl);
165+
options.put(RESTCatalogOptions.TOKEN.key(), initToken);
166+
options.put(RESTCatalogOptions.TOKEN_PROVIDER.key(), AuthProviderEnum.BEAR.identifier());
167+
options.put(RESTCatalogOptions.DATA_TOKEN_ENABLED.key(), "true");
168+
return options;
169+
}
170+
133171
@Override
134172
protected String getTempDirPath() {
135173
return this.warehouse;

0 commit comments

Comments
 (0)