Skip to content
This repository was archived by the owner on Apr 23, 2023. It is now read-only.

Commit eae717c

Browse files
author
Jack Tang
committed
added running check in poll(), added channelMaxItems
1 parent aaf876d commit eae717c

File tree

1 file changed

+39
-18
lines changed

1 file changed

+39
-18
lines changed

src/threadproxy.nim

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,31 @@ type
1111
threadName: string
1212

1313
ReceiverNotFoundError* = object of ThreadProxyError
14-
## raised whenever thread with name not found
14+
## Raised whenever thread with name not found
1515
sender: string
1616
receiver: string
1717

1818
MessageUndeliveredError* = object of ThreadProxyError
19-
## raised when message cannot be send to channel
19+
## Raised when message cannot be send to channel
2020
kind: ThreadMessageKind
2121
action: string
2222
sender: string
2323
data: JsonNode
2424

2525
ActionConflictError* = object of ThreadProxyError
26-
## raised when registering action with non-unique name
26+
## Raised when registering action with non-unique name
2727
threadName: string
2828
action: string
2929

3030
NameConflictError* = object of ThreadProxyError
31-
## raise when creating thread with non-unique name
31+
## Raise when creating thread with non-unique name
3232
threadName: string
3333

34-
ThreadMessageKind* = enum
34+
PollConflictError* = object of ThreadProxyError
35+
## Raise when poll() is called while ThreadProxy is running
36+
threadName: string
37+
38+
ThreadMessageKind = enum
3539
EMIT, REQUEST, REPLY, SYS
3640

3741
SysMsg = enum
@@ -85,10 +89,10 @@ proc finalize(ch: ThreadChannelWrapper) =
8589
ch.self[].close()
8690
deallocShared(ch.self)
8791

88-
proc newThreadChannel(): ThreadChannelWrapper =
92+
proc newThreadChannel(channelMaxItems: int): ThreadChannelWrapper =
8993
result.new(finalize)
9094
result.self = cast[ThreadChannelPtr](allocShared0(sizeof(ThreadChannel)))
91-
result.self[].open()
95+
result.self[].open(channelMaxItems)
9296

9397
proc finalize(th: ThreadWrapper) =
9498
dealloc(th.self)
@@ -102,6 +106,7 @@ proc defaultAction(action: string, data: JsonNode): Future[JsonNode] {.async.} =
102106
return nil
103107

104108
proc newThreadProxy*(token: ThreadToken): ThreadProxy =
109+
## Create a new ThreadProxy
105110
ThreadProxy(
106111
name: token.name,
107112
mainChannel: token.mainChannel,
@@ -112,8 +117,9 @@ proc newThreadProxy*(token: ThreadToken): ThreadProxy =
112117
defaultAction: defaultAction
113118
)
114119

115-
proc newMainThreadProxy*(name: string): MainThreadProxy =
116-
let ch = newThreadChannel()
120+
proc newMainThreadProxy*(name: string, channelMaxItems: int = 0): MainThreadProxy =
121+
## Create a new MainThreadProxy
122+
let ch = newThreadChannel(channelMaxItems)
117123
var channels = initTable[string, ThreadChannelWrapper]()
118124
channels[name] = ch
119125
MainThreadProxy(
@@ -127,7 +133,7 @@ proc newMainThreadProxy*(name: string): MainThreadProxy =
127133
channels: channels
128134
)
129135

