Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions bench/static_http/hello.d
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"name" : "hello",
"dependencies": {
"photon": { "path" : "../.." },
"photon-http": "0.5.5"
"photon-http": "0.5.6"
}
}
+/
Expand Down Expand Up @@ -34,27 +34,31 @@ void server_worker(Socket client) {
}

void server() {
Socket server = new TcpSocket();
server.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
server.bind(new InternetAddress("0.0.0.0", 8080));
server.listen(1000);

debug writeln("Started server");
try {
Socket server = new TcpSocket();
server.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
server.bind(new InternetAddress("0.0.0.0", 8080));
server.listen(1000);

void processClient(Socket client) {
go(() => server_worker(client));
}
debug writeln("Started server");

while(true) {
try {
debug writeln("Waiting for server.accept()");
Socket client = server.accept();
debug writeln("New client accepted");
processClient(client);
void processClient(Socket client) {
go(() => server_worker(client));
}
catch(Exception e) {
writefln("Failure to accept %s", e);

while(true) {
try {
debug writeln("Waiting for server.accept()");
Socket client = server.accept();
debug writeln("New client accepted");
processClient(client);
}
catch(Exception e) {
writefln("Failure to accept %s", e);
}
}
} catch (Exception e) {
stderr.writefln("Got exception while setting up the server: %s", e);
}
}

Expand Down
84 changes: 56 additions & 28 deletions src/photon/windows/core.d
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import std.concurrency;

import rewind.map;

import mecca.time_queue;

import photon.ds.common;
import photon.ds.intrusive_queue;
import photon.windows.support;
Expand Down Expand Up @@ -210,25 +212,12 @@ public auto semaphore(int count) {
return cast(shared)Semaphore(count);
}

extern(Windows) VOID timerCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_TIMER Timer) {
FiberExt fiber = cast(FiberExt)Context;
fiber.schedule(size_t.max, WAKE_TIMER);
}

///
struct Timer {
@trusted:
alias Callback = void delegate() @safe nothrow;
void wait(Duration dur) {
auto timer = CreateThreadpoolTimer(&timerCallback, cast(void*)currentFiber, &environ);
wenforce(timer != null, "Failed to create threadpool timer");
FILETIME time;
long hnsecs = -dur.total!"hnsecs";
time.dwHighDateTime = cast(DWORD)(hnsecs >> 32);
time.dwLowDateTime = hnsecs & 0xFFFF_FFFF;
SetThreadpoolTimer(timer, &time, 0, 0);
FiberExt.yield();
CloseThreadpoolTimer(timer);
delay(dur);
}

void stop() nothrow {}
Expand Down Expand Up @@ -282,6 +271,8 @@ enum int WAKE_TRIGGER = -1;
enum int WAKE_TIMER = -2;
enum int WAKE_JOIN = -3;

enum TIMER_NUM_BINS = 256;
enum TIMER_NUM_LEVELS = 4;

struct SchedulerBlock {
shared IntrusiveQueue!(FiberExt, RawEvent) queue;
Expand Down Expand Up @@ -339,6 +330,20 @@ class FiberExt : Fiber {
}
}

struct TimedFiber {
shared FiberExt* fiber;
TscTimePoint timePoint;
timeQueue.OwnerAttrType _owner;
TimedFiber* _next, _prev;

void schedule(size_t numSched) {
auto f = cast()steal(*fiber);
if (f) {
f.schedule(numSched, WAKE_TIMER);
}
}
}

shared SchedulerBlock[] scheds;

enum MAX_THREADPOOL_SIZE = 100;
Expand All @@ -348,6 +353,11 @@ __gshared PTP_POOL threadPool; // for synchronious syscalls
__gshared TP_CALLBACK_ENVIRON_V3 environ; // callback environment for the pool
shared int alive; // count of non-terminated Fibers scheduled

CascadingTimeQueue!(TimedFiber*, TIMER_NUM_BINS, TIMER_NUM_LEVELS, true) timeQueue; // thread-local

TimedFiber timerEntry(FiberExt* fiber, Duration delay) nothrow {
return TimedFiber(cast(shared)fiber, TscTimePoint.hardNow() + delay);
}

public void initPhoton() {
SYSTEM_INFO info;
Expand Down Expand Up @@ -432,11 +442,19 @@ package(photon) void schedulerEntry(size_t n)
// TODO: handle NUMA case
wenforce(SetThreadAffinityMask(GetCurrentThread(), 1L<<n), "failed to set affinity");
shared SchedulerBlock* sched = scheds.ptr + n;
timeQueue.open(100.usecs);
while (alive > 0) {
for(;;) {
TscTimePoint t;
for (;;) {
t = TscTimePoint.hardNow();
for (;;) {
TimedFiber* f = timeQueue.pop(t);
if (f == null) break;
f.schedule(n);
}
FiberExt f = sched.queue.drain();
if (f is null) break; // drained an empty queue, time to sleep
do {
if (f is null) break;
while (f) {
auto next = f.next; //save next, it will be reused on scheduling
currentFiber = f;
logf("Fiber %x started", cast(void*)f);
Expand All @@ -455,9 +473,9 @@ package(photon) void schedulerEntry(size_t n)
abort();
}
f = next;
} while(f !is null);
}
}
processEventsEntry(n);
processEventsEntry(n, timeQueue.timeTillNextEntry(t));
}
foreach (i; 0..scheds.length) {
notifyEventloop(i);
Expand All @@ -466,10 +484,11 @@ package(photon) void schedulerEntry(size_t n)

enum int MAX_COMPLETIONS = 500;

void processEventsEntry(size_t n) {
void processEventsEntry(size_t n, Duration wait) {
OVERLAPPED_ENTRY[MAX_COMPLETIONS] entries = void;
uint count = 0;
while(GetQueuedCompletionStatusEx(cast(HANDLE)scheds[n].iocp, entries.ptr, MAX_COMPLETIONS, &count, 0, FALSE)) {
uint ms = wait > 10.hours ? 0 : cast(uint) wait.total!"msecs";
while(GetQueuedCompletionStatusEx(cast(HANDLE)scheds[n].iocp, entries.ptr, MAX_COMPLETIONS, &count, ms, FALSE)) {
logf("Dequeued I/O events=%d", count);
foreach (e; entries[0..count]) {
if (e.lpCompletionKey == 0) {
Expand Down Expand Up @@ -501,8 +520,7 @@ void notifyEventloop(size_t n) nothrow {

extern(Windows) SOCKET socket(int af, int type, int protocol) {
logf("Intercepted socket!");
SOCKET s = WSASocketW(af, type, protocol, null, 0, WSA_FLAG_OVERLAPPED);
registerSocket(s);
SOCKET s = cast(SOCKET)WSASocketW(af, type, protocol, null, 0, WSA_FLAG_OVERLAPPED);
return s;
}

Expand All @@ -518,9 +536,7 @@ extern(Windows) VOID acceptJob(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PT
AcceptState* state = cast(AcceptState*)Context;
logf("Started threadpool job");
SOCKET resp = WSAAccept(state.socket, state.addr, state.addrlen, null, 0);
if (resp != INVALID_SOCKET) {
registerSocket(resp);
}
logf("Got accept response %s", resp);
state.socket = resp;
state.fiber.schedule(size_t.max, WAKE_TRIGGER);
}
Expand All @@ -532,6 +548,10 @@ extern(Windows) SOCKET accept(SOCKET s, sockaddr* addr, LPINT addrlen) {
state.addr = addr;
state.addrlen = addrlen;
state.fiber = currentFiber;
if (s !in ioWaiters) {
registerSocket(s);
}
ioWaiters[s] = currentFiber;
PTP_WORK work = CreateThreadpoolWork(&acceptJob, &state, &environ);
wenforce(work != null, "Failed to create work for threadpool");
SubmitThreadpoolWork(work);
Expand All @@ -548,6 +568,9 @@ void registerSocket(SOCKET s) {
extern(Windows) int recv(SOCKET s, void* buf, int len, int flags) {
OVERLAPPED overlapped;
WSABUF wsabuf = WSABUF(cast(uint)len, buf);
if (s !in ioWaiters) {
registerSocket(s);
}
ioWaiters[s] = currentFiber;
uint received = 0;
int ret = WSARecv(s, &wsabuf, 1, &received, cast(uint*)&flags, cast(LPWSAOVERLAPPED)&overlapped, null);
Expand All @@ -571,6 +594,9 @@ extern(Windows) int recv(SOCKET s, void* buf, int len, int flags) {
extern(Windows) int send(SOCKET s, void* buf, int len, int flags) {
OVERLAPPED overlapped;
WSABUF wsabuf = WSABUF(cast(uint)len, buf);
if (s !in ioWaiters) {
registerSocket(s);
}
ioWaiters[s] = currentFiber;
uint sent = 0;
int ret = WSASend(s, &wsabuf, 1, &sent, flags, cast(LPWSAOVERLAPPED)&overlapped, null);
Expand All @@ -594,8 +620,10 @@ extern(Windows) int send(SOCKET s, void* buf, int len, int flags) {

extern(Windows) void Sleep(DWORD dwMilliseconds) {
if (currentFiber !is null) {
auto tm = timer();
tm.wait(dwMilliseconds.msecs);
FiberExt fiber = currentFiber;
auto tm = timerEntry(&fiber, dwMilliseconds * 1.msecs);
timeQueue.insert(&tm);
FiberExt.yield();
} else {
SleepEx(dwMilliseconds, FALSE);
}
Expand Down
2 changes: 1 addition & 1 deletion src/photon/windows/support.d
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ extern(Windows) SOCKET WSASocketW(
int type,
int protocol,
void* lpProtocolInfo,
WORD g,
DWORD g,
DWORD dwFlags
) nothrow;

Expand Down
Loading