Skip to content

Commit 26535ce

Browse files
committed
Use non-blocking NIO instead. Move DomainProxyServer and associated test classes to correct package.
1 parent 615ca60 commit 26535ce

File tree

8 files changed

+223
-172
lines changed

8 files changed

+223
-172
lines changed

java-components/domain-proxy/client/src/main/java/com/redhat/hacbs/domainproxy/client/DomainProxyClient.java

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
package com.redhat.hacbs.domainproxy.client;
22

3-
import static com.redhat.hacbs.domainproxy.common.CommonIOUtil.createChannelToSocketWriter;
4-
import static com.redhat.hacbs.domainproxy.common.CommonIOUtil.createSocketToChannelWriter;
3+
import static com.redhat.hacbs.domainproxy.common.CommonIOUtil.LOCALHOST;
4+
import static com.redhat.hacbs.domainproxy.common.CommonIOUtil.SELECTOR_TIMEOUT_MS;
5+
import static com.redhat.hacbs.domainproxy.common.CommonIOUtil.createChannelToChannelBiDirectionalHandler;
56

67
import java.io.IOException;
7-
import java.net.ServerSocket;
8-
import java.net.Socket;
8+
import java.net.InetSocketAddress;
9+
import java.net.StandardProtocolFamily;
910
import java.net.UnixDomainSocketAddress;
11+
import java.nio.channels.SelectionKey;
12+
import java.nio.channels.Selector;
13+
import java.nio.channels.ServerSocketChannel;
1014
import java.nio.channels.SocketChannel;
15+
import java.util.Iterator;
16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.Executors;
18+
import java.util.concurrent.atomic.AtomicBoolean;
1119

