Skip to content
This repository was archived by the owner on Apr 23, 2023. It is now read-only.

Commit d7d737d

Browse files
author
Jack Tang
committed
add cleanThread
1 parent a042b9c commit d7d737d

File tree

2 files changed

+145
-46
lines changed

2 files changed

+145
-46
lines changed

src/threadproxy.nim

Lines changed: 108 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
when not(compileOption("threads")):
22
{.fatal: "--threads:on is required for threadproxy".}
3-
4-
import json, tables, asyncdispatch
3+
4+
import json, tables, asyncdispatch, sets
55

66
# almost always use together
77
export json, asyncdispatch
@@ -111,24 +111,27 @@ type
111111
## Signature for entry function of thread
112112

113113
proc finalize(ch: ThreadChannelWrapper) =
114-
ch.self[].close()
115-
deallocShared(ch.self)
114+
if not ch.self.isNil:
115+
ch.self[].close()
116+
deallocShared(ch.self)
117+
ch.self = nil
116118

117119
proc newThreadChannel(channelMaxItems: int): ThreadChannelWrapper =
118120
result.new(finalize)
119121
result.self = cast[ThreadChannelPtr](allocShared0(sizeof(ThreadChannel)))
120122
result.self[].open(channelMaxItems)
121123

122124
proc finalize(th: ThreadWrapper) =
123-
dealloc(th.self)
125+
if not th.self.isNil:
126+
dealloc(th.self)
127+
th.self = nil
124128

125129
proc newThread(): ThreadWrapper =
126130
result.new(finalize)
127131
result.self = cast[ptr Thread[ThreadProxy]](alloc0(sizeof(Thread[ThreadProxy])))
128132

129133
proc defaultAction(action: string, data: JsonNode): Future[JsonNode] {.async.} =
130134
# default action should ignore the message
131-
# echo "No handler for action: " & action & ", " & $data
132135
return nil
133136

134137
proc newThreadProxy*(token: ThreadToken): ThreadProxy =
@@ -195,14 +198,14 @@ proc newFailureMessage(
195198
)
196199

