Skip to content

Commit 1521d02

Browse files
authored
add graceful shutdown to http servers (#1728)
1 parent 2ad6c1b commit 1521d02

File tree

4 files changed

+94
-51
lines changed

4 files changed

+94
-51
lines changed

.changeset/sweet-donuts-bet.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"effect": patch
3+
---
4+
5+
add graceful shutdown to http servers

packages/effect/src/unstable/http/HttpServer.ts

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export class HttpServer extends ServiceMap.Service<HttpServer, {
3636
Exclude<R, HttpServerRequest> | Scope.Scope
3737
>
3838
}
39+
3940
readonly address: Address
4041
}>()("effect/http/HttpServer") {}
4142

@@ -115,10 +116,7 @@ export const serve: {
115116
HttpServer | Exclude<Effect.Services<App>, HttpServerRequest | Scope.Scope>
116117
> =>
117118
Layer.effectDiscard(
118-
Effect.flatMap(
119-
HttpServer.asEffect(),
120-
(server) => server.serve(effect, middleware!)
121-
)
119+
HttpServer.use((server) => server.serve(effect, middleware!))
122120
) as any)
123121

124122
/**
@@ -156,11 +154,7 @@ export const serveEffect: {
156154
void,
157155
never,
158156
Scope.Scope | HttpServer | Exclude<Effect.Services<App>, HttpServerRequest>
159-
> =>
160-
Effect.flatMap(
161-
HttpServer.asEffect(),
162-
(server) => server.serve(effect, middleware!)
163-
) as any)
157+
> => HttpServer.use((server) => server.serve(effect, middleware!)) as any)
164158

165159
/**
166160
* @since 4.0.0

packages/platform-bun/src/BunHttpServer.ts

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import type { Server as BunServer, ServerWebSocket } from "bun"
55
import * as Config from "effect/Config"
66
import type { ConfigError } from "effect/Config"
77
import * as Deferred from "effect/Deferred"
8+
import * as Duration from "effect/Duration"
89
import * as Effect from "effect/Effect"
910
import * as Exit from "effect/Exit"
1011
import * as Fiber from "effect/Fiber"
@@ -17,7 +18,7 @@ import * as Option from "effect/Option"
1718
import type * as Path from "effect/Path"
1819
import type * as Record from "effect/Record"
1920
import type * as Schema from "effect/Schema"
20-
import type * as Scope from "effect/Scope"
21+
import * as Scope from "effect/Scope"
2122
import * as Semaphore from "effect/Semaphore"
2223
import * as ServiceMap from "effect/ServiceMap"
2324
import * as Stream from "effect/Stream"
@@ -59,8 +60,12 @@ export type ServeOptions<R extends string> =
5960
*/
6061
export const make = Effect.fnUntraced(
6162
function*<R extends string>(
62-
options: ServeOptions<R>
63+
options: ServeOptions<R> & {
64+
readonly disablePreemptiveShutdown?: boolean | undefined
65+
readonly gracefulShutdownTimeout?: Duration.Input | undefined
66+
}
6367
) {
68+
const scope = yield* Effect.scope
6469
const handlerStack: Array<(request: Request, server: BunServer<WebSocketContext>) => Response | Promise<Response>> =
6570
[
6671
function(_request, _server) {
@@ -92,14 +97,24 @@ export const make = Effect.fnUntraced(
9297
}
9398
})
9499

95-
yield* Effect.addFinalizer(() => Effect.promise(() => server.stop()))
100+
const shutdown = yield* Effect.promise(() => server.stop()).pipe(
101+
Effect.cached
102+
)
103+
const preemptiveShutdown = options.disablePreemptiveShutdown ? Effect.void : Effect.timeoutOrElse(shutdown, {
104+
duration: options.gracefulShutdownTimeout ?? Duration.seconds(20),
105+
onTimeout: () => Effect.void
106+
})
107+
108+
yield* Scope.addFinalizer(scope, shutdown)
96109

97110
return Server.make({
98111
address: { _tag: "TcpAddress", port: server.port!, hostname: server.hostname! },
99112
serve: Effect.fnUntraced(function*(httpApp, middleware) {
100113
const parent = yield* Effect.fiber
101-
const scope = yield* Effect.scope
102114
const services = parent.services
115+
const serveScope = ServiceMap.getUnsafe(services, Scope.Scope)
116+
const scope = Scope.forkUnsafe(serveScope, "parallel")
117+
103118
const httpEffect = HttpEffect.toHandled(httpApp, (request, response) =>
104119
Effect.sync(() => {
105120
;(request as BunServerRequest).resolve(makeResponse(request, response, services, scope))
@@ -119,17 +134,13 @@ export const make = Effect.fnUntraced(
119134
})
120135
}
121136

122-
yield* Effect.acquireRelease(
123-
Effect.sync(() => {
124-
handlerStack.push(handler)
125-
server.reload({ fetch: handler })
126-
}),
127-
() =>
128-
Effect.sync(() => {
129-
handlerStack.pop()
130-
server.reload({ fetch: handlerStack[handlerStack.length - 1] })
131-
})
132-
)
137+
yield* Scope.addFinalizerExit(serveScope, () => {
138+
handlerStack.pop()
139+
server.reload({ fetch: handlerStack[handlerStack.length - 1] })
140+
return preemptiveShutdown
141+
})
142+
handlerStack.push(handler)
143+
server.reload({ fetch: handler })
133144
})
134145
})
135146
}
@@ -202,7 +213,10 @@ const makeResponse = (
202213
* @category Layers
203214
*/
204215
export const layerServer: <R extends string>(
205-
options: ServeOptions<R>
216+
options: ServeOptions<R> & {
217+
readonly disablePreemptiveShutdown?: boolean | undefined
218+
readonly gracefulShutdownTimeout?: Duration.Input | undefined
219+
}
206220
) => Layer.Layer<Server.HttpServer> = flow(make, Layer.effect(Server.HttpServer)) as any
207221

208222
/**
@@ -224,7 +238,10 @@ export const layerHttpServices: Layer.Layer<
224238
* @category Layers
225239
*/
226240
export const layer = <R extends string>(
227-
options: ServeOptions<R>
241+
options: ServeOptions<R> & {
242+
readonly disablePreemptiveShutdown?: boolean | undefined
243+
readonly gracefulShutdownTimeout?: Duration.Input | undefined
244+
}
228245
): Layer.Layer<
229246
| Server.HttpServer
230247
| HttpPlatform
@@ -250,7 +267,12 @@ export const layerTest: Layer.Layer<
250267
* @category Layers
251268
*/
252269
export const layerConfig = <R extends string>(
253-
options: Config.Wrap<ServeOptions<R>>
270+
options: Config.Wrap<
271+
ServeOptions<R> & {
272+
readonly disablePreemptiveShutdown?: boolean | undefined
273+
readonly gracefulShutdownTimeout?: Duration.Input | undefined
274+
}
275+
>
254276
): Layer.Layer<
255277
Server.HttpServer | HttpPlatform | FileSystem.FileSystem | Etag.Generator | Path.Path,
256278
ConfigError

packages/platform-node/src/NodeHttpServer.ts

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
import * as Cause from "effect/Cause"
55
import * as Config from "effect/Config"
6+
import * as Duration from "effect/Duration"
67
import * as Effect from "effect/Effect"
78
import * as Fiber from "effect/Fiber"
89
import type * as FileSystem from "effect/FileSystem"
@@ -56,25 +57,35 @@ import { NodeWS } from "./NodeSocket.ts"
5657
*/
5758
export const make = Effect.fnUntraced(function*(
5859
evaluate: LazyArg<Http.Server>,
59-
options: Net.ListenOptions
60+
options: Net.ListenOptions & {
61+
readonly disablePreemptiveShutdown?: boolean | undefined
62+
readonly gracefulShutdownTimeout?: Duration.Input | undefined
63+
}
6064
) {
6165
const scope = yield* Effect.scope
6266
const server = evaluate()
63-
yield* Scope.addFinalizer(
64-
scope,
65-
Effect.callback<void>((resume) => {
66-
if (!server.listening) {
67-
return resume(Effect.void)
67+
68+
const shutdown = yield* Effect.callback<void>((resume) => {
69+
if (!server.listening) {
70+
return resume(Effect.void)
71+
}
72+
server.close((error) => {
73+
if (error) {
74+
resume(Effect.die(error))
75+
} else {
76+
resume(Effect.void)
6877
}
69-
server.close((error) => {
70-
if (error) {
71-
resume(Effect.die(error))
72-
} else {
73-
resume(Effect.void)
74-
}
75-
})
7678
})
77-
)
79+
}).pipe(Effect.cached)
80+
81+
const preemptiveShutdown = options.disablePreemptiveShutdown ?
82+
Effect.void :
83+
Effect.timeoutOrElse(shutdown, {
84+
duration: options.gracefulShutdownTimeout ?? Duration.seconds(20),
85+
onTimeout: () => Effect.void
86+
})
87+
88+
yield* Scope.addFinalizer(scope, shutdown)
7889

7990
yield* Effect.callback<void, ServeError>((resume) => {
8091
function onError(cause: Error) {
@@ -112,7 +123,8 @@ export const make = Effect.fnUntraced(function*(
112123
port: address.port
113124
},
114125
serve: Effect.fnUntraced(function*(httpApp, middleware) {
115-
const scope = yield* Effect.scope
126+
const serveScope = yield* Effect.scope
127+
const scope = Scope.forkUnsafe(serveScope, "parallel")
116128
const handler = yield* (makeHandler(httpApp, {
117129
middleware: middleware as any,
118130
scope
@@ -121,12 +133,11 @@ export const make = Effect.fnUntraced(function*(
121133
middleware: middleware as any,
122134
scope
123135
})
124-
yield* Effect.addFinalizer(() =>
125-
Effect.sync(() => {
126-
server.off("request", handler)
127-
server.off("upgrade", upgradeHandler)
128-
})
129-
)
136+
yield* Scope.addFinalizerExit(serveScope, () => {
137+
server.off("request", handler)
138+
server.off("upgrade", upgradeHandler)
139+
return preemptiveShutdown
140+
})
130141
server.on("request", handler)
131142
server.on("upgrade", upgradeHandler)
132143
})
@@ -360,7 +371,10 @@ class ServerRequestImpl extends NodeHttpIncomingMessage<HttpServerError> impleme
360371
*/
361372
export const layerServer: (
362373
evaluate: LazyArg<Http.Server<typeof Http.IncomingMessage, typeof Http.ServerResponse>>,
363-
options: Net.ListenOptions
374+
options: Net.ListenOptions & {
375+
readonly disablePreemptiveShutdown?: boolean | undefined
376+
readonly gracefulShutdownTimeout?: Duration.Input | undefined
377+
}
364378
) => Layer.Layer<HttpServer.HttpServer, ServeError> = flow(make, Layer.effect(HttpServer.HttpServer))
365379

366380
/**
@@ -381,7 +395,10 @@ export const layerHttpServices: Layer.Layer<
381395
*/
382396
export const layer = (
383397
evaluate: LazyArg<Http.Server>,
384-
options: Net.ListenOptions
398+
options: Net.ListenOptions & {
399+
readonly disablePreemptiveShutdown?: boolean | undefined
400+
readonly gracefulShutdownTimeout?: Duration.Input | undefined
401+
}
385402
): Layer.Layer<
386403
HttpServer.HttpServer | NodeServices.NodeServices | HttpPlatform.HttpPlatform | Etag.Generator,
387404
ServeError
@@ -397,7 +414,12 @@ export const layer = (
397414
*/
398415
export const layerConfig = (
399416
evaluate: LazyArg<Http.Server>,
400-
options: Config.Wrap<Net.ListenOptions>
417+
options: Config.Wrap<
418+
Net.ListenOptions & {
419+
readonly disablePreemptiveShutdown?: boolean | undefined
420+
readonly gracefulShutdownTimeout?: Duration.Input | undefined
421+
}
422+
>
401423
): Layer.Layer<
402424
HttpServer.HttpServer | FileSystem.FileSystem | Path.Path | HttpPlatform.HttpPlatform | Etag.Generator,
403425
ServeError | Config.ConfigError

0 commit comments

Comments
 (0)