|
28 | 28 | import java.util.Map; |
29 | 29 | import java.util.Objects; |
30 | 30 | import java.util.Optional; |
| 31 | +import java.util.concurrent.CopyOnWriteArrayList; |
31 | 32 | import java.util.concurrent.ThreadLocalRandom; |
32 | 33 | import java.util.concurrent.TimeUnit; |
33 | 34 | import java.util.function.Predicate; |
|
43 | 44 | import reactor.core.publisher.FluxSink; |
44 | 45 | import reactor.core.publisher.Mono; |
45 | 46 | import reactor.core.publisher.MonoSink; |
46 | | -import reactor.core.publisher.ReplayProcessor; |
47 | 47 | import reactor.core.scheduler.Scheduler; |
48 | 48 |
|
49 | 49 | public final class MembershipProtocolImpl implements MembershipProtocol { |
@@ -742,15 +742,22 @@ public static class JmxMonitorMBean extends AbstractMonitorMBean implements Moni |
742 | 742 | public static final int REMOVED_MEMBERS_HISTORY_SIZE = 42; |
743 | 743 |
|
744 | 744 | private final MembershipProtocolImpl membershipProtocol; |
745 | | - private final ReplayProcessor<MembershipEvent> removedMembersHistory; |
| 745 | + private final List<MembershipEvent> removedMembersHistory; |
746 | 746 |
|
747 | 747 | private JmxMonitorMBean(MembershipProtocolImpl membershipProtocol) { |
748 | 748 | this.membershipProtocol = membershipProtocol; |
749 | | - this.removedMembersHistory = ReplayProcessor.create(REMOVED_MEMBERS_HISTORY_SIZE); |
| 749 | + this.removedMembersHistory = new CopyOnWriteArrayList<>(); |
| 750 | + |
750 | 751 | membershipProtocol |
751 | 752 | .listen() |
752 | 753 | .filter(MembershipEvent::isRemoved) |
753 | | - .subscribe(removedMembersHistory); |
| 754 | + .subscribe( |
| 755 | + event -> { |
| 756 | + removedMembersHistory.add(event); |
| 757 | + if (removedMembersHistory.size() > REMOVED_MEMBERS_HISTORY_SIZE) { |
| 758 | + removedMembersHistory.remove(0); |
| 759 | + } |
| 760 | + }); |
754 | 761 | } |
755 | 762 |
|
756 | 763 | @Override |
@@ -782,9 +789,9 @@ public String getSuspectedMembersAsString() { |
782 | 789 |
|
783 | 790 | @Override |
784 | 791 | public List<String> getDeadMembers() { |
785 | | - List<String> deadMembers = new ArrayList<>(); |
786 | | - removedMembersHistory.map(MembershipEvent::toString).subscribe(deadMembers::add); |
787 | | - return deadMembers; |
| 792 | + return removedMembersHistory.stream() |
| 793 | + .map(MembershipEvent::toString) |
| 794 | + .collect(Collectors.toList()); |
788 | 795 | } |
789 | 796 |
|
790 | 797 | @Override |
|
0 commit comments