Skip to content

Commit 446ca0b

Browse files
Test changes to add leak detection
1 parent 02f8821 commit 446ca0b

32 files changed

+604
-2
lines changed
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.cosmos.encryption;
4+
5+
import com.azure.cosmos.implementation.StackTraceUtil;
6+
import io.netty.util.ResourceLeakDetector;
7+
import io.netty.util.ResourceLeakDetectorFactory;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
import org.testng.IExecutionListener;
11+
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
15+
public final class CosmosNettyLeakDetectorFactory extends ResourceLeakDetectorFactory implements IExecutionListener {
16+
protected static Logger logger = LoggerFactory.getLogger(CosmosNettyLeakDetectorFactory.class.getSimpleName());
17+
private final static List<String> identifiedLeaks = new ArrayList<>();
18+
private final static Object staticLock = new Object();
19+
private static volatile boolean isLeakDetectionDisabled = false;
20+
private static volatile boolean isInitialized = false;
21+
22+
private CosmosNettyLeakDetectorFactory() {
23+
}
24+
25+
@Override
26+
public void onExecutionStart() {
27+
ingestIntoNetty();
28+
}
29+
30+
@Override
31+
public void onExecutionFinish() {
32+
// Run GC to force finalizers to run - only in finalizers Netty would actually detect any leaks.
33+
System.gc();
34+
}
35+
36+
// This method must be called as early as possible in the lifecycle of a process
37+
// before any Netty ByteBNuf has been allocated
38+
public static void ingestIntoNetty() {
39+
if (isInitialized) {
40+
return;
41+
}
42+
43+
synchronized (staticLock) {
44+
if (isInitialized) {
45+
return;
46+
}
47+
48+
// Must run before any Netty ByteBuf is allocated
49+
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
50+
// sample every allocation
51+
System.setProperty("io.netty.leakDetection.samplingInterval", "1");
52+
// install custom reporter
53+
ResourceLeakDetectorFactory.setResourceLeakDetectorFactory(new CosmosNettyLeakDetectorFactory());
54+
55+
isInitialized = true;
56+
}
57+
}
58+
59+
public static List<String> resetIdentifiedLeaks() {
60+
// Run GC to force finalizers to run - only in finalizers Netty would actually detect any leaks.
61+
System.gc();
62+
synchronized (staticLock) {
63+
List<String> leaksSnapshot = new ArrayList<>(identifiedLeaks);
64+
65+
identifiedLeaks.clear();
66+
return leaksSnapshot;
67+
}
68+
}
69+
70+
public static AutoCloseable createDisableLeakDetectionScope() {
71+
synchronized (staticLock) {
72+
logger.info("Disabling Leak detection: {}", StackTraceUtil.currentCallStack());
73+
return new DisableLeakDetectionScope();
74+
}
75+
}
76+
77+
@Override
78+
public <T> ResourceLeakDetector<T> newResourceLeakDetector(
79+
Class<T> resource, int samplingInterval, long maxActive) {
80+
81+
return new ResourceLeakDetector<T>(resource, samplingInterval, maxActive) {
82+
@Override
83+
protected void reportTracedLeak(String resourceType, String records) {
84+
synchronized (staticLock) {
85+
if (!isLeakDetectionDisabled) {
86+
String msg = "NETTY LEAK (traced) type="
87+
+ resourceType
88+
+ "records=\n"
89+
+ records;
90+
91+
identifiedLeaks.add(msg);
92+
logger.error(msg);
93+
}
94+
}
95+
}
96+
97+
@Override
98+
protected void reportUntracedLeak(String resourceType) {
99+
synchronized (staticLock) {
100+
String msg = "NETTY LEAK (untraced) type=" + resourceType;
101+
102+
identifiedLeaks.add(msg);
103+
logger.error(msg);
104+
}
105+
}
106+
107+
@Override
108+
protected void reportInstancesLeak(String resourceType) {
109+
synchronized (staticLock) {
110+
String msg = "NETTY LEAK (instances) type=" + resourceType;
111+
112+
identifiedLeaks.add(msg);
113+
logger.error(msg);
114+
}
115+
}
116+
};
117+
}
118+
119+
private static final class DisableLeakDetectionScope implements AutoCloseable {
120+
@Override
121+
public void close() {
122+
synchronized (staticLock) {
123+
isLeakDetectionDisabled = false;
124+
logger.info("Leak detection enabled again.");
125+
}
126+
}
127+
}
128+
}

sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/TestSuiteBase.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.azure.cosmos.implementation.ConnectionPolicy;
2727
import com.azure.cosmos.implementation.InternalObjectNode;
2828
import com.azure.cosmos.implementation.PathParser;
29+
import com.azure.cosmos.implementation.RxDocumentClientImpl;
2930
import com.azure.cosmos.implementation.TestConfigurations;
3031
import com.azure.cosmos.implementation.Utils;
3132
import com.azure.cosmos.implementation.directconnectivity.Protocol;
@@ -62,7 +63,9 @@
6263
import org.slf4j.Logger;
6364
import org.slf4j.LoggerFactory;
6465
import org.testng.ITestContext;
66+
import org.testng.annotations.AfterClass;
6567
import org.testng.annotations.AfterSuite;
68+
import org.testng.annotations.BeforeClass;
6669
import org.testng.annotations.BeforeSuite;
6770
import org.testng.annotations.DataProvider;
6871
import org.testng.annotations.Listeners;
@@ -73,14 +76,17 @@
7376
import java.time.Duration;
7477
import java.util.ArrayList;
7578
import java.util.Collections;
79+
import java.util.HashMap;
7680
import java.util.List;
81+
import java.util.Map;
7782
import java.util.UUID;
7883
import java.util.concurrent.TimeUnit;
7984
import java.util.stream.Collectors;
8085

8186
import static com.azure.cosmos.BridgeInternal.extractConfigs;
8287
import static com.azure.cosmos.BridgeInternal.injectConfigs;
8388
import static org.assertj.core.api.Assertions.assertThat;
89+
import static org.assertj.core.api.Fail.fail;
8490
import static org.mockito.Mockito.doAnswer;
8591
import static org.mockito.Mockito.spy;
8692

@@ -110,6 +116,8 @@ public class TestSuiteBase extends CosmosEncryptionAsyncClientTest {
110116

111117
protected int subscriberValidationTimeout = TIMEOUT;
112118

119+
private volatile Map<Integer, String> activeClientsAtBegin = new HashMap<>();
120+
113121
private static CosmosAsyncDatabase SHARED_DATABASE;
114122
private static CosmosAsyncContainer SHARED_MULTI_PARTITION_COLLECTION_WITH_ID_AS_PARTITION_KEY;
115123
private static CosmosAsyncContainer SHARED_MULTI_PARTITION_COLLECTION;
@@ -226,6 +234,56 @@ public CosmosAsyncDatabase getDatabase(String id) {
226234
}
227235
}
228236

