2222import java .util .concurrent .locks .Condition ;
2323import java .util .concurrent .locks .ReentrantLock ;
2424
25- import static java .lang .foreign .ValueLayout .ADDRESS ;
26- import static java .lang .foreign .ValueLayout .JAVA_INT ;
25+ import static java .lang .foreign .MemorySegment .NULL ;
26+ import static java .lang .foreign .ValueLayout .*;
27+
2728
2829/**
2930 * Background task for handling asynchronous transfers.
@@ -54,6 +55,7 @@ enum TaskState {
5455 private TaskState state = TaskState .NOT_STARTED ;
5556 private MemorySegment asyncIoRunLoop ;
5657 private MemorySegment completionUpcallStub ;
58+ private MemorySegment messagePort ;
5759 private long lastTransferId ;
5860 private final Map <Long , MacosTransfer > transfersById = new HashMap <>();
5961
@@ -67,15 +69,9 @@ void addEventSource(MemorySegment source) {
6769 asyncIoLock .lock ();
6870
6971 if (state != TaskState .RUNNING ) {
70- if (state == TaskState .NOT_STARTED ) {
71- startAsyncIOThread (source );
72- waitForRunLoopReady ();
73- return ;
74-
75- } else {
76- // special case: run loop is not ready yet but background process is already starting
77- waitForRunLoopReady ();
78- }
72+ if (state == TaskState .NOT_STARTED )
73+ startAsyncIOThread ();
74+ waitForRunLoopReady ();
7975 }
8076
8177 CoreFoundation .CFRunLoopAddSource (asyncIoRunLoop , source , IOKit .kCFRunLoopDefaultMode ());
@@ -92,34 +88,60 @@ private void waitForRunLoopReady() {
9288
9389 /**
9490 * Removes an event source from this background task.
95- *
91+ * <p>
92+ * The event source is not immediately removed. Instead, it is posted to a message queue
93+ * processed by the same background thread processing the completion callbacks. This ensures
94+ * that the events from releasing interfaces and closing devices are processed.
95+ * </p>
9696 * @param source event source
9797 */
9898 void removeEventSource (MemorySegment source ) {
99- CoreFoundation .CFRunLoopRemoveSource (asyncIoRunLoop , source , IOKit .kCFRunLoopDefaultMode ());
99+ try (var arena = Arena .ofConfined ()) {
100+ var eventSourceRef = arena .allocate (JAVA_LONG , 1 );
101+ eventSourceRef .set (JAVA_LONG , 0 , source .address ());
102+ var dataRef = CoreFoundation .CFDataCreate (NULL , eventSourceRef , eventSourceRef .byteSize ());
103+ CoreFoundation .CFMessagePortSendRequest (messagePort , 0 , dataRef , 0 , 0 , NULL , NULL );
104+ CoreFoundation .CFRelease (dataRef );
105+ }
100106 }
101107
102108 /**
103109 * Starts the background thread.
104- *
105- * @param firstSource first event source
106110 */
107- private void startAsyncIOThread (MemorySegment firstSource ) {
111+ @ SuppressWarnings ("java:S125" )
112+ private void startAsyncIOThread () {
113+ MemorySegment messagePortSource ;
114+
108115 try {
109116 state = TaskState .STARTING ;
117+
118+ // create descriptor for completion callback function
110119 var completionHandlerFuncDesc = FunctionDescriptor .ofVoid (ADDRESS , JAVA_INT , ADDRESS );
111120 var asyncIOCompletedMH = MethodHandles .lookup ().findVirtual (MacosAsyncTask .class , "asyncIOCompleted" ,
112121 MethodType .methodType (void .class , MemorySegment .class , int .class , MemorySegment .class ));
113122
114123 var methodHandle = asyncIOCompletedMH .bindTo (this );
115- completionUpcallStub = Linker .nativeLinker ().upcallStub (methodHandle , completionHandlerFuncDesc ,
116- Arena .global ());
124+ completionUpcallStub = Linker .nativeLinker ().upcallStub (methodHandle , completionHandlerFuncDesc , Arena .global ());
125+
126+ // create descriptor for message port callback function
127+ var messagePortCallbackFuncDec = FunctionDescriptor .of (ADDRESS , ADDRESS , JAVA_INT , ADDRESS , ADDRESS );
128+ var messagePortCallbackMH = MethodHandles .lookup ().findVirtual (MacosAsyncTask .class , "messagePortCallback" ,
129+ MethodType .methodType (MemorySegment .class , MemorySegment .class , int .class , MemorySegment .class , MemorySegment .class ));
130+ var messagePortCallbackHandle = messagePortCallbackMH .bindTo (this );
131+ var messagePortCallbackStub = Linker .nativeLinker ().upcallStub (messagePortCallbackHandle , messagePortCallbackFuncDec , Arena .global ());
132+
133+ // create local and remote message ports
134+ var pid = ProcessHandle .current ().pid ();
135+ var portName = CoreFoundationHelper .createCFStringRef ("net.codecrete.usb.macos.eventsource." + pid , Arena .global ());
136+ var localPort = CoreFoundation .CFMessagePortCreateLocal (NULL , portName , messagePortCallbackStub , NULL , NULL );
137+ messagePortSource = CoreFoundation .CFMessagePortCreateRunLoopSource (NULL , localPort , 0 );
138+ messagePort = CoreFoundation .CFMessagePortCreateRemote (NULL , portName );
117139
118140 } catch (IllegalAccessException | NoSuchMethodException e ) {
119141 throw new UsbException ("internal error (creating method handle)" , e );
120142 }
121143
122- var thread = new Thread (() -> asyncIOCompletionTask (firstSource ), "USB async IO" );
144+ var thread = new Thread (() -> asyncIOCompletionTask (messagePortSource ), "USB async IO" );
123145 thread .setDaemon (true );
124146 thread .start ();
125147 }
@@ -184,6 +206,20 @@ private void asyncIOCompleted(MemorySegment refcon, int result, MemorySegment ar
184206 transfer .completion ().completed (transfer );
185207 }
186208
209+ /**
210+ * Callback function called when a message is received on the message port.
211+ * <p>
212+ * All messages are related to removing event sources. They just contain the run loop source reference.
213+ * </p>
214+ */
215+ @ SuppressWarnings ({"java:S1144" , "unused" })
216+ private MemorySegment messagePortCallback (MemorySegment local , int msgid , MemorySegment data , MemorySegment info ) {
217+ var runloopSourceRefPtr = CoreFoundation .CFDataGetBytePtr (data );
218+ var runloopSourceRef = MemorySegment .ofAddress (runloopSourceRefPtr .get (JAVA_LONG_UNALIGNED , 0 ));
219+ CoreFoundation .CFRunLoopRemoveSource (asyncIoRunLoop , runloopSourceRef , IOKit .kCFRunLoopDefaultMode ());
220+ return NULL ;
221+ }
222+
187223 /**
188224 * Gets the native IO completion callback function for asynchronous transfers
189225 * to be handled by this background task.
0 commit comments