Skip to content

Commit d03bfc4

Browse files
lazer-devAlex Danilenko
andauthored
server events (#55)
Co-authored-by: Alex Danilenko <[email protected]>
1 parent 4d1ae16 commit d03bfc4

File tree

4 files changed

+206
-77
lines changed

4 files changed

+206
-77
lines changed

core/src/main/java/spotty/server/Server.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import spotty.server.connection.Connection;
77
import spotty.server.connection.socket.SocketFactory;
88
import spotty.server.connection.socket.SpottySocket;
9+
import spotty.server.event.ServerEvents;
910
import spotty.server.handler.request.RequestHandler;
1011
import spotty.server.registry.exception.ExceptionHandlerRegistry;
1112
import spotty.server.worker.ReactorWorker;
@@ -24,20 +25,18 @@
2425
import java.nio.channels.ServerSocketChannel;
2526
import java.nio.channels.SocketChannel;
2627
import java.security.KeyStore;
27-
import java.util.Iterator;
28-
import java.util.concurrent.ExecutorService;
29-
import java.util.concurrent.Executors;
28+
import java.util.Set;
3029
import java.util.concurrent.atomic.AtomicInteger;
3130

3231
import static java.nio.channels.SelectionKey.OP_ACCEPT;
3332
import static java.nio.channels.SelectionKey.OP_CONNECT;
3433
import static java.nio.channels.SelectionKey.OP_READ;
3534
import static java.nio.channels.SelectionKey.OP_WRITE;
36-
import static spotty.common.utils.ThreadUtils.threadPool;
3735
import static spotty.common.validation.Validation.isNotBlank;
3836
import static spotty.common.validation.Validation.notBlank;
3937
import static spotty.common.validation.Validation.notNull;
4038
import static spotty.common.validation.Validation.validate;
39+
import static spotty.server.connection.Connection.Builder.connection;
4140
import static spotty.server.connection.state.ConnectionState.CLOSED;
4241
import static spotty.server.connection.state.ConnectionState.DATA_REMAINING;
4342
import static spotty.server.connection.state.ConnectionState.INITIALIZED;
@@ -48,20 +47,19 @@
4847
public final class Server implements Closeable {
4948
private static final Logger LOG = LoggerFactory.getLogger(Server.class);
5049

51-
private final ExecutorService SERVER_RUN = Executors.newSingleThreadExecutor(threadPool("spotty-main", false));
52-
5350
private volatile boolean running = false;
5451
private volatile boolean started = false;
5552
private volatile boolean enabledHttps = false;
5653

5754
private final AtomicInteger connections = new AtomicInteger();
5855

5956
private final SocketFactory socketFactory = new SocketFactory();
57+
private final ServerEvents serverEvents = new ServerEvents();
6058

6159
private final int maxRequestBodySize;
6260
private final RequestHandler requestHandler;
6361
private final ExceptionHandlerRegistry exceptionHandlerRegistry;
64-
private final ReactorWorker reactorWorker ;
62+
private final ReactorWorker reactorWorker;
6563
private final InetSocketAddress socketAddress;
6664

6765
public Server(int port, int maxRequestBodySize, RequestHandler requestHandler, ExceptionHandlerRegistry exceptionHandlerRegistry, ReactorWorker reactorWorker) {
@@ -80,7 +78,9 @@ public synchronized void start() {
8078
return;
8179
}
8280

83-
SERVER_RUN.execute(this::serverInit);
81+
final Thread main = new Thread(this::serverRun, "spotty-main");
82+
main.setDaemon(false);
83+
main.start();
8484
}
8585

8686
public void enableHttps(String keyStorePath, String keyStorePassword, String trustStorePath, String trustStorePassword) {
@@ -130,7 +130,6 @@ public void enableHttps(String keyStorePath, String keyStorePassword, String tru
130130
@Override
131131
public synchronized void close() {
132132
stop();
133-
SERVER_RUN.shutdownNow();
134133
reactorWorker.close();
135134
}
136135

@@ -186,7 +185,7 @@ public String hostUrl() {
186185
return sb.toString();
187186
}
188187

189-
private void serverInit() {
188+
private void serverRun() {
190189
try (final ServerSocketChannel serverSocket = ServerSocketChannel.open();
191190
final Selector selector = Selector.open()) {
192191
// Binding this server on the port
@@ -202,14 +201,19 @@ private void serverInit() {
202201
final Thread currentThread = Thread.currentThread();
203202
while (running && !currentThread.isInterrupted()) {
204203
selector.select(1000);
205-
final Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
206-
while (keys.hasNext()) {
207-
final SelectionKey key = keys.next();
208-
keys.remove();
209204

205+
final Set<SelectionKey> keys = selector.selectedKeys();
206+
serverEvents.add(keys);
207+
keys.clear();
208+
209+
SelectionKey key;
210+
while ((key = serverEvents.poll()) != null) {
210211
if (!key.isValid()) {
211212
final Connection connection = (Connection) key.attachment();
212-
connection.close();
213+
if (connection != null) {
214+
connection.close();
215+
}
216+
213217
key.cancel();
214218

215219
continue;
@@ -243,7 +247,15 @@ private void accept(SelectionKey acceptKey) throws IOException {
243247

244248
final SpottySocket socket = socketFactory.createSocket(channel);
245249

246-
final Connection connection = new Connection(socket, requestHandler, reactorWorker, exceptionHandlerRegistry, maxRequestBodySize);
250+
final Connection connection = connection()
251+
.socket(socket)
252+
.serverEvents(serverEvents)
253+
.requestHandler(requestHandler)
254+
.reactorWorker(reactorWorker)
255+
.exceptionHandlerRegistry(exceptionHandlerRegistry)
256+
.maxRequestBodySize(maxRequestBodySize)
257+
.build();
258+
247259
LOG.debug("{} accepted, count={}", connection, connections.incrementAndGet());
248260

249261
connection.whenStateIs(CLOSED, () -> {

core/src/main/java/spotty/server/connection/Connection.java

Lines changed: 84 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import spotty.common.utils.ExceptionalRunnable;
3636
import spotty.server.connection.socket.SpottySocket;
3737
import spotty.server.connection.state.ConnectionState;
38+
import spotty.server.event.ServerEvents;
3839
import spotty.server.handler.exception.ExceptionHandler;
3940
import spotty.server.handler.request.RequestHandler;
4041
import spotty.server.registry.exception.ExceptionHandlerRegistry;
@@ -108,36 +109,26 @@ public final class Connection extends StateMachine<ConnectionState> implements C
108109
private SpottySocket socket;
109110
private final ReactorWorker reactorWorker;
110111
private final ExceptionHandlerRegistry exceptionHandlerRegistry;
112+
private final ServerEvents serverEvents;
111113
private final int maxRequestBodySize;
112114
private ByteBuffer readBuffer;
113115
private RequestHandler requestHandler;
116+
private SelectionKey selectionKey;
114117

115118
private ByteBuffer headersByteBuffer;
116119
private ByteBuffer bodyByteBuffer;
117120

118-
public Connection(SpottySocket socket,
119-
RequestHandler requestHandler,
120-
ReactorWorker reactorWorker,
121-
ExceptionHandlerRegistry exceptionHandlerRegistry,
122-
int maxRequestBodySize) throws SpottyStreamException {
123-
this(socket, requestHandler, reactorWorker, exceptionHandlerRegistry, maxRequestBodySize, DEFAULT_BUFFER_SIZE);
124-
}
125-
126-
public Connection(SpottySocket socket,
127-
RequestHandler requestHandler,
128-
ReactorWorker reactorWorker,
129-
ExceptionHandlerRegistry exceptionHandlerRegistry,
130-
int maxRequestBodySize,
131-
int bufferSize) throws SpottyStreamException {
121+
private Connection(Builder builder) throws SpottyStreamException {
132122
super(INITIALIZED);
133123

134-
this.socket = notNull("socket", socket);
135-
this.requestHandler = notNull("requestHandler", requestHandler);
136-
this.reactorWorker = notNull("reactorWorker", reactorWorker);
137-
this.exceptionHandlerRegistry = notNull("exceptionHandlerService", exceptionHandlerRegistry);
138-
this.maxRequestBodySize = maxRequestBodySize;
124+
this.socket = notNull("socket", builder.socket);
125+
this.requestHandler = notNull("requestHandler", builder.requestHandler);
126+
this.reactorWorker = notNull("reactorWorker", builder.reactorWorker);
127+
this.exceptionHandlerRegistry = notNull("exceptionHandlerService", builder.exceptionHandlerRegistry);
128+
this.serverEvents = notNull("serverEvents", builder.serverEvents);
129+
this.maxRequestBodySize = builder.maxRequestBodySize;
139130

140-
this.readBuffer = ByteBuffer.allocate(bufferSize);
131+
this.readBuffer = ByteBuffer.allocate(builder.bufferSize);
141132

142133
this.stateHandlerGraph
143134
.filter(
@@ -204,7 +195,11 @@ public void after() {
204195
}
205196

206197
public SelectionKey register(Selector selector) {
207-
return exceptionHandler(() -> socket.register(selector, OP_CONNECT, this));
198+
if (selectionKey != null) {
199+
throw new SpottyException("connection has been registered already");
200+
}
201+
202+
return selectionKey = exceptionHandler(() -> socket.register(selector, OP_CONNECT, this));
208203
}
209204

210205
public void markDataRemaining() {
@@ -218,12 +213,15 @@ public void markReadyToRead() {
218213
}
219214

220215
public void handle() {
221-
do {
222-
exceptionHandler(
223-
handleState,
224-
afterExceptionHandler // if exception respond error to the client
225-
);
226-
} while (socket.readBufferHasRemaining() && state().isReading());
216+
exceptionHandler(
217+
handleState,
218+
afterExceptionHandler // if exception respond error to the client
219+
);
220+
221+
// if after connection handing socket buffer still has data then run handing in next tick
222+
if (socket.readBufferHasRemaining() && state().isReading()) {
223+
runHandleNextTick();
224+
}
227225
}
228226

229227
// optimization to not spawn callback objects each time
@@ -583,14 +581,18 @@ private boolean responseWriteCompleted() {
583581

584582
changeState(READY_TO_READ);
585583

586-
// has something did not process
584+
// has something did not process, add it to handle in next tick
587585
if (readBuffer.position() > 0 || socket.readBufferHasRemaining()) {
588-
handle();
586+
runHandleNextTick();
589587
}
590588

591589
return false;
592590
}
593591

592+
private void runHandleNextTick() {
593+
serverEvents.add(selectionKey);
594+
}
595+
594596
private void resetResponse() {
595597
this.response.reset();
596598
this.responseHeadersBuffer.reset();
@@ -680,4 +682,57 @@ private <T> T exceptionHandler(ExceptionalCallable<T> runnable, Runnable afterEx
680682
return null;
681683
}
682684

685+
public static class Builder {
686+
private SpottySocket socket;
687+
private RequestHandler requestHandler;
688+
private ReactorWorker reactorWorker;
689+
private ExceptionHandlerRegistry exceptionHandlerRegistry;
690+
private ServerEvents serverEvents;
691+
private int maxRequestBodySize;
692+
private int bufferSize = DEFAULT_BUFFER_SIZE;
693+
694+
public static Builder connection() {
695+
return new Builder();
696+
}
697+
698+
public Builder socket(SpottySocket socket) {
699+
this.socket = socket;
700+
return this;
701+
}
702+
703+
public Builder requestHandler(RequestHandler requestHandler) {
704+
this.requestHandler = requestHandler;
705+
return this;
706+
}
707+
708+
public Builder reactorWorker(ReactorWorker reactorWorker) {
709+
this.reactorWorker = reactorWorker;
710+
return this;
711+
}
712+
713+
public Builder exceptionHandlerRegistry(ExceptionHandlerRegistry exceptionHandlerRegistry) {
714+
this.exceptionHandlerRegistry = exceptionHandlerRegistry;
715+
return this;
716+
}
717+
718+
public Builder serverEvents(ServerEvents serverEvents) {
719+
this.serverEvents = serverEvents;
720+
return this;
721+
}
722+
723+
public Builder maxRequestBodySize(int maxRequestBodySize) {
724+
this.maxRequestBodySize = maxRequestBodySize;
725+
return this;
726+
}
727+
728+
public Builder bufferSize(int bufferSize) {
729+
this.bufferSize = bufferSize;
730+
return this;
731+
}
732+
733+
public Connection build() {
734+
return new Connection(this);
735+
}
736+
}
737+
683738
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2022 - Alex Danilenko
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package spotty.server.event;
17+
18+
import java.nio.channels.SelectionKey;
19+
import java.util.Collection;
20+
import java.util.LinkedList;
21+
import java.util.Queue;
22+
23+
public final class ServerEvents {
24+
private final Queue<SelectionKey> queue = new LinkedList<>();
25+
26+
public void add(SelectionKey key) {
27+
queue.add(key);
28+
}
29+
30+
public void add(Collection<SelectionKey> keys) {
31+
queue.addAll(keys);
32+
}
33+
34+
public SelectionKey poll() {
35+
return queue.poll();
36+
}
37+
38+
public boolean isEmpty() {
39+
return queue.isEmpty();
40+
}
41+
}

0 commit comments

Comments
 (0)