Skip to content

Commit a691520

Browse files
committed
add Atom tests
1 parent ff2a8fb commit a691520

File tree

11 files changed

+1672
-99
lines changed

11 files changed

+1672
-99
lines changed

packages/effect/src/Fiber.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
*
7373
* @since 2.0.0
7474
*/
75-
import type { NonEmptyArray } from "./Array.ts"
75+
import type * as Arr from "./Array.ts"
7676
import type { Effect } from "./Effect.ts"
7777
import type { Exit } from "./Exit.ts"
7878
import * as effect from "./internal/effect.ts"
@@ -258,10 +258,13 @@ export const join: <A, E>(self: Fiber<A, E>) => Effect<A, E> = effect.fiberJoin
258258
* @since 2.0.0
259259
* @category combinators
260260
*/
261-
export const joinAll: <A extends Fiber<any, any>>(
262-
self: Iterable<A>
261+
export const joinAll: <A extends Iterable<Fiber<any, any>>>(
262+
self: A
263263
) => Effect<
264-
NonEmptyArray<A extends Fiber<infer _A, infer _E> ? _A : never>,
264+
Arr.ReadonlyArray.With<
265+
A,
266+
A extends Iterable<Fiber<infer _A, infer _E>> ? _A : never
267+
>,
265268
A extends Fiber<infer _A, infer _E> ? _E : never
266269
> = effect.fiberJoinAll
267270

packages/effect/src/Layer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ export const sync: {
718718
* @category constructors
719719
*/
720720
export const syncServices = <A>(evaluate: LazyArg<ServiceMap.ServiceMap<A>>): Layer<A> =>
721-
fromBuildUnsafe(constant(internalEffect.sync(evaluate)))
721+
fromBuildMemo(constant(internalEffect.sync(evaluate)))
722722

723723
/**
724724
* Constructs a layer from the specified scoped effect.

packages/effect/src/ServiceMap.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export interface Service<in out Identifier, in out Shape>
5353
of(self: Shape): Shape
5454
serviceMap(self: Shape): ServiceMap<Identifier>
5555
use<A, E, R>(f: (service: Shape) => Effect<A, E, R>): Effect<A, E, R | Identifier>
56+
useSync<A>(f: (service: Shape) => A): Effect<A, never, Identifier>
5657

5758
readonly stack?: string | undefined
5859
readonly key: string
@@ -205,6 +206,9 @@ const ServiceProto: any = {
205206
},
206207
use<A, E, R>(this: Service<never, any>, f: (service: any) => Effect<A, E, R>): Effect<A, E, R> {
207208
return withFiber((fiber) => f(get(fiber.services, this)))
209+
},
210+
useSync<A>(this: Service<never, any>, f: (service: any) => A): Effect<A, never, never> {
211+
return withFiber((fiber) => exitSucceed(f(get(fiber.services, this))))
208212
}
209213
}
210214

packages/effect/src/Stream.ts

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1474,22 +1474,12 @@ export const tap: {
14741474
options?: {
14751475
readonly concurrency?: number | "unbounded" | undefined
14761476
} | undefined
1477-
): Stream<A, E | E2, R | R2> => {
1478-
const concurrency = options?.concurrency ?? 1
1479-
if (concurrency === 1 || concurrency === "unbounded") {
1480-
return self.channel.pipe(
1481-
Channel.tap(Effect.forEach(f, { discard: true, concurrency }), options),
1482-
fromChannel
1483-
)
1484-
}
1485-
return suspend(() => {
1486-
const withPermit = Effect.makeSemaphoreUnsafe(concurrency).withPermit
1487-
return self.channel.pipe(
1488-
Channel.tap(Effect.forEach((a) => withPermit(f(a)), { discard: true, concurrency }), options),
1489-
fromChannel
1490-
)
1491-
})
1492-
})
1477+
): Stream<A, E | E2, R | R2> =>
1478+
mapEffect(
1479+
self,
1480+
(a) => Effect.as(f(a), a),
1481+
options
1482+
))
14931483

14941484
/**
14951485
* @since 2.0.0

packages/effect/src/internal/effect.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -758,8 +758,8 @@ export const fiberJoin = <A, E>(self: Fiber.Fiber<A, E>): Effect.Effect<A, E> =>
758758
}
759759

760760
/** @internal */
761-
export const fiberJoinAll = <A extends Fiber.Fiber<any, any>>(self: Iterable<A>): Effect.Effect<
762-
Arr.NonEmptyArray<A extends Fiber.Fiber<infer _A, infer _E> ? _A : never>,
761+
export const fiberJoinAll = <A extends Iterable<Fiber.Fiber<any, any>>>(self: A): Effect.Effect<
762+
Arr.ReadonlyArray.With<A, A extends Iterable<Fiber.Fiber<infer _A, infer _E>> ? _A : never>,
763763
A extends Fiber.Fiber<infer _A, infer _E> ? _E : never
764764
> =>
765765
callback((resume) => {

packages/effect/src/unstable/http/HttpBody.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -214,15 +214,16 @@ export const text = (body: string, contentType?: string): Uint8Array =>
214214
* @since 4.0.0
215215
* @category constructors
216216
*/
217-
export const jsonUnsafe = (body: unknown): Uint8Array => text(JSON.stringify(body), "application/json")
217+
export const jsonUnsafe = (body: unknown, contentType?: string): Uint8Array =>
218+
text(JSON.stringify(body), contentType ?? "application/json")
218219

219220
/**
220221
* @since 4.0.0
221222
* @category constructors
222223
*/
223-
export const json = (body: unknown): Effect.Effect<Uint8Array, HttpBodyError> =>
224+
export const json = (body: unknown, contentType?: string): Effect.Effect<Uint8Array, HttpBodyError> =>
224225
Effect.try({
225-
try: () => text(JSON.stringify(body), "application/json"),
226+
try: () => text(JSON.stringify(body), contentType ?? "application/json"),
226227
catch: (cause) => new HttpBodyError({ reason: { _tag: "JsonError" }, cause })
227228
})
228229

@@ -235,19 +236,19 @@ export const jsonSchema = <S extends Schema.Schema<any>>(
235236
options?: ParseOptions | undefined
236237
) => {
237238
const encode = Parser.encodeUnknownEffect(Schema.toCodecJson(schema))
238-
return (body: S["Type"]): Effect.Effect<Uint8Array, HttpBodyError, S["EncodingServices"]> =>
239+
return (body: S["Type"], contentType?: string): Effect.Effect<Uint8Array, HttpBodyError, S["EncodingServices"]> =>
239240
encode(body, options).pipe(
240241
Effect.mapError((issue) => new HttpBodyError({ reason: { _tag: "SchemaError", issue }, cause: issue })),
241-
Effect.flatMap((body) => json(body))
242+
Effect.flatMap((body) => json(body, contentType))
242243
)
243244
}
244245

245246
/**
246247
* @since 4.0.0
247248
* @category constructors
248249
*/
249-
export const urlParams = (urlParams: UrlParams.UrlParams): Uint8Array =>
250-
text(UrlParams.toString(urlParams), "application/x-www-form-urlencoded")
250+
export const urlParams = (urlParams: UrlParams.UrlParams, contentType?: string): Uint8Array =>
251+
text(UrlParams.toString(urlParams), contentType ?? "application/x-www-form-urlencoded")
251252

252253
/**
253254
* @since 4.0.0

packages/effect/src/unstable/http/HttpServerResponse.ts

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ export const empty = (
9292
*/
9393
export const redirect = (
9494
location: string | URL,
95-
options?: Options.WithContentType | undefined
95+
options?: Options.WithContent | undefined
9696
): HttpServerResponse => {
9797
const headers = Headers.fromRecordUnsafe({ location: location.toString() })
9898
return makeResponse({
@@ -213,16 +213,18 @@ export const htmlStream = <
213213
*/
214214
export const json = (
215215
body: unknown,
216-
options?: Options.WithContent | undefined
217-
): Effect.Effect<HttpServerResponse, Body.HttpBodyError> =>
218-
Effect.map(Body.json(body), (body) =>
216+
options?: Options.WithContentType | undefined
217+
): Effect.Effect<HttpServerResponse, Body.HttpBodyError> => {
218+
const headers = options?.headers ? Headers.fromInput(options.headers) : Headers.empty
219+
return Effect.map(Body.json(body, getContentType(options, headers)), (body) =>
219220
makeResponse({
220221
status: options?.status ?? 200,
221222
statusText: options?.statusText,
222-
headers: options?.headers && Headers.fromInput(options.headers),
223+
headers,
223224
cookies: options?.cookies,
224225
body
225226
}))
227+
}
226228

227229
/**
228230
* @since 4.0.0
@@ -235,16 +237,18 @@ export const schemaJson = <A, I, RD, RE>(
235237
const encode = Body.jsonSchema(schema, options)
236238
return (
237239
body: A,
238-
options?: Options.WithContent | undefined
239-
): Effect.Effect<HttpServerResponse, Body.HttpBodyError, RE> =>
240-
Effect.map(encode(body), (body) =>
240+
options?: Options.WithContentType | undefined
241+
): Effect.Effect<HttpServerResponse, Body.HttpBodyError, RE> => {
242+
const headers = options?.headers ? Headers.fromInput(options.headers) : Headers.empty
243+
return Effect.map(encode(body, getContentType(options, headers)), (body) =>
241244
makeResponse({
242245
status: options?.status ?? 200,
243246
statusText: options?.statusText,
244-
headers: options?.headers && Headers.fromInput(options.headers),
247+
headers,
245248
cookies: options?.cookies,
246249
body
247250
}))
251+
}
248252
}
249253

250254
/**
@@ -253,34 +257,38 @@ export const schemaJson = <A, I, RD, RE>(
253257
*/
254258
export const jsonUnsafe = (
255259
body: unknown,
256-
options?: Options.WithContent | undefined
257-
): HttpServerResponse =>
258-
makeResponse({
260+
options?: Options.WithContentType | undefined
261+
): HttpServerResponse => {
262+
const headers = options?.headers ? Headers.fromInput(options.headers) : Headers.empty
263+
return makeResponse({
259264
status: options?.status ?? 200,
260265
statusText: options?.statusText,
261-
headers: options?.headers && Headers.fromInput(options.headers),
266+
headers,
262267
cookies: options?.cookies,
263-
body: Body.jsonUnsafe(body)
268+
body: Body.jsonUnsafe(body, getContentType(options, headers))
264269
})
270+
}
265271

266272
/**
267273
* @since 4.0.0
268274
* @category constructors
269275
*/
270276
export const urlParams = (
271277
body: UrlParams.Input,
272-
options?: Options.WithContent | undefined
273-
): HttpServerResponse =>
274-
makeResponse({
278+
options?: Options.WithContentType | undefined
279+
): HttpServerResponse => {
280+
const headers = options?.headers ? Headers.fromInput(options.headers) : Headers.empty
281+
return makeResponse({
275282
status: options?.status ?? 200,
276283
statusText: options?.statusText,
277-
headers: options?.headers && Headers.fromInput(options.headers),
284+
headers,
278285
cookies: options?.cookies,
279286
body: Body.text(
280287
UrlParams.toString(UrlParams.fromInput(body)),
281-
"application/x-www-form-urlencoded"
288+
getContentType(options, headers) ?? "application/x-www-form-urlencoded"
282289
)
283290
})
291+
}
284292

285293
/**
286294
* @since 4.0.0

packages/effect/src/unstable/httpapi/HttpApiBuilder.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -645,25 +645,33 @@ const responseTransformation = <A, I, RD, RE>(
645645
const encoding = HttpApiSchema.getEncoding(ast)
646646
switch (encoding.kind) {
647647
case "Json": {
648-
return Effect.mapError(
649-
Response.json(data, { status }),
650-
(error) => new Issue.InvalidType(ast, Option.some(error))
651-
)
648+
try {
649+
return Effect.succeed(Response.text(JSON.stringify(data), {
650+
status,
651+
contentType: encoding.contentType
652+
}))
653+
} catch (error) {
654+
return Effect.fail(new Issue.InvalidType(ast, Option.some(error)))
655+
}
652656
}
653657
case "Text": {
654-
return Effect.succeed(Response.text(data as any, {
658+
return Effect.succeed(Response.text(data as string, {
655659
status,
656660
contentType: encoding.contentType
657661
}))
658662
}
659663
case "Uint8Array": {
660-
return Effect.succeed(Response.uint8Array(data as any, {
664+
return Effect.succeed(Response.uint8Array(data as Uint8Array, {
661665
status,
662666
contentType: encoding.contentType
663667
}))
664668
}
665669
case "UrlParams": {
666-
return Effect.succeed(Response.urlParams(data as any, { status }))
670+
return Effect.succeed(
671+
Response.urlParams(data as any, { status }).pipe(
672+
Response.setHeader("content-type", encoding.contentType)
673+
)
674+
)
667675
}
668676
}
669677
}

packages/effect/src/unstable/reactivity/Atom.ts

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import * as Effect from "../../Effect.ts"
99
import * as Exit from "../../Exit.ts"
1010
import * as Fiber from "../../Fiber.ts"
1111
import type { LazyArg } from "../../Function.ts"
12-
import { constant, constVoid, dual, pipe } from "../../Function.ts"
12+
import { constant, constTrue, constVoid, dual, pipe } from "../../Function.ts"
1313
import type * as Inspectable from "../../Inspectable.ts"
1414
import { PipeInspectableProto } from "../../internal/core.ts"
1515
import * as Layer from "../../Layer.ts"
@@ -513,21 +513,20 @@ function runCallbackSync<R, A, E, ER = never>(
513513
}
514514
const runFork = Effect.runForkWith(services)
515515
const scheduler = ServiceMap.get(services, Scheduler.Scheduler)
516-
const canFlush = "flush" in scheduler
517-
const fiberRuntime = runFork(effect)
518-
if (canFlush) {
516+
const fiber = runFork(effect)
517+
if ("flush" in scheduler) {
519518
;(scheduler as Scheduler.MixedScheduler).flush()
520519
}
521-
const result = fiberRuntime.pollUnsafe()
520+
const result = fiber.pollUnsafe()
522521
if (result) {
523522
onExit(result)
524523
return undefined
525524
}
526-
const remove = fiberRuntime.addObserver(onExit)
525+
const remove = fiber.addObserver(onExit)
527526
function cancel() {
528527
remove()
529528
if (!uninterruptible) {
530-
fiberRuntime.interruptUnsafe()
529+
fiber.interruptUnsafe()
531530
}
532531
}
533532
return cancel
@@ -750,16 +749,16 @@ function makeStream<A, E>(
750749
services = ServiceMap.add(services, AtomRegistry, ctx.registry)
751750

752751
const run = Effect.scopedWith((scope) =>
753-
Effect.flatMap(Channel.toPullScoped(Stream.toChannel(stream), scope), (pull) =>
754-
pull.pipe(
755-
Effect.flatMap((arr) => {
752+
Effect.flatMap(Channel.toPullScoped(stream.channel, scope), (pull) =>
753+
Effect.whileLoop({
754+
while: constTrue,
755+
body: () => pull,
756+
step(arr) {
756757
ctx.setSelf(AsyncResult.success(Arr.lastNonEmpty(arr), {
757758
waiting: true
758759
}))
759-
return Effect.void
760-
}),
761-
Effect.forever({ autoYield: false })
762-
))
760+
}
761+
}))
763762
).pipe(
764763
Effect.catchCause((cause) => {
765764
if (Pull.isHaltCause(cause)) {
@@ -784,8 +783,16 @@ function makeStream<A, E>(
784783
return Effect.void
785784
})
786785
)
786+
const servicesMap = new Map(services.mapUnsafe)
787+
servicesMap.set(AtomRegistry.key, ctx.registry)
788+
servicesMap.set(Scheduler.Scheduler.key, ctx.registry.scheduler)
787789

788-
const cancel = runCallbackSync(ServiceMap.add(services, AtomRegistry, ctx.registry), run, constVoid, false)
790+
const cancel = runCallbackSync(
791+
ServiceMap.makeUnsafe<AtomRegistry>(servicesMap),
792+
run,
793+
constVoid,
794+
false
795+
)
789796
if (cancel !== undefined) {
790797
ctx.addFinalizer(cancel)
791798
}
@@ -1076,7 +1083,7 @@ function makeResultFn<Arg, E, A>(
10761083
(fiber) => {
10771084
fibers.add(fiber)
10781085
fiber.addObserver(() => fibers.delete(fiber))
1079-
return Effect.map(Fiber.joinAll(fibers), Arr.lastNonEmpty)
1086+
return Effect.map(Fiber.joinAll(fibers), (arr) => arr[0])
10801087
}
10811088
)
10821089
}

0 commit comments

Comments
 (0)