Skip to content

Commit 4072fda

Browse files
committed
Merge branch 'master' into develop
2 parents c76c490 + 3a0313e commit 4072fda

File tree

3 files changed

+134
-2
lines changed

3 files changed

+134
-2
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<groupId>com.cryptomkt.api</groupId>
66
<artifactId>cryptomarket</artifactId>
77
<packaging>jar</packaging>
8-
<version>1.0.8</version>
8+
<version>1.0.9</version>
99
<licenses>
1010
<license>
1111
<name>Apache License, Version 2.0</name>

src/main/java/com/cryptomkt/api/utils/Subscriber.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ public Subscriber(Consumer<JSONObject> callable, SyncJson syncJson) {
1818
@Override
1919
public void run() {
2020
JSONObject data;
21-
while (true) {
21+
boolean interrupted = false;
22+
while (!interrupted) {
2223
try {
2324
synchronized (syncJson) {
2425
syncJson.wait();
@@ -27,6 +28,7 @@ public void run() {
2728
consumer.accept(data);
2829
} catch (InterruptedException e) {
2930
e.printStackTrace();
31+
interrupted = true;
3032
}
3133

3234
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package com.cryptomkt.api;
2+
3+
import com.cryptomkt.api.utils.Subscriber;
4+
import com.cryptomkt.api.utils.SyncJson;
5+
import org.json.JSONException;
6+
import org.json.JSONObject;
7+
import org.junit.Test;
8+
9+
import java.io.Closeable;
10+
import java.util.ArrayList;
11+
import java.util.Arrays;
12+
import java.util.List;
13+
import java.util.concurrent.Executors;
14+
import java.util.concurrent.ScheduledExecutorService;
15+
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
import java.util.function.Consumer;
18+
import java.util.stream.Collectors;
19+
20+
import static junit.framework.TestCase.assertTrue;
21+
22+
public class SubscriberTest {
23+
24+
25+
// Tests that the Subscriber properly terminates execution once the Thread it's running on is interrupted.
26+
@Test
27+
public void testRun() throws InterruptedException {
28+
29+
ClientMock register = new ClientMock();
30+
31+
// Creates several incarnations of the client. If the leak manifests and created threads are left alive waiting, at the end there will be one for every iteration of the loop.
32+
for (int i = 0; i < 7; i++) {
33+
System.out.println("Re-registering listener on socket");
34+
register.restart();
35+
Thread.sleep(100L);
36+
}
37+
38+
register.close();
39+
Thread.sleep(100L);
40+
41+
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
42+
int numberOfThreads = threadGroup.activeCount();
43+
Thread[] threads = new Thread[numberOfThreads];
44+
threadGroup.enumerate(threads);
45+
List<String> subscriberThreads = Arrays.stream(threads)
46+
.map(Thread::getName)
47+
.filter(x -> x.startsWith("subscriber"))
48+
.collect(Collectors.toList());
49+
50+
System.out.println("Subscriber threads alive and waiting after closing: " + subscriberThreads);
51+
52+
assertTrue(subscriberThreads.isEmpty());
53+
}
54+
55+
/**
56+
* This will act as consumer, it represents the client side.
57+
* It holds the callback that will register to consume the json when the event arrives.
58+
*/
59+
private static class ClientMock {
60+
61+
private SocketImplMock socket;
62+
63+
public void restart() {
64+
if (socket != null) {
65+
socket.close();
66+
}
67+
socket = new SocketImplMock();
68+
69+
socket.onBalance(json -> json.keys().forEachRemaining(key -> {
70+
try {
71+
String strKey = (String) key;
72+
JSONObject balanceObject = json.getJSONObject(strKey);
73+
System.out.println("Event consumed: " + balanceObject);
74+
} catch (JSONException e) {
75+
e.printStackTrace();
76+
}
77+
}));
78+
}
79+
80+
public void close() {
81+
System.out.println("About to close socket");
82+
socket.close();
83+
}
84+
}
85+
86+
/**
87+
* This will act as producer with an scheduled thread writing on the balancePub every 1 second.
88+
*/
89+
private static class SocketImplMock implements Closeable {
90+
91+
private static final AtomicInteger seq = new AtomicInteger(1);
92+
93+
final SyncJson balancePub;
94+
95+
private final ScheduledExecutorService scheduledExecutorService;
96+
private final List<Subscriber> subscribers;
97+
98+
public SocketImplMock() {
99+
balancePub = new SyncJson();
100+
101+
scheduledExecutorService = Executors.newScheduledThreadPool(8);
102+
subscribers = new ArrayList<>();
103+
104+
scheduledExecutorService.scheduleAtFixedRate(() -> {
105+
try {
106+
System.out.println("Event produced");
107+
synchronized (balancePub) {
108+
balancePub.setData("{ \"field\": { \"inner\": \"value\"} }");
109+
balancePub.notifyAll();
110+
}
111+
} catch (Exception e) {
112+
e.printStackTrace();
113+
}
114+
}, 1, 1, TimeUnit.SECONDS);
115+
}
116+
117+
public void onBalance(Consumer<JSONObject> consumer) {
118+
Subscriber subscriber = new Subscriber(consumer, balancePub);
119+
subscriber.setName("subscriber-" + seq.getAndIncrement());
120+
this.subscribers.add(subscriber);
121+
subscriber.start();
122+
}
123+
124+
@Override
125+
public void close() {
126+
this.subscribers.forEach(Thread::interrupt);
127+
this.scheduledExecutorService.shutdown();
128+
}
129+
}
130+
}

0 commit comments

Comments
 (0)