237+
@BeforeClass(groups = {"fast", "long", "direct", "multi-master", "encryption"}, timeOut = SUITE_SETUP_TIMEOUT)
238+
239+
public void beforeClassSetupLeakDetection() {
240+
CosmosNettyLeakDetectorFactory.ingestIntoNetty();
241+
this.activeClientsAtBegin = RxDocumentClientImpl.getActiveClientsSnapshot();
242+
}
243+
244+
@AfterClass(groups = {"fast", "long", "direct", "multi-master", "encryption"}, timeOut = SUITE_SETUP_TIMEOUT)
245+
public void afterClassSetupLeakDetection() {
246+
247+
Map<Integer, String> leakedClientSnapshotNow = RxDocumentClientImpl.getActiveClientsSnapshot();
248+
StringBuilder sb = new StringBuilder();
249+
Map<Integer, String> leakedClientSnapshotAtBegin = activeClientsAtBegin;
250+
251+
for (Integer clientId : leakedClientSnapshotNow.keySet()) {
252+
if (!leakedClientSnapshotAtBegin.containsKey(clientId)) {
253+
// this client was leaked in this class
254+
sb
255+
.append("CosmosClient [")
256+
.append(clientId)
257+
.append("] leaked. Callstack of initialization:\n")
258+
.append(leakedClientSnapshotNow.get(clientId));
259+
}
260+
}
261+
262+
if (sb.length() > 0) {
263+
String msg = "\"COSMOS CLIENT LEAKS detected in test class: "
264+
+ this.getClass().getCanonicalName()
265+
+ sb;
266+
267+
logger.error(msg);
268+
// fail(msg);
269+
}
270+
271+
List<String> nettyLeaks = CosmosNettyLeakDetectorFactory.resetIdentifiedLeaks();
272+
if (nettyLeaks.size() > 0) {
273+
sb.append("\n");
274+
for (String leak : nettyLeaks) {
275+
sb.append(leak).append("\n");
276+
}
277+
278+
String msg = "\"NETTY LEAKS detected in test class: "
279+
+ this.getClass().getCanonicalName()
280+
+ sb;
281+
282+
logger.error(msg);
283+
// fail(msg);
284+
}
285+
}
286+
229287
@BeforeSuite(groups = {"fast", "long", "direct", "multi-master", "encryption"}, timeOut = SUITE_SETUP_TIMEOUT)
230288
public static void beforeSuite() {
231289

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosAsyncClientTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.azure.cosmos.models.PartitionKey;
1111
import org.testng.ITest;
1212
import org.testng.annotations.AfterMethod;
13+
import org.testng.annotations.BeforeClass;
1314
import org.testng.annotations.BeforeMethod;
1415

1516
import java.lang.reflect.Method;
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.cosmos;
4+
5+
import com.azure.cosmos.implementation.StackTraceUtil;
6+
import io.netty.util.ResourceLeakDetector;
7+
import io.netty.util.ResourceLeakDetectorFactory;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
import org.testng.IExecutionListener;
11+
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
15+
public final class CosmosNettyLeakDetectorFactory extends ResourceLeakDetectorFactory implements IExecutionListener {
16+
protected static Logger logger = LoggerFactory.getLogger(CosmosNettyLeakDetectorFactory.class.getSimpleName());
17+
private final static List<String> identifiedLeaks = new ArrayList<>();
18+
private final static Object staticLock = new Object();
19+
private static volatile boolean isLeakDetectionDisabled = false;
20+
private static volatile boolean isInitialized = false;
21+
22+
private CosmosNettyLeakDetectorFactory() {
23+
}
24+
25+
@Override
26+
public void onExecutionStart() {
27+
ingestIntoNetty();
28+
}
29+
30+
@Override
31+
public void onExecutionFinish() {
32+
// Run GC to force finalizers to run - only in finalizers Netty would actually detect any leaks.
33+
System.gc();
34+
}
35+
36+
// This method must be called as early as possible in the lifecycle of a process
37+
// before any Netty ByteBNuf has been allocated
38+
public static void ingestIntoNetty() {
39+
if (isInitialized) {
40+
return;
41+
}
42+
43+
synchronized (staticLock) {
44+
if (isInitialized) {
45+
return;
46+
}
47+
48+
// Must run before any Netty ByteBuf is allocated
49+
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
50+
// sample every allocation
51+
System.setProperty("io.netty.leakDetection.samplingInterval", "1");
52+
// install custom reporter
53+
ResourceLeakDetectorFactory.setResourceLeakDetectorFactory(new CosmosNettyLeakDetectorFactory());
54+
55+
isInitialized = true;
56+
}
57+
}
58+
59+
public static List<String> resetIdentifiedLeaks() {
60+
// Run GC to force finalizers to run - only in finalizers Netty would actually detect any leaks.
61+
System.gc();
62+
synchronized (staticLock) {
63+
List<String> leaksSnapshot = new ArrayList<>(identifiedLeaks);
64+
65+
identifiedLeaks.clear();
66+
return leaksSnapshot;
67+
}
68+
}
69+
70+
public static AutoCloseable createDisableLeakDetectionScope() {
71+
synchronized (staticLock) {
72+
logger.info("Disabling Leak detection: {}", StackTraceUtil.currentCallStack());
73+
return new DisableLeakDetectionScope();
74+
}
75+
}
76+
77+
@Override
78+
public <T> ResourceLeakDetector<T> newResourceLeakDetector(
79+
Class<T> resource, int samplingInterval, long maxActive) {
80+
81+
return new ResourceLeakDetector<T>(resource, samplingInterval, maxActive) {
82+
@Override
83+
protected void reportTracedLeak(String resourceType, String records) {
84+
synchronized (staticLock) {
85+
if (!isLeakDetectionDisabled) {
86+
String msg = "NETTY LEAK (traced) type="
87+
+ resourceType
88+
+ "records=\n"
89+
+ records;
90+
91+
identifiedLeaks.add(msg);
92+
logger.error(msg);
93+
}
94+
}
95+
}
96+
97+
@Override
98+
protected void reportUntracedLeak(String resourceType) {
99+
synchronized (staticLock) {
100+
String msg = "NETTY LEAK (untraced) type=" + resourceType;
101+
102+
identifiedLeaks.add(msg);
103+
logger.error(msg);
104+
}
105+
}
106+
107+
@Override
108+
protected void reportInstancesLeak(String resourceType) {
109+
synchronized (staticLock) {
110+
String msg = "NETTY LEAK (instances) type=" + resourceType;
111+
112+
identifiedLeaks.add(msg);
113+
logger.error(msg);
114+
}
115+
}
116+
};
117+
}
118+
119+
private static final class DisableLeakDetectionScope implements AutoCloseable {
120+
@Override
121+
public void close() {
122+
synchronized (staticLock) {
123+
isLeakDetectionDisabled = false;
124+
logger.info("Leak detection enabled again.");
125+
}
126+
}
127+
}
128+
}

0 commit comments

Comments
 (0)