Skip to content

Commit 37d32dd

Browse files
committed
Improve threading.
1 parent e75209d commit 37d32dd

File tree

3 files changed

+76
-74
lines changed

3 files changed

+76
-74
lines changed
Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.redhat.hacbs.domainproxy.client;
22

33
import static com.redhat.hacbs.domainproxy.common.CommonIOUtil.LOCALHOST;
4-
import static com.redhat.hacbs.domainproxy.common.CommonIOUtil.createChannelToChannelBiDirectionalHandler;
4+
import static com.redhat.hacbs.domainproxy.common.CommonIOUtil.TIMEOUT_MS;
5+
import static com.redhat.hacbs.domainproxy.common.CommonIOUtil.channelToChannelBiDirectionalHandler;
6+
import static java.lang.Thread.currentThread;
57

68
import java.io.IOException;
79
import java.net.InetSocketAddress;
@@ -14,7 +16,6 @@
1416
import java.util.Iterator;
1517
import java.util.concurrent.ExecutorService;
1618
import java.util.concurrent.Executors;
17-
import java.util.concurrent.atomic.AtomicBoolean;
1819

1920
import jakarta.annotation.PostConstruct;
2021
import jakarta.annotation.PreDestroy;
@@ -43,49 +44,49 @@ public class DomainProxyClient {
4344
@ConfigProperty(name = "byte-buffer-size")
4445
int byteBufferSize;
4546

46-
private final AtomicBoolean running = new AtomicBoolean(true);
47-
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
47+
private ExecutorService executor;
4848

4949
@PostConstruct
5050
public void start() {
5151
Log.info("Starting domain proxy client...");
5252
Log.infof("Byte buffer size %d", byteBufferSize); // TODO Remove
53-
new Thread(() -> {
54-
try (final ServerSocketChannel serverChannel = ServerSocketChannel.open(StandardProtocolFamily.INET);
55-
final Selector selector = Selector.open()) {
56-
final InetSocketAddress address = new InetSocketAddress(LOCALHOST, clientHttpPort);
57-
serverChannel.bind(address);
58-
serverChannel.configureBlocking(false);
59-
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
60-
while (running.get()) {
61-
if (selector.selectNow() > 0) {
62-
final Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
63-
while (keys.hasNext()) {
64-
final SelectionKey key = keys.next();
65-
keys.remove();
66-
if (key.isAcceptable()) {
67-
if (key.channel() instanceof final ServerSocketChannel keyChannel) {
68-
final SocketChannel httpClientChannel = keyChannel.accept();
69-
final SocketChannel domainSocketChannel = SocketChannel
70-
.open(UnixDomainSocketAddress.of(domainSocket));
71-
executor.submit(
72-
createChannelToChannelBiDirectionalHandler(byteBufferSize, httpClientChannel,
73-
domainSocketChannel));
74-
}
75-
}
53+
executor = Executors.newVirtualThreadPerTaskExecutor();
54+
executor.submit(this::startClient);
55+
}
56+
57+
private void startClient() {
58+
try (final ServerSocketChannel serverChannel = ServerSocketChannel.open(StandardProtocolFamily.INET);
59+
final Selector selector = Selector.open()) {
60+
currentThread().setName("connectionHandler");
61+
final InetSocketAddress address = new InetSocketAddress(LOCALHOST, clientHttpPort);
62+
serverChannel.bind(address);
63+
serverChannel.configureBlocking(false);
64+
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
65+
while (!currentThread().isInterrupted()) {
66+
if (selector.select(TIMEOUT_MS) > 0) {
67+
final Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
68+
while (keys.hasNext()) {
69+
final SelectionKey key = keys.next();
70+
keys.remove();
71+
if (key.isAcceptable()) {
72+
final ServerSocketChannel keyChannel = (ServerSocketChannel) key.channel();
73+
final SocketChannel httpClientChannel = keyChannel.accept();
74+
final SocketChannel domainSocketChannel = SocketChannel
75+
.open(UnixDomainSocketAddress.of(domainSocket));
76+
executor.submit(channelToChannelBiDirectionalHandler(byteBufferSize, httpClientChannel,
77+
domainSocketChannel));
7678
}
7779
}
7880
}
79-
} catch (final IOException e) {
80-
Log.errorf(e, "Error initialising domain proxy client");
8181
}
82-
Quarkus.asyncExit();
83-
}).start();
82+
} catch (final IOException e) {
83+
Log.errorf(e, "Error initialising domain proxy client");
84+
}
85+
Quarkus.asyncExit();
8486
}
8587

8688
@PreDestroy
8789
public void stop() {
88-
running.set(false);
89-
executor.shutdown();
90+
executor.shutdownNow();
9091
}
9192
}

java-components/domain-proxy/common/src/main/java/com/redhat/hacbs/domainproxy/common/CommonIOUtil.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.redhat.hacbs.domainproxy.common;
22

3+
import static java.lang.Thread.currentThread;
4+
35
import java.io.IOException;
46
import java.nio.ByteBuffer;
57
import java.nio.channels.SelectionKey;
@@ -30,11 +32,11 @@ private enum Operation {
3032
}
3133
}
3234

33-
public static Runnable createChannelToChannelBiDirectionalHandler(final int byteBufferSize,
35+
public static Runnable channelToChannelBiDirectionalHandler(final int byteBufferSize,
3436
final SocketChannel leftChannel,
3537
final SocketChannel rightChannel) {
3638
return () -> {
37-
Thread.currentThread().setName("ChannelToChannelBiDirectionalHandler");
39+
currentThread().setName("channelToChannelHandler");
3840
LOG.info("Connections opened");
3941
int bytesReadLeft = 0;
4042
int bytesReadRight = 0;
Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.redhat.hacbs.domainproxy.server;
22

33
import static com.redhat.hacbs.domainproxy.common.CommonIOUtil.LOCALHOST;
4-
import static com.redhat.hacbs.domainproxy.common.CommonIOUtil.createChannelToChannelBiDirectionalHandler;
4+
import static com.redhat.hacbs.domainproxy.common.CommonIOUtil.TIMEOUT_MS;
5+
import static com.redhat.hacbs.domainproxy.common.CommonIOUtil.channelToChannelBiDirectionalHandler;
6+
import static java.lang.Thread.currentThread;
57

68
import java.io.IOException;
79
import java.net.InetSocketAddress;
@@ -16,7 +18,6 @@
1618
import java.util.Iterator;
1719
import java.util.concurrent.ExecutorService;
1820
import java.util.concurrent.Executors;
19-
import java.util.concurrent.atomic.AtomicBoolean;
2021

2122
import jakarta.annotation.PostConstruct;
2223
import jakarta.annotation.PreDestroy;
@@ -45,54 +46,52 @@ public class DomainProxyServer {
4546
@ConfigProperty(name = "byte-buffer-size")
4647
int byteBufferSize;
4748

48-
private final AtomicBoolean running = new AtomicBoolean(true);
49-
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
49+
private ExecutorService executor;
5050

5151
@PostConstruct
5252
public void start() {
5353
Log.infof("Byte buffer size %d", byteBufferSize); // TODO Remove
54-
new Thread(() -> {
55-
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
56-
try {
57-
Files.delete(Path.of(domainSocket));
58-
} catch (final IOException e) {
59-
Log.errorf(e, "Error deleting domain socket");
60-
}
61-
}));
62-
try (final ServerSocketChannel serverChannel = ServerSocketChannel.open(StandardProtocolFamily.UNIX);
63-
final Selector selector = Selector.open()) {
64-
serverChannel.bind(UnixDomainSocketAddress.of(domainSocket));
65-
serverChannel.configureBlocking(false);
66-
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
67-
while (running.get()) {
68-
if (selector.selectNow() > 0) {
69-
final Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
70-
while (keys.hasNext()) {
71-
final SelectionKey key = keys.next();
72-
keys.remove();
73-
if (key.isAcceptable()) {
74-
if (key.channel() instanceof final ServerSocketChannel keyChannel) {
75-
final SocketChannel domainSocketChannel = keyChannel.accept();
76-
final SocketChannel httpServerChannel = SocketChannel
77-
.open(new InetSocketAddress(LOCALHOST, httpServerPort));
78-
executor.submit(
79-
createChannelToChannelBiDirectionalHandler(byteBufferSize, httpServerChannel,
80-
domainSocketChannel));
81-
}
82-
}
54+
executor = Executors.newVirtualThreadPerTaskExecutor();
55+
executor.submit(this::startServer);
56+
}
57+
58+
private void startServer() {
59+
try (final ServerSocketChannel serverChannel = ServerSocketChannel.open(StandardProtocolFamily.UNIX);
60+
final Selector selector = Selector.open()) {
61+
currentThread().setName("connectionHandler");
62+
serverChannel.bind(UnixDomainSocketAddress.of(domainSocket));
63+
serverChannel.configureBlocking(false);
64+
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
65+
while (!currentThread().isInterrupted()) {
66+
if (selector.select(TIMEOUT_MS) > 0) {
67+
final Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
68+
while (keys.hasNext()) {
69+
final SelectionKey key = keys.next();
70+
keys.remove();
71+
if (key.isAcceptable()) {
72+
final ServerSocketChannel keyChannel = (ServerSocketChannel) key.channel();
73+
final SocketChannel domainSocketChannel = keyChannel.accept();
74+
final SocketChannel httpServerChannel = SocketChannel
75+
.open(new InetSocketAddress(LOCALHOST, httpServerPort));
76+
executor.submit(channelToChannelBiDirectionalHandler(byteBufferSize, httpServerChannel,
77+
domainSocketChannel));
8378
}
8479
}
8580
}
86-
} catch (final IOException e) {
87-
Log.errorf(e, "Error initialising domain proxy server");
8881
}
89-
Quarkus.asyncExit();
90-
}).start();
82+
} catch (final IOException e) {
83+
Log.errorf(e, "Error initialising domain proxy server");
84+
}
85+
Quarkus.asyncExit();
9186
}
9287

9388
@PreDestroy
9489
public void stop() {
95-
running.set(false);
96-
executor.shutdown();
90+
executor.shutdownNow();
91+
try {
92+
Files.deleteIfExists(Path.of(domainSocket));
93+
} catch (final IOException e) {
94+
Log.errorf(e, "Error deleting domain socket");
95+
}
9796
}
9897
}

0 commit comments

Comments
 (0)