197200
proc newSysMessage(
198-
action: string,
201+
action: SysMsg,
199202
channel: ThreadChannelPtr,
200203
data: JsonNode = nil,
201204
callbackId: int = 0
202205
): ThreadMessage =
203206
ThreadMessage(
204207
kind: SYS,
205-
action: action,
208+
action: $action,
206209
channel: channel,
207210
json: if data.isNil: newJNull() else: data,
208211
callbackId: callbackId
@@ -260,14 +263,14 @@ template onDefaultData*(proxy: ThreadProxy, body: untyped) =
260263
let action {.inject.} = a
261264
let data {.inject.} = j
262265
`body`
263-
266+
264267
proc send(proxy: ThreadProxy, target: ThreadChannelPtr, action: string, data: JsonNode): Future[void] =
265268
result = newFuture[void]("send")
266269
let sent = target[].trySend proxy.newMessage(EMIT, action, data)
267270
if sent:
268271
result.complete()
269272
else:
270-
let err = newException(MessageUndeliveredError, "failed to send")
273+
let err = newException(MessageUndeliveredError, "Failed to send message to target")
271274
err.action = action
272275
err.kind = EMIT
273276
err.data = data
@@ -281,7 +284,7 @@ proc ask(proxy: ThreadProxy, target: ThreadChannelPtr, action: string, data: Jso
281284
let sent = target[].trySend proxy.newMessage(REQUEST, action, data, id)
282285
if not sent:
283286
proxy.jsonCallbacks.del(id)
284-
let err = newException(MessageUndeliveredError, "failed to send")
287+
let err = newException(MessageUndeliveredError, "Failed to send message to target")
285288
err.action = action
286289
err.kind = EMIT
287290
err.data = data
@@ -294,10 +297,10 @@ proc getChannel(proxy: ThreadProxy, name: string): Future[ThreadChannelPtr] =
294297
if proxy.isMainThreadProxy:
295298
# get from channels
296299
let mainProxy = cast[MainThreadProxy](proxy)
297-
let chp = mainProxy.channels.getOrDefault(name, nil)
298-
if not chp.isNil:
300+
let ch = mainProxy.directory.getOrDefault(name, nil)
301+
if not ch.isNil:
299302
result = newFuture[ThreadChannelPtr]("getChannel")
300-
result.complete(chp.channel.self)
303+
result.complete(ch)
301304
else:
302305
var err = newException(ReceiverNotFoundError, "Cannot not find " & name)
303306
err.sender = proxy.name
@@ -316,7 +319,7 @@ proc getChannel(proxy: ThreadProxy, name: string): Future[ThreadChannelPtr] =
316319
result = newFuture[ThreadChannelPtr]("getChannel")
317320
proxy.channelCallbacks[id] = result
318321
# reply to fast Channel
319-
proxy.mainFastChannel[].send newSysMessage($GET_NAME_REQ, proxy.fastChannel, %name, id)
322+
proxy.mainFastChannel[].send newSysMessage(GET_NAME_REQ, proxy.fastChannel, %name, id)
320323

321324

322325
proc send*(proxy: ThreadProxy, target: string, action: string, data: JsonNode = nil): Future[void] {.async.} =
@@ -336,7 +339,7 @@ proc ask*(proxy: ThreadProxy, target: string, action: string, data: JsonNode = n
336339

337340
proc processEvent(proxy: ThreadProxy, event: ThreadMessage) =
338341
# for debug
339-
# echo proxy.name, event
342+
# echo proxy.name, event[]
340343

341344
case event.kind:
342345
of EMIT:
@@ -394,16 +397,16 @@ proc processEvent(proxy: ThreadProxy, event: ThreadMessage) =
394397
let chp = mainProxy.channels.getOrDefault(name, nil)
395398
if chp.isNil:
396399
# cannot find name, send back with sender channel pointer
397-
sender[].send newSysMessage($GET_NAME_REP, sender, %name, event.callbackId)
400+
sender[].send newSysMessage(GET_NAME_REP, sender, %name, event.callbackId)
398401
else:
399402
# found and reply
400-
sender[].send newSysMessage($GET_NAME_REP, chp.fastChannel.self, %name, event.callbackId)
403+
sender[].send newSysMessage(GET_NAME_REP, chp.fastChannel.self, %name, event.callbackId)
401404
of $GET_NAME_REP:
402405
let name = event.json.getStr()
403406
let ch = event.channel
404407
let id = event.callbackId
405408
let cb = proxy.channelCallbacks.getOrDefault(id, nil)
406-
if ch == proxy.channel:
409+
if ch == proxy.fastChannel:
407410
# not found
408411
if not cb.isNil:
409412
proxy.channelCallbacks.del id
@@ -418,17 +421,22 @@ proc processEvent(proxy: ThreadProxy, event: ThreadMessage) =
418421
proxy.channelCallbacks.del id
419422
cb.complete(ch)
420423
of $DEL_NAME_REQ:
421-
let name = event.json.getStr()
422-
let sender = event.channel
424+
let names = event.json
425+
let mainChannel = event.channel
423426
let id = event.callbackId
424-
proxy.directory.del name
425-
sender[].send newSysMessage($DEL_NAME_REP, nil, nil, id)
426-
# of $DEL_NAME_REP:
427-
427+
for name in names:
428+
proxy.directory.del name.getStr()
429+
mainChannel[].send newSysMessage(DEL_NAME_REP, proxy.fastChannel, %proxy.name, id)
430+
of $DEL_NAME_REP:
431+
let id = event.callbackId
432+
let future = proxy.jsonCallbacks[id]
433+
if likely(not future.isNil):
434+
proxy.jsonCallbacks.del id
435+
future.complete(event.json)
428436
else:
429437
raise newException(Defect, "Unknown system action " & event.action)
430438

431-
proc process*(proxy: ThreadProxy): bool =
439+
proc process*(proxy: ThreadProxy): bool {.gcsafe.} =
432440
## Process one message on channel. Return false if channel is empty, otherwise true.
433441

434442
# process fast channel first if tryRecv success
@@ -491,6 +499,7 @@ proc createToken*(proxy: MainThreadProxy, name: string, channelMaxItems: int = 0
491499
channel: ch,
492500
fastChannel: fch
493501
)
502+
proxy.directory[name] = ch.self
494503
return ThreadToken(
495504
name: name,
496505
mainFastChannel: proxy.fastChannel,
@@ -523,28 +532,84 @@ proc pinToCpu*(proxy: MainThreadProxy, name: string, cpu: Natural) =
523532

524533
proc isThreadRunning*(proxy: MainThreadProxy, name: string): bool =
525534
## Check whether thread is running. Applicable only to threads created with `createThread`
526-
if name in proxy.threads:
535+
if name notin proxy.threads:
527536
let err = newException(ThreadNotFoundError, "Cannot find thread with name " & name)
528537
err.threadName = name
529538
raise err
530539
let thread = proxy.threads[name]
531540
result = running(thread.self[])
532541

533-
proc cleanThread*(proxy: MainThreadProxy, name: string): Future[void] =
534-
## Unreference reousrces associated with a non-running thread of `name`.
535-
## The resources will be released upon GC.
536-
##
537-
## Call GC_fullCollect() right after cleanupThread() if you want to release the resource immediately.
538-
##
539-
## Raise ThreadUncleanableError if the thread is running
540-
##
541-
if proxy.isThreadRunning(name):
542-
var err = newException(ThreadUncleanableError, "Cannot clean up a running thread")
543-
err.threadName = name
544-
raise err
545-
546-
# todo ask all threads to remove cache of name
547-
#
542+
# import sequtils
543+
# proc debugPrint*(proxy: ThreadProxy) =
544+
# echo proxy.name
545+
# if proxy of MainThreadProxy:
546+
# let proxy = cast[MainThreadProxy](proxy)
547+
# echo " channels: ", toSeq(proxy.channels.keys)
548+
# echo " threads: ", toSeq(proxy.threads.keys)
549+
# echo " directory: ", toSeq(proxy.directory.keys)
550+
551+
proc deleteThread(proxy: MainThreadProxy, name: string) =
552+
# this is do not clean up directory, use cleanThread
553+
let th = proxy.threads[name]
554+
th.finalize()
548555
proxy.threads.del(name)
556+
557+
let ch = proxy.channels[name]
558+
ch.channel.finalize()
559+
ch.fastChannel.finalize()
549560
proxy.channels.del(name)
550-
# GC_fullCollect()
561+
562+
proc cleanThreads*(proxy: MainThreadProxy): Future[void] =
563+
## Clean up resource of non-running threads
564+
565+
let ret = newFuture[void]("cleanThread")
566+
result = ret
567+
568+
# separate active and inactive threads
569+
var
570+
actives: seq[string]
571+
inactives: seq[string]
572+
for n, _ in proxy.directory:
573+
if proxy.isThreadRunning(n):
574+
actives.add n
575+
else:
576+
inactives.add n
577+
578+
# return if nothing to do
579+
if inactives.len == 0:
580+
ret.complete()
581+
return
582+
583+
# delete from directory to prevent new GET_NAME_REQ
584+
for n in inactives:
585+
proxy.directory.del n
586+
587+
# ask threads to delete directory of inactives
588+
var targets = actives.toHashSet()
589+
for target in actives:
590+
# ask all threads to delete directory
591+
let id = proxy.nextCallbackId()
592+
# todo: add timeout for failure detection
593+
let future = newFuture[JsonNode]("cleanThread")
594+
proxy.jsonCallbacks[id] = future
595+
future.addCallback proc(f: Future[JsonNode]) =
596+
if f.failed:
597+
ret.fail(f.readError)
598+
else:
599+
targets.excl f.read.getStr()
600+
if targets.len == 0:
601+
# after recieve all responds, delete resources
602+
for n in inactives:
603+
proxy.deleteThread n
604+
ret.complete()
605+
606+
# send to thread
607+
let chp = proxy.channels[target]
608+
chp.fastChannel.self[].send newSysMessage(DEL_NAME_REQ, proxy.fastChannel, %inactives, id)
609+
610+
# no thread to wait, just complete
611+
if actives.len == 0:
612+
for n in inactives:
613+
proxy.deleteThread n
614+
ret.complete()
615+

tests/test.nim

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,23 @@ import threadproxy
33
import json
44

55
proc workerMain(proxy: ThreadProxy) {.thread.} =
6+
var active = true
67

78
proxy.onData "ping":
89
return data
910

11+
proxy.onData "pingTo":
12+
let future = proxy.ask(data.getStr(), "ping", %proxy.name)
13+
yield future
14+
return %( not future.failed )
15+
1016
proxy.onData "resend":
1117
asyncCheck proxy.send("main", "recev", data)
1218

19+
proxy.onData "stop":
20+
active = false
21+
proxy.stop()
22+
1323
proxy.on "failure", proc(j: JsonNode): Future[JsonNode] {.gcsafe,async.} =
1424
if true:
1525
raise newException(ValueError, j.getStr())
@@ -20,7 +30,7 @@ proc workerMain(proxy: ThreadProxy) {.thread.} =
2030
"data": data
2131
}
2232

23-
while true:
33+
while active:
2434
try:
2535
waitFor proxy.poll()
2636
except:
@@ -92,6 +102,30 @@ suite "threadproxy":
92102
# assert err.msg == errMsg
93103
waitFor run()
94104

105+
test "cleanThreads":
106+
let run = proc() {.async.} =
107+
let proxy = newMainThreadProxy("main")
108+
asyncCheck proxy.poll()
109+
proxy.createThread("worker1", workerMain)
110+
proxy.createThread("worker2", workerMain)
111+
proxy.createThread("worker3", workerMain)
112+
113+
block:
114+
# worker2 pre-cache worker1 channel, while worker3 not
115+
let canPing21 = await proxy.ask("worker2", "pingTo", %"worker1")
116+
assert canPing21.getBool()
117+
118+
await proxy.send("worker1", "stop")
119+
while proxy.isThreadRunning("worker1"):
120+
discard
121+
await proxy.cleanThreads()
122+
123+
block:
124+
let canPing21 = await proxy.ask("worker2", "pingTo", %"worker1")
125+
assert not canPing21.getBool()
126+
let canPing31 = await proxy.ask("worker3", "pingTo", %"worker1")
127+
assert not canPing31.getBool()
128+
129+
waitFor run()
130+
95131

96-
97-

0 commit comments

Comments
 (0)