Skip to content

Commit bca461f

Browse files
Iterating on test tools
1 parent 5c3dac3 commit bca461f

File tree

7 files changed

+275
-189
lines changed

7 files changed

+275
-189
lines changed

sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncClientTest.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,36 @@
99
import com.azure.cosmos.implementation.Configs;
1010
import com.azure.cosmos.implementation.ConnectionPolicy;
1111
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
12+
import com.azure.cosmos.implementation.RxDocumentClientImpl;
1213
import com.azure.cosmos.implementation.guava27.Strings;
14+
import io.netty.buffer.PooledByteBufAllocator;
15+
import io.netty.util.internal.PlatformDependent;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
1318
import org.testng.ITest;
19+
import org.testng.annotations.AfterClass;
1420
import org.testng.annotations.AfterMethod;
21+
import org.testng.annotations.BeforeClass;
1522
import org.testng.annotations.BeforeMethod;
1623

24+
import java.lang.management.BufferPoolMXBean;
25+
import java.lang.management.ManagementFactory;
1726
import java.lang.reflect.Method;
27+
import java.util.HashMap;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.concurrent.atomic.AtomicInteger;
1831

1932
public abstract class CosmosEncryptionAsyncClientTest implements ITest {
33+
protected static Logger logger = LoggerFactory.getLogger(CosmosEncryptionAsyncClientTest.class.getSimpleName());
34+
protected static final int SUITE_SETUP_TIMEOUT = 120000;
2035
private static final ImplementationBridgeHelpers.CosmosClientBuilderHelper.CosmosClientBuilderAccessor cosmosClientBuilderAccessor =
2136
ImplementationBridgeHelpers.CosmosClientBuilderHelper.getCosmosClientBuilderAccessor();
37+
private final static AtomicInteger instancesUsed = new AtomicInteger(0);
2238

2339
private final CosmosClientBuilder clientBuilder;
2440
private String testName;
41+
private volatile Map<Integer, String> activeClientsAtBegin = new HashMap<>();
2542

2643
public CosmosEncryptionAsyncClientTest() {
2744
this(new CosmosClientBuilder());
@@ -35,6 +52,78 @@ public final CosmosClientBuilder getClientBuilder() {
3552
return this.clientBuilder;
3653
}
3754

55+
@BeforeClass(groups = {"fast", "long", "direct", "multi-master", "encryption"}, timeOut = SUITE_SETUP_TIMEOUT)
56+
57+
public void beforeClassSetupLeakDetection() {
58+
if (instancesUsed.getAndIncrement() == 0) {
59+
this.activeClientsAtBegin = RxDocumentClientImpl.getActiveClientsSnapshot();
60+
this.logMemoryUsage("BEFORE");
61+
}
62+
}
63+
64+
private void logMemoryUsage(String name) {
65+
long pooledDirectBytes = PooledByteBufAllocator.DEFAULT.metric()
66+
.directArenas().stream()
67+
.mapToLong(io.netty.buffer.PoolArenaMetric::numActiveBytes)
68+
.sum();
69+
70+
long used = PlatformDependent.usedDirectMemory();
71+
long max = PlatformDependent.maxDirectMemory();
72+
logger.info("MEMORY USAGE: {}:{}", this.getClass().getCanonicalName(), name);
73+
logger.info("Netty Direct Memory: {}/{}/{} bytes", used, pooledDirectBytes, max);
74+
for (BufferPoolMXBean pool : ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class)) {
75+
logger.info("Pool {}: used={} bytes, capacity={} bytes, count={}",
76+
pool.getName(), pool.getMemoryUsed(), pool.getTotalCapacity(), pool.getCount());
77+
}
78+
}
79+
80+
@AfterClass(groups = {"fast", "long", "direct", "multi-master", "encryption"}, timeOut = SUITE_SETUP_TIMEOUT)
81+
public void afterClassSetupLeakDetection() {
82+
if (instancesUsed.decrementAndGet() == 0) {
83+
Map<Integer, String> leakedClientSnapshotNow = RxDocumentClientImpl.getActiveClientsSnapshot();
84+
StringBuilder sb = new StringBuilder();
85+
Map<Integer, String> leakedClientSnapshotAtBegin = activeClientsAtBegin;
86+
87+
for (Integer clientId : leakedClientSnapshotNow.keySet()) {
88+
if (!leakedClientSnapshotAtBegin.containsKey(clientId)) {
89+
// this client was leaked in this class
90+
sb
91+
.append("CosmosClient [")
92+
.append(clientId)
93+
.append("] leaked. Callstack of initialization:\n")
94+
.append(leakedClientSnapshotNow.get(clientId))
95+
.append("\n\n");
96+
}
97+
}
98+
99+
if (sb.length() > 0) {
100+
String msg = "\"COSMOS CLIENT LEAKS detected in test class: "
101+
+ this.getClass().getCanonicalName()
102+
+ "\n\n"
103+
+ sb;
104+
105+
logger.error(msg);
106+
// fail(msg);
107+
}
108+
109+
List<String> nettyLeaks = CosmosNettyLeakDetectorFactory.resetIdentifiedLeaks();
110+
if (nettyLeaks.size() > 0) {
111+
sb.append("\n");
112+
for (String leak : nettyLeaks) {
113+
sb.append(leak).append("\n");
114+
}
115+
116+
String msg = "NETTY LEAKS detected in test class: "
117+
+ this.getClass().getCanonicalName()
118+
+ sb;
119+
120+
logger.error(msg);
121+
// fail(msg);
122+
}
123+
this.logMemoryUsage("AFTER");
124+
}
125+
}
126+
38127
@Override
39128
public final String getTestName() {
40129
return this.testName;

sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/TestSuiteBase.java

Lines changed: 0 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.azure.cosmos.implementation.ConnectionPolicy;
2727
import com.azure.cosmos.implementation.InternalObjectNode;
2828
import com.azure.cosmos.implementation.PathParser;
29-
import com.azure.cosmos.implementation.RxDocumentClientImpl;
3029
import com.azure.cosmos.implementation.TestConfigurations;
3130
import com.azure.cosmos.implementation.Utils;
3231
import com.azure.cosmos.implementation.directconnectivity.Protocol;
@@ -60,12 +59,8 @@
6059
import org.apache.commons.lang3.ObjectUtils;
6160
import org.apache.commons.lang3.StringUtils;
6261
import org.mockito.stubbing.Answer;
63-
import org.slf4j.Logger;
64-
import org.slf4j.LoggerFactory;
6562
import org.testng.ITestContext;
66-
import org.testng.annotations.AfterClass;
6763
import org.testng.annotations.AfterSuite;
68-
import org.testng.annotations.BeforeClass;
6964
import org.testng.annotations.BeforeSuite;
7065
import org.testng.annotations.DataProvider;
7166
import org.testng.annotations.Listeners;
@@ -76,17 +71,14 @@
7671
import java.time.Duration;
7772
import java.util.ArrayList;
7873
import java.util.Collections;
79-
import java.util.HashMap;
8074
import java.util.List;
81-
import java.util.Map;
8275
import java.util.UUID;
8376
import java.util.concurrent.TimeUnit;
8477
import java.util.stream.Collectors;
8578

8679
import static com.azure.cosmos.BridgeInternal.extractConfigs;
8780
import static com.azure.cosmos.BridgeInternal.injectConfigs;
8881
import static org.assertj.core.api.Assertions.assertThat;
89-
import static org.assertj.core.api.Fail.fail;
9082
import static org.mockito.Mockito.doAnswer;
9183
import static org.mockito.Mockito.spy;
9284

@@ -96,13 +88,11 @@ public class TestSuiteBase extends CosmosEncryptionAsyncClientTest {
9688
private static final int DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL = 500;
9789
private static final ObjectMapper objectMapper = new ObjectMapper();
9890

99-
protected static Logger logger = LoggerFactory.getLogger(TestSuiteBase.class.getSimpleName());
10091
protected static final int TIMEOUT = 40000;
10192
protected static final int FEED_TIMEOUT = 40000;
10293
protected static final int SETUP_TIMEOUT = 60000;
10394
protected static final int SHUTDOWN_TIMEOUT = 24000;
10495

105-
protected static final int SUITE_SETUP_TIMEOUT = 120000;
10696
protected static final int SUITE_SHUTDOWN_TIMEOUT = 60000;
10797

10898
protected static final int WAIT_REPLICA_CATCH_UP_IN_MILLIS = 4000;
@@ -116,8 +106,6 @@ public class TestSuiteBase extends CosmosEncryptionAsyncClientTest {
116106

117107
protected int subscriberValidationTimeout = TIMEOUT;
118108

119-
private volatile Map<Integer, String> activeClientsAtBegin = new HashMap<>();
120-
121109
private static CosmosAsyncDatabase SHARED_DATABASE;
122110
private static CosmosAsyncContainer SHARED_MULTI_PARTITION_COLLECTION_WITH_ID_AS_PARTITION_KEY;
123111
private static CosmosAsyncContainer SHARED_MULTI_PARTITION_COLLECTION;
@@ -234,55 +222,6 @@ public CosmosAsyncDatabase getDatabase(String id) {
234222
}
235223
}
236224

237-
@BeforeClass(groups = {"fast", "long", "direct", "multi-master", "encryption"}, timeOut = SUITE_SETUP_TIMEOUT)
238-
239-
public void beforeClassSetupLeakDetection() {
240-
this.activeClientsAtBegin = RxDocumentClientImpl.getActiveClientsSnapshot();
241-
}
242-
243-
@AfterClass(groups = {"fast", "long", "direct", "multi-master", "encryption"}, timeOut = SUITE_SETUP_TIMEOUT)
244-
public void afterClassSetupLeakDetection() {
245-
246-
Map<Integer, String> leakedClientSnapshotNow = RxDocumentClientImpl.getActiveClientsSnapshot();
247-
StringBuilder sb = new StringBuilder();
248-
Map<Integer, String> leakedClientSnapshotAtBegin = activeClientsAtBegin;
249-
250-
for (Integer clientId : leakedClientSnapshotNow.keySet()) {
251-
if (!leakedClientSnapshotAtBegin.containsKey(clientId)) {
252-
// this client was leaked in this class
253-
sb
254-
.append("CosmosClient [")
255-
.append(clientId)
256-
.append("] leaked. Callstack of initialization:\n")
257-
.append(leakedClientSnapshotNow.get(clientId));
258-
}
259-
}
260-
261-
if (sb.length() > 0) {
262-
String msg = "\"COSMOS CLIENT LEAKS detected in test class: "
263-
+ this.getClass().getCanonicalName()
264-
+ sb;
265-
266-
logger.error(msg);
267-
// fail(msg);
268-
}
269-
270-
List<String> nettyLeaks = CosmosNettyLeakDetectorFactory.resetIdentifiedLeaks();
271-
if (nettyLeaks.size() > 0) {
272-
sb.append("\n");
273-
for (String leak : nettyLeaks) {
274-
sb.append(leak).append("\n");
275-
}
276-
277-
String msg = "NETTY LEAKS detected in test class: "
278-
+ this.getClass().getCanonicalName()
279-
+ sb;
280-
281-
logger.error(msg);
282-
// fail(msg);
283-
}
284-
}
285-
286225
@BeforeSuite(groups = {"fast", "long", "direct", "multi-master", "encryption"}, timeOut = SUITE_SETUP_TIMEOUT)
287226
public void beforeSuite() {
288227

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosAsyncClientTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,42 @@
44
package com.azure.cosmos;
55

66
import com.azure.cosmos.implementation.ConnectionPolicy;
7+
import com.azure.cosmos.implementation.RxDocumentClientImpl;
78
import com.azure.cosmos.implementation.guava27.Strings;
89
import com.azure.cosmos.models.CosmosItemRequestOptions;
910
import com.azure.cosmos.models.CosmosItemResponse;
1011
import com.azure.cosmos.models.PartitionKey;
12+
import io.netty.buffer.PooledByteBufAllocator;
13+
import io.netty.util.internal.PlatformDependent;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
1116
import org.testng.ITest;
17+
import org.testng.annotations.AfterClass;
1218
import org.testng.annotations.AfterMethod;
1319
import org.testng.annotations.BeforeClass;
1420
import org.testng.annotations.BeforeMethod;
1521

22+
import java.lang.management.BufferPoolMXBean;
23+
import java.lang.management.ManagementFactory;
1624
import java.lang.reflect.Method;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.concurrent.atomic.AtomicInteger;
1729

1830
import static org.assertj.core.api.Assertions.assertThat;
1931

2032
public abstract class CosmosAsyncClientTest implements ITest {
2133

2234
public static final String ROUTING_GATEWAY_EMULATOR_PORT = ":8081";
2335
public static final String COMPUTE_GATEWAY_EMULATOR_PORT = ":9999";
36+
37+
protected static Logger logger = LoggerFactory.getLogger(CosmosAsyncClientTest.class.getSimpleName());
38+
protected static final int SUITE_SETUP_TIMEOUT = 120000;
39+
private final static AtomicInteger instancesUsed = new AtomicInteger(0);
2440
private final CosmosClientBuilder clientBuilder;
2541
private String testName;
42+
private volatile Map<Integer, String> activeClientsAtBegin = new HashMap<>();
2643

2744
public CosmosAsyncClientTest() {
2845
this(new CosmosClientBuilder());
@@ -32,6 +49,81 @@ public CosmosAsyncClientTest(CosmosClientBuilder clientBuilder) {
3249
this.clientBuilder = clientBuilder;
3350
}
3451

52+
@BeforeClass(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master", "emulator",
53+
"emulator-vnext", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct",
54+
"circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master"}, timeOut = SUITE_SETUP_TIMEOUT, alwaysRun = true)
55+
public void beforeClassSetupLeakDetection() {
56+
if (instancesUsed.getAndIncrement() == 0) {
57+
this.activeClientsAtBegin = RxDocumentClientImpl.getActiveClientsSnapshot();
58+
this.logMemoryUsage("BEFORE");
59+
}
60+
}
61+
62+
private void logMemoryUsage(String name) {
63+
long pooledDirectBytes = PooledByteBufAllocator.DEFAULT.metric()
64+
.directArenas().stream()
65+
.mapToLong(io.netty.buffer.PoolArenaMetric::numActiveBytes)
66+
.sum();
67+
68+
long used = PlatformDependent.usedDirectMemory();
69+
long max = PlatformDependent.maxDirectMemory();
70+
logger.info("MEMORY USAGE: {}:{}", this.getClass().getCanonicalName(), name);
71+
logger.info("Netty Direct Memory: {}/{}/{} bytes", used, pooledDirectBytes, max);
72+
for (BufferPoolMXBean pool : ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class)) {
73+
logger.info("Pool {}: used={} bytes, capacity={} bytes, count={}",
74+
pool.getName(), pool.getMemoryUsed(), pool.getTotalCapacity(), pool.getCount());
75+
}
76+
}
77+
78+
@AfterClass(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master", "emulator",
79+
"emulator-vnext", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct",
80+
"circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master"}, timeOut = SUITE_SETUP_TIMEOUT, alwaysRun = true)
81+
public void afterClassSetupLeakDetection() {
82+
if (instancesUsed.decrementAndGet() == 0) {
83+
Map<Integer, String> leakedClientSnapshotNow = RxDocumentClientImpl.getActiveClientsSnapshot();
84+
StringBuilder sb = new StringBuilder();
85+
Map<Integer, String> leakedClientSnapshotAtBegin = activeClientsAtBegin;
86+
87+
for (Integer clientId : leakedClientSnapshotNow.keySet()) {
88+
if (!leakedClientSnapshotAtBegin.containsKey(clientId)) {
89+
// this client was leaked in this class
90+
sb
91+
.append("CosmosClient [")
92+
.append(clientId)
93+
.append("] leaked. Callstack of initialization:\n")
94+
.append(leakedClientSnapshotNow.get(clientId))
95+
.append("\n\n");
96+
}
97+
}
98+
99+
if (sb.length() > 0) {
100+
String msg = "\"COSMOS CLIENT LEAKS detected in test class: "
101+
+ this.getClass().getCanonicalName()
102+
+ "\n\n"
103+
+ sb;
104+
105+
logger.error(msg);
106+
// fail(msg);
107+
}
108+
109+
List<String> nettyLeaks = CosmosNettyLeakDetectorFactory.resetIdentifiedLeaks();
110+
if (nettyLeaks.size() > 0) {
111+
sb.append("\n");
112+
for (String leak : nettyLeaks) {
113+
sb.append(leak).append("\n");
114+
}
115+
116+
String msg = "\"NETTY LEAKS detected in test class: "
117+
+ this.getClass().getCanonicalName()
118+
+ sb;
119+
120+
logger.error(msg);
121+
// fail(msg);
122+
}
123+
this.logMemoryUsage("AFTER");
124+
}
125+
}
126+
35127
public final CosmosClientBuilder getClientBuilder() {
36128
return this.clientBuilder;
37129
}

0 commit comments

Comments
 (0)