Skip to content

Commit 79849e9

Browse files
IGNITE-27938 Java thin: Add ability to configure cache partitions count
1 parent bebb4d8 commit 79849e9

File tree

9 files changed

+230
-105
lines changed

9 files changed

+230
-105
lines changed

modules/core/src/main/java/org/apache/ignite/client/ClientCacheConfiguration.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ public final class ClientCacheConfiguration implements Serializable {
131131
/** @serial Expiry policy. */
132132
private ExpiryPolicy expiryPlc;
133133

134+
/** @serial Partitions count. */
135+
private int partitions = -1;
136+
134137
/**
135138
* Root directories where partition files are stored.
136139
* @see DataStorageConfiguration#setStoragePath(String)
@@ -193,6 +196,7 @@ public ClientCacheConfiguration(ClientCacheConfiguration ccfg) {
193196
writeSynchronizationMode = ccfg.getWriteSynchronizationMode();
194197
storagePaths = ccfg.getStoragePaths();
195198
idxPath = ccfg.getIndexPath();
199+
partitions = ccfg.getPartitions();
196200
}
197201

198202
/**
@@ -793,6 +797,27 @@ public ClientCacheConfiguration setExpiryPolicy(ExpiryPolicy expiryPlc) {
793797
return this;
794798
}
795799

800+
/**
801+
* Gets partitions count.
802+
*
803+
* @return Partitions count.
804+
*/
805+
public int getPartitions() {
806+
return partitions;
807+
}
808+
809+
/**
810+
* Sets partitions count.
811+
*
812+
* @param partitions Partitions count.
813+
* @return {@code this} for chaining.
814+
*/
815+
public ClientCacheConfiguration setPartitions(int partitions) {
816+
this.partitions = partitions;
817+
818+
return this;
819+
}
820+
796821
/**
797822
* @return A path to the root directory where the Persistent Store for cache group will persist data and indexes.
798823
*/

modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.ignite.internal.util.typedef.T2;
6363
import org.jetbrains.annotations.Nullable;
6464

65+
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.CACHE_CFG_PARTITIONS;
6566
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.CACHE_STORAGES;
6667
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.QRY_INITIATOR_ID;
6768
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.QRY_PARTITIONS_BATCH_SIZE;
@@ -382,6 +383,11 @@ else if (cfg.getExpiryPolicy() != null) {
382383
else if (!F.isEmpty(cfg.getStoragePaths()) || !F.isEmpty(cfg.getIndexPath()))
383384
throw new ClientFeatureNotSupportedByServerException("Cache storages are not supported by the server");
384385

386+
if (protocolCtx.isFeatureSupported(CACHE_CFG_PARTITIONS))
387+
itemWriter.accept(CfgItem.PARTITIONS, w -> w.writeInt(cfg.getPartitions()));
388+
else if (cfg.getPartitions() > 0)
389+
throw new ClientProtocolError("Partitions configuration by thin client is not supported by the server");
390+
385391
writer.writeInt(origPos, out.position() - origPos - 4); // configuration length
386392
writer.writeInt(origPos + 4, propCnt.get()); // properties count
387393
}
@@ -518,7 +524,8 @@ ClientCacheConfiguration cacheConfiguration(BinaryInputStream in, ProtocolContex
518524
: reader.readStringArray())
519525
.setIndexPath(!protocolCtx.isFeatureSupported(CACHE_STORAGES)
520526
? null
521-
: reader.readString());
527+
: reader.readString())
528+
.setPartitions(!protocolCtx.isFeatureSupported(CACHE_CFG_PARTITIONS) ? null : reader.readInt());
522529
}
523530
}
524531

