@@ -19,63 +19,6 @@ const heap_d = UInt32(8)
1919const heaps = [Vector {taskheap} (undef, 0 ), Vector {taskheap} (undef, 0 )]
2020const heaps_lock = [SpinLock (), SpinLock ()]
2121
22-
23- """
24- cong(max::UInt32)
25-
26- Return a random UInt32 in the range `1:max` except if max is 0, in that case return 0.
27- """
28- cong (max:: UInt32 ) = iszero (max) ? UInt32 (0 ) : rand_ptls (max) + UInt32 (1 ) # TODO : make sure users don't use 0 and remove this check
29-
30- get_ptls_rng () = ccall (:jl_get_ptls_rng , UInt64, ())
31-
32- set_ptls_rng (seed:: UInt64 ) = ccall (:jl_set_ptls_rng , Cvoid, (UInt64,), seed)
33-
34- """
35- rand_ptls(max::UInt32)
36-
37- Return a random UInt32 in the range `0:max-1` using the thread-local RNG
38- state. Max must be greater than 0.
39- """
40- Base. @assume_effects :removable :inaccessiblememonly :notaskstate function rand_ptls (max:: UInt32 )
41- rngseed = get_ptls_rng ()
42- val, seed = rand_uniform_max_int32 (max, rngseed)
43- set_ptls_rng (seed)
44- return val % UInt32
45- end
46-
47- # This implementation is based on OpenSSLs implementation of rand_uniform
48- # https://github.com/openssl/openssl/blob/1d2cbd9b5a126189d5e9bc78a3bdb9709427d02b/crypto/rand/rand_uniform.c#L13-L99
49- # Comments are vendored from their implementation as well.
50- # For the original developer check the PR to swift https://github.com/apple/swift/pull/39143.
51-
52- # Essentially it boils down to incrementally generating a fixed point
53- # number on the interval [0, 1) and multiplying this number by the upper
54- # range limit. Once it is certain what the fractional part contributes to
55- # the integral part of the product, the algorithm has produced a definitive
56- # result.
57- """
58- rand_uniform_max_int32(max::UInt32, seed::UInt64)
59-
60- Return a random UInt32 in the range `0:max-1` using the given seed.
61- Max must be greater than 0.
62- """
63- Base. @assume_effects :total function rand_uniform_max_int32 (max:: UInt32 , seed:: UInt64 )
64- if max == UInt32 (1 )
65- return UInt32 (0 ), seed
66- end
67- # We are generating a fixed point number on the interval [0, 1).
68- # Multiplying this by the range gives us a number on [0, upper).
69- # The high word of the multiplication result represents the integral part
70- # This is not completely unbiased as it's missing the fractional part of the original implementation but it's good enough for our purposes
71- seed = UInt64 (69069 ) * seed + UInt64 (362437 )
72- prod = (UInt64 (max)) * (seed % UInt32) # 64 bit product
73- i = prod >> 32 % UInt32 # integral part
74- return i % UInt32, seed
75- end
76-
77-
78-
7922function multiq_sift_up (heap:: taskheap , idx:: Int32 )
8023 while idx > Int32 (1 )
8124 parent = (idx - Int32 (2 )) ÷ heap_d + Int32 (1 )
@@ -147,10 +90,10 @@ function multiq_insert(task::Task, priority::UInt16)
14790
14891 task. priority = priority
14992
150- rn = cong (heap_p)
93+ rn = Base . Scheduler . cong (heap_p)
15194 tpheaps = heaps[tp]
15295 while ! trylock (tpheaps[rn]. lock)
153- rn = cong (heap_p)
96+ rn = Base . Scheduler . cong (heap_p)
15497 end
15598
15699 heap = tpheaps[rn]
@@ -190,8 +133,8 @@ function multiq_deletemin()
190133 if i == heap_p
191134 return nothing
192135 end
193- rn1 = cong (heap_p)
194- rn2 = cong (heap_p)
136+ rn1 = Base . Scheduler . cong (heap_p)
137+ rn2 = Base . Scheduler . cong (heap_p)
195138 prio1 = tpheaps[rn1]. priority
196139 prio2 = tpheaps[rn2]. priority
197140 if prio1 > prio2
@@ -211,7 +154,21 @@ function multiq_deletemin()
211154 heap = tpheaps[rn1]
212155 task = heap. tasks[1 ]
213156 if ccall (:jl_set_task_tid , Cint, (Any, Cint), task, tid- 1 ) == 0
157+ # This task is stuck to a thread that's likely sleeping, move the task to it's private queue and wake it up
158+ # We move this out of the queue to avoid spinning on it
159+ ntasks = heap. ntasks
160+ @atomic :monotonic heap. ntasks = ntasks - Int32 (1 )
161+ heap. tasks[1 ] = heap. tasks[ntasks]
162+ Base. _unsetindex! (heap. tasks, Int (ntasks))
163+ prio1 = typemax (UInt16)
164+ if ntasks > 1
165+ multiq_sift_down (heap, Int32 (1 ))
166+ prio1 = heap. tasks[1 ]. priority
167+ end
168+ @atomic :monotonic heap. priority = prio1
169+ push! (Base. workqueue_for (tid), t)
214170 unlock (heap. lock)
171+ ccall (:jl_wakeup_thread , Cvoid, (Int16,), (tid - 1 ) % Int16)
215172 @goto retry
216173 end
217174 ntasks = heap. ntasks
@@ -243,4 +200,9 @@ function multiq_check_empty()
243200 return true
244201end
245202
203+
204+ enqueue! (t:: Task ) = multiq_insert (t, t. priority)
205+ dequeue! () = multiq_deletemin ()
206+ checktaskempty () = multiq_check_empty ()
207+
246208end
0 commit comments