Skip to content

Commit ec78235

Browse files
committed
check data token before call fileIO function
1 parent 0280985 commit ec78235

File tree

2 files changed

+41
-30
lines changed

2 files changed

+41
-30
lines changed

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.junit.jupiter.api.Test;
5656

5757
import java.io.IOException;
58+
import java.io.UncheckedIOException;
5859
import java.util.ArrayList;
5960
import java.util.Collections;
6061
import java.util.HashMap;
@@ -238,7 +239,9 @@ public void testDataTokenExpired() throws Exception {
238239
FileStoreTable tableTestWrite = (FileStoreTable) catalog.getTable(identifier);
239240
restCatalogServer.setFileIO(identifier, tableTestWrite.fileIO());
240241
List<Integer> data = Lists.newArrayList(12);
241-
assertThrows(IOException.class, () -> batchWrite(tableTestWrite, data));
242+
Exception exception =
243+
assertThrows(UncheckedIOException.class, () -> batchWrite(tableTestWrite, data));
244+
assertEquals(RESTTestFileIO.TOKEN_EXPIRED_MSG, exception.getCause().getMessage());
242245
RESTToken dataToken =
243246
new RESTToken(
244247
ImmutableMap.of("akId", "akId", "akSecret", UUID.randomUUID().toString()),
@@ -262,21 +265,18 @@ public void testDataTokenUnExistInServer() throws Exception {
262265
restTokenFileIO.isObjectStore();
263266
restCatalogServer.removeDataToken(identifier);
264267
restCatalogServer.setFileIO(identifier, tableTestWrite.fileIO());
265-
assertThrows(IOException.class, () -> batchWrite(tableTestWrite, data));
268+
Exception exception =
269+
assertThrows(UncheckedIOException.class, () -> batchWrite(tableTestWrite, data));
270+
assertEquals(RESTTestFileIO.TOKEN_UN_EXIST_MSG, exception.getCause().getMessage());
266271
}
267272

268273
private void batchWrite(FileStoreTable tableTestWrite, List<Integer> data) throws Exception {
269274
BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder();
270275
BatchTableWrite write = writeBuilder.newWrite();
271-
data.forEach(
272-
i -> {
273-
GenericRow record1 = GenericRow.of(i);
274-
try {
275-
write.write(record1);
276-
} catch (Exception e) {
277-
throw new RuntimeException(e);
278-
}
279-
});
276+
for (Integer i : data) {
277+
GenericRow record = GenericRow.of(i);
278+
write.write(record);
279+
}
280280
List<CommitMessage> messages = write.prepareCommit();
281281
BatchTableCommit commit = writeBuilder.newCommit();
282282
commit.commit(messages);

paimon-core/src/test/java/org/apache/paimon/rest/RESTTestFileIO.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -54,60 +54,71 @@ public void configure(CatalogContext context) {
5454

5555
@Override
5656
public SeekableInputStream newInputStream(Path path) throws IOException {
57-
boolean isDataTokenEnabled = options.getOptional(DATA_TOKEN_ENABLED).orElse(false);
58-
if (isDataTokenEnabled) {
59-
RESTToken token = getToken(path);
60-
if (token == null) {
61-
throw new IOException(TOKEN_UN_EXIST_MSG);
62-
} else if (token.expireAtMillis() < System.currentTimeMillis()) {
63-
throw new IOException(TOKEN_EXPIRED_MSG);
64-
}
65-
}
57+
checkDataToken(path);
6658
return super.newInputStream(path);
6759
}
6860

69-
private RESTToken getToken(Path path) {
70-
String basePath = options.get("warehouse").replaceAll("rest-test-file-io://", "");
71-
String filePath = path.toString().split(":")[1].replaceAll(basePath, "");
72-
String[] paths = filePath.split("/");
73-
String database = paths[1].replaceAll("\\.db", "");
74-
String table = paths[2];
75-
return DataTokenStore.getDataToken(
76-
options.get("catalog-server-id"), Identifier.create(database, table).getFullName());
77-
}
78-
7961
@Override
8062
public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException {
63+
checkDataToken(path);
8164
return super.newOutputStream(path, overwrite);
8265
}
8366

8467
@Override
8568
public FileStatus getFileStatus(Path path) throws IOException {
69+
checkDataToken(path);
8670
return super.getFileStatus(path);
8771
}
8872

8973
@Override
9074
public FileStatus[] listStatus(Path path) throws IOException {
75+
checkDataToken(path);
9176
return super.listStatus(path);
9277
}
9378

9479
@Override
9580
public boolean exists(Path path) throws IOException {
81+
checkDataToken(path);
9682
return super.exists(path);
9783
}
9884

9985
@Override
10086
public boolean delete(Path path, boolean recursive) throws IOException {
87+
checkDataToken(path);
10188
return super.delete(path, recursive);
10289
}
10390

10491
@Override
10592
public boolean mkdirs(Path path) throws IOException {
93+
checkDataToken(path);
10694
return super.mkdirs(path);
10795
}
10896

10997
@Override
11098
public boolean rename(Path src, Path dst) throws IOException {
99+
checkDataToken(src);
111100
return super.rename(src, dst);
112101
}
102+
103+
private void checkDataToken(Path path) throws IOException {
104+
boolean isDataTokenEnabled = options.getOptional(DATA_TOKEN_ENABLED).orElse(false);
105+
if (isDataTokenEnabled) {
106+
RESTToken token = getToken(path);
107+
if (token == null) {
108+
throw new IOException(TOKEN_UN_EXIST_MSG);
109+
} else if (token.expireAtMillis() < System.currentTimeMillis()) {
110+
throw new IOException(TOKEN_EXPIRED_MSG);
111+
}
112+
}
113+
}
114+
115+
private RESTToken getToken(Path path) {
116+
String basePath = options.get("warehouse").replaceAll("rest-test-file-io://", "");
117+
String filePath = path.toString().split(":")[1].replaceAll(basePath, "");
118+
String[] paths = filePath.split("/");
119+
String database = paths[1].replaceAll("\\.db", "");
120+
String table = paths[2];
121+
return DataTokenStore.getDataToken(
122+
options.get("catalog-server-id"), Identifier.create(database, table).getFullName());
123+
}
113124
}

0 commit comments

Comments
 (0)