Skip to content

Commit 0280985

Browse files
committed
add data token test
1 parent c1248bf commit 0280985

File tree

4 files changed

+108
-8
lines changed

4 files changed

+108
-8
lines changed

paimon-core/src/test/java/org/apache/paimon/rest/DataTokenStore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,12 @@ public static RESTToken getDataToken(String serverId, String tableFullName) {
4242
}
4343
return dataTokenData.get(tableFullName);
4444
}
45+
46+
public static void removeDataToken(String serverId, String tableFullName) {
47+
Map<String, RESTToken> dataTokenData = serverName2DataTokenData.get(serverId);
48+
if (dataTokenData != null && dataTokenData.containsKey(tableFullName)) {
49+
dataTokenData.remove(tableFullName);
50+
serverName2DataTokenData.put(serverId, dataTokenData);
51+
}
52+
}
4553
}

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public RESTCatalogServer(
131131
this.databaseUri = String.format("/v1/%s/databases", prefix);
132132
authToken = initToken;
133133
Options conf = new Options();
134+
this.configResponse.getDefaults().forEach((k, v) -> conf.setString(k, v));
134135
conf.setString("warehouse", warehouse);
135136
CatalogContext context = CatalogContext.create(conf);
136137
Path warehousePath = new Path(warehouse);
@@ -168,6 +169,14 @@ public void setDataToken(Identifier identifier, RESTToken token) {
168169
DataTokenStore.putDataToken(serverId, identifier.getFullName(), token);
169170
}
170171

172+
public void setFileIO(Identifier identifier, FileIO fileIO) {
173+
tableFileIOStore.put(identifier.getFullName(), fileIO);
174+
}
175+
176+
public void removeDataToken(Identifier identifier) {
177+
DataTokenStore.removeDataToken(serverId, identifier.getFullName());
178+
}
179+
171180
public RESTToken getDataToken(Identifier identifier) {
172181
return DataTokenStore.getDataToken(serverId, identifier.getFullName());
173182
}

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.junit.jupiter.api.BeforeEach;
5555
import org.junit.jupiter.api.Test;
5656

57+
import java.io.IOException;
5758
import java.util.ArrayList;
5859
import java.util.Collections;
5960
import java.util.HashMap;
@@ -67,6 +68,7 @@
6768
import static org.assertj.core.api.Assertions.assertThat;
6869
import static org.assertj.core.api.Assertions.assertThatThrownBy;
6970
import static org.junit.jupiter.api.Assertions.assertEquals;
71+
import static org.junit.jupiter.api.Assertions.assertThrows;
7072

7173
/** Test for REST Catalog. */
7274
class RESTCatalogTest extends CatalogTestBase {
@@ -92,8 +94,6 @@ public void setUp() throws Exception {
9294
warehouse,
9395
"header." + serverDefineHeaderName,
9496
serverDefineHeaderValue,
95-
RESTCatalogOptions.DATA_TOKEN_ENABLED.key(),
96-
"true",
9797
"catalog-server-id",
9898
serverId),
9999
ImmutableMap.of());
@@ -224,13 +224,87 @@ void testSnapshotFromREST() throws Exception {
224224
}
225225

226226
@Test
227-
public void testBatchRecordsWrite() throws Exception {
227+
public void testDataTokenExpired() throws Exception {
228+
this.catalog = initDataTokenCatalog();
229+
Identifier identifier =
230+
Identifier.create("test_data_token", "table_for_expired_date_token");
231+
createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
232+
RESTToken expiredDataToken =
233+
new RESTToken(
234+
ImmutableMap.of(
235+
"akId", "akId-expire", "akSecret", UUID.randomUUID().toString()),
236+
System.currentTimeMillis() - 100_000);
237+
restCatalogServer.setDataToken(identifier, expiredDataToken);
238+
FileStoreTable tableTestWrite = (FileStoreTable) catalog.getTable(identifier);
239+
restCatalogServer.setFileIO(identifier, tableTestWrite.fileIO());
240+
List<Integer> data = Lists.newArrayList(12);
241+
assertThrows(IOException.class, () -> batchWrite(tableTestWrite, data));
242+
RESTToken dataToken =
243+
new RESTToken(
244+
ImmutableMap.of("akId", "akId", "akSecret", UUID.randomUUID().toString()),
245+
System.currentTimeMillis() + 100_000);
246+
restCatalogServer.setDataToken(identifier, dataToken);
247+
batchWrite(tableTestWrite, data);
248+
List<String> actual = batchRead(tableTestWrite);
249+
assertThat(actual).containsExactlyInAnyOrder("+I[12]");
250+
}
251+
252+
@Test
253+
public void testDataTokenUnExistInServer() throws Exception {
254+
this.catalog = initDataTokenCatalog();
255+
Identifier identifier =
256+
Identifier.create("test_data_token", "table_for_un_exist_date_token");
257+
createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
258+
FileStoreTable tableTestWrite = (FileStoreTable) catalog.getTable(identifier);
259+
RESTTokenFileIO restTokenFileIO = (RESTTokenFileIO) tableTestWrite.fileIO();
260+
List<Integer> data = Lists.newArrayList(12);
261+
// as RESTTokenFileIO is lazy so we need to call isObjectStore() to init fileIO
262+
restTokenFileIO.isObjectStore();
263+
restCatalogServer.removeDataToken(identifier);
264+
restCatalogServer.setFileIO(identifier, tableTestWrite.fileIO());
265+
assertThrows(IOException.class, () -> batchWrite(tableTestWrite, data));
266+
}
228267

268+
private void batchWrite(FileStoreTable tableTestWrite, List<Integer> data) throws Exception {
269+
BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder();
270+
BatchTableWrite write = writeBuilder.newWrite();
271+
data.forEach(
272+
i -> {
273+
GenericRow record1 = GenericRow.of(i);
274+
try {
275+
write.write(record1);
276+
} catch (Exception e) {
277+
throw new RuntimeException(e);
278+
}
279+
});
280+
List<CommitMessage> messages = write.prepareCommit();
281+
BatchTableCommit commit = writeBuilder.newCommit();
282+
commit.commit(messages);
283+
write.close();
284+
commit.close();
285+
}
286+
287+
private List<String> batchRead(FileStoreTable tableTestWrite) throws IOException {
288+
ReadBuilder readBuilder = tableTestWrite.newReadBuilder();
289+
List<Split> splits = readBuilder.newScan().plan().splits();
290+
TableRead read = readBuilder.newRead();
291+
RecordReader<InternalRow> reader = read.createReader(splits);
292+
List<String> result = new ArrayList<>();
293+
reader.forEachRemaining(
294+
row -> {
295+
String rowStr =
296+
String.format("%s[%d]", row.getRowKind().shortString(), row.getInt(0));
297+
result.add(rowStr);
298+
});
299+
return result;
300+
}
301+
302+
@Test
303+
public void testBatchRecordsWrite() throws Exception {
229304
Identifier tableIdentifier = Identifier.create("my_db", "my_table");
230305
createTable(tableIdentifier, Maps.newHashMap(), Lists.newArrayList("col1"));
231306
FileStoreTable tableTestWrite = (FileStoreTable) catalog.getTable(tableIdentifier);
232-
restCatalogServer.tableFileIOStore.put(
233-
tableIdentifier.getFullName(), tableTestWrite.fileIO());
307+
restCatalogServer.setFileIO(tableIdentifier, tableTestWrite.fileIO());
234308
// write
235309
BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder();
236310
BatchTableWrite write = writeBuilder.newWrite();

paimon-core/src/test/java/org/apache/paimon/rest/RESTTestFileIO.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,16 @@
2929

3030
import java.io.IOException;
3131

32+
import static org.apache.paimon.rest.RESTCatalogOptions.DATA_TOKEN_ENABLED;
33+
3234
/**
3335
* A {@link org.apache.paimon.fs.FileIO} implementation for testing.
3436
*
3537
* <p>It is used to test the RESTFileIO.
3638
*/
3739
public class RESTTestFileIO extends LocalFileIO {
40+
public static final String TOKEN_UN_EXIST_MSG = "token is null";
41+
public static final String TOKEN_EXPIRED_MSG = "token is expired";
3842
private Options options;
3943

4044
@Override
@@ -50,9 +54,14 @@ public void configure(CatalogContext context) {
5054

5155
@Override
5256
public SeekableInputStream newInputStream(Path path) throws IOException {
53-
RESTToken token = getToken(path);
54-
if (token.expireAtMillis() < System.currentTimeMillis()) {
55-
throw new RuntimeException("token expired");
57+
boolean isDataTokenEnabled = options.getOptional(DATA_TOKEN_ENABLED).orElse(false);
58+
if (isDataTokenEnabled) {
59+
RESTToken token = getToken(path);
60+
if (token == null) {
61+
throw new IOException(TOKEN_UN_EXIST_MSG);
62+
} else if (token.expireAtMillis() < System.currentTimeMillis()) {
63+
throw new IOException(TOKEN_EXPIRED_MSG);
64+
}
5665
}
5766
return super.newInputStream(path);
5867
}

0 commit comments

Comments
 (0)