Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 63 additions & 10 deletions lib/pure/asyncdispatch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,19 @@ when defined(windows) or defined(nimdoc):

var gDisp{.threadvar.}: owned PDispatcher ## Global dispatcher

proc close*(disp: PDispatcher) =
if disp.isNil: return
assert disp.callbacks.len == 0
assert disp.timers.len == 0
assert disp.handles.len == 0
if disp.ioPort != 0:
if closeHandle(disp.ioPort) == 0:
raiseOSError(osLastError())
disp.ioPort = 0

proc setGlobalDispatcher*(disp: sink PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
close(gDisp)
gDisp = disp
initCallSoonProc()

Expand Down Expand Up @@ -1120,14 +1130,29 @@ when defined(windows) or defined(nimdoc):
doAssert(ev.hWaiter != 0, "Event is not registered in the queue!")
let p = getGlobalDispatcher()
p.handles.excl(AsyncFD(ev.hEvent))
if unregisterWait(ev.hWaiter) == 0:
let waitFd = ev.hWaiter
ev.hWaiter = 0
if unregisterWait(waitFd) == 0:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
raiseOSError(err)
ev.hWaiter = 0
else:
deallocShared(cast[pointer](ev.pcd))
ev.pcd = nil

proc close*(ev: AsyncEvent) =
## Closes event `ev`.
if ev.hWaiter != 0:
unregister(ev)
if closeHandle(ev.hEvent) == 0:
raiseOSError(osLastError())
if ev.pcd != nil:
# Unregistration completed asynchronously; event callback will free the
# registration state and then release the event object itself.
ev.hEvent = 0
else:
deallocShared(cast[pointer](ev))
return
let res = closeHandle(ev.hEvent)
deallocShared(cast[pointer](ev))
if res == 0:
Expand All @@ -1146,23 +1171,40 @@ when defined(windows) or defined(nimdoc):
proc eventcb(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if ev.hWaiter != 0:
if cb(fd):
# we need this check to avoid exception, if `unregister(event)` was
# called in callback.
deallocShared(cast[pointer](pcd))
if ev.hWaiter != 0:
unregister(ev)
let waitFd = ev.hWaiter
ev.hWaiter = 0
p.handles.excl(fd)
if unregisterWait(waitFd) == 0:
let err = osLastError()
if err.int32 != ERROR_IO_PENDING:
raiseOSError(err)
deallocShared(cast[pointer](pcd))
ev.pcd = nil
if ev.hEvent == 0:
deallocShared(cast[pointer](ev))
else:
# if callback returned `false`, then it wants to be called again, so
# we need to ref and protect `pcd.ovl` again, because it will be
# unrefed and disposed in `poll()`.
GC_ref(pcd.ovl)
pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
if ev.hWaiter != 0:
GC_ref(pcd.ovl)
pcd.ovl.data.cell = system.protect(rawEnv(pcd.ovl.data.cb))
else:
deallocShared(cast[pointer](pcd))
ev.pcd = nil
if ev.hEvent == 0:
deallocShared(cast[pointer](ev))
else:
# if ev.hWaiter == 0, then event was unregistered before `poll()` call.
deallocShared(cast[pointer](pcd))
ev.pcd = nil
if ev.hEvent == 0:
deallocShared(cast[pointer](ev))

registerWaitableHandle(p, hEvent, flags, pcd, INFINITE, eventcb)
ev.hWaiter = pcd.waitFd
ev.pcd = pcd

initAll()
else:
Expand Down Expand Up @@ -1218,16 +1260,27 @@ else:

when defined(nuttx):
import std/exitprocs
proc close*(disp: PDispatcher)

proc cleanDispatcher() {.noconv.} =
close(gDisp)
gDisp = nil

proc addFinalyzer() =
addExitProc(cleanDispatcher)

proc close*(disp: PDispatcher) =
if disp.isNil: return
assert disp.callbacks.len == 0
assert disp.timers.len == 0
assert disp.selector.isNil or disp.selector.isEmpty()
if not disp.selector.isNil:
disp.selector.close()
disp.selector = nil

proc setGlobalDispatcher*(disp: owned PDispatcher) =
if not gDisp.isNil:
assert gDisp.callbacks.len == 0
close(gDisp)
gDisp = disp
initCallSoonProc()

Expand Down
1 change: 1 addition & 0 deletions lib/pure/ioselects/ioselectors_kqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ proc close*[T](s: Selector[T]) =
let res2 = posix.close(s.sock)
when hasThreadSupport:
deinitLock(s.changesLock)
deallocSharedArray(s.changes)
deallocSharedArray(s.fds)
deallocShared(cast[pointer](s))
if res1 != 0 or res2 != 0:
Expand Down
21 changes: 21 additions & 0 deletions tests/async/tasyncdispatch_dispatcher_fdleak.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
discard """
output: "closed"
"""

when defined(windows):
echo "closed"
else:
import asyncdispatch, selectors, posix

block:
let disp = newDispatcher()
let fd = getFd(getIoHandler(disp))
disp.close()
doAssert fcntl(fd.cint, F_GETFD) == -1

block:
let fd = getFd(getIoHandler(getGlobalDispatcher()))
setGlobalDispatcher(nil)
doAssert fcntl(fd.cint, F_GETFD) == -1

echo "closed"
27 changes: 27 additions & 0 deletions tests/async/tasyncevent_memleak.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
discard """
cmd: "nim c -r --threads:on --mm:orc $file"
output: "0"
"""

when defined(windows):
import asyncdispatch

let before = getOccupiedSharedMem()

block:
let ev = newAsyncEvent()
addEvent(ev) do (fd: AsyncFD) -> bool:
true
ev.close()

block:
let ev = newAsyncEvent()
addEvent(ev) do (fd: AsyncFD) -> bool:
true
ev.unregister()
ev.close()

setGlobalDispatcher(nil)
echo getOccupiedSharedMem() - before
else:
echo 0
14 changes: 14 additions & 0 deletions tests/async/tioselectors_memleak.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
discard """
cmd: "nim c -r --threads:on -d:threadsafe --mm:orc $file"
output: "0"
"""

import selectors

let before = getOccupiedSharedMem()

block:
let selector = newSelector[int]()
selector.close()

echo getOccupiedSharedMem() - before
Loading