Skip to content

Commit a4058b1

Browse files
authored
Merge pull request #18 from lwahlmeier/wsServer
Ws server
2 parents 5c63340 + d093745 commit a4058b1

File tree

32 files changed

+1487
-364
lines changed

32 files changed

+1487
-364
lines changed

build-all.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ cd protocol
33
./gradlew clean build install
44
cd ..
55
cd server
6-
./gradlew clean build install --offline
6+
./gradlew clean build install
77
cd ..
88
cd client
99
./gradlew clean build install

client/build.shared

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@ dependencies {
1919
compile (
2020
"org.threadly:threadly:$threadlyVersion",
2121
"org.threadly:litesockets:$litesocketsVersion",
22-
"org.threadly:litesockets-http-protocol:$version",
23-
)
24-
testCompile (
25-
"org.threadly:litesockets-http-server:$version",
22+
"org.threadly:litesockets-http-protocol:$version"
2623
)
2724
}
2825

client/gradle/wrapper/gradle-wrapper.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
33
distributionPath=wrapper/dists
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
6-
distributionUrl=https\://services.gradle.org/distributions/gradle-3.3-bin.zip
6+
distributionUrl=https\://services.gradle.org/distributions/gradle-4.4-bin.zip

client/src/main/java/org/threadly/litesockets/client/http/HTTPClient.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.threadly.litesockets.protocols.http.shared.HTTPParsingException;
3838
import org.threadly.litesockets.protocols.http.shared.HTTPRequestType;
3939
import org.threadly.litesockets.protocols.http.shared.HTTPResponseCode;
40+
import org.threadly.litesockets.protocols.ws.WebSocketFrameParser.WebSocketFrame;
4041
import org.threadly.litesockets.utils.IOUtils;
4142
import org.threadly.litesockets.utils.SSLUtils;
4243
import org.threadly.util.AbstractService;
@@ -514,7 +515,7 @@ private class HTTPRequestWrapper implements HTTPResponseCallback {
514515
private long lastRead = Clock.lastKnownForwardProgressingMillis();
515516

516517
public HTTPRequestWrapper(HTTPRequest hr, HTTPAddress ha, ByteBuffer body, long timeout) {
517-
hrp.addHTTPRequestCallback(this);
518+
hrp.addHTTPResponseCallback(this);
518519
this.hr = hr;
519520
this.ha = ha;
520521
this.body = body;
@@ -545,8 +546,8 @@ public void bodyData(ByteBuffer bb) {
545546

546547
@Override
547548
public void finished() {
548-
slf.setResult(new HTTPResponseData(response, responseMBB.duplicateAndClean()));
549-
hrp.removeHTTPRequestCallback(this);
549+
slf.setResult(new HTTPResponseData(HTTPClient.this, hr, response, responseMBB.duplicateAndClean()));
550+
hrp.removeHTTPResponseCallback(this);
550551
inProcess.remove(client);
551552
addBackTCPClient(ha, client);
552553
processQueue();
@@ -555,6 +556,13 @@ public void finished() {
555556
@Override
556557
public void hasError(Throwable t) {
557558
slf.setFailure(t);
559+
client.close();
560+
}
561+
562+
@Override
563+
public void websocketData(WebSocketFrame wsf, ByteBuffer bb) {
564+
slf.setFailure(new Exception("HTTPClient does not currently support websockets!"));
565+
client.close();
558566
}
559567
}
560568

@@ -563,11 +571,23 @@ public void hasError(Throwable t) {
563571
*/
564572
public static class HTTPResponseData {
565573
private final HTTPResponse hr;
574+
private final HTTPRequest origRequest;
566575
private final MergedByteBuffers body;
576+
private final HTTPClient client;
567577

568-
public HTTPResponseData(HTTPResponse hr, MergedByteBuffers bb) {
578+
public HTTPResponseData(HTTPClient client, HTTPRequest origRequest, HTTPResponse hr, MergedByteBuffers bb) {
579+
this.client = client;
569580
this.hr = hr;
570581
this.body = bb;
582+
this.origRequest = origRequest;
583+
}
584+
585+
public HTTPClient getHTTPClient() {
586+
return client;
587+
}
588+
589+
public HTTPRequest getHTTPRequest() {
590+
return origRequest;
571591
}
572592

573593
public HTTPResponse getResponse() {

client/src/main/java/org/threadly/litesockets/client/http/HTTPStreamClient.java

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
import org.threadly.litesockets.SocketExecuter;
1717
import org.threadly.litesockets.TCPClient;
1818
import org.threadly.litesockets.protocols.http.request.HTTPRequest;
19-
import org.threadly.litesockets.protocols.http.request.HTTPRequestBuilder;
2019
import org.threadly.litesockets.protocols.http.response.HTTPResponse;
2120
import org.threadly.litesockets.protocols.http.response.HTTPResponseProcessor;
2221
import org.threadly.litesockets.protocols.http.response.HTTPResponseProcessor.HTTPResponseCallback;
2322
import org.threadly.litesockets.protocols.http.shared.HTTPUtils;
23+
import org.threadly.litesockets.protocols.ws.WebSocketFrameParser.WebSocketFrame;
2424
import org.threadly.litesockets.utils.SSLUtils;
2525

2626
/**
@@ -54,7 +54,6 @@ public class HTTPStreamClient implements StreamingClient {
5454
private final HTTPResponseProcessor httpProcessor;
5555

5656
private volatile boolean isConnected = false;
57-
private volatile boolean headersSent = false;
5857
private volatile HTTPStreamReader httpReader;
5958
private volatile SettableListenableFuture<HTTPResponse> slfResponse;
6059
private volatile HTTPRequest currentHttpRequest;
@@ -65,21 +64,19 @@ public class HTTPStreamClient implements StreamingClient {
6564
* @param client the {@link TCPClient} to use for this connection.
6665
* @param headerSent true if the http headers have already been sent, false if they still need to be sent.
6766
*/
68-
public HTTPStreamClient(TCPClient client, boolean headerSent) {
69-
this(client, client.getRemoteSocketAddress().getHostName(), headerSent);
67+
public HTTPStreamClient(TCPClient client) {
68+
this(client, client.getRemoteSocketAddress().getHostName());
7069
}
7170

72-
public HTTPStreamClient(TCPClient client, String host, boolean headerSent) {
71+
public HTTPStreamClient(TCPClient client, String host) {
7372
this.client = client;
7473
this.host = host;
75-
if(headersSent) {
76-
currentHttpRequest = new HTTPRequestBuilder().build();
77-
}
7874
port = client.getRemoteSocketAddress().getPort();
7975
client.addCloseListener(classCloser);
8076
httpProcessor = new HTTPResponseProcessor();
81-
httpProcessor.addHTTPRequestCallback(requestCB);
82-
77+
httpProcessor.addHTTPResponseCallback(requestCB);
78+
slfResponse = new SettableListenableFuture<HTTPResponse>();
79+
isConnected = true;
8380
}
8481

8582
/**
@@ -99,7 +96,7 @@ public HTTPStreamClient(SocketExecuter se, String host, int port) throws IOExcep
9996
client.setConnectionTimeout(DEFAULT_TIMEOUT);
10097
client.addCloseListener(classCloser);
10198
httpProcessor = new HTTPResponseProcessor();
102-
httpProcessor.addHTTPRequestCallback(requestCB);
99+
httpProcessor.addHTTPResponseCallback(requestCB);
103100
}
104101

105102
@Override
@@ -137,6 +134,17 @@ public int getPort() {
137134
return port;
138135
}
139136

137+
@Override
138+
public void setRequestResponseHeaders(HTTPRequest httpRequest, HTTPResponse httpResponse, boolean writeResponse) {
139+
if(!slfResponse.isDone()) {
140+
currentHttpRequest = httpRequest;
141+
httpProcessor.processData(httpResponse.getByteBuffer());
142+
if(writeResponse) {
143+
client.write(httpResponse.getByteBuffer());
144+
}
145+
}
146+
}
147+
140148
/**
141149
* <p>Tell the client to write an HTTPRequest to the server. This can technically be done
142150
* whenever you want to but obviously use only when you know you can sent a request, right after
@@ -169,7 +177,11 @@ public ListenableFuture<?> write(ByteBuffer bb) {
169177
return client.write(bb);
170178
}
171179
}
172-
180+
181+
public ListenableFuture<?> getLastWriteFuture() {
182+
return client.lastWriteFuture();
183+
}
184+
173185
/**
174186
* Sets the HTTPStreamReader for this client.
175187
*
@@ -274,6 +286,13 @@ public void hasError(Throwable t) {
274286
slfResponse.setFailure(t);
275287
client.close();
276288
}
289+
290+
@Override
291+
public void websocketData(WebSocketFrame wsf, ByteBuffer bb) {
292+
if(httpReader != null) {
293+
httpReader.handle(bb);
294+
}
295+
}
277296
}
278297

279298
/**

client/src/main/java/org/threadly/litesockets/client/http/StreamingClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import javax.net.ssl.SSLEngine;
77

88
import org.threadly.concurrent.future.ListenableFuture;
9+
import org.threadly.litesockets.protocols.http.request.HTTPRequest;
10+
import org.threadly.litesockets.protocols.http.response.HTTPResponse;
911

1012

1113
/**
@@ -37,6 +39,16 @@ public interface StreamingClient {
3739
*/
3840
public void setConnectionTimeout(int timeout);
3941

42+
/**
43+
* Sets the HTTPRequest and HTTPResponse allowing the streaming client to start streaming
44+
* w/o sending these in itself. This is used for the server side of streaming.
45+
*
46+
* @param httpRequest
47+
* @param httpResponse
48+
* @param writeResponse
49+
*/
50+
public void setRequestResponseHeaders(HTTPRequest httpRequest, HTTPResponse httpResponse, boolean writeResponse);
51+
4052
/**
4153
* This performs a write to the connection.
4254
*
@@ -45,6 +57,8 @@ public interface StreamingClient {
4557
*/
4658
public ListenableFuture<?> write(ByteBuffer bb);
4759

60+
public ListenableFuture<?> getLastWriteFuture();
61+
4862
/**
4963
* This is called to connect this client to the server.
5064
* This will also send in the HTTP upgrade request once the TCP connection is finished.

client/src/main/java/org/threadly/litesockets/client/ws/WebSocketClient.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
import org.threadly.litesockets.client.http.HTTPStreamClient;
1919
import org.threadly.litesockets.client.http.HTTPStreamClient.HTTPStreamReader;
2020
import org.threadly.litesockets.client.http.StreamingClient;
21+
import org.threadly.litesockets.protocols.http.request.HTTPRequest;
2122
import org.threadly.litesockets.protocols.http.request.HTTPRequestBuilder;
2223
import org.threadly.litesockets.protocols.http.response.HTTPResponse;
24+
import org.threadly.litesockets.protocols.http.response.HTTPResponseBuilder;
2325
import org.threadly.litesockets.protocols.http.shared.HTTPConstants;
26+
import org.threadly.litesockets.protocols.http.shared.HTTPRequestType;
2427
import org.threadly.litesockets.protocols.http.shared.HTTPResponseCode;
2528
import org.threadly.litesockets.protocols.ws.WebSocketFrameParser;
2629
import org.threadly.litesockets.protocols.ws.WebSocketFrameParser.WebSocketFrame;
@@ -34,7 +37,20 @@
3437
* @author lwahlmeier
3538
*
3639
*/
37-
public class WebSocketClient implements StreamingClient{
40+
public class WebSocketClient implements StreamingClient {
41+
public static final HTTPResponse DEFAULT_WS_RESPONSE = new HTTPResponseBuilder()
42+
.setResponseCode(HTTPResponseCode.SwitchingProtocols)
43+
.setHeader(HTTPConstants.HTTP_KEY_UPGRADE, "websocket")
44+
.setHeader(HTTPConstants.HTTP_KEY_CONNECTION, "Upgrade")
45+
.setHeader(HTTPConstants.HTTP_KEY_WEBSOCKET_ACCEPT, "123456")
46+
.build();
47+
public static final HTTPRequest DEFAULT_WS_REQUEST = new HTTPRequestBuilder()
48+
.setRequestType(HTTPRequestType.GET)
49+
.setHeader(HTTPConstants.HTTP_KEY_UPGRADE, "websocket")
50+
.setHeader(HTTPConstants.HTTP_KEY_CONNECTION, "Upgrade")
51+
.setHeader(HTTPConstants.HTTP_KEY_WEBSOCKET_VERSION, "13")
52+
.setHeader(HTTPConstants.HTTP_KEY_WEBSOCKET_KEY, "")
53+
.build();
3854
public static final String WSS_STRING = "wss";
3955
public static final String WS_STRING = "ws";
4056
public static final int WSS_PORT = 443;
@@ -57,17 +73,13 @@ public class WebSocketClient implements StreamingClient{
5773
* @param client the TCPClient to use for this connection.
5874
* @param alreadyUpgraded true if the connection has already upgraded to do websockets false if the http upgrade is still required.
5975
*/
60-
public WebSocketClient(final TCPClient client, final boolean alreadyUpgraded) {
76+
public WebSocketClient(final TCPClient client) {
6177
if(client.isClosed()) {
6278
throw new IllegalStateException("TCPClient is closed! Can only use an Open TCPClient");
6379
}
6480

65-
hsc = new HTTPStreamClient(client, alreadyUpgraded);
66-
67-
if(alreadyUpgraded) {
68-
sentRequest.set(true);
69-
connectFuture.setResult(true);
70-
}
81+
hsc = new HTTPStreamClient(client);
82+
connectFuture.setResult(true);
7183
}
7284

7385
/**
@@ -266,6 +278,13 @@ public void setWebSocketDataReader(final WebSocketDataReader reader) {
266278
hsc.setHTTPStreamReader(lsr);
267279
}
268280

281+
282+
@Override
283+
public void setRequestResponseHeaders(HTTPRequest httpRequest, HTTPResponse httpResponse, boolean writeRequest) {
284+
hsc.setRequestResponseHeaders(httpRequest, httpResponse, writeRequest);
285+
sentRequest.set(true);
286+
}
287+
269288
@Override
270289
public ListenableFuture<?> write(final ByteBuffer bb) {
271290
return write(bb, this.wsoc.getValue(), defaultMask);
@@ -302,6 +321,10 @@ public ListenableFuture<?> write(final ByteBuffer bb, final byte opCode, final b
302321
throw new IllegalStateException("Must be connected first!");
303322
}
304323
}
324+
325+
public ListenableFuture<?> getLastWriteFuture() {
326+
return hsc.getLastWriteFuture();
327+
}
305328

306329
@Override
307330
public ListenableFuture<Boolean> connect() {
@@ -407,4 +430,5 @@ public interface WebSocketDataReader {
407430
public void onData(WebSocketFrame wsf, ByteBuffer bb);
408431
}
409432

433+
410434
}

0 commit comments

Comments
 (0)