Skip to content

Commit 86d85f9

Browse files
authored
IGNITE-23901 Added performance statistics for putAllConflict, removeAllConflict operations (#11793)
1 parent 1afc0ad commit 86d85f9

File tree

5 files changed

+97
-16
lines changed

5 files changed

+97
-16
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1969,8 +1969,9 @@ protected boolean put0(final K key, final V val, final CacheEntryPredicate filte
19691969
return;
19701970

19711971
final boolean statsEnabled = ctx.statisticsEnabled();
1972+
boolean perfStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
19721973

1973-
long start = statsEnabled ? System.nanoTime() : 0L;
1974+
long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 0L;
19741975

19751976
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
19761977

@@ -1986,6 +1987,9 @@ protected boolean put0(final K key, final V val, final CacheEntryPredicate filte
19861987

19871988
if (statsEnabled)
19881989
metrics0().addPutAllConflictTimeNanos(System.nanoTime() - start);
1990+
1991+
if (perfStatsEnabled)
1992+
writeStatistics(OperationType.CACHE_PUT_ALL_CONFLICT, start);
19891993
}
19901994

19911995
/** {@inheritDoc} */
@@ -1995,8 +1999,9 @@ protected boolean put0(final K key, final V val, final CacheEntryPredicate filte
19951999
return new GridFinishedFuture<Object>();
19962000

19972001
final boolean statsEnabled = ctx.statisticsEnabled();
2002+
boolean perfStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
19982003

1999-
long start = statsEnabled ? System.nanoTime() : 0L;
2004+
long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 0L;
20002005

20012006
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
20022007

@@ -2013,6 +2018,9 @@ protected boolean put0(final K key, final V val, final CacheEntryPredicate filte
20132018
if (statsEnabled)
20142019
fut.listen(new UpdatePutAllConflictTimeStatClosure<>(metrics0(), start));
20152020

2021+
if (perfStatsEnabled)
2022+
fut.listen(() -> writeStatistics(OperationType.CACHE_PUT_ALL_CONFLICT, start));
2023+
20162024
return fut;
20172025
}
20182026

@@ -2839,8 +2847,9 @@ protected IgniteInternalFuture<Boolean> removeAsync0(final K key, @Nullable fina
28392847
return;
28402848

28412849
boolean statsEnabled = ctx.statisticsEnabled();
2850+
boolean perfStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
28422851

2843-
long start = statsEnabled ? System.nanoTime() : 0L;
2852+
long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 0L;
28442853

28452854
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
28462855

@@ -2856,6 +2865,9 @@ protected IgniteInternalFuture<Boolean> removeAsync0(final K key, @Nullable fina
28562865

28572866
if (statsEnabled)
28582867
metrics0().addRemoveAllConflictTimeNanos(System.nanoTime() - start);
2868+
2869+
if (perfStatsEnabled)
2870+
writeStatistics(OperationType.CACHE_REMOVE_ALL_CONFLICT, start);
28592871
}
28602872

28612873
/** {@inheritDoc} */
@@ -2865,8 +2877,9 @@ protected IgniteInternalFuture<Boolean> removeAsync0(final K key, @Nullable fina
28652877
return new GridFinishedFuture<Object>();
28662878

28672879
final boolean statsEnabled = ctx.statisticsEnabled();
2880+
boolean perfStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
28682881

2869-
final long start = statsEnabled ? System.nanoTime() : 0L;
2882+
final long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 0L;
28702883

28712884
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
28722885

@@ -2883,6 +2896,9 @@ protected IgniteInternalFuture<Boolean> removeAsync0(final K key, @Nullable fina
28832896
if (statsEnabled)
28842897
fut.listen(new UpdateRemoveAllConflictTimeStatClosure<>(metrics0(), start));
28852898

2899+
if (perfStatsEnabled)
2900+
fut.listen(() -> writeStatistics(OperationType.CACHE_REMOVE_ALL_CONFLICT, start));
2901+
28862902
return fut;
28872903
}
28882904

@@ -6165,7 +6181,7 @@ public InvokeAllTimeStatClosure(CacheMetricsImpl metrics, final long start) {
61656181
* @param op Operation type.
61666182
* @param start Start time in nanoseconds.
61676183
*/
6168-
private void writeStatistics(OperationType op, long start) {
6184+
protected void writeStatistics(OperationType op, long start) {
61696185
ctx.kernalContext().performanceStatistics().cacheOperation(
61706186
op,
61716187
ctx.cacheId(),

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
9797
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
9898
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
99+
import org.apache.ignite.internal.processors.performancestatistics.OperationType;
99100
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
100101
import org.apache.ignite.internal.util.GridLongList;
101102
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -695,9 +696,10 @@ private IgniteInternalFuture<Map<K, V>> getAllAsyncInternal(
695696

696697
/** {@inheritDoc} */
697698
@Override public IgniteInternalFuture<?> putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) {
698-
final boolean statsEnabled = ctx.statisticsEnabled();
699+
boolean statsEnabled = ctx.statisticsEnabled();
700+
boolean perfStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
699701

700-
long start = statsEnabled ? System.nanoTime() : 0L;
702+
long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 0L;
701703

702704
ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
703705

@@ -716,6 +718,9 @@ private IgniteInternalFuture<Map<K, V>> getAllAsyncInternal(
716718
if (statsEnabled)
717719
fut.listen(new UpdatePutAllConflictTimeStatClosure<>(metrics0(), start));
718720

721+
if (perfStatsEnabled)
722+
fut.listen(() -> writeStatistics(OperationType.CACHE_PUT_ALL_CONFLICT, start));
723+
719724
return fut;
720725
}
721726

@@ -760,8 +765,9 @@ private IgniteInternalFuture<Map<K, V>> getAllAsyncInternal(
760765
/** {@inheritDoc} */
761766
@Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> conflictMap) {
762767
final boolean statsEnabled = ctx.statisticsEnabled();
768+
boolean perfStatsEnabled = ctx.kernalContext().performanceStatistics().enabled();
763769

764-
final long start = statsEnabled ? System.nanoTime() : 0L;
770+
final long start = (statsEnabled || perfStatsEnabled) ? System.nanoTime() : 0L;
765771

766772
ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size());
767773

@@ -770,6 +776,9 @@ private IgniteInternalFuture<Map<K, V>> getAllAsyncInternal(
770776
if (statsEnabled)
771777
fut.listen(new UpdateRemoveAllConflictTimeStatClosure<>(metrics0(), start));
772778

779+
if (perfStatsEnabled)
780+
fut.listen(() -> writeStatistics(OperationType.CACHE_REMOVE_ALL_CONFLICT, start));
781+
773782
return fut;
774783
}
775784

modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,19 @@ public enum OperationType {
9393
/** Custom query property. */
9494
QUERY_PROPERTY(21),
9595

96+
/** Cache put all conflict. */
97+
CACHE_PUT_ALL_CONFLICT(22),
98+
99+
/** Cache remove all conflict. */
100+
CACHE_REMOVE_ALL_CONFLICT(23),
101+
96102
/** Version. */
97103
VERSION(255);
98104

99105
/** Cache operations. */
100106
public static final EnumSet<OperationType> CACHE_OPS = EnumSet.of(CACHE_GET, CACHE_PUT, CACHE_REMOVE,
101107
CACHE_GET_AND_PUT, CACHE_GET_AND_REMOVE, CACHE_INVOKE, CACHE_LOCK, CACHE_GET_ALL, CACHE_PUT_ALL,
102-
CACHE_REMOVE_ALL, CACHE_INVOKE_ALL);
108+
CACHE_REMOVE_ALL, CACHE_INVOKE_ALL, CACHE_PUT_ALL_CONFLICT, CACHE_REMOVE_ALL_CONFLICT);
103109

104110
/** Transaction operations. */
105111
public static final EnumSet<OperationType> TX_OPS = EnumSet.of(TX_COMMIT, TX_ROLLBACK);

modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsThinClientTest.java

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919

2020
import java.util.Collection;
2121
import java.util.Collections;
22+
import java.util.EnumSet;
2223
import java.util.HashMap;
2324
import java.util.UUID;
2425
import java.util.concurrent.atomic.AtomicInteger;
25-
import java.util.function.Consumer;
2626
import org.apache.ignite.Ignition;
27+
import org.apache.ignite.cache.CacheAtomicityMode;
2728
import org.apache.ignite.client.ClientCache;
29+
import org.apache.ignite.client.ClientCacheConfiguration;
2830
import org.apache.ignite.client.ClientTransaction;
2931
import org.apache.ignite.client.Config;
3032
import org.apache.ignite.client.IgniteClient;
@@ -33,26 +35,38 @@
3335
import org.apache.ignite.configuration.IgniteConfiguration;
3436
import org.apache.ignite.configuration.ThinClientConfiguration;
3537
import org.apache.ignite.internal.IgniteEx;
38+
import org.apache.ignite.internal.client.thin.TcpClientCache;
3639
import org.apache.ignite.internal.client.thin.TestTask;
40+
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
3741
import org.apache.ignite.internal.util.GridIntList;
42+
import org.apache.ignite.internal.util.lang.ConsumerX;
3843
import org.apache.ignite.internal.util.typedef.F;
44+
import org.apache.ignite.internal.util.typedef.T3;
3945
import org.apache.ignite.internal.util.typedef.internal.CU;
4046
import org.apache.ignite.internal.util.typedef.internal.U;
4147
import org.apache.ignite.lang.IgniteUuid;
4248
import org.junit.Test;
49+
import org.junit.runner.RunWith;
50+
import org.junit.runners.Parameterized;
4351

52+
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
53+
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
4454
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET;
4555
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_ALL;
4656
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_PUT;
4757
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_REMOVE;
4858
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT;
4959
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT_ALL;
60+
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT_ALL_CONFLICT;
5061
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE;
5162
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE_ALL;
63+
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE_ALL_CONFLICT;
64+
import static org.junit.Assume.assumeTrue;
5265

5366
/**
5467
* Tests thin client performance statistics.
5568
*/
69+
@RunWith(Parameterized.class)
5670
public class PerformanceStatisticsThinClientTest extends AbstractPerformanceStatisticsTest {
5771
/** Test task name. */
5872
public static final String TEST_TASK_NAME = "TestTask";
@@ -66,12 +80,20 @@ public class PerformanceStatisticsThinClientTest extends AbstractPerformanceStat
6680
/** Thin client. */
6781
private static IgniteClient thinClient;
6882

83+
/** */
84+
@Parameterized.Parameter
85+
public CacheAtomicityMode atomicityMode;
86+
87+
/** */
88+
@Parameterized.Parameters(name = "atomicityMode={0}")
89+
public static Collection<?> parameters() {
90+
return EnumSet.of(ATOMIC, TRANSACTIONAL);
91+
}
92+
6993
/** {@inheritDoc} */
7094
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
7195
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
7296

73-
cfg.setCacheConfiguration(defaultCacheConfiguration());
74-
7597
cfg.setClientConnectorConfiguration(
7698
new ClientConnectorConfiguration().setThinClientConfiguration(
7799
new ThinClientConfiguration().setMaxActiveComputeTasksPerConnection(ACTIVE_TASKS_LIMIT)));
@@ -96,9 +118,22 @@ public class PerformanceStatisticsThinClientTest extends AbstractPerformanceStat
96118
@Override protected void afterTestsStopped() throws Exception {
97119
super.afterTestsStopped();
98120

121+
stopAllGrids();
99122
thinClient.close();
100123
}
101124

125+
/** {@inheritDoc} */
126+
@Override protected void beforeTest() throws Exception {
127+
thinClient.createCache(new ClientCacheConfiguration()
128+
.setName(DEFAULT_CACHE_NAME)
129+
.setAtomicityMode(atomicityMode));
130+
}
131+
132+
/** {@inheritDoc} */
133+
@Override protected void afterTest() throws Exception {
134+
thinClient.destroyCache(DEFAULT_CACHE_NAME);
135+
}
136+
102137
/** @throws Exception If failed. */
103138
@Test
104139
public void testCompute() throws Exception {
@@ -169,10 +204,24 @@ public void testCacheOperation() throws Exception {
169204
checkCacheOperation(CACHE_REMOVE_ALL, cache -> cache.removeAll(Collections.singleton(3)));
170205

171206
checkCacheOperation(CACHE_GET_AND_REMOVE, cache -> cache.getAndRemove(5));
207+
208+
GridCacheVersion confl = new GridCacheVersion(1, 0, 1, (byte)2);
209+
210+
checkCacheOperation(CACHE_PUT_ALL_CONFLICT, cache -> ((TcpClientCache<Object, Object>)cache)
211+
.putAllConflict(F.asMap(6, new T3<>(1, confl, CU.EXPIRE_TIME_ETERNAL))));
212+
213+
checkCacheOperation(CACHE_REMOVE_ALL_CONFLICT, cache -> ((TcpClientCache<Object, Object>)cache)
214+
.removeAllConflict(F.asMap(6, confl)));
215+
216+
checkCacheOperation(CACHE_PUT_ALL_CONFLICT, cache -> ((TcpClientCache<Object, Object>)cache)
217+
.putAllConflictAsync(F.asMap(7, new T3<>(2, confl, CU.EXPIRE_TIME_ETERNAL))).get());
218+
219+
checkCacheOperation(CACHE_REMOVE_ALL_CONFLICT, cache -> ((TcpClientCache<Object, Object>)cache)
220+
.removeAllConflictAsync(F.asMap(7, confl)).get());
172221
}
173222

174223
/** Checks cache operation. */
175-
private void checkCacheOperation(OperationType op, Consumer<ClientCache<Object, Object>> clo) throws Exception {
224+
private void checkCacheOperation(OperationType op, ConsumerX<ClientCache<Object, Object>> clo) throws Exception {
176225
long startTime = U.currentTimeMillis();
177226

178227
cleanPerformanceStatisticsDir();
@@ -202,6 +251,8 @@ private void checkCacheOperation(OperationType op, Consumer<ClientCache<Object,
202251
/** @throws Exception If failed. */
203252
@Test
204253
public void testTransaction() throws Exception {
254+
assumeTrue(atomicityMode == TRANSACTIONAL);
255+
205256
checkTx(true);
206257

207258
checkTx(false);

modules/core/src/test/java/org/apache/ignite/internal/util/lang/ConsumerX.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818
package org.apache.ignite.internal.util.lang;
1919

2020
import java.util.function.Consumer;
21-
import org.jetbrains.annotations.Nullable;
2221

2322
/**
2423
* Represents an operation that accepts a single input argument and returns
2524
* no result. Unlike most other functional interfaces,
2625
* {@code ConsumerX} is expected to operate via side-effects.
27-
*
26+
* <p>
2827
* Also it is able to throw {@link Exception} unlike {@link Consumer}.
2928
*
3029
* @param <T> The type of the input to the operation.
@@ -36,5 +35,5 @@ public interface ConsumerX<T> {
3635
*
3736
* @param t the input argument.
3837
*/
39-
public void accept(@Nullable T t) throws Exception;
38+
public void accept(T t) throws Exception;
4039
}

0 commit comments

Comments
 (0)