Skip to content

Commit 0cc787f

Browse files
lhotarimanas-ctds
authored andcommitted
[fix][test] Fix multiple ByteBuf leaks in tests (apache#24281)
(cherry picked from commit 669c642) (cherry picked from commit 564618d)
1 parent fbdfc3b commit 0cc787f

File tree

14 files changed

+184
-58
lines changed

14 files changed

+184
-58
lines changed

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2899,6 +2899,9 @@ void testTrimDeletedEntries() throws ManagedLedgerException, InterruptedExceptio
28992899
EntryImpl entry5 = EntryImpl.create(markDeletedPosition.getLedgerId(), markDeletedPosition.getEntryId() + 7,
29002900
ByteBufAllocator.DEFAULT.buffer(0));
29012901
List<Entry> entries = Lists.newArrayList(entry1, entry2, entry3, entry4, entry5);
2902+
// release data buffers since EntryImpl.create will retain the buffer
2903+
entries.forEach(entry -> entry.getDataBuffer().release());
2904+
29022905
c1.trimDeletedEntries(entries);
29032906
assertEquals(entries.size(), 1);
29042907
assertEquals(entries.get(0).getPosition(), PositionFactory.create(markDeletedPosition.getLedgerId(),
@@ -2908,6 +2911,9 @@ void testTrimDeletedEntries() throws ManagedLedgerException, InterruptedExceptio
29082911
assertEquals(entry2.refCnt(), 0);
29092912
assertEquals(entry3.refCnt(), 0);
29102913
assertEquals(entry4.refCnt(), 0);
2914+
2915+
// release remaining entry
2916+
entries.forEach(Entry::release);
29112917
}
29122918

29132919
@Test(timeOut = 20000)
@@ -5286,6 +5292,7 @@ public void testApplyMaxSizeCap() throws Exception {
52865292
@Test
52875293
void testForceCursorRecovery() throws Exception {
52885294
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
5295+
factory.shutdown();
52895296
factory = new ManagedLedgerFactoryImpl(metadataStore, bk);
52905297
ManagedLedgerConfig config = new ManagedLedgerConfig();
52915298
config.setLedgerForceRecovery(true);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3778,4 +3778,9 @@ public void incrementThrottleCount() {
37783778
public void decrementThrottleCount() {
37793779
throttleTracker.decrementThrottleCount();
37803780
}
3781+
3782+
@VisibleForTesting
3783+
void setAuthState(AuthenticationState authState) {
3784+
this.authState = authState;
3785+
}
37813786
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,12 @@ public void testAddBrokerEntryMetadata() throws Exception {
124124
assertEquals(19, ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex());
125125
List<Entry> entryList = cursor.readEntries(numberOfEntries);
126126
for (int i = 0 ; i < numberOfEntries; i ++) {
127+
Entry entry = entryList.get(i);
127128
BrokerEntryMetadata metadata =
128-
Commands.parseBrokerEntryMetadataIfExist(entryList.get(i).getDataBuffer());
129+
Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
129130
assertNotNull(metadata);
130131
assertEquals(metadata.getIndex(), (i + 1) * MOCK_BATCH_SIZE - 1);
132+
entry.release();
131133
}
132134

133135
cursor.close();
@@ -151,7 +153,9 @@ public void testMessagePayloadProcessor() throws Exception {
151153
ledger.addEntry("Test Message".getBytes());
152154
factory.getEntryCacheManager().clear();
153155
List<Entry> entryList = cursor.readEntries(1);
154-
String message = new String(entryList.get(0).getData());
156+
Entry entry = entryList.get(0);
157+
String message = new String(entry.getData());
158+
entry.release();
155159
Assert.assertTrue(message.equals("Test Message"));
156160
cursor.close();
157161
ledger.close();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ public void testFilterEntriesForConsumerOfEntryFilter() throws Exception {
108108

109109
List<Entry> entries = new ArrayList<>();
110110

111-
Entry e = EntryImpl.create(1, 2, createMessage("message1", 1));
111+
ByteBuf message = createMessage("message1", 1);
112+
Entry e = EntryImpl.create(1, 2, message);
113+
message.release();
112114
long expectedBytePermits = e.getLength();
113115
entries.add(e);
114116
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
@@ -124,7 +126,9 @@ public void testFilterEntriesForConsumerOfEntryFilter() throws Exception {
124126
@Test
125127
public void testFilterEntriesForConsumerOfTxnMsgAbort() {
126128
List<Entry> entries = new ArrayList<>();
127-
entries.add(EntryImpl.create(1, 1, createTnxAbortMessage("message1", 1)));
129+
ByteBuf message = createTnxAbortMessage("message1", 1);
130+
entries.add(EntryImpl.create(1, 1, message));
131+
message.release();
128132

129133
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
130134
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
@@ -140,7 +144,9 @@ public void testFilterEntriesForConsumerOfTxnBufferAbort() {
140144
when(mockTopic.isTxnAborted(any(TxnID.class), any())).thenReturn(true);
141145

142146
List<Entry> entries = new ArrayList<>();
143-
entries.add(EntryImpl.create(1, 1, createTnxMessage("message1", 1)));
147+
ByteBuf message = createTnxMessage("message1", 1);
148+
entries.add(EntryImpl.create(1, 1, message));
149+
message.release();
144150

145151
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
146152
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
@@ -154,6 +160,7 @@ public void testFilterEntriesForConsumerOfServerOnlyMarker() {
154160
ByteBuf markerMessage =
155161
Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId", "testSourceCluster");
156162
entries.add(EntryImpl.create(1, 1, markerMessage));
163+
markerMessage.release();
157164

158165
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
159166
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
@@ -164,7 +171,9 @@ public void testFilterEntriesForConsumerOfServerOnlyMarker() {
164171
@Test
165172
public void testFilterEntriesForConsumerOfDelayedMsg() {
166173
List<Entry> entries = new ArrayList<>();
167-
entries.add(EntryImpl.create(1, 1, createDelayedMessage("message1", 1)));
174+
ByteBuf message = createDelayedMessage("message1", 1);
175+
entries.add(EntryImpl.create(1, 1, message));
176+
message.release();
168177

169178
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
170179
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ public static byte[] createMessageWrittenToLedger(String msg, long messageTimest
9898
headers.writeInt(msgMetadataSize);
9999
messageMetadata.writeTo(headers);
100100
ByteBuf headersAndPayload = ByteBufPair.coalesce(ByteBufPair.get(headers, data));
101-
byte[] byteMessage = headersAndPayload.nioBuffer().array();
101+
byte[] byteMessage = new byte[headersAndPayload.readableBytes()];
102+
headersAndPayload.readBytes(byteMessage);
102103
headersAndPayload.release();
103104
return byteMessage;
104105
}
@@ -123,7 +124,8 @@ public static ByteBuf createMessageByteBufWrittenToLedger(String msg) throws Exc
123124
public static byte[] appendBrokerTimestamp(ByteBuf headerAndPayloads) throws Exception {
124125
ByteBuf msgWithEntryMeta =
125126
Commands.addBrokerEntryMetadata(headerAndPayloads, getBrokerEntryMetadataInterceptors(), 1);
126-
byte[] byteMessage = msgWithEntryMeta.nioBuffer().array();
127+
byte[] byteMessage = new byte[msgWithEntryMeta.readableBytes()];
128+
msgWithEntryMeta.readBytes(byteMessage);
127129
msgWithEntryMeta.release();
128130
return byteMessage;
129131
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,11 @@
4747
import io.netty.buffer.ByteBuf;
4848
import io.netty.buffer.Unpooled;
4949
import io.netty.channel.Channel;
50+
import io.netty.channel.ChannelFuture;
5051
import io.netty.channel.ChannelHandlerContext;
5152
import io.netty.channel.DefaultEventLoopGroup;
5253
import io.netty.channel.EventLoopGroup;
54+
import io.netty.util.ReferenceCountUtil;
5355
import java.lang.reflect.Field;
5456
import java.lang.reflect.Method;
5557
import java.net.InetAddress;
@@ -172,6 +174,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
172174

173175
private EventLoopGroup eventLoopGroup;
174176
private ManagedLedgerFactory managedLedgerFactory;
177+
private ChannelHandlerContext ctx;
175178

176179
@BeforeMethod(alwaysRun = true)
177180
public void setup() throws Exception {
@@ -209,7 +212,7 @@ public void setup() throws Exception {
209212
doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
210213
doReturn(new PulsarCommandSenderImpl(null, serverCnx))
211214
.when(serverCnx).getCommandSender();
212-
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
215+
ctx = mock(ChannelHandlerContext.class);
213216
Channel channel = mock(Channel.class);
214217

215218
eventLoopGroup = new DefaultEventLoopGroup();
@@ -2278,16 +2281,26 @@ public void testGetReplicationClusters() throws MetadataStoreException {
22782281
@Test
22792282
public void testSendProducerTxnPrechecks() throws Exception {
22802283
PersistentTopic topic = mock(PersistentTopic.class);
2284+
CountDownLatch latch = new CountDownLatch(1);
2285+
doAnswer(invocation -> {
2286+
Object msg = invocation.getArgument(0);
2287+
ReferenceCountUtil.safeRelease(msg);
2288+
latch.countDown();
2289+
return mock(ChannelFuture.class);
2290+
}).when(ctx).writeAndFlush(any(), any());
22812291
String role = "appid1";
22822292
Producer producer1 = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
22832293
role, false, null, SchemaVersion.Latest, 0, true,
22842294
ProducerAccessMode.Shared, Optional.empty(), true);
22852295
producer1.close(false).get();
2296+
ByteBuf headersAndPayload = Unpooled.wrappedBuffer("test".getBytes());
22862297
producer1.publishTxnMessage(
22872298
new TxnID(1L, 0L),
2288-
1, 1, 1, null, 1, false, false
2299+
1, 1, 1, headersAndPayload, 1, false, false
22892300
);
22902301
verify(topic, times(0)).publishTxnMessage(any(), any(), any());
2302+
// wait for the writeAndFlush to be called so that ByteBuf leak isn't reported
2303+
assertTrue(latch.await(5, TimeUnit.SECONDS));
22912304
}
22922305

22932306
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.function.Predicate;
7373
import java.util.function.Supplier;
7474
import java.util.stream.Collectors;
75+
import javax.naming.AuthenticationException;
7576
import lombok.AllArgsConstructor;
7677
import lombok.extern.slf4j.Slf4j;
7778
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -96,9 +97,11 @@
9697
import org.apache.pulsar.broker.auth.MockAuthorizationProvider;
9798
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
9899
import org.apache.pulsar.broker.auth.MockMutableAuthenticationProvider;
100+
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
99101
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
100102
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
101103
import org.apache.pulsar.broker.authentication.AuthenticationService;
104+
import org.apache.pulsar.broker.authentication.AuthenticationState;
102105
import org.apache.pulsar.broker.authorization.AuthorizationService;
103106
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
104107
import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -117,7 +120,6 @@
117120
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
118121
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
119122
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
120-
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
121123
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
122124
import org.apache.pulsar.common.api.proto.CommandConnected;
123125
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
@@ -332,7 +334,7 @@ public void testConnectCommandWithProxyVersion() throws Exception {
332334

333335
assertEquals(serverCnx.getState(), State.Connected);
334336
assertEquals(serverCnx.getProxyVersion(), "my-pulsar-proxy");
335-
channel.finish();
337+
channel.finishAndReleaseAll();
336338
}
337339

338340
@DataProvider(name = "clientVersions")
@@ -3094,7 +3096,7 @@ public void testDelayedClosedProducer() throws Exception {
30943096
Object response = getResponse();
30953097
assertTrue(response instanceof CommandSuccess);
30963098

3097-
channel.finish();
3099+
channel.finishAndReleaseAll();
30983100
}
30993101

31003102
@Test(timeOut = 30000)
@@ -3399,21 +3401,42 @@ public boolean isCompletedExceptionally() {
33993401
}
34003402

34013403
@Test
3402-
public void testHandleAuthResponseWithoutClientVersion() {
3403-
ServerCnx cnx = mock(ServerCnx.class, CALLS_REAL_METHODS);
3404-
CommandAuthResponse authResponse = mock(CommandAuthResponse.class);
3405-
org.apache.pulsar.common.api.proto.AuthData authData = mock(org.apache.pulsar.common.api.proto.AuthData.class);
3406-
when(authResponse.getResponse()).thenReturn(authData);
3407-
when(authResponse.hasResponse()).thenReturn(true);
3408-
when(authResponse.getResponse().hasAuthMethodName()).thenReturn(true);
3409-
when(authResponse.getResponse().hasAuthData()).thenReturn(true);
3410-
when(authResponse.hasClientVersion()).thenReturn(false);
3411-
try {
3412-
cnx.handleAuthResponse(authResponse);
3413-
} catch (Exception ignore) {
3414-
}
3415-
verify(authResponse, times(1)).hasClientVersion();
3416-
verify(authResponse, times(0)).getClientVersion();
3404+
public void testHandleAuthResponseWithoutClientVersion() throws Exception {
3405+
resetChannel();
3406+
// use a dummy authentication provider
3407+
AuthenticationProvider authenticationProvider = new AuthenticationProvider() {
3408+
@Override
3409+
public void initialize(ServiceConfiguration config) throws IOException {
3410+
3411+
}
3412+
3413+
@Override
3414+
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
3415+
return "role";
3416+
}
3417+
3418+
@Override
3419+
public String getAuthMethodName() {
3420+
return "dummy";
3421+
}
3422+
3423+
@Override
3424+
public void close() throws IOException {
3425+
3426+
}
3427+
};
3428+
AuthData clientData = AuthData.of(new byte[0]);
3429+
AuthenticationState authenticationState =
3430+
authenticationProvider.newAuthState(clientData, null, null);
3431+
// inject the AuthenticationState instance so that auth response can be processed
3432+
serverCnx.setAuthState(authenticationState);
3433+
// send the auth response with no client version
3434+
String clientVersion = null;
3435+
ByteBuf authResponse =
3436+
Commands.newAuthResponse("token", clientData, Commands.getCurrentProtocolVersion(), clientVersion);
3437+
channel.writeInbound(authResponse);
3438+
CommandConnected response = (CommandConnected) getResponse();
3439+
assertNotNull(response);
34173440
}
34183441

34193442
@Test(expectedExceptions = IllegalArgumentException.class)

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.Collections;
3030
import java.util.List;
3131
import java.util.Map;
32+
import java.util.Queue;
33+
import java.util.concurrent.LinkedBlockingDeque;
3234
import java.util.concurrent.atomic.AtomicLong;
3335
import java.util.function.Supplier;
3436
import java.util.stream.Collectors;
@@ -37,6 +39,7 @@
3739
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
3840
import org.apache.pulsar.common.api.proto.MessageMetadata;
3941
import org.apache.pulsar.common.protocol.Commands;
42+
import org.testng.annotations.AfterMethod;
4043
import org.testng.annotations.BeforeMethod;
4144
import org.testng.annotations.Test;
4245

@@ -46,6 +49,8 @@ public class SharedConsumerAssignorTest {
4649
private final ConsumerSelector roundRobinConsumerSelector = new ConsumerSelector();
4750
private final List<EntryAndMetadata> entryAndMetadataList = new ArrayList<>();
4851
private final List<EntryAndMetadata> replayQueue = new ArrayList<>();
52+
private final Queue<EntryAndMetadata> cleanupQueue = new LinkedBlockingDeque<>();
53+
4954
private SharedConsumerAssignor assignor;
5055

5156
@BeforeMethod
@@ -77,6 +82,16 @@ public void prepareData() {
7782
// P.S. In the table above, The uuid represents the "<producer-name>-<sequence-id>" for non-chunks
7883
assertEquals(toString(entryAndMetadataList), Arrays.asList(
7984
"0:0@A-0", "0:1@A-1-0-3", "0:2@A-1-1-3", "0:3@B-0", "0:4@B-1-0-2", "0:5@A-1-2-3", "0:6@B-1-1-2"));
85+
entryAndMetadataList.forEach(entry -> assertEquals(entry.getDataBuffer().refCnt(), 1));
86+
cleanupQueue.addAll(entryAndMetadataList);
87+
}
88+
89+
@AfterMethod
90+
public void releaseEntries() {
91+
EntryAndMetadata entry;
92+
while ((entry = cleanupQueue.poll()) != null) {
93+
entry.release();
94+
}
8095
}
8196

8297
@Test
@@ -199,9 +214,13 @@ void sendChunk(int chunkId, int numChunks) {
199214

200215
private static EntryAndMetadata createEntryAndMetadata(final long entryId,
201216
final MessageMetadata metadata) {
202-
final ByteBuf payload = Commands.serializeMetadataAndPayload(
203-
Commands.ChecksumType.Crc32c, metadata, PulsarByteBufAllocator.DEFAULT.buffer());
204-
return EntryAndMetadata.create(EntryImpl.create(0L, entryId, payload));
217+
ByteBuf payload = PulsarByteBufAllocator.DEFAULT.buffer();
218+
final ByteBuf data = Commands.serializeMetadataAndPayload(
219+
Commands.ChecksumType.Crc32c, metadata, payload);
220+
payload.release();
221+
EntryAndMetadata entryAndMetadata = EntryAndMetadata.create(EntryImpl.create(0L, entryId, data));
222+
data.release();
223+
return entryAndMetadata;
205224
}
206225

207226
private static MessageMetadata createMetadata(final String producerName,

0 commit comments

Comments
 (0)