Skip to content

Commit 970489e

Browse files
IGNITE-26755 Allow to create table for already existing cache with defined schema - Fixes #12453.
Signed-off-by: Aleksey Plekhanov <[email protected]>
1 parent 6dbf744 commit 970489e

File tree

6 files changed

+124
-23
lines changed

6 files changed

+124
-23
lines changed

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractDdlIntegrationTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.ignite.internal.processors.query.calcite.integration;
1818

19+
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
1920
import org.apache.ignite.cluster.ClusterState;
2021
import org.apache.ignite.configuration.DataRegionConfiguration;
2122
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -57,6 +58,7 @@ public class AbstractDdlIntegrationTest extends AbstractBasicIntegrationTest {
5758
return super.getConfiguration(igniteInstanceName)
5859
.setSqlConfiguration(
5960
new SqlConfiguration().setSqlSchemas("MY_SCHEMA")
61+
.setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration())
6062
)
6163
.setDataStorageConfiguration(
6264
new DataStorageConfiguration()

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/TableDdlIntegrationTest.java

Lines changed: 76 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.sql.Date;
2121
import java.sql.Time;
2222
import java.sql.Timestamp;
23+
import java.util.Collections;
2324
import java.util.HashSet;
2425
import java.util.LinkedHashMap;
2526
import java.util.LinkedHashSet;
@@ -31,9 +32,9 @@
3132
import org.apache.ignite.IgniteCache;
3233
import org.apache.ignite.cache.CacheAtomicityMode;
3334
import org.apache.ignite.cache.CacheMode;
34-
import org.apache.ignite.cache.CachePeekMode;
3535
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
3636
import org.apache.ignite.cache.QueryEntity;
37+
import org.apache.ignite.cache.query.SqlFieldsQuery;
3738
import org.apache.ignite.configuration.CacheConfiguration;
3839
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
3940
import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -392,15 +393,84 @@ public void createTableCustomSchema() {
392393
*/
393394
@Test
394395
public void createTableOnExistingCache() {
395-
IgniteCache<Object, Object> cache = client.getOrCreateCache("my_cache");
396+
// Cache without SQL configuration.
397+
IgniteCache<Object, Object> cache = client.getOrCreateCache("my_cache0");
396398

397-
sql("create table my_schema.my_table (f1 int, f2 varchar) with cache_name=\"my_cache\"");
399+
// DDL with explicit schema.
400+
sql("create table my_schema.my_table (f1 int, f2 varchar) with cache_name=\"my_cache0\"");
398401

399-
sql("insert into my_schema.my_table(f1, f2) values (1, '1'),(2, '2')");
402+
insertAndCheckSize(cache, "my_schema");
403+
404+
// Cache without SQL configuration.
405+
cache = client.getOrCreateCache("my_cache1");
406+
407+
// DDL with implicit PUBLIC schema.
408+
sql("create table my_table (f1 int, f2 varchar) with cache_name=\"my_cache1\"");
409+
410+
insertAndCheckSize(cache, "public");
411+
412+
// Cache with defined schema.
413+
cache = client.getOrCreateCache(new CacheConfiguration<>("my_cache2").setSqlSchema("my_schema2"));
414+
415+
// DDL with explicit correct schema.
416+
sql("create table my_schema2.my_table (f1 int, f2 varchar) with cache_name=\"my_cache2\"");
417+
418+
insertAndCheckSize(cache, "my_schema2");
419+
420+
// Cache with defined schema.
421+
cache = client.getOrCreateCache(new CacheConfiguration<>("my_cache3").setSqlSchema("my_schema3"));
422+
423+
// DDL with explicit wrong schema.
424+
assertThrows("create table my_schema.my_table2 (f1 int, f2 varchar) with cache_name=\"my_cache3\"",
425+
IgniteSQLException.class, "Invalid schema: MY_SCHEMA");
426+
427+
// DDL with explicit wrong schema.
428+
assertThrows("create table public.my_table2 (f1 int, f2 varchar) with cache_name=\"my_cache3\"",
429+
IgniteSQLException.class, "Invalid schema: PUBLIC");
430+
431+
// DDL with implicit wrong schema.
432+
assertThrows("create table my_table2 (f1 int, f2 varchar) with cache_name=\"my_cache3\"",
433+
IgniteSQLException.class, "Invalid schema: PUBLIC");
434+
435+
// DDL with implicit cache schema.
436+
cache.query(new SqlFieldsQuery("create table my_table (f1 int, f2 varchar) with cache_name=\"my_cache3\""));
437+
438+
insertAndCheckSize(cache, "my_schema3");
439+
440+
// Cache with defined SQL functions, schema is defined by cache name.
441+
cache = client.getOrCreateCache(new CacheConfiguration<>("my_cache4").setSqlFunctionClasses(getClass()));
442+
443+
// DDL with explicit wrong schema.
444+
assertThrows("create table public.my_table2 (f1 int, f2 varchar) with cache_name=\"my_cache4\"",
445+
IgniteSQLException.class, "Invalid schema: PUBLIC");
446+
447+
// DDL with explicit correct schema.
448+
sql("create table \"my_cache4\".my_table (f1 int, f2 varchar) with cache_name=\"my_cache4\"");
449+
450+
insertAndCheckSize(cache, "\"my_cache4\"");
451+
452+
// Cache with defined query entities.
453+
client.getOrCreateCache(new CacheConfiguration<>("my_cache5")
454+
.setQueryEntities(Collections.singleton(new QueryEntity(Integer.class, Integer.class))));
455+
456+
assertThrows("create table \"my_cache5\".my_table (f1 int, f2 varchar) with cache_name=\"my_cache5\"",
457+
IgniteSQLException.class, "Cache is already indexed");
458+
459+
// Cache with indexed types.
460+
client.getOrCreateCache(new CacheConfiguration<>("my_cache6")
461+
.setIndexedTypes(Integer.class, Integer.class));
462+
463+
assertThrows("create table \"my_cache6\".my_table (f1 int, f2 varchar) with cache_name=\"my_cache6\"",
464+
IgniteSQLException.class, "Cache is already indexed");
465+
}
466+
467+
/** */
468+
private void insertAndCheckSize(IgniteCache<?, ?> cache, String schema) {
469+
sql("insert into " + schema + ".my_table(f1, f2) values (1, '1'),(2, '2')");
400470

401-
assertThat(sql("select * from my_schema.my_table"), hasSize(2));
471+
assertThat(sql("select * from " + schema + ".my_table"), hasSize(2));
402472

403-
assertEquals(2, cache.size(CachePeekMode.PRIMARY));
473+
assertEquals(2, cache.size());
404474
}
405475

406476
/**

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/IgniteQueryErrorCode.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ public final class IgniteQueryErrorCode {
9595
/** View does not exist. */
9696
public static final int VIEW_NOT_FOUND = 3018;
9797

98+
/** Schema is invalid. */
99+
public static final int INVALID_SCHEMA = 3019;
100+
98101
/* 4xxx - cache related runtime errors */
99102

100103
/** Attempt to INSERT a key that is already in cache. */

modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
250250
/** Coordinator node (initialized lazily). */
251251
private ClusterNode crd;
252252

253-
/** Registered cache names. */
254-
private final Collection<String> cacheNames = ConcurrentHashMap.newKeySet();
253+
/** Registered cache names to schema mapping. */
254+
private final Map<String, String> cacheNamesToSchema = new ConcurrentHashMap();
255255

256256
/** ID history for index create/drop discovery messages. */
257257
private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> dscoMsgIdHist =
@@ -976,7 +976,7 @@ private void startSchemaChange(SchemaOperation schemaOp) {
976976

977977
boolean cacheExists = cacheDesc != null && Objects.equals(msg.deploymentId(), cacheDesc.deploymentId());
978978

979-
boolean cacheRegistered = cacheExists && cacheNames.contains(cacheName);
979+
boolean cacheRegistered = cacheExists && cacheNamesToSchema.containsKey(cacheName);
980980

981981
// Validate schema state and decide whether we should proceed or not.
982982
SchemaAbstractOperation op = msg.operation();
@@ -1696,8 +1696,21 @@ else if (op instanceof SchemaAlterTableDropColumnOperation) {
16961696
}
16971697
}
16981698
else if (op instanceof SchemaAddQueryEntityOperation) {
1699-
if (cacheNames.contains(op.cacheName()))
1700-
err = new SchemaOperationException(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED, op.cacheName());
1699+
String cacheSchema = cacheNamesToSchema.get(cacheName);
1700+
1701+
if (cacheSchema != null) {
1702+
if (!Objects.equals(cacheSchema, op.schemaName()))
1703+
err = new SchemaOperationException(SchemaOperationException.CODE_INVALID_SCHEMA, op.schemaName());
1704+
else {
1705+
for (QueryTypeIdKey t : types.keySet()) {
1706+
if (Objects.equals(t.cacheName(), cacheName)) {
1707+
err = new SchemaOperationException(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED, cacheName);
1708+
1709+
break;
1710+
}
1711+
}
1712+
}
1713+
}
17011714
}
17021715
else
17031716
err = new SchemaOperationException("Unsupported operation: " + op);
@@ -1745,8 +1758,12 @@ private T2<Boolean, SchemaOperationException> prepareChangeOnNotStartedCache(
17451758
SchemaOperationException err = null;
17461759

17471760
if (op instanceof SchemaAddQueryEntityOperation) {
1748-
if (cacheSupportSql(desc.cacheConfiguration()))
1761+
CacheConfiguration<?, ?> ccfg = desc.cacheConfiguration();
1762+
1763+
if (!F.isEmpty(ccfg.getQueryEntities()))
17491764
err = new SchemaOperationException(SchemaOperationException.CODE_CACHE_ALREADY_INDEXED, desc.cacheName());
1765+
else if (!F.isEmpty(ccfg.getSqlSchema()) && !Objects.equals(ccfg.getSqlSchema(), op.schemaName()))
1766+
err = new SchemaOperationException(SchemaOperationException.CODE_INVALID_SCHEMA, op.schemaName());
17501767

17511768
return new T2<>(nop, err);
17521769
}
@@ -2173,15 +2190,13 @@ else if (op instanceof SchemaAlterTableDropColumnOperation) {
21732190
else if (op instanceof SchemaAddQueryEntityOperation) {
21742191
SchemaAddQueryEntityOperation op0 = (SchemaAddQueryEntityOperation)op;
21752192

2176-
if (!cacheNames.contains(op0.cacheName())) {
2177-
cacheInfo.onSchemaAddQueryEntity(op0);
2193+
cacheInfo.onSchemaAddQueryEntity(op0);
21782194

2179-
T3<Collection<QueryTypeCandidate>, Map<String, QueryTypeDescriptorImpl>, Map<String, QueryTypeDescriptorImpl>>
2180-
candRes = createQueryCandidates(op0.cacheName(), op0.schemaName(), cacheInfo, op0.entities(),
2181-
op0.isSqlEscape());
2195+
T3<Collection<QueryTypeCandidate>, Map<String, QueryTypeDescriptorImpl>, Map<String, QueryTypeDescriptorImpl>>
2196+
candRes = createQueryCandidates(op0.cacheName(), op0.schemaName(), cacheInfo, op0.entities(),
2197+
op0.isSqlEscape());
21822198

2183-
registerCache0(op0.cacheName(), op.schemaName(), cacheInfo, candRes.get1(), false);
2184-
}
2199+
registerCache0(op0.cacheName(), op.schemaName(), cacheInfo, candRes.get1(), false);
21852200

21862201
if (idxRebuildFutStorage.prepareRebuildIndexes(singleton(cacheInfo.cacheId()), null).isEmpty())
21872202
rebuildIndexesFromHash0(cacheInfo.cacheContext(), false, cancelTok);
@@ -2350,7 +2365,7 @@ private void registerCache0(
23502365
boolean isSql
23512366
) throws IgniteCheckedException {
23522367
synchronized (stateMux) {
2353-
if (moduleEnabled()) {
2368+
if (moduleEnabled() && !cacheNamesToSchema.containsKey(cacheName)) {
23542369
ctx.indexProcessor().idxRowCacheRegistry().onCacheRegistered(cacheInfo);
23552370

23562371
schemaMgr.onCacheCreated(cacheName, schemaName, cacheInfo.config().getSqlFunctionClasses());
@@ -2391,7 +2406,7 @@ private void registerCache0(
23912406
schemaMgr.onCacheTypeCreated(cacheInfo, desc, isSql);
23922407
}
23932408

2394-
cacheNames.add(CU.mask(cacheName));
2409+
cacheNamesToSchema.putIfAbsent(CU.mask(cacheName), schemaName);
23952410
}
23962411
catch (IgniteCheckedException | RuntimeException e) {
23972412
onCacheStop0(cacheInfo, true, true);
@@ -2410,7 +2425,7 @@ private void registerCache0(
24102425
* @param clearIdx Clear flag.
24112426
*/
24122427
public void onCacheStop0(GridCacheContextInfo cacheInfo, boolean destroy, boolean clearIdx) {
2413-
if (!moduleEnabled() || !cacheNames.contains(cacheInfo.name()))
2428+
if (!moduleEnabled() || !cacheNamesToSchema.containsKey(cacheInfo.name()))
24142429
return;
24152430

24162431
String cacheName = cacheInfo.name();
@@ -2462,7 +2477,7 @@ public void onCacheStop0(GridCacheContextInfo cacheInfo, boolean destroy, boolea
24622477
U.error(log, "Failed to clear schema manager on cache unregister (will ignore): " + cacheName, e);
24632478
}
24642479

2465-
cacheNames.remove(cacheName);
2480+
cacheNamesToSchema.remove(cacheName);
24662481

24672482
Iterator<Long> missedCacheTypeIter = missedCacheTypes.iterator();
24682483

modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1700,6 +1700,11 @@ public static IgniteSQLException convert(SchemaOperationException e) {
17001700

17011701
break;
17021702

1703+
case SchemaOperationException.CODE_INVALID_SCHEMA:
1704+
sqlCode = IgniteQueryErrorCode.INVALID_SCHEMA;
1705+
1706+
break;
1707+
17031708
default:
17041709
sqlCode = IgniteQueryErrorCode.UNKNOWN;
17051710
}

modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ public class SchemaOperationException extends IgniteCheckedException {
6363
/** Code: schema not found. */
6464
public static final int CODE_SCHEMA_NOT_FOUND = 11;
6565

66+
/** Code: schema not found. */
67+
public static final int CODE_INVALID_SCHEMA = 12;
68+
6669
/** Error code. */
6770
private final int code;
6871

@@ -165,6 +168,9 @@ private static String message(int code, String objName) {
165168
case CODE_SCHEMA_NOT_FOUND:
166169
return "Schema doesn't exist: " + objName;
167170

171+
case CODE_INVALID_SCHEMA:
172+
return "Invalid schema: " + objName;
173+
168174
default:
169175
assert false;
170176

0 commit comments

Comments
 (0)