Skip to content

Commit 8f4c1f9

Browse files
skoshxtim-smart
andauthored
fix: flush devtools queued messages on teardown (#1825)
Co-authored-by: Tim Smart <hello@timsmart.co>
1 parent b6b81a9 commit 8f4c1f9

File tree

3 files changed

+46
-19
lines changed

3 files changed

+46
-19
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
"effect": patch
3+
---
4+
5+
Fix DevToolsClient not flushing final span events on teardown.
6+
7+
The stream consumer was `forkScoped`, causing it to be interrupted before
8+
it could drain remaining queue items. Replaced with `forkChild` and
9+
`Fiber.await` in the finalizer so the stream drains naturally after the
10+
queue is failed.

packages/effect/src/unstable/devtools/DevToolsClient.ts

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import * as Cause from "../../Cause.ts"
55
import * as Deferred from "../../Deferred.ts"
66
import * as Effect from "../../Effect.ts"
7+
import * as Fiber from "../../Fiber.ts"
78
import * as Layer from "../../Layer.ts"
89
import * as Metric from "../../Metric.ts"
910
import * as Queue from "../../Queue.ts"
@@ -23,9 +24,14 @@ const ResponseSchema = Schema.toCodecJson(DevToolsSchema.Response)
2324
* @since 4.0.0
2425
* @category tags
2526
*/
26-
export class DevToolsClient extends ServiceMap.Service<DevToolsClient, {
27-
readonly sendUnsafe: (_: DevToolsSchema.Span | DevToolsSchema.SpanEvent) => void
28-
}>()("effect/devtools/DevToolsClient") {}
27+
export class DevToolsClient extends ServiceMap.Service<
28+
DevToolsClient,
29+
{
30+
readonly sendUnsafe: (
31+
_: DevToolsSchema.Span | DevToolsSchema.SpanEvent
32+
) => void
33+
}
34+
>()("effect/devtools/DevToolsClient") {}
2935

3036
const makeEffect = Effect.gen(function*() {
3137
const socket = yield* Socket.Socket
@@ -37,7 +43,9 @@ const makeEffect = Effect.gen(function*() {
3743
Queue.offerUnsafe(requests, toMetricsSnapshot(services))
3844
})
3945

40-
const handleResponse = (response: DevToolsSchema.Response): Effect.Effect<void> => {
46+
const handleResponse = (
47+
response: DevToolsSchema.Response
48+
): Effect.Effect<void> => {
4149
switch (response._tag) {
4250
case "MetricsRequest": {
4351
return offerMetricsSnapshot
@@ -48,7 +56,7 @@ const makeEffect = Effect.gen(function*() {
4856
}
4957
}
5058

51-
yield* Stream.fromQueue(requests).pipe(
59+
const fiber = yield* Stream.fromQueue(requests).pipe(
5260
Stream.pipeThroughChannel(
5361
Ndjson.duplexSchemaString(Socket.toChannelString(socket), {
5462
inputSchema: RequestSchema,
@@ -57,12 +65,15 @@ const makeEffect = Effect.gen(function*() {
5765
),
5866
Stream.onFirst(() => Deferred.completeWith(connected, Effect.void)),
5967
Stream.runForEach(handleResponse),
60-
Effect.forkScoped
68+
Effect.forkDetach
6169
)
6270

6371
yield* Effect.addFinalizer(() =>
6472
offerMetricsSnapshot.pipe(
65-
Effect.andThen(Effect.flatMap(Effect.fiberId, (id) => Queue.failCause(requests, Cause.interrupt(id))))
73+
Effect.andThen(
74+
Effect.flatMap(Effect.fiberId, (id) => Queue.failCause(requests, Cause.interrupt(id)))
75+
),
76+
Effect.andThen(Fiber.await(fiber))
6677
)
6778
)
6879

@@ -84,7 +95,9 @@ const makeEffect = Effect.gen(function*() {
8495
})
8596
})
8697

87-
const toMetricsSnapshot = (services: ServiceMap.ServiceMap<never>): DevToolsSchema.MetricsSnapshot => ({
98+
const toMetricsSnapshot = (
99+
services: ServiceMap.ServiceMap<never>
100+
): DevToolsSchema.MetricsSnapshot => ({
88101
_tag: "MetricsSnapshot",
89102
metrics: Metric.snapshotUnsafe(services)
90103
})
@@ -108,11 +121,7 @@ export const make: Effect.Effect<
108121
* @since 4.0.0
109122
* @category layers
110123
*/
111-
export const layer: Layer.Layer<
112-
DevToolsClient,
113-
never,
114-
Socket.Socket
115-
> = Layer.effect(DevToolsClient, make)
124+
export const layer: Layer.Layer<DevToolsClient, never, Socket.Socket> = Layer.effect(DevToolsClient, make)
116125

117126
const makeTracerEffect = Effect.gen(function*() {
118127
const client = yield* DevToolsClient
@@ -162,10 +171,6 @@ export const makeTracer: Effect.Effect<Tracer.Tracer, never, DevToolsClient> = m
162171
* @since 4.0.0
163172
* @category layers
164173
*/
165-
export const layerTracer: Layer.Layer<
166-
never,
167-
never,
168-
Socket.Socket
169-
> = Layer.effect(Tracer.Tracer, makeTracer).pipe(
174+
export const layerTracer: Layer.Layer<never, never, Socket.Socket> = Layer.effect(Tracer.Tracer, makeTracer).pipe(
170175
Layer.provide(layer)
171176
)

packages/effect/src/unstable/devtools/DevToolsSchema.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
/**
22
* @since 4.0.0
33
*/
4+
import * as Exit from "../../Exit.ts"
5+
import { identity } from "../../Function.ts"
46
import type * as Option from "../../Option.ts"
57
import * as Schema from "../../Schema.ts"
8+
import * as SchemaTransformation from "../../SchemaTransformation.ts"
69

710
/**
811
* @since 4.0.0
@@ -26,7 +29,16 @@ export type SpanStatusStarted = Schema.Schema.Type<typeof SpanStatusStarted>
2629
export const SpanStatusEnded = Schema.Struct({
2730
_tag: Schema.tag("Ended"),
2831
startTime: Schema.BigInt,
29-
endTime: Schema.BigInt
32+
endTime: Schema.BigInt,
33+
exit: Schema.Exit(Schema.Void, Schema.DefectWithStack, Schema.DefectWithStack).pipe(
34+
Schema.decodeTo(
35+
Schema.Exit(Schema.Unknown, Schema.Unknown, Schema.Unknown),
36+
SchemaTransformation.transform({
37+
decode: identity,
38+
encode: Exit.asVoid
39+
})
40+
)
41+
)
3042
})
3143

3244
/**

0 commit comments

Comments
 (0)