Skip to content

Commit 9a3759f

Browse files
authored
[#4812] fix(core): Make ensure the schema exists before creating tables and topics (#4826)
### What changes were proposed in this pull request? If the schema is not created by Gravitino, the Gravitino will lack the metadata in the backend storage. If we create a table in this schema, the storage won't contain the metadata. So it will fail to set owner. Because the storage won't store the table. Because the storage won't contain schema id. This won't bring too much performance cost. Because loadSchema will use read lock after first loading. If we have cache, we could be more quick. ### Why are the changes needed? Fix: #4812 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added a ut.
1 parent 9a3243a commit 9a3759f

File tree

8 files changed

+248
-151
lines changed

8 files changed

+248
-151
lines changed

core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java

Lines changed: 97 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -143,74 +143,25 @@ public Table createTable(
143143
SortOrder[] sortOrders,
144144
Index[] indexes)
145145
throws NoSuchSchemaException, TableAlreadyExistsException {
146-
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
147-
doWithCatalog(
148-
catalogIdent,
149-
c ->
150-
c.doWithPropertiesMeta(
151-
p -> {
152-
validatePropertyForCreate(p.tablePropertiesMetadata(), properties);
153-
return null;
154-
}),
155-
IllegalArgumentException.class);
156-
long uid = idGenerator.nextId();
157-
// Add StringIdentifier to the properties, the specific catalog will handle this
158-
// StringIdentifier to make sure only when the operation is successful, the related
159-
// TableEntity will be visible.
160-
StringIdentifier stringId = StringIdentifier.fromId(uid);
161-
Map<String, String> updatedProperties =
162-
StringIdentifier.newPropertiesWithId(stringId, properties);
163-
164-
doWithCatalog(
165-
catalogIdent,
166-
c ->
167-
c.doWithTableOps(
168-
t ->
169-
t.createTable(
170-
ident,
171-
columns,
172-
comment,
173-
updatedProperties,
174-
partitions == null ? EMPTY_TRANSFORM : partitions,
175-
distribution == null ? Distributions.NONE : distribution,
176-
sortOrders == null ? new SortOrder[0] : sortOrders,
177-
indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
178-
NoSuchSchemaException.class,
179-
TableAlreadyExistsException.class);
180-
181-
// Retrieve the Table again to obtain some values generated by underlying catalog
182-
Table table =
183-
doWithCatalog(
184-
catalogIdent,
185-
c -> c.doWithTableOps(t -> t.loadTable(ident)),
186-
NoSuchTableException.class);
187-
188-
TableEntity tableEntity =
189-
TableEntity.builder()
190-
.withId(uid)
191-
.withName(ident.name())
192-
.withNamespace(ident.namespace())
193-
.withAuditInfo(
194-
AuditInfo.builder()
195-
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
196-
.withCreateTime(Instant.now())
197-
.build())
198-
.build();
199-
200-
try {
201-
store.put(tableEntity, true /* overwrite */);
202-
} catch (Exception e) {
203-
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
204-
return EntityCombinedTable.of(table)
205-
.withHiddenPropertiesSet(
206-
getHiddenPropertyNames(
207-
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
208-
}
209146

210-
return EntityCombinedTable.of(table, tableEntity)
211-
.withHiddenPropertiesSet(
212-
getHiddenPropertyNames(
213-
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
147+
// Load the schema to make sure the schema exists.
148+
SchemaDispatcher schemaDispatcher = GravitinoEnv.getInstance().schemaDispatcher();
149+
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
150+
schemaDispatcher.loadSchema(schemaIdent);
151+
152+
return TreeLockUtils.doWithTreeLock(
153+
NameIdentifier.of(ident.namespace().levels()),
154+
LockType.WRITE,
155+
() ->
156+
internalCreateTable(
157+
ident,
158+
columns,
159+
comment,
160+
properties,
161+
partitions,
162+
distribution,
163+
sortOrders,
164+
indexes));
214165
}
215166

216167
/**
@@ -476,4 +427,83 @@ private EntityCombinedTable internalLoadTable(NameIdentifier ident) {
476427
table.properties()))
477428
.withImported(tableEntity != null);
478429
}
430+
431+
private Table internalCreateTable(
432+
NameIdentifier ident,
433+
Column[] columns,
434+
String comment,
435+
Map<String, String> properties,
436+
Transform[] partitions,
437+
Distribution distribution,
438+
SortOrder[] sortOrders,
439+
Index[] indexes) {
440+
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
441+
doWithCatalog(
442+
catalogIdent,
443+
c ->
444+
c.doWithPropertiesMeta(
445+
p -> {
446+
validatePropertyForCreate(p.tablePropertiesMetadata(), properties);
447+
return null;
448+
}),
449+
IllegalArgumentException.class);
450+
long uid = idGenerator.nextId();
451+
// Add StringIdentifier to the properties, the specific catalog will handle this
452+
// StringIdentifier to make sure only when the operation is successful, the related
453+
// TableEntity will be visible.
454+
StringIdentifier stringId = StringIdentifier.fromId(uid);
455+
Map<String, String> updatedProperties =
456+
StringIdentifier.newPropertiesWithId(stringId, properties);
457+
458+
doWithCatalog(
459+
catalogIdent,
460+
c ->
461+
c.doWithTableOps(
462+
t ->
463+
t.createTable(
464+
ident,
465+
columns,
466+
comment,
467+
updatedProperties,
468+
partitions == null ? EMPTY_TRANSFORM : partitions,
469+
distribution == null ? Distributions.NONE : distribution,
470+
sortOrders == null ? new SortOrder[0] : sortOrders,
471+
indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
472+
NoSuchSchemaException.class,
473+
TableAlreadyExistsException.class);
474+
475+
TableEntity tableEntity =
476+
TableEntity.builder()
477+
.withId(uid)
478+
.withName(ident.name())
479+
.withNamespace(ident.namespace())
480+
.withAuditInfo(
481+
AuditInfo.builder()
482+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
483+
.withCreateTime(Instant.now())
484+
.build())
485+
.build();
486+
487+
// Retrieve the Table again to obtain some values generated by underlying catalog
488+
Table table =
489+
doWithCatalog(
490+
catalogIdent,
491+
c -> c.doWithTableOps(t -> t.loadTable(ident)),
492+
NoSuchTableException.class);
493+
494+
try {
495+
store.put(tableEntity, true /* overwrite */);
496+
} catch (Exception e) {
497+
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
498+
return EntityCombinedTable.of(table)
499+
.withHiddenPropertiesSet(
500+
getHiddenPropertyNames(
501+
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
502+
}
503+
504+
return EntityCombinedTable.of(table, tableEntity)
505+
.withHiddenPropertiesSet(
506+
getHiddenPropertyNames(
507+
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, table.properties()));
508+
}
479509
}

core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java

Lines changed: 67 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -128,61 +128,16 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
128128
public Topic createTopic(
129129
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties)
130130
throws NoSuchSchemaException, TopicAlreadyExistsException {
131-
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
132-
doWithCatalog(
133-
catalogIdent,
134-
c ->
135-
c.doWithPropertiesMeta(
136-
p -> {
137-
validatePropertyForCreate(p.topicPropertiesMetadata(), properties);
138-
return null;
139-
}),
140-
IllegalArgumentException.class);
141-
Long uid = idGenerator.nextId();
142-
StringIdentifier stringId = StringIdentifier.fromId(uid);
143-
Map<String, String> updatedProperties =
144-
StringIdentifier.newPropertiesWithId(stringId, properties);
145131

146-
doWithCatalog(
147-
catalogIdent,
148-
c -> c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, updatedProperties)),
149-
NoSuchSchemaException.class,
150-
TopicAlreadyExistsException.class);
151-
152-
// Retrieve the Topic again to obtain some values generated by underlying catalog
153-
Topic topic =
154-
doWithCatalog(
155-
catalogIdent,
156-
c -> c.doWithTopicOps(t -> t.loadTopic(ident)),
157-
NoSuchTopicException.class);
132+
// Load the schema to make sure the schema exists.
133+
SchemaDispatcher schemaDispatcher = GravitinoEnv.getInstance().schemaDispatcher();
134+
NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
135+
schemaDispatcher.loadSchema(schemaIdent);
158136

159-
TopicEntity topicEntity =
160-
TopicEntity.builder()
161-
.withId(fromProperties(topic.properties()).id())
162-
.withName(ident.name())
163-
.withComment(comment)
164-
.withNamespace(ident.namespace())
165-
.withAuditInfo(
166-
AuditInfo.builder()
167-
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
168-
.withCreateTime(Instant.now())
169-
.build())
170-
.build();
171-
172-
try {
173-
store.put(topicEntity, true /* overwrite */);
174-
} catch (Exception e) {
175-
LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
176-
return EntityCombinedTopic.of(topic)
177-
.withHiddenPropertiesSet(
178-
getHiddenPropertyNames(
179-
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
180-
}
181-
182-
return EntityCombinedTopic.of(topic, topicEntity)
183-
.withHiddenPropertiesSet(
184-
getHiddenPropertyNames(
185-
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
137+
return TreeLockUtils.doWithTreeLock(
138+
NameIdentifier.of(ident.namespace().levels()),
139+
LockType.WRITE,
140+
() -> internalCreateTopic(ident, comment, dataLayout, properties));
186141
}
187142

188143
/**
@@ -374,4 +329,63 @@ private EntityCombinedTopic internalLoadTopic(NameIdentifier ident) {
374329
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()))
375330
.withImported(topicEntity != null);
376331
}
332+
333+
private Topic internalCreateTopic(
334+
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties) {
335+
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
336+
doWithCatalog(
337+
catalogIdent,
338+
c ->
339+
c.doWithPropertiesMeta(
340+
p -> {
341+
validatePropertyForCreate(p.topicPropertiesMetadata(), properties);
342+
return null;
343+
}),
344+
IllegalArgumentException.class);
345+
Long uid = idGenerator.nextId();
346+
StringIdentifier stringId = StringIdentifier.fromId(uid);
347+
Map<String, String> updatedProperties =
348+
StringIdentifier.newPropertiesWithId(stringId, properties);
349+
350+
doWithCatalog(
351+
catalogIdent,
352+
c -> c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, updatedProperties)),
353+
NoSuchSchemaException.class,
354+
TopicAlreadyExistsException.class);
355+
356+
// Retrieve the Topic again to obtain some values generated by underlying catalog
357+
Topic topic =
358+
doWithCatalog(
359+
catalogIdent,
360+
c -> c.doWithTopicOps(t -> t.loadTopic(ident)),
361+
NoSuchTopicException.class);
362+
363+
TopicEntity topicEntity =
364+
TopicEntity.builder()
365+
.withId(fromProperties(topic.properties()).id())
366+
.withName(ident.name())
367+
.withComment(comment)
368+
.withNamespace(ident.namespace())
369+
.withAuditInfo(
370+
AuditInfo.builder()
371+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
372+
.withCreateTime(Instant.now())
373+
.build())
374+
.build();
375+
376+
try {
377+
store.put(topicEntity, true /* overwrite */);
378+
} catch (Exception e) {
379+
LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
380+
return EntityCombinedTopic.of(topic)
381+
.withHiddenPropertiesSet(
382+
getHiddenPropertyNames(
383+
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
384+
}
385+
386+
return EntityCombinedTopic.of(topic, topicEntity)
387+
.withHiddenPropertiesSet(
388+
getHiddenPropertyNames(
389+
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties()));
390+
}
377391
}

core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class TestPartitionNormalizeDispatcher extends TestOperationDispatcher {
3939
NameIdentifierUtil.ofTable(metalake, catalog, SCHEMA, "TEST_PARTITION_NORMALIZE_TABLE");
4040

4141
@BeforeAll
42-
public static void initialize() {
42+
public static void initialize() throws IllegalAccessException {
4343
TestPartitionOperationDispatcher.prepareTable();
4444
partitionNormalizeDispatcher =
4545
new PartitionNormalizeDispatcher(

core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,19 @@
1818
*/
1919
package org.apache.gravitino.catalog;
2020

21+
import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
22+
import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
23+
import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
24+
import static org.mockito.Mockito.doReturn;
25+
import static org.mockito.Mockito.mock;
26+
2127
import com.google.common.collect.Maps;
2228
import java.util.Arrays;
29+
import org.apache.commons.lang3.reflect.FieldUtils;
30+
import org.apache.gravitino.Config;
31+
import org.apache.gravitino.GravitinoEnv;
2332
import org.apache.gravitino.NameIdentifier;
33+
import org.apache.gravitino.lock.LockManager;
2434
import org.apache.gravitino.rel.Column;
2535
import org.apache.gravitino.rel.expressions.literals.Literal;
2636
import org.apache.gravitino.rel.expressions.literals.Literals;
@@ -50,7 +60,7 @@ public class TestPartitionOperationDispatcher extends TestOperationDispatcher {
5060
Maps.newHashMap());
5161

5262
@BeforeAll
53-
public static void initialize() {
63+
public static void initialize() throws IllegalAccessException {
5464
prepareTable();
5565
partitionOperationDispatcher.addPartition(TABLE_IDENT, PARTITION);
5666

@@ -66,14 +76,22 @@ public static void initialize() {
6676
"Custom class loader is not used");
6777
}
6878

69-
protected static void prepareTable() {
79+
protected static void prepareTable() throws IllegalAccessException {
7080
schemaOperationDispatcher =
7181
new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator);
7282
tableOperationDispatcher =
7383
new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
7484
partitionOperationDispatcher =
7585
new PartitionOperationDispatcher(catalogManager, entityStore, idGenerator);
7686

87+
Config config = mock(Config.class);
88+
doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
89+
doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
90+
doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
91+
FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new LockManager(config), true);
92+
FieldUtils.writeField(
93+
GravitinoEnv.getInstance(), "schemaDispatcher", schemaOperationDispatcher, true);
94+
7795
NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema(metalake, catalog, SCHEMA);
7896
schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
7997
Column[] columns =

0 commit comments

Comments
 (0)