@@ -798,7 +805,10 @@ private enum CfgItem {
798805
STORAGE_PATH(408),
799806

800807
/** Index path. */
801-
IDX_PATH(409);
808+
IDX_PATH(409),
809+
810+
/** Affinity function. */
811+
PARTITIONS(410);
802812

803813
/** Code. */
804814
private final short code;

modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,10 @@ public enum ProtocolBitmaskFeature {
115115
DC_AWARE(22),
116116

117117
/** SqlFieldsQuery initiatorId property. */
118-
QRY_INITIATOR_ID(23);
118+
QRY_INITIATOR_ID(23),
119+
120+
/** Partitions count in cache configuration. */
121+
CACHE_CFG_PARTITIONS(24);
119122

120123
/** */
121124
private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.ignite.cache.CachePartialUpdateException;
5353
import org.apache.ignite.cache.CacheServerNotFoundException;
5454
import org.apache.ignite.cache.QueryEntity;
55+
import org.apache.ignite.cache.affinity.AffinityFunction;
5556
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
5657
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
5758
import org.apache.ignite.cache.store.CacheStoreSessionListener;
@@ -1634,6 +1635,14 @@ public static void validateKeyConfigiration(
16341635
}
16351636
}
16361637

1638+
/**
1639+
* @param partitions Partitions count.
1640+
* @return Default affinity function with predefined partitions count.
1641+
*/
1642+
public static AffinityFunction defaultAffinity(int partitions) {
1643+
return new RendezvousAffinityFunction(false, partitions);
1644+
}
1645+
16371646
/**
16381647
* @param log Logger.
16391648
* @param cfg Initializes cache configuration with proper defaults.
@@ -1650,15 +1659,10 @@ public static void initializeConfigDefaults(IgniteLogger log, CacheConfiguration
16501659
cfg.setNodeFilter(CacheConfiguration.ALL_NODES);
16511660

16521661
if (cfg.getAffinity() == null) {
1653-
if (cfg.getCacheMode() == PARTITIONED) {
1654-
RendezvousAffinityFunction aff = new RendezvousAffinityFunction();
1655-
1656-
cfg.setAffinity(aff);
1657-
}
1662+
if (cfg.getCacheMode() == PARTITIONED)
1663+
cfg.setAffinity(defaultAffinity(RendezvousAffinityFunction.DFLT_PARTITION_COUNT));
16581664
else {
1659-
RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 512);
1660-
1661-
cfg.setAffinity(aff);
1665+
cfg.setAffinity(defaultAffinity(512));
16621666

16631667
cfg.setBackups(Integer.MAX_VALUE);
16641668
}

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ public enum ClientBitmaskFeature implements ThinProtocolFeature {
112112
DC_AWARE(22),
113113

114114
/** SqlFieldsQuery initiatorId property. */
115-
QRY_INITIATOR_ID(23);
115+
QRY_INITIATOR_ID(23),
116+
117+
/** Partitions count in cache configuration. */
118+
CACHE_CFG_PARTITIONS(24);
116119

117120
/** */
118121
private static final EnumSet<ClientBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheConfigurationSerializer.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.ignite.cache.PartitionLossPolicy;
3434
import org.apache.ignite.cache.QueryEntity;
3535
import org.apache.ignite.cache.QueryIndex;
36+
import org.apache.ignite.cache.affinity.AffinityFunction;
3637
import org.apache.ignite.client.ClientException;
3738
import org.apache.ignite.configuration.CacheConfiguration;
3839
import org.apache.ignite.internal.binary.BinaryWriterEx;
@@ -41,6 +42,7 @@
4142
import org.apache.ignite.internal.processors.platform.client.ClientProtocolVersionFeature;
4243
import org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils;
4344
import org.apache.ignite.internal.util.typedef.T2;
45+
import org.apache.ignite.internal.util.typedef.internal.CU;
4446

4547
import static java.util.Optional.ofNullable;
4648
import static org.apache.ignite.internal.processors.platform.client.ClientProtocolVersionFeature.QUERY_ENTITY_PRECISION_AND_SCALE;
@@ -148,6 +150,9 @@ public class ClientCacheConfigurationSerializer {
148150
/** */
149151
private static final short IDX_PATH = 409;
150152

153+
/** */
154+
private static final short PARTITIONS = 410;
155+
151156
/**
152157
* Writes the cache configuration.
153158
* @param writer Writer.
@@ -226,6 +231,11 @@ static void write(BinaryWriterEx writer, CacheConfiguration cfg, ClientProtocolC
226231
writer.writeString(cfg.getIndexPath());
227232
}
228233

234+
if (protocolCtx.isFeatureSupported(ClientBitmaskFeature.CACHE_CFG_PARTITIONS)) {
235+
AffinityFunction aff = cfg.getAffinity();
236+
writer.writeInt(aff == null ? -1 : aff.partitions());
237+
}
238+
229239
// Write length (so that part of the config can be skipped).
230240
writer.writeInt(pos, writer.out().position() - pos - 4);
231241
}
@@ -471,6 +481,14 @@ static T2<CacheConfiguration, Boolean> read(BinaryRawReader reader, ClientProtoc
471481
case IDX_PATH:
472482
cfg.setIndexPath(reader.readString());
473483
break;
484+
485+
case PARTITIONS:
486+
int partitions = reader.readInt();
487+
488+
if (partitions > 0)
489+
cfg.setAffinity(CU.defaultAffinity(partitions));
490+
491+
break;
474492
}
475493
}
476494

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.client.thin;
19+
20+
import java.util.AbstractMap;
21+
import java.util.Collections;
22+
import java.util.LinkedHashMap;
23+
import java.util.stream.Collectors;
24+
import java.util.stream.Stream;
25+
import org.apache.ignite.Ignite;
26+
import org.apache.ignite.cache.CacheAtomicityMode;
27+
import org.apache.ignite.cache.CacheKeyConfiguration;
28+
import org.apache.ignite.cache.CacheMode;
29+
import org.apache.ignite.cache.CacheRebalanceMode;
30+
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
31+
import org.apache.ignite.cache.PartitionLossPolicy;
32+
import org.apache.ignite.cache.QueryEntity;
33+
import org.apache.ignite.cache.QueryIndex;
34+
import org.apache.ignite.client.ClientCache;
35+
import org.apache.ignite.client.ClientCacheConfiguration;
36+
import org.apache.ignite.client.Comparers;
37+
import org.apache.ignite.client.IgniteClient;
38+
import org.apache.ignite.configuration.CacheConfiguration;
39+
import org.apache.ignite.configuration.DataRegionConfiguration;
40+
import org.apache.ignite.configuration.DataStorageConfiguration;
41+
import org.apache.ignite.configuration.IgniteConfiguration;
42+
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
43+
import org.junit.Test;
44+
45+
/**
46+
* Thin client cache configuration tests.
47+
*/
48+
public class CacheConfigurationTest extends AbstractThinClientTest {
49+
/**
50+
* Tested API:
51+
* <ul>
52+
* <li>{@link ClientCache#getName()}</li>
53+
* <li>{@link ClientCache#getConfiguration()}</li>
54+
* </ul>
55+
*/
56+
@Test
57+
public void testCacheConfiguration() throws Exception {
58+
final String dataRegionName = "functional-test-data-region";
59+
60+
IgniteConfiguration cfg = getConfiguration()
61+
.setDataStorageConfiguration(new DataStorageConfiguration()
62+
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
63+
.setName(dataRegionName)));
64+
65+
try (Ignite ignite = startGrid(cfg); IgniteClient client = startClient(ignite)) {
66+
final String CACHE_NAME = "testCacheConfiguration";
67+
68+
ClientCacheConfiguration cacheCfgTemplate = new ClientCacheConfiguration().setName(CACHE_NAME)
69+
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
70+
.setBackups(3)
71+
.setCacheMode(CacheMode.PARTITIONED)
72+
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
73+
.setEagerTtl(false)
74+
.setGroupName("FunctionalTest")
75+
.setDefaultLockTimeout(12345)
76+
.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE)
77+
.setReadFromBackup(true)
78+
.setRebalanceBatchSize(67890)
79+
.setRebalanceBatchesPrefetchCount(102938)
80+
.setRebalanceDelay(54321)
81+
.setRebalanceMode(CacheRebalanceMode.SYNC)
82+
.setRebalanceOrder(2)
83+
.setRebalanceThrottle(564738)
84+
.setRebalanceTimeout(142536)
85+
.setKeyConfiguration(new CacheKeyConfiguration("Employee", "orgId"))
86+
.setQueryEntities(new QueryEntity(int.class.getName(), "Employee")
87+
.setTableName("EMPLOYEE")
88+
.setFields(
89+
Stream.of(
90+
new AbstractMap.SimpleEntry<>("id", Integer.class.getName()),
91+
new AbstractMap.SimpleEntry<>("orgId", Integer.class.getName())
92+
).collect(Collectors.toMap(
93+
AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue, (a, b) -> a, LinkedHashMap::new
94+
))
95+
)
96+
// During query normalization null keyFields become empty set.
97+
// Set empty collection for comparator.
98+
.setKeyFields(Collections.emptySet())
99+
.setKeyFieldName("id")
100+
.setNotNullFields(Collections.singleton("id"))
101+
.setDefaultFieldValues(Collections.singletonMap("id", 0))
102+
.setIndexes(Collections.singletonList(new QueryIndex("id", true, "IDX_EMPLOYEE_ID")))
103+
.setAliases(Stream.of("id", "orgId").collect(Collectors.toMap(f -> f, String::toUpperCase)))
104+
)
105+
.setExpiryPolicy(new PlatformExpiryPolicy(10, 20, 30))
106+
.setCopyOnRead(!CacheConfiguration.DFLT_COPY_ON_READ)
107+
.setDataRegionName(dataRegionName)
108+
.setMaxConcurrentAsyncOperations(4)
109+
.setMaxQueryIteratorsCount(4)
110+
.setOnheapCacheEnabled(true)
111+
.setQueryDetailMetricsSize(1024)
112+
.setQueryParallelism(4)
113+
.setSqlEscapeAll(true)
114+
.setSqlIndexMaxInlineSize(1024)
115+
.setSqlSchema("functional-test-schema")
116+
.setStatisticsEnabled(true);
117+
118+
ClientCacheConfiguration cacheCfg = new ClientCacheConfiguration(cacheCfgTemplate);
119+
120+
ClientCache<Object, Object> cache = client.createCache(cacheCfg);
121+
122+
assertEquals(CACHE_NAME, cache.getName());
123+
124+
assertTrue(Comparers.equal(cacheCfgTemplate, cache.getConfiguration()));
125+
}
126+
}
127+
128+
/** Tests cache partitions configuration. */
129+
@Test
130+
public void testCachePartitionsConfiguration() throws Exception {
131+
try (Ignite ignite = startGrid(0); IgniteClient client = startClient(ignite)) {
132+
// Explicit partitions count test.
133+
String cacheName = "test";
134+
135+
// Client to server propagation.
136+
client.createCache(new ClientCacheConfiguration().setName(cacheName).setPartitions(100));
137+
138+
assertEquals(100, ignite.cache(cacheName)
139+
.getConfiguration(CacheConfiguration.class).getAffinity().partitions());
140+
141+
// Server to client propagation.
142+
assertEquals(100, client.cache(cacheName).getConfiguration().getPartitions());
143+
144+
// Implicit partitions count test.
145+
cacheName = "test2";
146+
147+
client.createCache(new ClientCacheConfiguration().setName(cacheName));
148+
149+
assertEquals(ignite.cache(cacheName).getConfiguration(CacheConfiguration.class)
150+
.getAffinity().partitions(), client.cache(cacheName).getConfiguration().getPartitions());
151+
}
152+
}
153+
}

0 commit comments

Comments
 (0)