diff --git a/pom.xml b/pom.xml
index 77a42b9..2b462e4 100755
--- a/pom.xml
+++ b/pom.xml
@@ -34,8 +34,8 @@
maven-compiler-plugin
3.0
- 1.7
- 1.7
+ 1.8
+ 1.8
@@ -78,6 +78,24 @@
4.11
test
+
+ org.mockito
+ mockito-all
+ 1.9.5
+ test
+
+
+ org.powermock
+ powermock-api-mockito
+ 1.5.4
+ test
+
+
+ org.powermock
+ powermock-module-junit4
+ 1.5.4
+ test
+
org.slf4j
slf4j-simple
diff --git a/src/main/java/net/unit8/wscl/ClassLoaderEndpoint.java b/src/main/java/net/unit8/wscl/ClassLoaderEndpoint.java
index 51ccdf4..4f92cc3 100644
--- a/src/main/java/net/unit8/wscl/ClassLoaderEndpoint.java
+++ b/src/main/java/net/unit8/wscl/ClassLoaderEndpoint.java
@@ -1,11 +1,21 @@
package net.unit8.wscl;
-import net.unit8.wscl.dto.ResourceRequest;
-import net.unit8.wscl.dto.ResourceResponse;
-import net.unit8.wscl.handler.ResourceRequestWriteHandler;
-import net.unit8.wscl.handler.ResourceResponseReadHandler;
-import net.unit8.wscl.util.FressianUtils;
-import net.unit8.wscl.util.PropertyUtils;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.ClientEndpoint;
+import javax.websocket.Endpoint;
+import javax.websocket.EndpointConfig;
+import javax.websocket.MessageHandler;
+import javax.websocket.OnOpen;
+import javax.websocket.Session;
+
import org.fressian.FressianReader;
import org.fressian.FressianWriter;
import org.fressian.handlers.ILookup;
@@ -15,12 +25,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.websocket.*;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.*;
+import net.unit8.wscl.dto.ResourceRequest;
+import net.unit8.wscl.dto.ResourceResponse;
+import net.unit8.wscl.handler.ResourceRequestWriteHandler;
+import net.unit8.wscl.handler.ResourceResponseReadHandler;
+import net.unit8.wscl.util.FressianUtils;
+import net.unit8.wscl.util.PropertyUtils;
/**
* @author kawasima
@@ -86,8 +96,8 @@ public Map valAt(Class key) {
logger.debug("fetch class:" + request.getResourceName() + ":" + request.getClassLoaderId());
- final BlockingQueue queue = new ArrayBlockingQueue<>(1);
- waitingResponses.put(request.getResourceName(), queue);
+ waitingResponses.putIfAbsent(request.getResourceName(), new ArrayBlockingQueue<>(5));
+ final BlockingQueue queue = waitingResponses.get(request.getResourceName());
try {
session.getAsyncRemote().sendBinary(ByteBuffer.wrap(baos.toByteArray()));
ResourceResponse response = queue.poll(PropertyUtils.getLongSystemProperty("wscl.timeout", 5000), TimeUnit.MILLISECONDS);
@@ -98,7 +108,10 @@ public Map valAt(Class key) {
} catch(InterruptedException ex) {
throw new IOException("Interrupted in waiting for request." + request.getResourceName(), ex);
} finally {
- waitingResponses.remove(request.getResourceName());
+ if (waitingResponses.get(request.getResourceName()) != null && waitingResponses.get(request.getResourceName()).isEmpty()) {
+ waitingResponses.remove(request.getResourceName());
+ }
+ fw.close();
}
}
diff --git a/src/test/java/net/unit8/wscl/ClassLoaderEndpointTest.java b/src/test/java/net/unit8/wscl/ClassLoaderEndpointTest.java
new file mode 100644
index 0000000..c3780d2
--- /dev/null
+++ b/src/test/java/net/unit8/wscl/ClassLoaderEndpointTest.java
@@ -0,0 +1,124 @@
+package net.unit8.wscl;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.websocket.RemoteEndpoint;
+import javax.websocket.RemoteEndpoint.Async;
+import javax.websocket.Session;
+
+import org.fressian.FressianWriter;
+import org.fressian.Writer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import net.unit8.wscl.dto.ResourceRequest;
+import net.unit8.wscl.dto.ResourceResponse;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({FressianWriter.class,ClassLoaderEndpoint.class})
+public class ClassLoaderEndpointTest {
+ private ClassLoaderEndpoint cle;
+ private Session session;
+ private RemoteEndpoint remoteEndpoint;
+ private ResourceRequest resourceRequest;
+ private FressianWriter fressianWriter;
+
+ @Before
+ public void setup() throws Exception{
+ //setup mocks
+ cle = new ClassLoaderEndpoint();
+ session = mock(Session.class);
+ remoteEndpoint = mock(Async.class);
+ doReturn(remoteEndpoint).when(session).getAsyncRemote();
+ ((Async) doReturn(null).when(remoteEndpoint)).sendBinary(any(ByteBuffer.class));
+ resourceRequest = mock(ResourceRequest.class);
+ doReturn(UUID.randomUUID()).when(resourceRequest).getClassLoaderId();
+ doReturn("resouce1").when(resourceRequest).getResourceName();
+ fressianWriter = mock(FressianWriter.class);
+ doReturn(mock(Writer.class)).when(fressianWriter).writeObject(any(ResourceRequest.class));
+ PowerMockito.whenNew(FressianWriter.class).withAnyArguments().thenReturn(fressianWriter );
+ }
+
+
+
+ @Test
+ public void oneRequest() throws Exception {
+ cle.onOpen(session, null);
+ ExecutorService service = Executors.newCachedThreadPool();
+ CompletableFuture asyncResponse = CompletableFuture.supplyAsync(() -> request(cle,resourceRequest), service);
+ Field f = cle.getClass().getDeclaredField("waitingResponses");
+ f.setAccessible(true);
+ ConcurrentHashMap waitingResponse = (ConcurrentHashMap) f.get(cle);
+ Thread.sleep(1000);
+ BlockingQueue queue = (BlockingQueue) waitingResponse.get("resouce1");
+ queue.offer(new ResourceResponse("resoucename1"));
+ assertThat(asyncResponse.get(),is("resoucename1"));
+ }
+
+ @Test
+ public void twoRequestAtTheSameTime() throws Exception {
+ cle.onOpen(session, null);
+ ExecutorService service = Executors.newCachedThreadPool();
+ CompletableFuture asyncResponse1 = CompletableFuture.supplyAsync(() -> request(cle,resourceRequest), service);
+ CompletableFuture asyncResponse2 = CompletableFuture.supplyAsync(() -> request(cle,resourceRequest), service);
+ Field f = cle.getClass().getDeclaredField("waitingResponses");
+ f.setAccessible(true);
+ ConcurrentHashMap waitingResponse = (ConcurrentHashMap) f.get(cle);
+ Thread.sleep(1000);
+ BlockingQueue queue = (BlockingQueue) waitingResponse.get("resouce1");
+ queue.offer(new ResourceResponse("resoucename1"));
+ queue.offer(new ResourceResponse("resoucename1"));
+ assertThat(asyncResponse1.get(),is("resoucename1"));
+ assertThat(asyncResponse2.get(),is("resoucename1"));
+ }
+
+ @Test
+ public void manyRequestAtTheSameTime() throws Exception {
+ cle.onOpen(session, null);
+ ExecutorService service = Executors.newCachedThreadPool();
+ CompletableFuture asyncResponse1 = CompletableFuture.supplyAsync(() -> request(cle,resourceRequest), service);
+ CompletableFuture asyncResponse2 = CompletableFuture.supplyAsync(() -> request(cle,resourceRequest), service);
+ CompletableFuture asyncResponse3 = CompletableFuture.supplyAsync(() -> request(cle,resourceRequest), service);
+ Field f = cle.getClass().getDeclaredField("waitingResponses");
+ f.setAccessible(true);
+ ConcurrentHashMap waitingResponse = (ConcurrentHashMap) f.get(cle);
+ Thread.sleep(1000);
+ BlockingQueue queue = (BlockingQueue) waitingResponse.get("resouce1");
+ queue.offer(new ResourceResponse("resoucename1"));
+ queue.offer(new ResourceResponse("resoucename1"));
+ queue.offer(new ResourceResponse("resoucename1"));
+
+ assertThat(asyncResponse1.get(),is("resoucename1"));
+ assertThat(asyncResponse2.get(),is("resoucename1"));
+ assertThat(asyncResponse3.get(),is("resoucename1"));
+ }
+
+
+ String request(ClassLoaderEndpoint cle, ResourceRequest resourceRequest){
+ try {
+ return cle.request(resourceRequest).getResourceName();
+ } catch (IOException e) {
+ fail();
+ }
+ return null;
+ }
+}