Skip to content

Commit 0773a32

Browse files
committed
Separate async IO class (Windows)
1 parent ddd9b88 commit 0773a32

File tree

12 files changed

+255
-179
lines changed

12 files changed

+255
-179
lines changed

java-does-usb/jextract/windows/gen_win.cmd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ call %JEXTRACT% --source --output ../../src/main/java ^
3131
--include-constant FORMAT_MESSAGE_ALLOCATE_BUFFER ^
3232
--include-constant FORMAT_MESSAGE_FROM_SYSTEM ^
3333
--include-constant FORMAT_MESSAGE_IGNORE_INSERTS ^
34+
--include-constant FORMAT_MESSAGE_FROM_HMODULE ^
3435
--include-constant INFINITE ^
3536
--include-struct _GUID ^
3637
--include-typedef GUID ^

java-does-usb/src/main/java/net/codecrete/usb/common/EndpointInputStream.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,14 @@ public abstract class EndpointInputStream extends InputStream {
6262
protected EndpointInputStream(USBDeviceImpl device, int endpointNumber) {
6363
this.device = device;
6464
this.endpointNumber = endpointNumber;
65-
65+
arena = Arena.openShared();
6666
bufferSize = 4 * device.getEndpoint(USBDirection.IN, endpointNumber).packetSize();
67+
68+
configureEndpoint();
69+
6770
completedTransferQueue = new ArrayBlockingQueue<>(MAX_OUTSTANDING_TRANSFERS);
6871

6972
// create all transfers, and submit them except one
70-
arena = Arena.openShared();
7173
try {
7274
for (int i = 0; i < MAX_OUTSTANDING_TRANSFERS; i++) {
7375
final var transfer = device.createTransfer();
@@ -204,4 +206,7 @@ private void collectOutstandingTransfers() {
204206
}
205207

206208
protected abstract void submitTransferIn(Transfer transfer);
209+
210+
protected void configureEndpoint() {
211+
}
207212
}

java-does-usb/src/main/java/net/codecrete/usb/common/EndpointOutputStream.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,13 @@ public abstract class EndpointOutputStream extends OutputStream {
6464
protected EndpointOutputStream(USBDeviceImpl device, int endpointNumber) {
6565
this.device = device;
6666
this.endpointNumber = endpointNumber;
67-
6867
packetSize = device.getEndpoint(USBDirection.OUT, endpointNumber).packetSize();
6968
bufferSize = packetSize;
69+
arena = Arena.openShared();
70+
71+
configureEndpoint();
7072

7173
availableTransferQueue = new ArrayBlockingQueue<>(MAX_OUTSTANDING_TRANSFERS);
72-
arena = Arena.openShared();
7374

7475
// prefill transfer queue
7576
for (int i = 0; i < MAX_OUTSTANDING_TRANSFERS; i++) {
@@ -245,4 +246,7 @@ private synchronized void onCompletion(Transfer transfer) {
245246
}
246247

247248
protected abstract void submitTransferOut(Transfer request);
249+
250+
protected void configureEndpoint() {
251+
}
248252
}

java-does-usb/src/main/java/net/codecrete/usb/linux/LinuxAsyncTask.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,11 @@
3232
/**
3333
* Background task for handling asynchronous transfers.
3434
* <p>
35-
* Each USB device must register its file handler with this task.
35+
* Each USB device must register its file handle with this task.
3636
* </p>
3737
* <p>
38-
* The task keeps track of the submitted transfers by remembering the
39-
* URB address (USB request block) in order to match it to the
40-
* completion.
38+
* The task keeps track of the submitted transfers by indexing them
39+
* by URB address (USB request block).
4140
* </p>
4241
* <p>
4342
* URBs are allocated but never freed. To limit the memory usage,
@@ -174,7 +173,7 @@ private void reapURB(int fd, MemorySegment urbPointerHolder, MemorySegment errno
174173
private void notifyAsyncIOTask() {
175174
// start background process if needed
176175
if (asyncIOUpdateEventFd == 0) {
177-
startAsyncIOHandler();
176+
startAsyncIOTask();
178177
return;
179178
}
180179

@@ -307,7 +306,7 @@ synchronized void abortTransfers(LinuxUSBDevice device, byte endpointAddress) {
307306
}
308307
}
309308

310-
private void startAsyncIOHandler() {
309+
private void startAsyncIOTask() {
311310
try (var arena = Arena.openConfined()) {
312311
var errnoState = arena.allocate(Linux.ERRNO_STATE.layout());
313312
asyncIOUpdateEventFd = IO.eventfd(0, 0, errnoState);

java-does-usb/src/main/java/net/codecrete/usb/windows/USBHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class USBHelper {
4545
// USB_PIPE_INFO PipeList[0];/* OUTPUT */
4646
//} USB_NODE_CONNECTION_INFORMATION_EX, *PUSB_NODE_CONNECTION_INFORMATION_EX;
4747
public static final GroupLayout USB_NODE_CONNECTION_INFORMATION_EX$Struct = structLayout(JAVA_INT.withName(
48-
"ConnectionIndex"), DeviceDescriptor.LAYOUT.withName("DeviceDescriptor"), JAVA_BYTE.withName(
48+
"ConnectionIndex"), DeviceDescriptor.LAYOUT.withName("DeviceDescriptor"), JAVA_BYTE.withName(
4949
"CurrentConfigurationValue"), JAVA_BYTE.withName("Speed"), JAVA_BYTE.withName("DeviceIsHub"),
5050
JAVA_SHORT_UNALIGNED.withName("DeviceAddress"), JAVA_INT.withName("NumberOfOpenPipes"),
5151
JAVA_INT.withName("ConnectionStatus")
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
//
2+
// Java Does USB
3+
// Copyright (c) 2023 Manuel Bleichenbacher
4+
// Licensed under MIT License
5+
// https://opensource.org/licenses/MIT
6+
//
7+
8+
package net.codecrete.usb.windows;
9+
10+
import net.codecrete.usb.windows.gen.kernel32.Kernel32;
11+
import net.codecrete.usb.windows.gen.kernel32.OVERLAPPED;
12+
import net.codecrete.usb.windows.winsdk.Kernel32B;
13+
14+
import java.lang.foreign.Arena;
15+
import java.lang.foreign.MemorySegment;
16+
import java.util.ArrayList;
17+
import java.util.HashMap;
18+
import java.util.List;
19+
import java.util.Map;
20+
21+
import static java.lang.foreign.MemorySegment.NULL;
22+
import static java.lang.foreign.ValueLayout.*;
23+
import static net.codecrete.usb.windows.WindowsUSBException.throwLastError;
24+
25+
/**
26+
* Background task for handling asynchronous transfers.
27+
* <p>
28+
* Each USB device must register its handle with this task.
29+
* </p>
30+
* <p>
31+
* The task keeps track of the submitted transfers by indexing them
32+
* by OVERLAPPED struct address.
33+
* </p>
34+
* <p>
35+
* OVERLAPPED structs are allocated but never freed. To limit the memory usage,
36+
* OVERLAPPED structs are reused. So the maximum number of outstanding transfers
37+
* determines the number of allocated OVERLAPPED structs.
38+
* </p>
39+
*/
40+
public class WindowsAsyncTask {
41+
42+
private static WindowsAsyncTask singletonInstance;
43+
44+
/**
45+
* Singleton instance of background task.
46+
*
47+
* @return background task
48+
*/
49+
static synchronized WindowsAsyncTask instance() {
50+
if (singletonInstance == null)
51+
singletonInstance = new WindowsAsyncTask();
52+
return singletonInstance;
53+
}
54+
55+
// Currently outstanding transfer requests,
56+
// indexed by OVERLAPPED address.
57+
private Map<Long, WindowsTransfer> requestsByOverlapped;
58+
// available OVERLAPPED data structures
59+
private List<MemorySegment> availableOverlappedStructs;
60+
// Arena used to allocate OVERLAPPED data structures
61+
private Arena arena;
62+
63+
/**
64+
* Windows completion port for asynchronous/overlapped IO
65+
*/
66+
private MemorySegment asyncIoCompletionPort = NULL;
67+
68+
/**
69+
* Background task for handling asynchronous IO completions.
70+
*/
71+
private void asyncCompletionTask() {
72+
73+
try (var arena = Arena.openConfined()) {
74+
75+
var overlappedHolder = arena.allocate(ADDRESS, NULL);
76+
var numBytesHolder = arena.allocate(JAVA_INT, 0);
77+
var completionKeyHolder = arena.allocate(JAVA_LONG, 0);
78+
var lastErrorState = arena.allocate(Win.LAST_ERROR_STATE.layout());
79+
80+
while (true) {
81+
overlappedHolder.set(ADDRESS, 0, MemorySegment.NULL);
82+
completionKeyHolder.set(JAVA_LONG, 0, 0);
83+
84+
int res = Kernel32B.GetQueuedCompletionStatus(asyncIoCompletionPort, numBytesHolder,
85+
completionKeyHolder, overlappedHolder, Kernel32.INFINITE(), lastErrorState);
86+
var overlappedAddr = overlappedHolder.get(JAVA_LONG, 0);
87+
88+
if (res == 0 && overlappedAddr == 0)
89+
throwLastError(lastErrorState, "Internal error (SetupDiGetDeviceInterfaceDetailW)");
90+
91+
if (overlappedAddr == 0)
92+
return; // registry closing?
93+
94+
completeTransfer(overlappedAddr);
95+
}
96+
}
97+
}
98+
99+
/**
100+
* Add a Windows handle (of a USB device) to the completion port.
101+
* <p>
102+
* The handle is removed by closing it.
103+
* </p>
104+
*
105+
* @param handle Windows handle
106+
*/
107+
synchronized void addDevice(MemorySegment handle) {
108+
109+
try (var arena = Arena.openConfined()) {
110+
var lastErrorState = arena.allocate(Win.LAST_ERROR_STATE.layout());
111+
112+
// Creates a new port if it doesn't exist; adds handle to existing port if it exists
113+
MemorySegment portHandle = Kernel32B.CreateIoCompletionPort(handle, asyncIoCompletionPort,
114+
handle.address(), 0, lastErrorState);
115+
if (portHandle == MemorySegment.NULL)
116+
throwLastError(lastErrorState, "internal error (CreateIoCompletionPort)");
117+
118+
if (asyncIoCompletionPort == MemorySegment.NULL) {
119+
asyncIoCompletionPort = portHandle;
120+
startAsyncIOTask();
121+
}
122+
}
123+
}
124+
125+
private void startAsyncIOTask() {
126+
availableOverlappedStructs = new ArrayList<>();
127+
arena = Arena.openShared();
128+
requestsByOverlapped = new HashMap<>();
129+
130+
// start background thread for handling IO completion
131+
Thread t = new Thread(this::asyncCompletionTask, "USB async IO");
132+
t.setDaemon(true);
133+
t.start();
134+
}
135+
136+
/**
137+
* Prepare a transfer for submission by adding the OVERLAPPED struct.
138+
*
139+
* @param transfer transfer to prepare
140+
*/
141+
synchronized void prepareForSubmission(WindowsTransfer transfer) {
142+
MemorySegment overlapped;
143+
int size = availableOverlappedStructs.size();
144+
if (size == 0) {
145+
overlapped = arena.allocate(OVERLAPPED.$LAYOUT());
146+
} else {
147+
overlapped = availableOverlappedStructs.remove(size - 1);
148+
}
149+
150+
transfer.overlapped = overlapped;
151+
transfer.resultSize = -1;
152+
requestsByOverlapped.put(overlapped.address(), transfer);
153+
}
154+
155+
/**
156+
* Completes the transfer by calling the completion handler.
157+
*
158+
* @param overlappedAddr address of OVERLAPPED struct
159+
*/
160+
private synchronized void completeTransfer(long overlappedAddr) {
161+
var transfer = requestsByOverlapped.remove(overlappedAddr);
162+
if (transfer == null)
163+
return;
164+
165+
transfer.resultCode = (int) OVERLAPPED.Internal$get(transfer.overlapped);
166+
transfer.resultSize = (int) OVERLAPPED.InternalHigh$get(transfer.overlapped);
167+
168+
availableOverlappedStructs.add(transfer.overlapped);
169+
transfer.overlapped = null;
170+
transfer.completion.completed(transfer);
171+
}
172+
}

java-does-usb/src/main/java/net/codecrete/usb/windows/WindowsEndpointInputStream.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@ public class WindowsEndpointInputStream extends EndpointInputStream {
1515

1616
WindowsEndpointInputStream(WindowsUSBDevice device, int endpointNumber) {
1717
super(device, endpointNumber);
18-
device.configureForAsyncIo(USBDirection.IN, endpointNumber);
1918
}
2019

2120
@Override
2221
protected void submitTransferIn(Transfer transfer) {
2322
((WindowsUSBDevice) device).submitTransferIn(endpointNumber, (WindowsTransfer) transfer);
2423
}
24+
25+
@Override
26+
protected void configureEndpoint() {
27+
((WindowsUSBDevice) device).configureForAsyncIo(USBDirection.IN, endpointNumber);
28+
}
2529
}

java-does-usb/src/main/java/net/codecrete/usb/windows/WindowsEndpointOutputStream.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@ public class WindowsEndpointOutputStream extends EndpointOutputStream {
1515

1616
WindowsEndpointOutputStream(WindowsUSBDevice device, int endpointNumber) {
1717
super(device, endpointNumber);
18-
device.configureForAsyncIo(USBDirection.OUT, endpointNumber);
1918
}
2019

2120
@Override
2221
protected void submitTransferOut(Transfer request) {
2322
((WindowsUSBDevice) device).submitTransferOut(endpointNumber, (WindowsTransfer) request);
2423
}
24+
25+
@Override
26+
protected void configureEndpoint() {
27+
((WindowsUSBDevice) device).configureForAsyncIo(USBDirection.OUT, endpointNumber);
28+
}
2529
}

0 commit comments

Comments
 (0)