diff --git a/lib/pure/asyncdispatch.nim b/lib/pure/asyncdispatch.nim index 70d94b023e546..37bc78d262c67 100644 --- a/lib/pure/asyncdispatch.nim +++ b/lib/pure/asyncdispatch.nim @@ -353,9 +353,19 @@ when defined(windows) or defined(nimdoc): var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher + proc close*(disp: PDispatcher) = + if disp.isNil: return + assert disp.callbacks.len == 0 + assert disp.timers.len == 0 + assert disp.handles.len == 0 + if disp.ioPort != 0: + if closeHandle(disp.ioPort) == 0: + raiseOSError(osLastError()) + disp.ioPort = 0 + proc setGlobalDispatcher*(disp: sink PDispatcher) = if not gDisp.isNil: - assert gDisp.callbacks.len == 0 + close(gDisp) gDisp = disp initCallSoonProc() @@ -1120,14 +1130,29 @@ when defined(windows) or defined(nimdoc): doAssert(ev.hWaiter != 0, "Event is not registered in the queue!") let p = getGlobalDispatcher() p.handles.excl(AsyncFD(ev.hEvent)) - if unregisterWait(ev.hWaiter) == 0: + let waitFd = ev.hWaiter + ev.hWaiter = 0 + if unregisterWait(waitFd) == 0: let err = osLastError() if err.int32 != ERROR_IO_PENDING: raiseOSError(err) - ev.hWaiter = 0 + else: + deallocShared(cast[pointer](ev.pcd)) + ev.pcd = nil proc close*(ev: AsyncEvent) = ## Closes event `ev`. + if ev.hWaiter != 0: + unregister(ev) + if closeHandle(ev.hEvent) == 0: + raiseOSError(osLastError()) + if ev.pcd != nil: + # Unregistration completed asynchronously; event callback will free the + # registration state and then release the event object itself. + ev.hEvent = 0 + else: + deallocShared(cast[pointer](ev)) + return let res = closeHandle(ev.hEvent) deallocShared(cast[pointer](ev)) if res == 0: @@ -1146,23 +1171,40 @@ when defined(windows) or defined(nimdoc): proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) = if ev.hWaiter != 0: if cb(fd): - # we need this check to avoid exception, if `unregister(event)` was - # called in callback. - deallocShared(cast[pointer](pcd)) if ev.hWaiter != 0: - unregister(ev) + let waitFd = ev.hWaiter + ev.hWaiter = 0 + p.handles.excl(fd) + if unregisterWait(waitFd) == 0: + let err = osLastError() + if err.int32 != ERROR_IO_PENDING: + raiseOSError(err) + deallocShared(cast[pointer](pcd)) + ev.pcd = nil + if ev.hEvent == 0: + deallocShared(cast[pointer](ev)) else: # if callback returned `false`, then it wants to be called again, so # we need to ref and protect `pcd.ovl` again, because it will be # unrefed and disposed in `poll()`. - GC_ref(pcd.ovl) - pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + if ev.hWaiter != 0: + GC_ref(pcd.ovl) + pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb)) + else: + deallocShared(cast[pointer](pcd)) + ev.pcd = nil + if ev.hEvent == 0: + deallocShared(cast[pointer](ev)) else: # if ev.hWaiter == 0, then event was unregistered before `poll()` call. deallocShared(cast[pointer](pcd)) + ev.pcd = nil + if ev.hEvent == 0: + deallocShared(cast[pointer](ev)) registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb) ev.hWaiter = pcd.waitFd + ev.pcd = pcd initAll() else: @@ -1218,16 +1260,27 @@ else: when defined(nuttx): import std/exitprocs + proc close*(disp: PDispatcher) proc cleanDispatcher() {.noconv.} = + close(gDisp) gDisp = nil proc addFinalyzer() = addExitProc(cleanDispatcher) + proc close*(disp: PDispatcher) = + if disp.isNil: return + assert disp.callbacks.len == 0 + assert disp.timers.len == 0 + assert disp.selector.isNil or disp.selector.isEmpty() + if not disp.selector.isNil: + disp.selector.close() + disp.selector = nil + proc setGlobalDispatcher*(disp: owned PDispatcher) = if not gDisp.isNil: - assert gDisp.callbacks.len == 0 + close(gDisp) gDisp = disp initCallSoonProc() diff --git a/lib/pure/ioselects/ioselectors_kqueue.nim b/lib/pure/ioselects/ioselectors_kqueue.nim index 513578eda64c8..70d8a79bc6eb4 100644 --- a/lib/pure/ioselects/ioselectors_kqueue.nim +++ b/lib/pure/ioselects/ioselectors_kqueue.nim @@ -126,6 +126,7 @@ proc close*[T](s: Selector[T]) = let res2 = posix.close(s.sock) when hasThreadSupport: deinitLock(s.changesLock) + deallocSharedArray(s.changes) deallocSharedArray(s.fds) deallocShared(cast[pointer](s)) if res1 != 0 or res2 != 0: diff --git a/tests/async/tasyncdispatch_dispatcher_fdleak.nim b/tests/async/tasyncdispatch_dispatcher_fdleak.nim new file mode 100644 index 0000000000000..3601235e8b97f --- /dev/null +++ b/tests/async/tasyncdispatch_dispatcher_fdleak.nim @@ -0,0 +1,21 @@ +discard """ + output: "closed" +""" + +when defined(windows): + echo "closed" +else: + import asyncdispatch, selectors, posix + + block: + let disp = newDispatcher() + let fd = getFd(getIoHandler(disp)) + disp.close() + doAssert fcntl(fd.cint, F_GETFD) == -1 + + block: + let fd = getFd(getIoHandler(getGlobalDispatcher())) + setGlobalDispatcher(nil) + doAssert fcntl(fd.cint, F_GETFD) == -1 + + echo "closed" diff --git a/tests/async/tasyncevent_memleak.nim b/tests/async/tasyncevent_memleak.nim new file mode 100644 index 0000000000000..f84dcf6f97a41 --- /dev/null +++ b/tests/async/tasyncevent_memleak.nim @@ -0,0 +1,27 @@ +discard """ + cmd: "nim c -r --threads:on --mm:orc $file" + output: "0" +""" + +when defined(windows): + import asyncdispatch + + let before = getOccupiedSharedMem() + + block: + let ev = newAsyncEvent() + addEvent(ev) do (fd: AsyncFD) -> bool: + true + ev.close() + + block: + let ev = newAsyncEvent() + addEvent(ev) do (fd: AsyncFD) -> bool: + true + ev.unregister() + ev.close() + + setGlobalDispatcher(nil) + echo getOccupiedSharedMem() - before +else: + echo 0 diff --git a/tests/async/tioselectors_memleak.nim b/tests/async/tioselectors_memleak.nim new file mode 100644 index 0000000000000..48d4a8e5e436f --- /dev/null +++ b/tests/async/tioselectors_memleak.nim @@ -0,0 +1,14 @@ +discard """ + cmd: "nim c -r --threads:on -d:threadsafe --mm:orc $file" + output: "0" +""" + +import selectors + +let before = getOccupiedSharedMem() + +block: + let selector = newSelector[int]() + selector.close() + +echo getOccupiedSharedMem() - before