130-
proc newMessage*(
136+
proc newMessage(
131137
proxy: ThreadProxy,
132138
kind: ThreadMessageKind,
133139
action: string,
@@ -142,7 +148,7 @@ proc newMessage*(
142148
callbackId: callbackId
143149
)
144150

145-
proc newSysMessage*(
151+
proc newSysMessage(
146152
action: string,
147153
channel: ThreadChannelPtr,
148154
data: JsonNode = nil,
@@ -162,7 +168,9 @@ proc isNameAvailable*(proxy: MainThreadProxy, name: string): bool {.inline.} = n
162168

163169
proc isMainThreadProxy(proxy: ThreadProxy): bool {.inline.} = proxy.channel == proxy.mainChannel
164170

165-
proc isRunning*(proxy: ThreadProxy): bool {.inline.} = proxy.active
171+
proc isRunning*(proxy: ThreadProxy): bool {.inline.} =
172+
## Check if `proxy` is running
173+
proxy.active
166174

167175
proc nextCallbackId(proxy: ThreadProxy): int {.inline.} =
168176
# start with 1, callbackId = 0 means no callbacks
@@ -183,6 +191,7 @@ proc onDefault*(proxy: ThreadProxy, handler: ThreadDefaultActionHandler) =
183191
proxy.defaultAction = handler
184192

185193
template onData*(proxy: ThreadProxy, action: string, body: untyped): void =
194+
## Template version of `on`
186195
proxy.on(
187196
action,
188197
proc(json: JsonNode): Future[JsonNode] {.gcsafe,async.} =
@@ -191,12 +200,15 @@ template onData*(proxy: ThreadProxy, action: string, body: untyped): void =
191200
)
192201

193202
template onDefaultData*(proxy: ThreadProxy, body: untyped) =
203+
## Template version of `onDefault`
194204
proxy.onDefault proc(a: string, j: JsonNode): Future[JsonNode] {.gcsafe,async.} =
195205
let action {.inject.} = a
196206
let data {.inject.} = j
197207
`body`
198208

199209
proc send*(proxy: ThreadProxy, target: ThreadChannelPtr, action: string, data: JsonNode): Future[void] =
210+
## Send `data` to `target` channel and then complete.
211+
## Raise MessageUndeliveredError if cannot put on to target channel.
200212
result = newFuture[void]("send")
201213
let sent = target[].trySend proxy.newMessage(EMIT, action, data)
202214
if sent:
@@ -210,6 +222,7 @@ proc send*(proxy: ThreadProxy, target: ThreadChannelPtr, action: string, data: J
210222
result.fail(err)
211223

212224
proc ask*(proxy: ThreadProxy, target: ThreadChannelPtr, action: string, data: JsonNode = nil): Future[JsonNode] =
225+
## Send `data` to `target` channel and then wait for reply
213226
let id = proxy.nextCallbackId()
214227
result = newFuture[JsonNode]("ask")
215228
proxy.jsonCallbacks[id] = result
@@ -227,6 +240,7 @@ proc getChannel*(proxy: ThreadProxy, name: string): Future[ThreadChannelPtr] =
227240
## Resolve name to channel
228241

229242
if proxy.isMainThreadProxy:
243+
# get from channels
230244
let mainProxy = cast[MainThreadProxy](proxy)
231245
let ch = mainProxy.channels.getOrDefault(name, nil)
232246
if not ch.isNil:
@@ -341,6 +355,8 @@ proc process*(proxy: ThreadProxy): bool =
341355
proc stop*(proxy: ThreadProxy) {.inline.} = proxy.active = false
342356

343357
proc poll*(proxy: ThreadProxy, interval: int = 16): Future[void] =
358+
## Start processing channel messages.
359+
## Raise PollConflictError if proxy is already running
344360
var future = newFuture[void]("poll")
345361
result = future
346362

@@ -356,10 +372,15 @@ proc poll*(proxy: ThreadProxy, interval: int = 16): Future[void] =
356372
else:
357373
future.complete()
358374

359-
proxy.active = true
360-
loop()
375+
if proxy.active:
376+
var err = newException(PollConflictError, "ThreadProxy is already started")
377+
err.threadName = proxy.name
378+
result.fail(err)
379+
else:
380+
proxy.active = true
381+
loop()
361382

362-
proc createToken*(proxy: MainThreadProxy, name: string): ThreadToken =
383+
proc createToken*(proxy: MainThreadProxy, name: string, channelMaxItems: int = 0): ThreadToken =
363384
## Create token for new threadproxy
364385

365386
# Check name availability
@@ -369,19 +390,19 @@ proc createToken*(proxy: MainThreadProxy, name: string): ThreadToken =
369390
raise err
370391

371392
# check new channel
372-
let ch = newThreadChannel()
393+
let ch = newThreadChannel(channelMaxItems)
373394
proxy.channels[name] = ch
374395
return ThreadToken(
375396
name: name,
376397
mainChannel: proxy.channel,
377398
channel: ch.self
378399
)
379400

380-
proc createThread*(proxy: MainThreadProxy, name: string, main: ThreadMainProc) =
401+
proc createThread*(proxy: MainThreadProxy, name: string, main: ThreadMainProc, channelMaxItems: int = 0) =
381402
## Create new thread managed by `proxy` with an unique `name`
382403

383404
# createToken will check name availability
384-
let token = proxy.createToken(name)
405+
let token = proxy.createToken(name, channelMaxItems)
385406

386407
# create thread wrapper to prevent GC
387408
let thread = newThread()

0 commit comments

Comments
 (0)