Skip to content

Commit a44fd56

Browse files
committed
Asynchronous code refactoring (Linux)
1 parent 3ba8488 commit a44fd56

File tree

9 files changed

+319
-261
lines changed

9 files changed

+319
-261
lines changed

java-does-usb/src/main/java/net/codecrete/usb/common/EndpointInputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ protected EndpointInputStream(USBDeviceImpl device, int endpointNumber) {
7373
final var transfer = device.createTransfer();
7474
transfer.data = arena.allocate(bufferSize, 8);
7575
transfer.dataSize = bufferSize;
76-
transfer.completionHandler = this::onCompletion;
76+
transfer.completion = this::onCompletion;
7777

7878
if (i == 0) {
7979
currentTransfer = transfer;

java-does-usb/src/main/java/net/codecrete/usb/common/EndpointOutputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ protected EndpointOutputStream(USBDeviceImpl device, int endpointNumber) {
7575
for (int i = 0; i < MAX_OUTSTANDING_TRANSFERS; i++) {
7676
final var transfer = device.createTransfer();
7777
transfer.data = arena.allocate(bufferSize, 8);
78-
transfer.completionHandler = this::onCompletion;
78+
transfer.completion = this::onCompletion;
7979

8080
if (i == 0) {
8181
currentTransfer = transfer;

java-does-usb/src/main/java/net/codecrete/usb/common/Transfer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,5 @@ public class Transfer {
3535
/**
3636
* Completion handler to call when the transfer is complete.
3737
*/
38-
public TransferCompletion completionHandler;
38+
public TransferCompletion completion;
3939
}
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
//
2+
// Java Does USB
3+
// Copyright (c) 2023 Manuel Bleichenbacher
4+
// Licensed under MIT License
5+
// https://opensource.org/licenses/MIT
6+
//
7+
8+
package net.codecrete.usb.linux;
9+
10+
import net.codecrete.usb.linux.gen.errno.errno;
11+
import net.codecrete.usb.linux.gen.poll.poll;
12+
import net.codecrete.usb.linux.gen.poll.pollfd;
13+
import net.codecrete.usb.linux.gen.usbdevice_fs.usbdevfs_urb;
14+
15+
import java.lang.foreign.Arena;
16+
import java.lang.foreign.MemorySegment;
17+
import java.lang.foreign.SegmentAllocator;
18+
import java.lang.foreign.SegmentScope;
19+
import java.util.ArrayList;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import static java.lang.foreign.ValueLayout.ADDRESS;
25+
import static java.lang.foreign.ValueLayout.JAVA_LONG;
26+
import static net.codecrete.usb.linux.LinuxUSBException.throwException;
27+
import static net.codecrete.usb.linux.LinuxUSBException.throwLastError;
28+
import static net.codecrete.usb.linux.USBDevFS.*;
29+
import static net.codecrete.usb.linux.gen.usbdevice_fs.usbdevice_fs.USBDEVFS_URB_TYPE_BULK;
30+
31+
/**
32+
* Background task for handling asynchronous transfers.
33+
* <p>
34+
* Each USB device must register its file handler with this task.
35+
* </p>
36+
* <p>
37+
* The task keeps track of the submitted transfers by remembering the
38+
* URB address (USB request buffer) in order to match it to the
39+
* completion.
40+
* </p>
41+
* <p>
42+
* URBs are allocated but never freed. To limit the memory usage,
43+
* URBs are reused. So the maximum number of outstanding transfers
44+
* determines the number of allocated URBs.
45+
* </p>
46+
*/
47+
public class LinuxAsyncTask {
48+
private static LinuxAsyncTask singletonInstance;
49+
50+
/**
51+
* Singleton instance of background task.
52+
*
53+
* @return background task
54+
*/
55+
static synchronized LinuxAsyncTask instance() {
56+
if (singletonInstance == null)
57+
singletonInstance = new LinuxAsyncTask();
58+
return singletonInstance;
59+
}
60+
61+
private final SegmentAllocator GLOBAL_ALLOCATOR = SegmentAllocator.nativeAllocator(SegmentScope.global());
62+
/// available URBs
63+
private final List<MemorySegment> availableURBs = new ArrayList<>();
64+
/// map of URB addresses to transfer (for outstanding transfers)
65+
private final Map<Long, LinuxTransfer> transfersByURB = new HashMap<>();
66+
/// array of file descriptors using asynchronous completion
67+
private int[] asyncFds;
68+
/// file descriptor to notify async IO background thread about an update
69+
private int asyncIOUpdateEventFd;
70+
71+
/**
72+
* Background task for handling asynchronous IO completions.
73+
*/
74+
private void asyncCompletionTask() {
75+
76+
try (var arena = Arena.openConfined()) {
77+
var errnoState = arena.allocate(Linux.ERRNO_STATE.layout());
78+
var asyncPolls = pollfd.allocateArray(100, arena);
79+
var urbPointerHolder = arena.allocate(ADDRESS);
80+
var eventfdValueHolder = arena.allocate(JAVA_LONG);
81+
82+
while (true) {
83+
84+
// get current file descriptor array
85+
int[] fds;
86+
synchronized (this) {
87+
fds = asyncFds;
88+
}
89+
90+
// prepare pollfd struct array
91+
var n = fds.length;
92+
for (int i = 0; i < n; i++) {
93+
pollfd.fd$set(asyncPolls, i, fds[i]);
94+
pollfd.events$set(asyncPolls, i, (short) (poll.POLLIN() | poll.POLLOUT()));
95+
pollfd.revents$set(asyncPolls, i, (short) 0);
96+
}
97+
98+
pollfd.fd$set(asyncPolls, n, asyncIOUpdateEventFd);
99+
pollfd.events$set(asyncPolls, n, (short) poll.POLLIN());
100+
pollfd.revents$set(asyncPolls, n, (short) 0);
101+
102+
// poll for event
103+
int res = poll.poll(asyncPolls, n + 1, -1);
104+
if (res < 0)
105+
throwException("internal error (poll)");
106+
107+
// check for events
108+
for (int i = 0; i < n + 1; i++) {
109+
var revent = pollfd.revents$get(asyncPolls, i);
110+
if (revent == 0)
111+
continue;
112+
113+
if ((revent & poll.POLLERR()) != 0) {
114+
// most likely the device has been disconnected; ignore
115+
continue;
116+
}
117+
118+
if (i != n) {
119+
// reap URB
120+
int fd = pollfd.fd$get(asyncPolls, i);
121+
reapURB(fd, urbPointerHolder, errnoState);
122+
123+
} else {
124+
// wakeup to refresh list of file descriptors
125+
res = IO.eventfd_read(asyncIOUpdateEventFd, eventfdValueHolder, errnoState);
126+
if (res < 0)
127+
throwLastError(errnoState, "internal error (eventfd_read)");
128+
}
129+
}
130+
131+
}
132+
}
133+
}
134+
135+
private void reapURB(int fd, MemorySegment urbPointerHolder, MemorySegment errnoState) {
136+
int res;
137+
res = IO.ioctl(fd, REAPURB, urbPointerHolder, errnoState);
138+
if (res < 0) {
139+
var err = Linux.getErrno(errnoState);
140+
if (err == errno.EBADF())
141+
return; // ignore, device might have been closed
142+
throwException(err, "internal error (reap URB)");
143+
}
144+
145+
// call completion handler
146+
var urbAddr = urbPointerHolder.get(JAVA_LONG, 0);
147+
var transfer = getTransferResult(urbAddr);
148+
transfer.completion.completed(transfer);
149+
}
150+
151+
/**
152+
* Notifies background process about changed FD list
153+
*/
154+
private void notifyAsyncIOTask() {
155+
// start background process if needed
156+
if (asyncIOUpdateEventFd == 0) {
157+
startAsyncIOHandler();
158+
return;
159+
}
160+
161+
try (var arena = Arena.openConfined()) {
162+
var errnoState = arena.allocate(Linux.ERRNO_STATE.layout());
163+
if (IO.eventfd_write(asyncIOUpdateEventFd, 1, errnoState) < 0)
164+
throwLastError(errnoState, "internal error (eventfd_write)");
165+
}
166+
}
167+
168+
/**
169+
* Register a device for asynchronous IO completion handling
170+
*
171+
* @param device USB device
172+
*/
173+
synchronized void addForAsyncIOCompletion(LinuxUSBDevice device) {
174+
int n = asyncFds != null ? asyncFds.length : 0;
175+
int[] fds = new int[n + 1];
176+
if (n > 0)
177+
System.arraycopy(asyncFds, 0, fds, 0, n);
178+
fds[n] = device.fileDescriptor();
179+
180+
// activate new array
181+
asyncFds = fds;
182+
notifyAsyncIOTask();
183+
}
184+
185+
/**
186+
* Unregisters a device from asynchronous IO completion handling.
187+
*
188+
* @param device USB device
189+
*/
190+
synchronized void removeFromAsyncIOCompletion(LinuxUSBDevice device) {
191+
// copy file descriptor (except the device's) into new array
192+
int n = asyncFds.length;
193+
if (n == 0) {
194+
System.err.println("internal error (file descriptor not found) - ignoring");
195+
return;
196+
}
197+
198+
int fd = device.fileDescriptor();
199+
int[] fds = new int[n - 1];
200+
int tgt = 0;
201+
for (int asyncFd : asyncFds) {
202+
if (asyncFd != fd) {
203+
if (tgt == n) {
204+
System.err.println("internal error (file descriptor not found) - ignoring");
205+
return;
206+
}
207+
fds[tgt] = asyncFd;
208+
tgt += 1;
209+
}
210+
}
211+
212+
// make new array to active one
213+
asyncFds = fds;
214+
notifyAsyncIOTask();
215+
}
216+
217+
synchronized void submitBulkTransfer(LinuxUSBDevice device, int endpointAddress, LinuxTransfer transfer) {
218+
219+
addURB(transfer);
220+
var urb = transfer.urb;
221+
222+
usbdevfs_urb.type$set(urb, (byte) USBDEVFS_URB_TYPE_BULK());
223+
usbdevfs_urb.endpoint$set(urb, (byte) endpointAddress);
224+
usbdevfs_urb.buffer$set(urb, transfer.data);
225+
usbdevfs_urb.buffer_length$set(urb, transfer.dataSize);
226+
usbdevfs_urb.usercontext$set(urb, MemorySegment.ofAddress(device.fileDescriptor()));
227+
228+
try (var arena = Arena.openConfined()) {
229+
var errnoState = arena.allocate(Linux.ERRNO_STATE.layout());
230+
if (IO.ioctl(device.fileDescriptor(), SUBMITURB, urb, errnoState) < 0) {
231+
String action = endpointAddress >= 128 ? "reading from" : "writing to";
232+
throwLastError(errnoState, "failed %s endpoint %d", action, endpointAddress);
233+
}
234+
}
235+
}
236+
237+
private void addURB(LinuxTransfer transfer) {
238+
MemorySegment urb;
239+
int size = availableURBs.size();
240+
if (size > 0) {
241+
urb = availableURBs.remove(size - 1);
242+
} else {
243+
urb = usbdevfs_urb.allocate(GLOBAL_ALLOCATOR);
244+
}
245+
246+
transfer.urb = urb;
247+
transfersByURB.put(urb.address(), transfer);
248+
}
249+
250+
private synchronized LinuxTransfer getTransferResult(long urbAddr) {
251+
var transfer = transfersByURB.remove(urbAddr);
252+
if (transfer == null)
253+
throwException("internal error (unknown URB)");
254+
255+
transfer.resultCode = usbdevfs_urb.status$get(transfer.urb);
256+
transfer.resultSize = usbdevfs_urb.actual_length$get(transfer.urb);
257+
258+
availableURBs.add(transfer.urb);
259+
transfer.urb = null;
260+
return transfer;
261+
}
262+
263+
synchronized void abortTransfers(LinuxUSBDevice device, byte endpointAddress) {
264+
int fd = device.fileDescriptor();
265+
try (var arena = Arena.openConfined()) {
266+
var errnoState = arena.allocate(Linux.ERRNO_STATE.layout());
267+
268+
for (var urbAddress : transfersByURB.keySet()) {
269+
var urb = usbdevfs_urb.ofAddress(MemorySegment.ofAddress(urbAddress), SegmentScope.global());
270+
if (fd != (int) usbdevfs_urb.usercontext$get(urb).address())
271+
continue;
272+
if (endpointAddress != usbdevfs_urb.endpoint$get(urb))
273+
continue;
274+
275+
if (IO.ioctl(fd, DISCARDURB, urb, errnoState) < 0)
276+
throwLastError(errnoState, "failed to cancel transfer");
277+
}
278+
}
279+
}
280+
281+
private void startAsyncIOHandler() {
282+
try (var arena = Arena.openConfined()) {
283+
var errnoState = arena.allocate(Linux.ERRNO_STATE.layout());
284+
asyncIOUpdateEventFd = IO.eventfd(0, 0, errnoState);
285+
if (asyncIOUpdateEventFd == -1) {
286+
asyncIOUpdateEventFd = 0;
287+
throwLastError(errnoState, "internal error (eventfd)");
288+
}
289+
}
290+
291+
// start background thread for handling IO completion
292+
Thread t = new Thread(this::asyncCompletionTask, "USB async IO");
293+
t.setDaemon(true);
294+
t.start();
295+
}
296+
}

0 commit comments

Comments
 (0)