Skip to content

Commit 998ad3e

Browse files
authored
[core] RESTCatalog: add table test and fix branch (#5313)
1 parent fc125f7 commit 998ad3e

File tree

10 files changed

+385
-137
lines changed

10 files changed

+385
-137
lines changed

paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
import java.util.List;
2626

27+
import static org.apache.paimon.utils.Preconditions.checkArgument;
28+
2729
/** Manager for {@code Branch}. */
2830
public interface BranchManager {
2931

@@ -58,4 +60,31 @@ static String normalizeBranch(String branch) {
5860
static boolean isMainBranch(String branch) {
5961
return branch.equals(DEFAULT_MAIN_BRANCH);
6062
}
63+
64+
static void validateBranch(String branchName) {
65+
checkArgument(
66+
!BranchManager.isMainBranch(branchName),
67+
String.format(
68+
"Branch name '%s' is the default branch and cannot be used.",
69+
DEFAULT_MAIN_BRANCH));
70+
checkArgument(
71+
!StringUtils.isNullOrWhitespaceOnly(branchName),
72+
"Branch name '%s' is blank.",
73+
branchName);
74+
checkArgument(
75+
!branchName.chars().allMatch(Character::isDigit),
76+
"Branch name cannot be pure numeric string but is '%s'.",
77+
branchName);
78+
}
79+
80+
static void fastForwardValidate(String branchName) {
81+
checkArgument(
82+
!branchName.equals(DEFAULT_MAIN_BRANCH),
83+
"Branch name '%s' do not use in fast-forward.",
84+
branchName);
85+
checkArgument(
86+
!StringUtils.isNullOrWhitespaceOnly(branchName),
87+
"Branch name '%s' is blank.",
88+
branchName);
89+
}
6190
}

paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,19 @@ public CatalogBranchManager(CatalogLoader catalogLoader, Identifier identifier)
4040
private void executePost(ThrowingConsumer<Catalog, Exception> func) {
4141
executeGet(
4242
catalog -> {
43-
func.accept(catalog);
44-
return null;
43+
try {
44+
func.accept(catalog);
45+
return null;
46+
} catch (Catalog.BranchNotExistException e) {
47+
throw new IllegalArgumentException(
48+
String.format("Branch name '%s' doesn't exist.", e.branch()));
49+
} catch (Catalog.TagNotExistException e) {
50+
throw new IllegalArgumentException(
51+
String.format("Tag '%s' doesn't exist.", e.tag()));
52+
} catch (Catalog.BranchAlreadyExistException e) {
53+
throw new IllegalArgumentException(
54+
String.format("Branch name '%s' already exists..", e.branch()));
55+
}
4556
});
4657
}
4758

@@ -62,7 +73,11 @@ public void createBranch(String branchName) {
6273

6374
@Override
6475
public void createBranch(String branchName, @Nullable String tagName) {
65-
executePost(catalog -> catalog.createBranch(identifier, branchName, tagName));
76+
executePost(
77+
catalog -> {
78+
BranchManager.validateBranch(branchName);
79+
catalog.createBranch(identifier, branchName, tagName);
80+
});
6681
}
6782

6883
@Override
@@ -72,7 +87,11 @@ public void dropBranch(String branchName) {
7287

7388
@Override
7489
public void fastForward(String branchName) {
75-
executePost(catalog -> catalog.fastForward(identifier, branchName));
90+
executePost(
91+
catalog -> {
92+
BranchManager.fastForwardValidate(branchName);
93+
catalog.fastForward(identifier, branchName);
94+
});
7695
}
7796

7897
@Override

paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ public Path branchPath(String branchName) {
7474
@Override
7575
public void createBranch(String branchName) {
7676
validateBranch(branchName);
77-
7877
try {
7978
TableSchema latestSchema = schemaManager.latest().get();
8079
copySchemasToBranch(branchName, latestSchema.id());
@@ -139,14 +138,7 @@ private boolean fileExists(Path path) {
139138

140139
@Override
141140
public void fastForward(String branchName) {
142-
checkArgument(
143-
!branchName.equals(DEFAULT_MAIN_BRANCH),
144-
"Branch name '%s' do not use in fast-forward.",
145-
branchName);
146-
checkArgument(
147-
!StringUtils.isNullOrWhitespaceOnly(branchName),
148-
"Branch name '%s' is blank.",
149-
branchName);
141+
BranchManager.fastForwardValidate(branchName);
150142
checkArgument(branchExists(branchName), "Branch name '%s' doesn't exist.", branchName);
151143

152144
Long earliestSnapshotId = snapshotManager.copyWithBranch(branchName).earliestSnapshotId();
@@ -207,6 +199,11 @@ public boolean branchExists(String branchName) {
207199
return fileExists(branchPath);
208200
}
209201

202+
public void validateBranch(String branchName) {
203+
BranchManager.validateBranch(branchName);
204+
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
205+
}
206+
210207
@Override
211208
public List<String> branches() {
212209
try {
@@ -218,23 +215,6 @@ public List<String> branches() {
218215
}
219216
}
220217

221-
private void validateBranch(String branchName) {
222-
checkArgument(
223-
!BranchManager.isMainBranch(branchName),
224-
String.format(
225-
"Branch name '%s' is the default branch and cannot be used.",
226-
DEFAULT_MAIN_BRANCH));
227-
checkArgument(
228-
!StringUtils.isNullOrWhitespaceOnly(branchName),
229-
"Branch name '%s' is blank.",
230-
branchName);
231-
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
232-
checkArgument(
233-
!branchName.chars().allMatch(Character::isDigit),
234-
"Branch name cannot be pure numeric string but is '%s'.",
235-
branchName);
236-
}
237-
238218
private void copySchemasToBranch(String branchName, long schemaId) throws IOException {
239219
for (int i = 0; i <= schemaId; i++) {
240220
if (schemaManager.schemaExists(i)) {

paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java renamed to paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogMockTest.java

Lines changed: 58 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.junit.jupiter.api.Test;
4343

4444
import java.io.File;
45+
import java.io.IOException;
4546
import java.util.HashMap;
4647
import java.util.Map;
4748
import java.util.UUID;
@@ -50,40 +51,29 @@
5051
import static org.junit.jupiter.api.Assertions.assertEquals;
5152

5253
/** Test REST Catalog on Mocked REST server. */
53-
class MockRESTCatalogTest extends RESTCatalogTestBase {
54+
class RESTCatalogMockTest extends RESTCatalogTestBase {
5455

5556
private RESTCatalogServer restCatalogServer;
5657
private final String initToken = "init_token";
5758
private final String serverDefineHeaderName = "test-header";
5859
private final String serverDefineHeaderValue = "test-value";
5960
private String dataPath;
6061
private AuthProvider authProvider;
62+
private Map<String, String> authMap;
6163

6264
@BeforeEach
6365
@Override
6466
public void setUp() throws Exception {
6567
super.setUp();
6668
dataPath = warehouse;
67-
String restWarehouse = UUID.randomUUID().toString();
68-
this.config =
69-
new ConfigResponse(
70-
ImmutableMap.of(
71-
RESTCatalogInternalOptions.PREFIX.key(),
72-
"paimon",
73-
"header." + serverDefineHeaderName,
74-
serverDefineHeaderValue,
75-
CatalogOptions.WAREHOUSE.key(),
76-
restWarehouse),
77-
ImmutableMap.of());
7869
this.authProvider = new BearTokenAuthProvider(initToken);
79-
restCatalogServer =
80-
new RESTCatalogServer(dataPath, authProvider, this.config, restWarehouse);
81-
restCatalogServer.start();
82-
options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
83-
options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
84-
options.set(RESTCatalogOptions.TOKEN, initToken);
85-
options.set(RESTCatalogOptions.TOKEN_PROVIDER, AuthProviderEnum.BEAR.identifier());
86-
this.restCatalog = new RESTCatalog(CatalogContext.create(options));
70+
this.authMap =
71+
ImmutableMap.of(
72+
RESTCatalogOptions.TOKEN.key(),
73+
initToken,
74+
RESTCatalogOptions.TOKEN_PROVIDER.key(),
75+
AuthProviderEnum.BEAR.identifier());
76+
this.restCatalog = initCatalog(false);
8777
this.catalog = restCatalog;
8878
}
8979

@@ -105,30 +95,24 @@ void testAuthFail() {
10595

10696
@Test
10797
void testDlfStSTokenAuth() throws Exception {
108-
String restWarehouse = UUID.randomUUID().toString();
10998
String akId = "akId" + UUID.randomUUID();
11099
String akSecret = "akSecret" + UUID.randomUUID();
111100
String securityToken = "securityToken" + UUID.randomUUID();
112101
String region = "cn-hangzhou";
113-
DLFAuthProvider authProvider =
114-
DLFAuthProvider.buildAKToken(akId, akSecret, securityToken, region);
115-
restCatalogServer =
116-
new RESTCatalogServer(dataPath, authProvider, this.config, restWarehouse);
117-
restCatalogServer.start();
118-
options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
119-
options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
120-
options.set(RESTCatalogOptions.TOKEN_PROVIDER, AuthProviderEnum.DLF.identifier());
121-
options.set(RESTCatalogOptions.DLF_REGION, region);
122-
options.set(RESTCatalogOptions.DLF_ACCESS_KEY_ID, akId);
123-
options.set(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET, akSecret);
124-
options.set(RESTCatalogOptions.DLF_SECURITY_TOKEN, securityToken);
125-
RESTCatalog restCatalog = new RESTCatalog(CatalogContext.create(options));
102+
this.authProvider = DLFAuthProvider.buildAKToken(akId, akSecret, securityToken, region);
103+
this.authMap =
104+
ImmutableMap.of(
105+
RESTCatalogOptions.TOKEN_PROVIDER.key(), AuthProviderEnum.DLF.identifier(),
106+
RESTCatalogOptions.DLF_REGION.key(), region,
107+
RESTCatalogOptions.DLF_ACCESS_KEY_ID.key(), akId,
108+
RESTCatalogOptions.DLF_ACCESS_KEY_SECRET.key(), akSecret,
109+
RESTCatalogOptions.DLF_SECURITY_TOKEN.key(), securityToken);
110+
RESTCatalog restCatalog = initCatalog(false);
126111
testDlfAuth(restCatalog);
127112
}
128113

129114
@Test
130115
void testDlfStSTokenPathAuth() throws Exception {
131-
String restWarehouse = UUID.randomUUID().toString();
132116
String region = "cn-hangzhou";
133117
String tokenPath = dataPath + UUID.randomUUID();
134118
generateTokenAndWriteToFile(tokenPath);
@@ -138,17 +122,13 @@ void testDlfStSTokenPathAuth() throws Exception {
138122
new Options(
139123
ImmutableMap.of(
140124
RESTCatalogOptions.DLF_TOKEN_PATH.key(), tokenPath)));
141-
DLFAuthProvider authProvider =
142-
DLFAuthProvider.buildRefreshToken(tokenLoader, 1000_000L, region);
143-
restCatalogServer =
144-
new RESTCatalogServer(dataPath, authProvider, this.config, restWarehouse);
145-
restCatalogServer.start();
146-
options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
147-
options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
148-
options.set(RESTCatalogOptions.TOKEN_PROVIDER, AuthProviderEnum.DLF.identifier());
149-
options.set(RESTCatalogOptions.DLF_REGION, region);
150-
options.set(RESTCatalogOptions.DLF_TOKEN_PATH, tokenPath);
151-
RESTCatalog restCatalog = new RESTCatalog(CatalogContext.create(options));
125+
this.authProvider = DLFAuthProvider.buildRefreshToken(tokenLoader, 1000_000L, region);
126+
this.authMap =
127+
ImmutableMap.of(
128+
RESTCatalogOptions.TOKEN_PROVIDER.key(), AuthProviderEnum.DLF.identifier(),
129+
RESTCatalogOptions.DLF_REGION.key(), region,
130+
RESTCatalogOptions.DLF_TOKEN_PATH.key(), tokenPath);
131+
RESTCatalog restCatalog = initCatalog(false);
152132
testDlfAuth(restCatalog);
153133
File file = new File(tokenPath);
154134
file.delete();
@@ -184,12 +164,8 @@ private void testDlfAuth(RESTCatalog restCatalog) throws Exception {
184164
}
185165

186166
@Override
187-
protected Catalog newRestCatalogWithDataToken() {
188-
options.set(RESTTokenFileIO.DATA_TOKEN_ENABLED, true);
189-
options.set(
190-
RESTTestFileIO.DATA_PATH_CONF_KEY,
191-
dataPath.replaceFirst("file", RESTFileIOTestLoader.SCHEME));
192-
return new RESTCatalog(CatalogContext.create(options));
167+
protected Catalog newRestCatalogWithDataToken() throws IOException {
168+
return initCatalog(true);
193169
}
194170

195171
@Override
@@ -234,4 +210,34 @@ protected void updateSnapshotOnRestServer(
234210
fileCount,
235211
lastFileCreationTime);
236212
}
213+
214+
private RESTCatalog initCatalog(boolean enableDataToken) throws IOException {
215+
String restWarehouse = UUID.randomUUID().toString();
216+
this.config =
217+
new ConfigResponse(
218+
ImmutableMap.of(
219+
RESTCatalogInternalOptions.PREFIX.key(),
220+
"paimon",
221+
"header." + serverDefineHeaderName,
222+
serverDefineHeaderValue,
223+
RESTTokenFileIO.DATA_TOKEN_ENABLED.key(),
224+
enableDataToken + "",
225+
CatalogOptions.WAREHOUSE.key(),
226+
restWarehouse),
227+
ImmutableMap.of());
228+
restCatalogServer =
229+
new RESTCatalogServer(dataPath, this.authProvider, this.config, restWarehouse);
230+
restCatalogServer.start();
231+
for (Map.Entry<String, String> entry : this.authMap.entrySet()) {
232+
options.set(entry.getKey(), entry.getValue());
233+
}
234+
options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
235+
options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
236+
String path =
237+
enableDataToken
238+
? dataPath.replaceFirst("file", RESTFileIOTestLoader.SCHEME)
239+
: dataPath;
240+
options.set(RESTTestFileIO.DATA_PATH_CONF_KEY, path);
241+
return new RESTCatalog(CatalogContext.create(options));
242+
}
237243
}

0 commit comments

Comments
 (0)