Skip to content

Commit b226760

Browse files
authored
simplify SubscriptionRef (#1752)
1 parent c4b8b0f commit b226760

File tree

6 files changed

+244
-296
lines changed

6 files changed

+244
-296
lines changed

.changeset/silver-kings-poke.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"effect": patch
3+
---
4+
5+
simplify SubscriptionRef

packages/effect/src/Latch.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,17 @@ export interface Latch {
2626
/** open the latch, releasing all fibers waiting on it */
2727
readonly open: Effect.Effect<boolean>
2828
/** open the latch, releasing all fibers waiting on it */
29-
readonly openUnsafe: () => boolean
29+
openUnsafe(this: Latch): boolean
3030
/** release all fibers waiting on the latch, without opening it */
3131
readonly release: Effect.Effect<boolean>
3232
/** wait for the latch to be opened */
3333
readonly await: Effect.Effect<void>
3434
/** close the latch */
3535
readonly close: Effect.Effect<boolean>
3636
/** close the latch */
37-
readonly closeUnsafe: () => boolean
37+
closeUnsafe(this: Latch): boolean
3838
/** only run the given effect when the latch is open */
39-
readonly whenOpen: <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>
39+
whenOpen<A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<A, E, R>
4040
}
4141

4242
/**

packages/effect/src/Semaphore.ts

Lines changed: 125 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
* @since 2.0.0
33
*/
44
import type * as Effect from "./Effect.ts"
5+
import type { Fiber } from "./Fiber.ts"
56
import { dual } from "./Function.ts"
7+
import * as core from "./internal/core.ts"
68
import * as internal from "./internal/effect.ts"
79
import type * as Option from "./Option.ts"
8-
import * as PartitionedSemaphore from "./PartitionedSemaphore.ts"
910

1011
/**
1112
* @category models
@@ -28,7 +29,7 @@ export interface Semaphore {
2829
/**
2930
* Adjusts the number of permits available in the semaphore.
3031
*/
31-
resize(permits: number): Effect.Effect<void>
32+
resize(this: Semaphore, permits: number): Effect.Effect<void>
3233

3334
/**
3435
* Runs an effect with the given number of permits and releases the permits
@@ -41,7 +42,7 @@ export interface Semaphore {
4142
* If insufficient permits are available, the function will wait until they
4243
* are released by other tasks.
4344
*/
44-
withPermits(permits: number): <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>
45+
withPermits(this: Semaphore, permits: number): <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>
4546

4647
/**
4748
* Runs an effect with the given number of permits and releases the permits
@@ -54,7 +55,7 @@ export interface Semaphore {
5455
* If insufficient permits are available, the function will wait until they
5556
* are released by other tasks.
5657
*/
57-
withPermit<A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<A, E, R>
58+
withPermit<A, E, R>(this: Semaphore, self: Effect.Effect<A, E, R>): Effect.Effect<A, E, R>
5859

5960
/**
6061
* Runs an effect only if the specified number of permits are immediately
@@ -68,6 +69,7 @@ export interface Semaphore {
6869
* the result is `Option.none`.
6970
*/
7071
withPermitsIfAvailable(
72+
this: Semaphore,
7173
permits: number
7274
): <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<Option.Option<A>, E, R>
7375

@@ -76,18 +78,18 @@ export interface Semaphore {
7678
* available permits, suspending the task if they are not yet available.
7779
* Concurrent pending `take` calls are processed in a first-in, first-out manner.
7880
*/
79-
take(permits: number): Effect.Effect<number>
81+
take(this: Semaphore, permits: number): Effect.Effect<number>
8082

8183
/**
8284
* Releases the specified number of permits and returns the resulting
8385
* available permits.
8486
*/
85-
release(permits: number): Effect.Effect<number>
87+
release(this: Semaphore, permits: number): Effect.Effect<number>
8688

8789
/**
8890
* Releases all permits held by this semaphore and returns the resulting available permits.
8991
*/
90-
releaseAll: Effect.Effect<number>
92+
readonly releaseAll: Effect.Effect<number>
9193
}
9294

9395
/**
@@ -127,7 +129,113 @@ export interface Semaphore {
127129
* @since 2.0.0
128130
* @category constructors
129131
*/
130-
export const makeUnsafe: (permits: number) => Semaphore = internal.makeSemaphoreUnsafe
132+
export const makeUnsafe = (permits: number): Semaphore => new SemaphoreImpl(permits)
133+
134+
class SemaphoreImpl implements Semaphore {
135+
public waiters = new Set<() => void>()
136+
public taken = 0
137+
public permits: number
138+
139+
constructor(permits: number) {
140+
this.permits = permits
141+
}
142+
143+
get free() {
144+
return this.permits - this.taken
145+
}
146+
147+
take(n: number): Effect.Effect<number> {
148+
const take: Effect.Effect<number> = internal.suspend(() => {
149+
if (this.free < n) {
150+
return internal.callback((resume) => {
151+
if (this.free >= n) return resume(take)
152+
const observer = () => {
153+
if (this.free < n) return
154+
this.waiters.delete(observer)
155+
resume(take)
156+
}
157+
this.waiters.add(observer)
158+
return internal.sync(() => {
159+
this.waiters.delete(observer)
160+
})
161+
})
162+
}
163+
this.taken += n
164+
return internal.succeed(n)
165+
})
166+
return take
167+
}
168+
169+
updateTakenUnsafe(fiber: Fiber<any, any>, f: (n: number) => number): number {
170+
this.taken = f(this.taken)
171+
if (this.waiters.size > 0) {
172+
fiber.currentDispatcher.scheduleTask(() => {
173+
const iter = this.waiters.values()
174+
let item = iter.next()
175+
while (item.done === false && this.free > 0) {
176+
item.value()
177+
item = iter.next()
178+
}
179+
}, 0)
180+
}
181+
return this.free
182+
}
183+
184+
updateTaken(f: (n: number) => number): Effect.Effect<number> {
185+
return core.withFiber((fiber) => internal.succeed(this.updateTakenUnsafe(fiber, f)))
186+
}
187+
188+
resize(permits: number) {
189+
return core.withFiber((fiber) => {
190+
this.permits = permits
191+
if (this.free < 0) return internal.void
192+
this.updateTakenUnsafe(fiber, (taken) => taken)
193+
return internal.void
194+
})
195+
}
196+
197+
release(n: number): Effect.Effect<number> {
198+
return this.updateTaken((taken) => taken - n)
199+
}
200+
201+
get releaseAll(): Effect.Effect<number> {
202+
return this.updateTaken((_) => 0)
203+
}
204+
205+
withPermits(n: number) {
206+
return <A, E, R>(self: Effect.Effect<A, E, R>) =>
207+
internal.uninterruptibleMask((restore) =>
208+
internal.flatMap(
209+
restore(this.take(n)),
210+
(permits) =>
211+
internal.onExitPrimitive(
212+
restore(self),
213+
() => {
214+
this.updateTakenUnsafe(internal.getCurrentFiber()!, (taken) => taken - permits)
215+
return undefined
216+
},
217+
true
218+
)
219+
)
220+
)
221+
}
222+
223+
get withPermit(): <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R> {
224+
return this.withPermits(1)
225+
}
226+
227+
withPermitsIfAvailable(n: number) {
228+
return <A, E, R>(self: Effect.Effect<A, E, R>) =>
229+
internal.uninterruptibleMask((restore) => {
230+
if (this.free < n) return internal.succeedNone
231+
this.taken += n
232+
return internal.onExitPrimitive(restore(internal.asSome(self)), () => {
233+
this.updateTakenUnsafe(internal.getCurrentFiber()!, (taken) => taken - n)
234+
return undefined
235+
}, true)
236+
})
237+
}
238+
}
131239

132240
/**
133241
* Creates a new Semaphore.
@@ -162,7 +270,7 @@ export const makeUnsafe: (permits: number) => Semaphore = internal.makeSemaphore
162270
* @since 2.0.0
163271
* @category constructors
164272
*/
165-
export const make: (permits: number) => Effect.Effect<Semaphore> = internal.makeSemaphore
273+
export const make = (permits: number): Effect.Effect<Semaphore> => internal.sync(() => new SemaphoreImpl(permits))
166274

167275
/**
168276
* Adjusts the number of permits available in the semaphore.
@@ -185,13 +293,9 @@ export const resize: {
185293
export const withPermits: {
186294
(self: Semaphore, permits: number): <A, E, R>(effect: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>
187295
<A, E, R>(self: Semaphore, permits: number, effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R>
188-
} = ((...args: Array<any>) => {
189-
if (args.length === 2) {
190-
const [self, permits] = args
191-
return (effect: Effect.Effect<any, any, any>) => self.withPermits(permits)(effect)
192-
}
193-
const [self, permits, effect] = args
194-
return self.withPermits(permits)(effect)
296+
} = ((self: Semaphore, permits: number, effect?: Effect.Effect<any, any, any>) => {
297+
const withPermits = self.withPermits(permits)
298+
return effect ? withPermits(effect) : withPermits
195299
}) as any
196300

197301
/**
@@ -204,12 +308,8 @@ export const withPermits: {
204308
export const withPermit: {
205309
(self: Semaphore): <A, E, R>(effect: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>
206310
<A, E, R>(self: Semaphore, effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R>
207-
} = ((...args: Array<any>) => {
208-
if (args.length === 1) {
209-
const [self] = args
210-
return (effect: Effect.Effect<any, any, any>) => self.withPermit(effect)
211-
}
212-
const [self, effect] = args
311+
} = ((self: Semaphore, effect?: Effect.Effect<any, any, any>) => {
312+
if (!effect) return self.withPermit
213313
return self.withPermit(effect)
214314
}) as any
215315

@@ -227,13 +327,9 @@ export const withPermitsIfAvailable: {
227327
permits: number,
228328
effect: Effect.Effect<A, E, R>
229329
): Effect.Effect<Option.Option<A>, E, R>
230-
} = ((...args: Array<any>) => {
231-
if (args.length === 2) {
232-
const [self, permits] = args
233-
return (effect: Effect.Effect<any, any, any>) => self.withPermitsIfAvailable(permits)(effect)
234-
}
235-
const [self, permits, effect] = args
236-
return self.withPermitsIfAvailable(permits)(effect)
330+
} = ((self: Semaphore, permits: number, effect?: Effect.Effect<any, any, any>) => {
331+
const withPermits = self.withPermitsIfAvailable(permits)
332+
return effect ? withPermits(effect) : withPermits
237333
}) as any
238334

239335
/**
@@ -268,33 +364,3 @@ export const release: {
268364
* @category combinators
269365
*/
270366
export const releaseAll = (self: Semaphore): Effect.Effect<number> => self.releaseAll
271-
272-
/**
273-
* @since 3.19.4
274-
* @category models
275-
*/
276-
export const PartitionedTypeId: PartitionedTypeId = PartitionedSemaphore.PartitionedTypeId
277-
278-
/**
279-
* @since 3.19.4
280-
* @category models
281-
*/
282-
export type PartitionedTypeId = PartitionedSemaphore.PartitionedTypeId
283-
284-
/**
285-
* @since 3.19.4
286-
* @category models
287-
*/
288-
export interface Partitioned<in K> extends PartitionedSemaphore.PartitionedSemaphore<K> {}
289-
290-
/**
291-
* @since 3.19.4
292-
* @category constructors
293-
*/
294-
export const makePartitionedUnsafe = PartitionedSemaphore.makeUnsafe
295-
296-
/**
297-
* @since 3.19.4
298-
* @category constructors
299-
*/
300-
export const makePartitioned = PartitionedSemaphore.make

0 commit comments

Comments
 (0)