Skip to content

Commit 87ca7f3

Browse files
TakaHiR07fanjianyemerlimat
authored
Fix Memory Leak In Netty Recycler of Bookie Client (#4609)
* make recycler static to avoid OOM problem * Fixed missing license headers * Fixed checkstyle * More checkstyle --------- Co-authored-by: fanjianye <[email protected]> Co-authored-by: Matteo Merli <[email protected]>
1 parent 7a29abf commit 87ca7f3

14 files changed

+1235
-912
lines changed
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
22+
package org.apache.bookkeeper.proto;
23+
24+
import io.netty.util.Recycler;
25+
import org.apache.bookkeeper.client.BKException;
26+
import org.apache.bookkeeper.common.util.MathUtils;
27+
import org.apache.bookkeeper.net.BookieId;
28+
import org.slf4j.MDC;
29+
30+
class AddCompletion extends CompletionValue implements BookkeeperInternalCallbacks.WriteCallback {
31+
32+
static AddCompletion acquireAddCompletion(final CompletionKey key,
33+
final BookkeeperInternalCallbacks.WriteCallback originalCallback,
34+
final Object originalCtx,
35+
final long ledgerId, final long entryId,
36+
PerChannelBookieClient perChannelBookieClient) {
37+
AddCompletion completion = ADD_COMPLETION_RECYCLER.get();
38+
completion.reset(key, originalCallback, originalCtx, ledgerId, entryId, perChannelBookieClient);
39+
return completion;
40+
}
41+
42+
final Recycler.Handle<AddCompletion> handle;
43+
44+
CompletionKey key = null;
45+
BookkeeperInternalCallbacks.WriteCallback originalCallback = null;
46+
47+
AddCompletion(Recycler.Handle<AddCompletion> handle) {
48+
super("Add", null, -1, -1, null);
49+
this.handle = handle;
50+
}
51+
52+
void reset(final CompletionKey key,
53+
final BookkeeperInternalCallbacks.WriteCallback originalCallback,
54+
final Object originalCtx,
55+
final long ledgerId, final long entryId,
56+
PerChannelBookieClient perChannelBookieClient) {
57+
this.key = key;
58+
this.originalCallback = originalCallback;
59+
this.ctx = originalCtx;
60+
this.ledgerId = ledgerId;
61+
this.entryId = entryId;
62+
this.startTime = org.apache.bookkeeper.common.util.MathUtils.nowInNano();
63+
64+
this.opLogger = perChannelBookieClient.addEntryOpLogger;
65+
this.timeoutOpLogger = perChannelBookieClient.addTimeoutOpLogger;
66+
this.perChannelBookieClient = perChannelBookieClient;
67+
this.mdcContextMap = perChannelBookieClient.preserveMdcForTaskExecution ? MDC.getCopyOfContextMap() : null;
68+
}
69+
70+
@Override
71+
public void release() {
72+
this.ctx = null;
73+
this.opLogger = null;
74+
this.timeoutOpLogger = null;
75+
this.perChannelBookieClient = null;
76+
this.mdcContextMap = null;
77+
handle.recycle(this);
78+
}
79+
80+
@Override
81+
public void writeComplete(int rc, long ledgerId, long entryId,
82+
BookieId addr,
83+
Object ctx) {
84+
logOpResult(rc);
85+
originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
86+
key.release();
87+
this.release();
88+
}
89+
90+
@Override
91+
boolean maybeTimeout() {
92+
if (MathUtils.elapsedNanos(startTime) >= perChannelBookieClient.addEntryTimeoutNanos) {
93+
timeout();
94+
return true;
95+
} else {
96+
return false;
97+
}
98+
}
99+
100+
@Override
101+
public void errorOut() {
102+
errorOut(BKException.Code.BookieHandleNotAvailableException);
103+
}
104+
105+
@Override
106+
public void errorOut(final int rc) {
107+
errorOutAndRunCallback(
108+
() -> writeComplete(rc, ledgerId, entryId, perChannelBookieClient.bookieId, ctx));
109+
}
110+
111+
@Override
112+
public void setOutstanding() {
113+
perChannelBookieClient.addEntryOutstanding.inc();
114+
}
115+
116+
@Override
117+
public void handleV2Response(
118+
long ledgerId, long entryId, BookkeeperProtocol.StatusCode status,
119+
BookieProtocol.Response response) {
120+
perChannelBookieClient.addEntryOutstanding.dec();
121+
handleResponse(ledgerId, entryId, status);
122+
}
123+
124+
@Override
125+
public void handleV3Response(
126+
BookkeeperProtocol.Response response) {
127+
perChannelBookieClient.addEntryOutstanding.dec();
128+
BookkeeperProtocol.AddResponse addResponse = response.getAddResponse();
129+
BookkeeperProtocol.StatusCode status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK
130+
? addResponse.getStatus() : response.getStatus();
131+
handleResponse(addResponse.getLedgerId(), addResponse.getEntryId(),
132+
status);
133+
}
134+
135+
private void handleResponse(long ledgerId, long entryId,
136+
BookkeeperProtocol.StatusCode status) {
137+
if (LOG.isDebugEnabled()) {
138+
logResponse(status, "ledger", ledgerId, "entry", entryId);
139+
}
140+
141+
int rc = convertStatus(status, BKException.Code.WriteException);
142+
writeComplete(rc, ledgerId, entryId, perChannelBookieClient.bookieId, ctx);
143+
}
144+
145+
private static final Recycler<AddCompletion> ADD_COMPLETION_RECYCLER = new Recycler<AddCompletion>() {
146+
@Override
147+
protected AddCompletion newObject(Recycler.Handle<AddCompletion> handle) {
148+
return new AddCompletion(handle);
149+
}
150+
};
151+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
22+
package org.apache.bookkeeper.proto;
23+
24+
import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
25+
26+
import org.apache.bookkeeper.client.BKException;
27+
import org.apache.bookkeeper.util.ByteBufList;
28+
29+
class BatchedReadCompletion extends CompletionValue {
30+
31+
final BookkeeperInternalCallbacks.BatchedReadEntryCallback cb;
32+
33+
public BatchedReadCompletion(final CompletionKey key,
34+
final BookkeeperInternalCallbacks.BatchedReadEntryCallback originalCallback,
35+
final Object originalCtx,
36+
long ledgerId, final long entryId,
37+
PerChannelBookieClient perChannelBookieClient) {
38+
super("BatchedRead", originalCtx, ledgerId, entryId, perChannelBookieClient);
39+
this.opLogger = perChannelBookieClient.readEntryOpLogger;
40+
this.timeoutOpLogger = perChannelBookieClient.readTimeoutOpLogger;
41+
this.cb = (rc, ledgerId1, startEntryId, bufList, ctx) -> {
42+
logOpResult(rc);
43+
originalCallback.readEntriesComplete(rc,
44+
ledgerId1, entryId,
45+
bufList, originalCtx);
46+
key.release();
47+
};
48+
}
49+
50+
@Override
51+
public void errorOut() {
52+
errorOut(BKException.Code.BookieHandleNotAvailableException);
53+
}
54+
55+
@Override
56+
public void errorOut(final int rc) {
57+
errorOutAndRunCallback(
58+
() -> cb.readEntriesComplete(rc, ledgerId,
59+
entryId, null, ctx));
60+
}
61+
62+
@Override
63+
public void handleV2Response(long ledgerId,
64+
long entryId,
65+
BookkeeperProtocol.StatusCode status,
66+
BookieProtocol.Response response) {
67+
68+
perChannelBookieClient.readEntryOutstanding.dec();
69+
if (!(response instanceof BookieProtocol.BatchedReadResponse)) {
70+
return;
71+
}
72+
BookieProtocol.BatchedReadResponse readResponse = (BookieProtocol.BatchedReadResponse) response;
73+
handleBatchedReadResponse(ledgerId, entryId, status, readResponse.getData(),
74+
INVALID_ENTRY_ID, -1L);
75+
}
76+
77+
@Override
78+
public void handleV3Response(BookkeeperProtocol.Response response) {
79+
// V3 protocol haven't supported batched read yet.
80+
}
81+
82+
private void handleBatchedReadResponse(long ledgerId,
83+
long entryId,
84+
BookkeeperProtocol.StatusCode status,
85+
ByteBufList buffers,
86+
long maxLAC, // max known lac piggy-back from bookies
87+
long lacUpdateTimestamp) { // the timestamp when the lac is updated.
88+
int rc = convertStatus(status, BKException.Code.ReadException);
89+
90+
if (maxLAC > INVALID_ENTRY_ID && (ctx instanceof BookkeeperInternalCallbacks.ReadEntryCallbackCtx)) {
91+
((BookkeeperInternalCallbacks.ReadEntryCallbackCtx) ctx).setLastAddConfirmed(maxLAC);
92+
}
93+
if (lacUpdateTimestamp > -1L && (ctx instanceof ReadLastConfirmedAndEntryContext)) {
94+
((ReadLastConfirmedAndEntryContext) ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
95+
}
96+
cb.readEntriesComplete(rc, ledgerId, entryId, buffers, ctx);
97+
}
98+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
22+
package org.apache.bookkeeper.proto;
23+
24+
import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
25+
26+
abstract class CompletionKey {
27+
OperationType operationType;
28+
29+
CompletionKey(OperationType operationType) {
30+
this.operationType = operationType;
31+
}
32+
33+
public void release() {}
34+
}

0 commit comments

Comments
 (0)