Skip to content

Commit 330a632

Browse files
Sleep with reactor's timer queue
1 parent c0f9a5a commit 330a632

File tree

1 file changed

+44
-23
lines changed

1 file changed

+44
-23
lines changed

src/photon/windows/core.d

Lines changed: 44 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import std.concurrency;
1818

1919
import rewind.map;
2020

21+
import mecca.time_queue;
22+
2123
import photon.ds.common;
2224
import photon.ds.intrusive_queue;
2325
import photon.windows.support;
@@ -210,25 +212,12 @@ public auto semaphore(int count) {
210212
return cast(shared)Semaphore(count);
211213
}
212214

213-
extern(Windows) VOID timerCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_TIMER Timer) {
214-
FiberExt fiber = cast(FiberExt)Context;
215-
fiber.schedule(size_t.max, WAKE_TIMER);
216-
}
217-
218215
///
219216
struct Timer {
220217
@trusted:
221218
alias Callback = void delegate() @safe nothrow;
222219
void wait(Duration dur) {
223-
auto timer = CreateThreadpoolTimer(&timerCallback, cast(void*)currentFiber, &environ);
224-
wenforce(timer != null, "Failed to create threadpool timer");
225-
FILETIME time;
226-
long hnsecs = -dur.total!"hnsecs";
227-
time.dwHighDateTime = cast(DWORD)(hnsecs >> 32);
228-
time.dwLowDateTime = hnsecs & 0xFFFF_FFFF;
229-
SetThreadpoolTimer(timer, &time, 0, 0);
230-
FiberExt.yield();
231-
CloseThreadpoolTimer(timer);
220+
delay(dur);
232221
}
233222

234223
void stop() nothrow {}
@@ -282,6 +271,8 @@ enum int WAKE_TRIGGER = -1;
282271
enum int WAKE_TIMER = -2;
283272
enum int WAKE_JOIN = -3;
284273

274+
enum TIMER_NUM_BINS = 256;
275+
enum TIMER_NUM_LEVELS = 4;
285276

286277
struct SchedulerBlock {
287278
shared IntrusiveQueue!(FiberExt, RawEvent) queue;
@@ -339,6 +330,20 @@ class FiberExt : Fiber {
339330
}
340331
}
341332

333+
struct TimedFiber {
334+
shared FiberExt* fiber;
335+
TscTimePoint timePoint;
336+
timeQueue.OwnerAttrType _owner;
337+
TimedFiber* _next, _prev;
338+
339+
void schedule(size_t numSched) {
340+
auto f = cast()steal(*fiber);
341+
if (f) {
342+
f.schedule(numSched, WAKE_TIMER);
343+
}
344+
}
345+
}
346+
342347
shared SchedulerBlock[] scheds;
343348

344349
enum MAX_THREADPOOL_SIZE = 100;
@@ -348,6 +353,11 @@ __gshared PTP_POOL threadPool; // for synchronious syscalls
348353
__gshared TP_CALLBACK_ENVIRON_V3 environ; // callback environment for the pool
349354
shared int alive; // count of non-terminated Fibers scheduled
350355

356+
CascadingTimeQueue!(TimedFiber*, TIMER_NUM_BINS, TIMER_NUM_LEVELS, true) timeQueue; // thread-local
357+
358+
TimedFiber timerEntry(FiberExt* fiber, Duration delay) nothrow {
359+
return TimedFiber(cast(shared)fiber, TscTimePoint.hardNow() + delay);
360+
}
351361

352362
public void initPhoton() {
353363
SYSTEM_INFO info;
@@ -432,11 +442,19 @@ package(photon) void schedulerEntry(size_t n)
432442
// TODO: handle NUMA case
433443
wenforce(SetThreadAffinityMask(GetCurrentThread(), 1L<<n), "failed to set affinity");
434444
shared SchedulerBlock* sched = scheds.ptr + n;
445+
timeQueue.open(100.usecs);
435446
while (alive > 0) {
436-
for(;;) {
447+
TscTimePoint t;
448+
for (;;) {
449+
t = TscTimePoint.hardNow();
450+
for (;;) {
451+
TimedFiber* f = timeQueue.pop(t);
452+
if (f == null) break;
453+
f.schedule(n);
454+
}
437455
FiberExt f = sched.queue.drain();
438-
if (f is null) break; // drained an empty queue, time to sleep
439-
do {
456+
if (f is null) break;
457+
while (f) {
440458
auto next = f.next; //save next, it will be reused on scheduling
441459
currentFiber = f;
442460
logf("Fiber %x started", cast(void*)f);
@@ -455,9 +473,9 @@ package(photon) void schedulerEntry(size_t n)
455473
abort();
456474
}
457475
f = next;
458-
} while(f !is null);
476+
}
459477
}
460-
processEventsEntry(n);
478+
processEventsEntry(n, timeQueue.timeTillNextEntry(t));
461479
}
462480
foreach (i; 0..scheds.length) {
463481
notifyEventloop(i);
@@ -466,10 +484,11 @@ package(photon) void schedulerEntry(size_t n)
466484

467485
enum int MAX_COMPLETIONS = 500;
468486

469-
void processEventsEntry(size_t n) {
487+
void processEventsEntry(size_t n, Duration wait) {
470488
OVERLAPPED_ENTRY[MAX_COMPLETIONS] entries = void;
471489
uint count = 0;
472-
while(GetQueuedCompletionStatusEx(cast(HANDLE)scheds[n].iocp, entries.ptr, MAX_COMPLETIONS, &count, 0, FALSE)) {
490+
uint ms = wait > 10.hours ? 0 : cast(uint) wait.total!"msecs";
491+
while(GetQueuedCompletionStatusEx(cast(HANDLE)scheds[n].iocp, entries.ptr, MAX_COMPLETIONS, &count, ms, FALSE)) {
473492
logf("Dequeued I/O events=%d", count);
474493
foreach (e; entries[0..count]) {
475494
if (e.lpCompletionKey == 0) {
@@ -601,8 +620,10 @@ extern(Windows) int send(SOCKET s, void* buf, int len, int flags) {
601620

602621
extern(Windows) void Sleep(DWORD dwMilliseconds) {
603622
if (currentFiber !is null) {
604-
auto tm = timer();
605-
tm.wait(dwMilliseconds.msecs);
623+
FiberExt fiber = currentFiber;
624+
auto tm = timerEntry(&fiber, dwMilliseconds * 1.msecs);
625+
timeQueue.insert(&tm);
626+
FiberExt.yield();
606627
} else {
607628
SleepEx(dwMilliseconds, FALSE);
608629
}

0 commit comments

Comments
 (0)