Skip to content

Commit 121b62f

Browse files
authored
Merge pull request #259 from scalecube/bug/jmx-memory-leak
Fix memory leak in jmx bean
2 parents d1ca70f + c5725d8 commit 121b62f

File tree

1 file changed

+14
-7
lines changed

1 file changed

+14
-7
lines changed

cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import java.util.Objects;
3030
import java.util.Optional;
31+
import java.util.concurrent.CopyOnWriteArrayList;
3132
import java.util.concurrent.ThreadLocalRandom;
3233
import java.util.concurrent.TimeUnit;
3334
import java.util.function.Predicate;
@@ -43,7 +44,6 @@
4344
import reactor.core.publisher.FluxSink;
4445
import reactor.core.publisher.Mono;
4546
import reactor.core.publisher.MonoSink;
46-
import reactor.core.publisher.ReplayProcessor;
4747
import reactor.core.scheduler.Scheduler;
4848

4949
public final class MembershipProtocolImpl implements MembershipProtocol {
@@ -742,15 +742,22 @@ public static class JmxMonitorMBean extends AbstractMonitorMBean implements Moni
742742
public static final int REMOVED_MEMBERS_HISTORY_SIZE = 42;
743743

744744
private final MembershipProtocolImpl membershipProtocol;
745-
private final ReplayProcessor<MembershipEvent> removedMembersHistory;
745+
private final List<MembershipEvent> removedMembersHistory;
746746

747747
private JmxMonitorMBean(MembershipProtocolImpl membershipProtocol) {
748748
this.membershipProtocol = membershipProtocol;
749-
this.removedMembersHistory = ReplayProcessor.create(REMOVED_MEMBERS_HISTORY_SIZE);
749+
this.removedMembersHistory = new CopyOnWriteArrayList<>();
750+
750751
membershipProtocol
751752
.listen()
752753
.filter(MembershipEvent::isRemoved)
753-
.subscribe(removedMembersHistory);
754+
.subscribe(
755+
event -> {
756+
removedMembersHistory.add(event);
757+
if (removedMembersHistory.size() > REMOVED_MEMBERS_HISTORY_SIZE) {
758+
removedMembersHistory.remove(0);
759+
}
760+
});
754761
}
755762

756763
@Override
@@ -782,9 +789,9 @@ public String getSuspectedMembersAsString() {
782789

783790
@Override
784791
public List<String> getDeadMembers() {
785-
List<String> deadMembers = new ArrayList<>();
786-
removedMembersHistory.map(MembershipEvent::toString).subscribe(deadMembers::add);
787-
return deadMembers;
792+
return removedMembersHistory.stream()
793+
.map(MembershipEvent::toString)
794+
.collect(Collectors.toList());
788795
}
789796

790797
@Override

0 commit comments

Comments
 (0)