1
+ import core.time : msecs;
2
+ import std.parallelism : totalCPUs;
3
+ import std.process : environment;
4
+
5
+ import juptune.core.util ,
6
+ juptune.core.ds ,
7
+ juptune.event,
8
+ juptune.event.fiber,
9
+ juptune.http;
10
+
11
+ import tests.common : log;
12
+ import tests.plaintext;
13
+
14
+ /+ +++ Constant config ++++/
15
+
16
+ enum SOCKET_BACKLOG_PER_THREAD = 1000 ;
17
+ enum FIBER_CALL_STACK_BYTES = 1024 * 100 ;
18
+ enum HTTP_READ_BUFFER_BYTES = 1024 ;
19
+ enum HTTP_WRITE_BUFFER_BYTES = 1024 ;
20
+
21
+ enum HTTP_CONFIG = Http1Config()
22
+ .withReadTimeout(1000. msecs)
23
+ .withWriteTimeout(1000. msecs);
24
+
25
+ static assert (
26
+ HTTP_READ_BUFFER_BYTES + HTTP_WRITE_BUFFER_BYTES < FIBER_CALL_STACK_BYTES / 4 ,
27
+ " To be safe, please ensure the buffer bytes are only a quarter the size of a fiber call stack."
28
+ );
29
+
30
+ /+ +++ Globals ++++/
31
+
32
+ __gshared TcpSocket server; // Currently there's no mechanism to directly pass data to new threads, so global state has to be used.
33
+
34
+ /+ +++ Functions ++++/
35
+
36
+ void main ()
37
+ {
38
+ auto loop = EventLoop(
39
+ EventLoopConfig()
40
+ .withFiberAllocatorConfig(
41
+ FiberAllocatorConfig()
42
+ .withBlockStackSize(FIBER_CALL_STACK_BYTES )
43
+ )
44
+ );
45
+
46
+ // open() and listen() can't be ran outside of an event loop thread, so currently this is the janky way to setup the server.
47
+ loop.addNoGCThread(() @nogc nothrow {
48
+ server.open().resultAssert;
49
+ server.listen(" 0.0.0.0:8080" , SOCKET_BACKLOG_PER_THREAD * totalCPUs).resultAssert;
50
+ juptuneEventLoopCancelThread();
51
+ });
52
+ loop.join();
53
+
54
+ // Then we can setup the proper loop threads.
55
+ foreach (i; 0 .. totalCPUs)
56
+ loop.addGCThread(&router);
57
+ loop.join();
58
+ }
59
+
60
+ // Juptune currently does not provide higher-level server features out of the box, so we have
61
+ // to hand-make a custom router.
62
+ //
63
+ // This is realistic in the sense that building a custom router is a completely valid, supported pattern
64
+ // for people who want/need something very specialised.
65
+ //
66
+ // This is unrealistic in the sense that once Juptune has a native router, the native router would
67
+ // almost certainly be used in a case like this (but since that's a TODO, this will have to do for now).
68
+ void router () nothrow
69
+ {
70
+ try
71
+ {
72
+ enum Route
73
+ {
74
+ FAILSAFE ,
75
+ plaintext,
76
+ }
77
+
78
+ enum Method
79
+ {
80
+ FAILSAFE ,
81
+ get
82
+ }
83
+
84
+ union RouteInput
85
+ {
86
+ PlainTextHeaderInput plaintext;
87
+ }
88
+
89
+ while (! juptuneEventLoopIsThreadCanceled())
90
+ {
91
+ TcpSocket client;
92
+
93
+ auto result = server.accept(client);
94
+ if (result.isError)
95
+ {
96
+ log(" error accepting socket: " , result);
97
+ continue ;
98
+ }
99
+
100
+ result = async(function () nothrow {
101
+ auto client = juptuneEventLoopGetContext! TcpSocket ;
102
+ scope (exit) if (client.isOpen)
103
+ auto _ = client.close();
104
+
105
+ Http1MessageSummary readSummary, writeSummary;
106
+ do
107
+ {
108
+ if (! client.isOpen)
109
+ return ;
110
+
111
+ // Read & Write primitives
112
+ ubyte [HTTP_READ_BUFFER_BYTES ] readBuffer;
113
+ ubyte [HTTP_WRITE_BUFFER_BYTES ] writeBuffer;
114
+ auto reader = Http1Reader(client, readBuffer, HTTP_CONFIG );
115
+ auto writer = Http1Writer(client, writeBuffer, HTTP_CONFIG );
116
+
117
+ // Routing state
118
+ Route route;
119
+ Method method;
120
+ RouteInput input;
121
+
122
+ // Error handling
123
+ uint errorCode;
124
+ string errorMsg;
125
+ void setError (uint code, string msg)
126
+ {
127
+ if (errorMsg ! is null )
128
+ return ;
129
+ errorCode = code;
130
+ errorMsg = msg;
131
+ }
132
+
133
+ // Parse request line
134
+ {
135
+ Http1RequestLine requestLine;
136
+ auto result = reader.readRequestLine(requestLine);
137
+ if (result.isError)
138
+ {
139
+ log(" readRequestLine() failed: " , result.error, " : " , result.context.slice);
140
+ return ;
141
+ }
142
+
143
+ requestLine.access((scope methodString, scope uri){
144
+ switch (methodString)
145
+ {
146
+ case " GET" :
147
+ method = Method.get ;
148
+ break ;
149
+
150
+ default :
151
+ setError(405 , " Unexpected method" );
152
+ break ;
153
+ }
154
+
155
+ switch (uri.path)
156
+ {
157
+ case " /plaintext" :
158
+ route = Route.plaintext;
159
+ break ;
160
+
161
+ default :
162
+ setError(404 , " Not found" );
163
+ break ;
164
+ }
165
+ });
166
+ }
167
+
168
+ // Read headers
169
+ bool foundEndOfHeaders;
170
+ while (! foundEndOfHeaders)
171
+ {
172
+ auto result = reader.checkEndOfHeaders(foundEndOfHeaders);
173
+ if (result.isError)
174
+ {
175
+ log(" checkEndOfHeaders() failed: " , result);
176
+ return ;
177
+ }
178
+ else if (foundEndOfHeaders)
179
+ break ;
180
+
181
+ Http1Header header;
182
+ result = reader.readHeader(header);
183
+ if (result.isError)
184
+ {
185
+ log(" readHeader() failed: " , result);
186
+ return ;
187
+ }
188
+
189
+ // Since we're using a custom router, we have the luxury of handling/ignoring headers during routing rather
190
+ // than stuffing them all into a hashmap, and doing the processing post-routing.
191
+ header.access((scope name, scope value){
192
+ final switch (route) with (Route)
193
+ {
194
+ case FAILSAFE : break ;
195
+
196
+ case plaintext:
197
+ break ;
198
+ }
199
+ });
200
+ }
201
+
202
+ // Read body
203
+ Http1BodyChunk chunk;
204
+ do {
205
+ chunk = Http1BodyChunk();
206
+ auto result = reader.readBody(chunk);
207
+ if (result.isError)
208
+ {
209
+ log(" readBody() failed: " , result);
210
+ return ;
211
+ }
212
+
213
+ // Likewise, we only need to deal with body data in certain routes, so we can ignore them in others.
214
+ chunk.access((scope data){
215
+ final switch (route) with (Route)
216
+ {
217
+ case FAILSAFE : break ;
218
+
219
+ case plaintext:
220
+ break ;
221
+ }
222
+ });
223
+ } while (chunk.hasDataLeft);
224
+
225
+ // Finish reading the message, and either dispatch it to a handler, or report an error back.
226
+ auto result = reader.finishMessage(readSummary);
227
+ if (result.isError)
228
+ {
229
+ log(" finishMessage() failed: " , result);
230
+ return ;
231
+ }
232
+
233
+ if (errorMsg ! is null )
234
+ {
235
+ import tests.common : putServerAndDate;
236
+ result = writer.putResponseLine(Http1Version.http11, errorCode, errorMsg).then! (
237
+ () => writer.putServerAndDate(),
238
+ () => writer.finishHeaders(),
239
+ () => writer.finishBody(),
240
+ () => writer.finishTrailers(),
241
+ () => writer.finishMessage(writeSummary),
242
+ );
243
+ if (result.isError)
244
+ {
245
+ log(" finishing a message [error variant] failed: " , result);
246
+ return ;
247
+ }
248
+ continue ;
249
+ }
250
+
251
+ final switch (route) with (Route)
252
+ {
253
+ case FAILSAFE : break ;
254
+
255
+ case plaintext:
256
+ handlePlainText(input.plaintext, writer, writeSummary);
257
+ break ;
258
+ }
259
+ } while (! readSummary.connectionClosed && ! writeSummary.connectionClosed);
260
+ }, client, &asyncMoveSetter! TcpSocket );
261
+ if (result.isError)
262
+ {
263
+ log(" error calling async(): " , result);
264
+ continue ;
265
+ }
266
+ }
267
+ }
268
+ catch (Throwable ex) // @suppress(dscanner.suspicious.catch_em_all)
269
+ {
270
+ import std.exception : assumeWontThrow;
271
+ log(" uncaught exception: " , ex.msg).assumeWontThrow;
272
+ debug log(ex.info).assumeWontThrow;
273
+ }
274
+ }
0 commit comments