1220
import jakarta.annotation.PostConstruct;
1321
import jakarta.annotation.PreDestroy;
@@ -36,21 +44,37 @@ public class DomainProxyClient {
3644
@ConfigProperty(name = "byte-buffer-size")
3745
int byteBufferSize;
3846

39-
private volatile boolean running = true;
47+
private final AtomicBoolean running = new AtomicBoolean(true);
48+
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
4049

4150
@PostConstruct
4251
public void start() {
4352
Log.info("Starting domain proxy client...");
44-
Log.infof("Byte buffer size %d", byteBufferSize); // TODO remove
45-
try (final ServerSocket serverSocket = new ServerSocket(clientHttpPort)) {
46-
while (running) {
47-
final Socket socket = serverSocket.accept();
48-
final UnixDomainSocketAddress address = UnixDomainSocketAddress.of(domainSocket);
49-
final SocketChannel channel = SocketChannel.open(address);
50-
// Write from socket to channel
51-
Thread.startVirtualThread(createSocketToChannelWriter(byteBufferSize, socket, channel));
52-
// Write from channel to socket
53-
Thread.startVirtualThread(createChannelToSocketWriter(byteBufferSize, channel, socket));
53+
Log.infof("Byte buffer size %d", byteBufferSize); // TODO Remove
54+
try (final ServerSocketChannel serverChannel = ServerSocketChannel.open(StandardProtocolFamily.INET);
55+
final Selector selector = Selector.open()) {
56+
final InetSocketAddress address = new InetSocketAddress(LOCALHOST, clientHttpPort);
57+
serverChannel.bind(address);
58+
serverChannel.configureBlocking(false);
59+
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
60+
while (running.get()) {
61+
if (selector.select(SELECTOR_TIMEOUT_MS) > 0) {
62+
final Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
63+
while (keys.hasNext()) {
64+
final SelectionKey key = keys.next();
65+
keys.remove();
66+
if (key.isAcceptable()) {
67+
if (key.channel() instanceof final ServerSocketChannel keyChannel) {
68+
final SocketChannel httpClientChannel = keyChannel.accept();
69+
final SocketChannel domainSocketChannel = SocketChannel
70+
.open(UnixDomainSocketAddress.of(domainSocket));
71+
executor.submit(
72+
() -> createChannelToChannelBiDirectionalHandler(byteBufferSize, httpClientChannel,
73+
domainSocketChannel).run());
74+
}
75+
}
76+
}
77+
}
5478
}
5579
} catch (final IOException e) {
5680
Log.errorf(e, "Error initialising domain proxy client");
@@ -60,6 +84,7 @@ public void start() {
6084

6185
@PreDestroy
6286
public void stop() {
63-
running = false;
87+
running.set(false);
88+
executor.shutdown();
6489
}
6590
}
Lines changed: 79 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,100 +1,106 @@
11
package com.redhat.hacbs.domainproxy.common;
22

33
import java.io.IOException;
4-
import java.net.Socket;
5-
import java.net.SocketException;
6-
import java.net.SocketTimeoutException;
74
import java.nio.ByteBuffer;
8-
import java.nio.channels.ClosedChannelException;
5+
import java.nio.channels.SelectionKey;
6+
import java.nio.channels.Selector;
97
import java.nio.channels.SocketChannel;
8+
import java.util.Iterator;
109

1110
import org.jboss.logging.Logger;
1211

1312
public final class CommonIOUtil {
1413

1514
private static final Logger LOG = Logger.getLogger(CommonIOUtil.class);
16-
private static final Object CLOSE_LOCK = new Object();
1715

18-
public static Runnable createSocketToChannelWriter(final int byteBufferSize, final Socket socket,
19-
final SocketChannel channel) {
20-
// Write from socket to channel
21-
return () -> {
22-
Thread.currentThread().setName("SocketToChannelWriter");
23-
int r;
24-
final byte[] buf = new byte[byteBufferSize];
25-
int bytesWritten = 0;
26-
LOG.info("Writing from socket to channel");
27-
try {
28-
while ((r = socket.getInputStream().read(buf)) > 0) {
29-
LOG.infof("Read %d bytes from socket", r);
30-
channel.write(ByteBuffer.wrap(buf, 0, r));
31-
LOG.infof("Wrote %d bytes to channel", r);
32-
bytesWritten += r;
33-
}
34-
} catch (final ClosedChannelException ignore) {
35-
LOG.info("Channel closed");
36-
} catch (final SocketException ignore) {
37-
LOG.info("Socket closed");
38-
} catch (final SocketTimeoutException ignore) {
39-
LOG.info("Socket timed out");
40-
} catch (final IOException e) {
41-
LOG.errorf(e, "Error writing from socket to channel");
42-
} finally {
43-
closeSocketAndChannel(socket, channel);
44-
}
45-
LOG.infof("Wrote %d bytes from socket to channel", bytesWritten);
46-
};
16+
public static final String LOCALHOST = "localhost";
17+
public static final int SELECTOR_TIMEOUT_MS = 1000;
18+
19+
private CommonIOUtil() {
4720
}
4821

49-
public static Runnable createChannelToSocketWriter(final int byteBufferSize, final SocketChannel channel,
50-
final Socket socket) {
51-
// Write from channel to socket
22+
public static Runnable createChannelToChannelBiDirectionalHandler(final int byteBufferSize,
23+
final SocketChannel leftChannel,
24+
final SocketChannel rightChannel) {
5225
return () -> {
53-
Thread.currentThread().setName("ChannelToSocketWriter");
54-
int r;
55-
final ByteBuffer buf = ByteBuffer.allocate(byteBufferSize);
56-
int bytesWritten = 0;
57-
LOG.info("Writing from channel to socket");
58-
try {
59-
while ((r = channel.read(buf)) > 0) {
60-
LOG.infof("Read %d bytes from channel", r);
61-
buf.flip();
62-
socket.getOutputStream().write(buf.array(), buf.arrayOffset(), buf.remaining());
63-
LOG.infof("Wrote %d bytes to socket", r);
64-
buf.clear();
65-
bytesWritten += r;
26+
Thread.currentThread().setName("ChannelToChannelBiDirectionalHandler");
27+
int bytesRead = 0, bytesWritten = 0;
28+
final ByteBuffer buffer = ByteBuffer.allocate(byteBufferSize);
29+
try (final Selector selector = Selector.open()) {
30+
leftChannel.configureBlocking(false);
31+
rightChannel.configureBlocking(false);
32+
leftChannel.register(selector, SelectionKey.OP_READ);
33+
rightChannel.register(selector, SelectionKey.OP_WRITE);
34+
while (leftChannel.isOpen() && rightChannel.isOpen()) {
35+
if (selector.select(SELECTOR_TIMEOUT_MS) > 0) {
36+
final Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
37+
while (keys.hasNext()) {
38+
final SelectionKey key = keys.next();
39+
keys.remove();
40+
if (key.isReadable()) {
41+
if (key.channel() == leftChannel) {
42+
final int bytes = transferData(leftChannel, rightChannel, buffer);
43+
if (bytes == 0) {
44+
return;
45+
}
46+
bytesRead += bytes;
47+
} else if (key.channel() == rightChannel) {
48+
final int bytes = transferData(rightChannel, leftChannel, buffer);
49+
if (bytes == 0) {
50+
return;
51+
}
52+
bytesRead += bytes;
53+
}
54+
}
55+
if (key.isWritable()) {
56+
if (key.channel() == leftChannel) {
57+
bytesWritten += transferData(leftChannel, rightChannel, buffer);
58+
} else if (key.channel() == rightChannel) {
59+
bytesWritten += transferData(rightChannel, leftChannel, buffer);
60+
}
61+
}
62+
}
63+
}
6664
}
67-
} catch (final ClosedChannelException ignore) {
68-
LOG.info("Channel closed");
69-
} catch (final SocketException ignore) {
70-
LOG.info("Socket closed");
71-
} catch (final SocketTimeoutException ignore) {
72-
LOG.info("Socket timed out");
7365
} catch (final IOException e) {
74-
LOG.errorf(e, "Error writing from channel to socket");
66+
LOG.errorf(e, "Error in bi-directional channel handling");
7567
} finally {
76-
closeSocketAndChannel(socket, channel);
68+
closeSocketChannel(leftChannel, rightChannel);
69+
LOG.infof("Read %d bytes between channels", bytesRead);
70+
LOG.infof("Wrote %d bytes between channels", bytesWritten);
7771
}
78-
LOG.infof("Wrote %d bytes from channel to socket", bytesWritten);
7972
};
8073
}
8174

82-
private static void closeSocketAndChannel(final Socket socket, final SocketChannel channel) {
83-
synchronized (CLOSE_LOCK) { // TODO is this necessary?
84-
try {
85-
if (channel != null && channel.isOpen()) {
86-
channel.close();
87-
}
88-
} catch (final IOException e) {
89-
LOG.errorf(e, "Error closing channel");
75+
private static int transferData(final SocketChannel sourceChannel, final SocketChannel destinationChannel,
76+
final ByteBuffer buffer)
77+
throws IOException {
78+
buffer.clear();
79+
final int bytesRead = sourceChannel.read(buffer);
80+
if (bytesRead == -1) {
81+
return 0;
82+
}
83+
buffer.flip();
84+
while (buffer.hasRemaining()) {
85+
destinationChannel.write(buffer);
86+
}
87+
return bytesRead;
88+
}
89+
90+
private static void closeSocketChannel(final SocketChannel sourceChannel, final SocketChannel destinationChannel) {
91+
try {
92+
if (sourceChannel != null && sourceChannel.isOpen()) {
93+
sourceChannel.close();
9094
}
91-
try {
92-
if (socket != null && !socket.isClosed()) {
93-
socket.close();
94-
}
95-
} catch (final IOException e) {
96-
LOG.errorf(e, "Error closing socket");
95+
} catch (final IOException e) {
96+
LOG.errorf(e, "Error closing source channel");
97+
}
98+
try {
99+
if (destinationChannel != null && destinationChannel.isOpen()) {
100+
destinationChannel.close();
97101
}
102+
} catch (final IOException e) {
103+
LOG.errorf(e, "Error closing destination channel");
98104
}
99105
}
100106
}

java-components/domain-proxy/server/src/main/java/com/redhat/hacbs/domainproxy/DomainProxyServer.java

Lines changed: 0 additions & 77 deletions
This file was deleted.

0 commit comments

Comments
 (0)