Skip to content

Commit 0f674fd

Browse files
authored
Merge pull request #60 from eed3si9n/wip/async
Non-blocking read support on Windows
2 parents 290368d + c7822e3 commit 0f674fd

10 files changed

+81
-39
lines changed

jni/org_scalasbt_ipcsocket_JNIWin32NamedPipeLibraryProvider.c

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@
1818
(*env)->ThrowNew(env, exClass, _buf); \
1919
} \
2020
} while (0);
21+
#define THROW_SOCKET_TIMEOUT(prefix, ...) \
22+
do { \
23+
char _buf[1024]; \
24+
snprintf(_buf, 1024, prefix ? prefix : "%s", __VA_ARGS__); \
25+
jclass exClass = (*env)->FindClass(env, "java/net/SocketTimeoutException"); \
26+
if (exClass != NULL) { \
27+
(*env)->ThrowNew(env, exClass, _buf); \
28+
} \
29+
} while (0);
2130

2231
#define FILL_ERROR(prefix, buf) \
2332
do { \
@@ -123,8 +132,8 @@ jlong JNICALL
123132
Java_org_scalasbt_ipcsocket_JNIWin32NamedPipeLibraryProvider_PeekNamedPipeNative(
124133
UNUSED JNIEnv *env, UNUSED jobject object, jlong handlePointer) {
125134
DWORD n = 0;
126-
BOOL immediate = PeekNamedPipe((HANDLE)handlePointer, NULL, 0, NULL, &n, NULL);
127-
if (!immediate) {
135+
BOOL ok = PeekNamedPipe((HANDLE)handlePointer, NULL, 0, NULL, &n, NULL);
136+
if (!ok) {
128137
if (GetLastError() != ERROR_IO_PENDING) {
129138
char buf[256];
130139
FILL_ERROR("PeekNamedPipe() failed: %s (error code %ld)", buf);
@@ -137,7 +146,8 @@ Java_org_scalasbt_ipcsocket_JNIWin32NamedPipeLibraryProvider_PeekNamedPipeNative
137146
jint JNICALL
138147
Java_org_scalasbt_ipcsocket_JNIWin32NamedPipeLibraryProvider_readNative(
139148
JNIEnv *env, UNUSED jobject object, jlong waitable, jlong hFile,
140-
jbyteArray buffer, jint offset, jint length, jboolean strict) {
149+
jbyteArray buffer, jint offset, jint length, jboolean strict,
150+
jint timeoutMillis) {
141151
HANDLE handle = (HANDLE)hFile;
142152
OVERLAPPED olap = {0};
143153
olap.hEvent = (HANDLE)waitable;
@@ -151,8 +161,16 @@ Java_org_scalasbt_ipcsocket_JNIWin32NamedPipeLibraryProvider_readNative(
151161
FILL_ERROR("ReadFile() failed: %s (error code %ld)", buf);
152162
THROW_IO(NULL, buf);
153163
}
164+
if (timeoutMillis > 0) {
165+
DWORD res = WaitForSingleObject(olap.hEvent, timeoutMillis);
166+
if (res == WAIT_TIMEOUT) {
167+
CancelIoEx(handle, NULL);
168+
char buf[256];
169+
FILL_ERROR("ReadFile() timed out: %s (error code %ld)", buf);
170+
THROW_SOCKET_TIMEOUT(NULL, buf);
171+
}
172+
}
154173
}
155-
156174
if (!GetOverlappedResult(handle, &olap, &bytes_read, TRUE)) {
157175
char buf[256];
158176
FILL_ERROR(

jni/org_scalasbt_ipcsocket_JNIWin32NamedPipeLibraryProvider.h

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/main/java/org/scalasbt/ipcsocket/JNAWin32NamedPipeLibraryProvider.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.scalasbt.ipcsocket;
22

33
import java.io.IOException;
4+
import java.net.SocketTimeoutException;
45
import java.nio.ByteBuffer;
56

67
import com.sun.jna.*;
@@ -107,12 +108,10 @@ public boolean DisconnectNamedPipe(Handle handle) {
107108
public long PeekNamedPipe(Handle hFile) throws IOException {
108109
HANDLE handle = getHandle(hFile);
109110
IntByReference n = new IntByReference();
110-
boolean immediate = delegate.PeekNamedPipe(handle, null, 0, null, n, null);
111-
if (!immediate) {
111+
boolean ok = delegate.PeekNamedPipe(handle, null, 0, null, n, null);
112+
if (!ok) {
112113
int lastError = delegate.GetLastError();
113-
if (lastError != WinError.ERROR_IO_PENDING) {
114-
throw new IOException("ReadFile() failed: " + lastError);
115-
}
114+
throw new IOException("PeekNamedPipe() failed: " + lastError);
116115
}
117116
return n.getValue();
118117
}
@@ -124,7 +123,8 @@ public int read(
124123
byte[] buffer,
125124
int offset,
126125
int len,
127-
boolean requireStrictLength)
126+
boolean requireStrictLength,
127+
int timeoutMillis)
128128
throws IOException {
129129
HANDLE readerWaitable = getHandle(waitable);
130130
HANDLE handle = getHandle(hFile);
@@ -140,8 +140,13 @@ public int read(
140140
if (lastError != WinError.ERROR_IO_PENDING) {
141141
throw new IOException("ReadFile() failed: " + lastError);
142142
}
143+
if (timeoutMillis > 0) {
144+
int res = delegate.WaitForSingleObject(olap.hEvent, timeoutMillis);
145+
if (res == WinError.WAIT_TIMEOUT) {
146+
throw new SocketTimeoutException("ReadFile() timed out " + res);
147+
}
148+
}
143149
}
144-
145150
IntByReference r = new IntByReference();
146151
if (!delegate.GetOverlappedResult(handle, olap.getPointer(), r, true)) {
147152
int lastError = delegate.GetLastError();

src/main/java/org/scalasbt/ipcsocket/JNIWin32NamedPipeLibraryProvider.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,19 +97,27 @@ public int read(
9797
byte[] buffer,
9898
int offset,
9999
int len,
100-
boolean requireStrictLength)
100+
boolean requireStrictLength,
101+
int timeoutMillis)
101102
throws IOException {
102103
return readNative(
103104
getHandlePointer(waitable),
104105
getHandlePointer(hFile),
105106
buffer,
106107
offset,
107108
len,
108-
requireStrictLength);
109+
requireStrictLength,
110+
timeoutMillis);
109111
}
110112

111113
native int readNative(
112-
long waitable, long hFile, byte[] buffer, int offset, int len, boolean requireStrictLength)
114+
long waitable,
115+
long hFile,
116+
byte[] buffer,
117+
int offset,
118+
int len,
119+
boolean requireStrictLength,
120+
int timeoutMillis)
113121
throws IOException;
114122

115123
@Override

src/main/java/org/scalasbt/ipcsocket/SocketChannels.java

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public static String readLine(SocketChannel channel) throws IOException {
8383
}
8484

8585
/** Utility function to read until newline from a non-blocking channel. */
86-
public static String readLine(SocketChannel channel, int readTimeoutMilis) throws IOException {
86+
public static String readLine(SocketChannel channel, int readTimeoutMillis) throws IOException {
8787
int readBytes;
8888
byte b;
8989
final List<Byte> values = new ArrayList<>();
@@ -96,19 +96,22 @@ public static String readLine(SocketChannel channel, int readTimeoutMilis) throw
9696
if (isJava17Plus() && !ServerSocketChannels.isWin) {
9797
if (!channel.isBlocking()) {
9898
channel.register(sel, SelectionKey.OP_READ);
99-
numOfKeys = sel.select(readTimeoutMilis);
99+
numOfKeys = sel.select(readTimeoutMillis);
100100
}
101101
} else {
102-
if (readTimeoutMilis > 0) {
103-
throw new IOException("timeout requires JDK 17 and non-Windows");
104-
}
105-
if (channel.supportedOptions().contains(SO_TIMEOUT)) {
106-
channel.setOption(SO_TIMEOUT, Integer.valueOf(readTimeoutMilis));
102+
if (readTimeoutMillis > 0) {
103+
if (ServerSocketChannels.isWin) {
104+
if (channel.supportedOptions().contains(SO_TIMEOUT)) {
105+
channel.setOption(SO_TIMEOUT, Integer.valueOf(readTimeoutMillis));
106+
}
107+
} else {
108+
throw new IOException("timeout requires JDK 17 or Windows");
109+
}
107110
}
108111
}
109112
if (numOfKeys == 0) {
110113
throw new SocketTimeoutException(
111-
"readLine timed out after " + Integer.toString(readTimeoutMilis) + " msec");
114+
"readLine timed out after " + Integer.toString(readTimeoutMillis) + " msec");
112115
} else {
113116
readBytes = channel.read(buf);
114117
}
@@ -133,7 +136,8 @@ public static ByteBuffer readAll(SocketChannel channel) throws IOException {
133136
}
134137

135138
/** Utility function to read all buffer from a non-blocking channel. */
136-
public static ByteBuffer readAll(SocketChannel channel, int readTimeoutMilis) throws IOException {
139+
public static ByteBuffer readAll(SocketChannel channel, int readTimeoutMillis)
140+
throws IOException {
137141
int readBytes;
138142
final List<Byte> values = new ArrayList<>();
139143
final int bufSize = 1024 * 1024;
@@ -145,22 +149,25 @@ public static ByteBuffer readAll(SocketChannel channel, int readTimeoutMilis) th
145149
if (isJava17Plus() && !ServerSocketChannels.isWin) {
146150
if (!channel.isBlocking()) {
147151
channel.register(sel, SelectionKey.OP_READ);
148-
numOfKeys = sel.select(readTimeoutMilis);
152+
numOfKeys = sel.select(readTimeoutMillis);
149153
}
150154
} else {
151-
if (readTimeoutMilis > 0) {
152-
throw new IOException("timeout requires JDK 17 and non-Windows");
153-
}
154-
// The following operation gets blocked on JDK 8
155-
// channel.register(sel, SelectionKey.OP_READ);
156-
// numOfKeys = sel.select(readTimeoutMilis);
157-
if (channel.supportedOptions().contains(SO_TIMEOUT)) {
158-
channel.setOption(SO_TIMEOUT, Integer.valueOf(readTimeoutMilis));
155+
if (readTimeoutMillis > 0) {
156+
// The following operation gets blocked on JDK 8
157+
// channel.register(sel, SelectionKey.OP_READ);
158+
// numOfKeys = sel.select(readTimeoutMillis);
159+
if (ServerSocketChannels.isWin) {
160+
if (channel.supportedOptions().contains(SO_TIMEOUT)) {
161+
channel.setOption(SO_TIMEOUT, Integer.valueOf(readTimeoutMillis));
162+
}
163+
} else {
164+
throw new IOException("timeout requires JDK 17 or Windows");
165+
}
159166
}
160167
}
161168
if (numOfKeys == 0) {
162169
throw new SocketTimeoutException(
163-
"readAll timed out after " + Integer.toString(readTimeoutMilis) + " msec");
170+
"readAll timed out after " + Integer.toString(readTimeoutMillis) + " msec");
164171
} else {
165172
readBytes = channel.read(buf);
166173
}

src/main/java/org/scalasbt/ipcsocket/Win32NamedPipeLibrary.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ boolean GetOverlappedResult(
8989

9090
boolean CancelIoEx(HANDLE hObject, Pointer lpOverlapped);
9191

92+
int WaitForSingleObject(HANDLE hObject, int dwMilliseconds);
93+
9294
HANDLE CreateEvent(
9395
SECURITY_ATTRIBUTES lpEventAttributes,
9496
boolean bManualReset,

src/main/java/org/scalasbt/ipcsocket/Win32NamedPipeLibraryProvider.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ int read(
2727
byte[] buffer,
2828
int offset,
2929
int len,
30-
boolean requireStrictLength)
30+
boolean requireStrictLength,
31+
int timeoutMillis)
3132
throws IOException;
3233

3334
void write(Handle waitable, Handle hFile, byte[] lpBuffer, int offset, int len)

src/main/java/org/scalasbt/ipcsocket/Win32NamedPipeSocket.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ public int read() throws IOException {
140140

141141
@Override
142142
public int read(byte[] b, int off, int len) throws IOException {
143-
return provider.read(readerWaitable, handle, b, off, len, requireStrictLength);
143+
return provider.read(
144+
readerWaitable, handle, b, off, len, requireStrictLength, getSoTimeout());
144145
}
145146
}
146147

720 Bytes
Binary file not shown.

src/test/java/org/scalasbt/ipcsocket/SocketChannelTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public void testNonBlockingEchoServer() throws IOException, InterruptedException
2121
"SocketChannelTest#testNonBlockingEchoServer(" + Boolean.toString(useJNI()) + ")");
2222
withSocket(
2323
sock -> {
24-
if (isJava17Plus() && !ServerSocketChannels.isWin) {
24+
if (isJava17Plus() || ServerSocketChannels.isWin) {
2525
String line = nonBlockingEchoServerTest(sock, 100, 600);
2626
assertEquals("echo did not return the content", "hello", line);
2727
}
@@ -34,7 +34,7 @@ public void testTimeout() throws IOException, InterruptedException {
3434
System.out.println("SocketChannelTest#testTimeout(" + Boolean.toString(useJNI()) + ")");
3535
withSocket(
3636
sock -> {
37-
if (isJava17Plus() && !ServerSocketChannels.isWin) {
37+
if (isJava17Plus() || ServerSocketChannels.isWin) {
3838
String line = nonBlockingEchoServerTest(sock, 6000, 600);
3939
assertEquals("echo did not timeout", "<unavailable>", line);
4040
}

0 commit comments

Comments
 (0)