Skip to content

Commit f6a0a0c

Browse files
authored
Merge pull request #180 from tonivade/fix/single-threaded
fix #179
2 parents 3ef66f2 + 977085c commit f6a0a0c

File tree

5 files changed

+18
-18
lines changed

5 files changed

+18
-18
lines changed

src/main/java/com/github/tonivade/resp/RespServerContext.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ public RespServerContext(String host, int port, CommandSuite commands) {
4343
this(host, port, commands, nullListener());
4444
}
4545

46-
public RespServerContext(String host, int port, CommandSuite commands,
47-
SessionListener sessionListener) {
46+
public RespServerContext(String host, int port, CommandSuite commands, SessionListener sessionListener) {
4847
this.host = checkNonEmpty(host);
4948
this.port = checkRange(port, 1024, 65535);
5049
this.commands = checkNonNull(commands);
@@ -106,9 +105,9 @@ Session getSession(String sourceKey, Function<String, Session> factory) {
106105
void processCommand(Request request) {
107106
LOGGER.debug("received command: {}", request);
108107

109-
RespCommand command = getCommand(request.getCommand());
108+
var command = getCommand(request.getCommand());
110109
try {
111-
executeOn(execute(command, request))
110+
enqueue(Observable.fromCallable(() -> executeCommand(command, request)))
112111
.subscribe(response -> processResponse(request, response),
113112
ex -> LOGGER.error("error executing command: " + request, ex));
114113
} catch (RuntimeException ex) {
@@ -135,7 +134,7 @@ protected RedisToken executeCommand(RespCommand command, Request request) {
135134
return command.execute(request);
136135
}
137136

138-
protected <T> Observable<T> executeOn(Observable<T> observable) {
137+
protected <T> Observable<T> enqueue(Observable<T> observable) {
139138
return observable.subscribeOn(scheduler);
140139
}
141140

@@ -146,13 +145,6 @@ private void processResponse(Request request, RedisToken token) {
146145
}
147146
}
148147

149-
private Observable<RedisToken> execute(RespCommand command, Request request) {
150-
return Observable.create(observer -> {
151-
observer.onNext(executeCommand(command, request));
152-
observer.onComplete();
153-
});
154-
}
155-
156148
private void clear() {
157149
clients.clear();
158150
state.clear();

src/main/java/com/github/tonivade/resp/command/server/PingCommand.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ public class PingCommand implements RespCommand {
2121
public RedisToken execute(Request request) {
2222
if (request.getLength() > 0) {
2323
return string(request.getParam(0));
24-
} else {
25-
return status(PONG);
2624
}
25+
return status(PONG);
2726
}
2827
}

src/main/java/com/github/tonivade/resp/protocol/RedisSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
public class RedisSerializer {
2020

21-
private static final Recycler<ByteBufferBuilder> RECYCLER = new Recycler<ByteBufferBuilder>() {
21+
private static final Recycler<ByteBufferBuilder> RECYCLER = new Recycler<>() {
2222
@Override
2323
protected ByteBufferBuilder newObject(Recycler.Handle<ByteBufferBuilder> handle) {
2424
return new ByteBufferBuilder(handle);

src/main/java/com/github/tonivade/resp/protocol/SafeString.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public SafeString(ByteBuffer buffer) {
3838
}
3939

4040
public byte[] getBytes() {
41-
ByteBuffer copy = buffer.duplicate();
41+
var copy = buffer.duplicate();
4242
byte[] bytes = new byte[copy.remaining()];
4343
copy.get(bytes);
4444
return bytes;
@@ -73,7 +73,7 @@ public String toString() {
7373
}
7474

7575
public String toHexString() {
76-
StringBuilder sb = new StringBuilder();
76+
var sb = new StringBuilder();
7777
byte[] bytes = getBytes();
7878
for (int i = 0; i < bytes.length; i++) {
7979
int v = bytes[i] & 0xFF;
@@ -99,7 +99,7 @@ public static List<SafeString> safeAsList(String... strings) {
9999
}
100100

101101
public static SafeString append(SafeString stringA, SafeString stringB) {
102-
ByteBuffer byteBuffer = ByteBuffer.allocate(checkNonNull(stringA).length() + checkNonNull(stringB).length());
102+
var byteBuffer = ByteBuffer.allocate(checkNonNull(stringA).length() + checkNonNull(stringB).length());
103103
byteBuffer.put(stringA.getBytes());
104104
byteBuffer.put(stringB.getBytes());
105105
((Buffer) byteBuffer).rewind();
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.github.tonivade.resp;
2+
3+
public class Main {
4+
5+
public static void main(String[] args) {
6+
RespServer.builder().build().start();
7+
}
8+
9+
}

0 commit comments

Comments
 (0)