Skip to content

Commit 6c1d423

Browse files
steveguryrobertroeser
authored andcommitted
Fix bug in FairLeaseGovernor (#147)
***Problem*** There is a bug in FairLeaseGovernor, the number of ticket per Responder is never decreased. ***Solution*** Fix the bug, add two unit-tests.
1 parent 47780c9 commit 6c1d423

File tree

2 files changed

+43
-11
lines changed

2 files changed

+43
-11
lines changed

reactivesocket-core/src/main/java/io/reactivesocket/lease/FairLeaseGovernor.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.reactivesocket.lease;
217

318
import io.reactivesocket.Frame;
@@ -15,11 +30,11 @@
1530
* Distribute evenly a static number of tickets to all connected clients.
1631
*/
1732
public class FairLeaseGovernor implements LeaseGovernor {
18-
private static ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(1);
19-
2033
private final int tickets;
2134
private final long period;
2235
private final TimeUnit unit;
36+
private final ScheduledExecutorService executor;
37+
2338
private final Map<Responder, Integer> responders;
2439
private ScheduledFuture<?> runningTask;
2540

@@ -41,19 +56,29 @@ private synchronized void distribute(int ttlMs) {
4156
}
4257
}
4358

44-
public FairLeaseGovernor(int tickets, long period, TimeUnit unit) {
59+
public FairLeaseGovernor(int tickets, long period, TimeUnit unit, ScheduledExecutorService executor) {
4560
this.tickets = tickets;
4661
this.period = period;
4762
this.unit = unit;
63+
this.executor = executor;
4864
responders = new HashMap<>();
4965
}
5066

67+
public FairLeaseGovernor(int tickets, long period, TimeUnit unit) {
68+
this(tickets, period, unit, Executors.newScheduledThreadPool(2, runnable -> {
69+
Thread thread = new Thread(runnable);
70+
thread.setDaemon(true);
71+
thread.setName("FairLeaseGovernor");
72+
return thread;
73+
}));
74+
}
75+
5176
@Override
5277
public synchronized void register(Responder responder) {
5378
responders.put(responder, 0);
5479
if (runningTask == null) {
5580
final int ttl = (int)TimeUnit.NANOSECONDS.convert(period, unit);
56-
runningTask = EXECUTOR.scheduleAtFixedRate(() -> distribute(ttl), 0, period, unit);
81+
runningTask = executor.scheduleAtFixedRate(() -> distribute(ttl), 0, period, unit);
5782
}
5883
}
5984

@@ -68,8 +93,13 @@ public synchronized void unregister(Responder responder) {
6893

6994
@Override
7095
public synchronized boolean accept(Responder responder, Frame frame) {
71-
boolean valid;
72-
final Integer remainingTickets = responders.get(responder);
73-
return remainingTickets == null || remainingTickets > 0;
96+
Integer remainingTickets = responders.get(responder);
97+
if (remainingTickets != null) {
98+
remainingTickets--;
99+
} else {
100+
remainingTickets = -1;
101+
}
102+
responders.put(responder, remainingTickets);
103+
return remainingTickets >= 0;
74104
}
75105
}

reactivesocket-examples/src/main/java/io/reactivesocket/examples/StressTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.reactivesocket.ReactiveSocket;
2222
import io.reactivesocket.RequestHandler;
2323
import io.reactivesocket.client.ClientBuilder;
24+
import io.reactivesocket.lease.FairLeaseGovernor;
2425
import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector;
2526
import io.reactivesocket.util.Unsafe;
2627
import io.reactivesocket.test.TestUtil;
@@ -80,13 +81,14 @@ public void cancel() {}
8081
.build();
8182

8283
SocketAddress addr = new InetSocketAddress("127.0.0.1", 0);
83-
TcpReactiveSocketServer.StartedServer server = TcpReactiveSocketServer.create(addr).start(setupHandler);
84-
SocketAddress serverAddress = server.getServerAddress();
85-
return serverAddress;
84+
FairLeaseGovernor leaseGovernor = new FairLeaseGovernor(5000, 100, TimeUnit.MILLISECONDS);
85+
TcpReactiveSocketServer.StartedServer server =
86+
TcpReactiveSocketServer.create(addr).start(setupHandler, leaseGovernor);
87+
return server.getServerAddress();
8688
}
8789

8890
private static Publisher<Collection<SocketAddress>> getServersList() {
89-
Observable<Collection<SocketAddress>> serverAddresses = Observable.interval(2, TimeUnit.SECONDS)
91+
Observable<Collection<SocketAddress>> serverAddresses = Observable.interval(0, 2, TimeUnit.SECONDS)
9092
.map(new Func1<Long, Collection<SocketAddress>>() {
9193
List<SocketAddress> addresses = new ArrayList<>();
9294

0 commit comments

Comments
 (0)