Skip to content

Commit 96c9537

Browse files
authored
ensure PersistedQueue memory driver removes items (#5847)
1 parent 92f4840 commit 96c9537

File tree

3 files changed

+100
-86
lines changed

3 files changed

+100
-86
lines changed

.changeset/orange-chefs-enter.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/experimental": patch
3+
---
4+
5+
ensure PersistedQueue memory driver removes items

packages/experimental/src/PersistedQueue.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ export const layerStoreMemory: Layer.Layer<
254254
while (true) {
255255
yield* queue.latch.await
256256
const item = Iterable.unsafeHead(queue.items)
257+
queue.items.delete(item)
257258
if (queue.items.size === 0) {
258259
queue.latch.unsafeClose()
259260
}

packages/experimental/test/PersistedQueue.test.ts

Lines changed: 94 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ import { assert, it } from "@effect/vitest"
44
import { Effect, Fiber, Layer, Schema, TestClock, TestServices } from "effect"
55
import { RedisContainer } from "./utils/redis.js"
66

7-
const layer = PersistedQueue.layer.pipe(
7+
const layerMemory = PersistedQueue.layer.pipe(
8+
Layer.provide(PersistedQueue.layerStoreMemory)
9+
)
10+
11+
const layerRedis = PersistedQueue.layer.pipe(
812
Layer.provide(Layer.unwrapEffect(Effect.gen(function*() {
913
const container = yield* RedisContainer
1014
return RedisPersistedQueue.layerStore({
@@ -14,97 +18,101 @@ const layer = PersistedQueue.layer.pipe(
1418
}))),
1519
Layer.provide(RedisContainer.layer)
1620
)
21+
;([
22+
["Memory", layerMemory],
23+
["Redis", layerRedis]
24+
] as const).forEach(([name, layer]) => {
25+
it.layer(layer, { timeout: "30 seconds" })(`PersistedQueue (${name})`, (it) => {
26+
it.effect("offer + take", () =>
27+
Effect.gen(function*() {
28+
const queue = yield* PersistedQueue.make({
29+
name: "test-queue-a",
30+
schema: Item
31+
})
32+
33+
yield* queue.offer({ n: 42n })
34+
yield* queue.take(Effect.fnUntraced(function*(value) {
35+
assert.strictEqual(value.n, 42n)
36+
}))
37+
}))
1738

18-
it.layer(layer, { timeout: "30 seconds" })("PersistedQueue", (it) => {
19-
it.effect("offer + take", () =>
20-
Effect.gen(function*() {
21-
const queue = yield* PersistedQueue.make({
22-
name: "test-queue-a",
23-
schema: Item
24-
})
39+
it.effect("interrupt", () =>
40+
Effect.gen(function*() {
41+
const queue = yield* PersistedQueue.make({
42+
name: "test-queue-b",
43+
schema: Item
44+
})
2545

26-
yield* queue.offer({ n: 42n })
27-
yield* queue.take(Effect.fnUntraced(function*(value) {
28-
assert.strictEqual(value.n, 42n)
46+
yield* queue.offer({ n: 42n })
47+
48+
const latch = Effect.unsafeMakeLatch()
49+
const fiber = yield* queue.take(Effect.fnUntraced(function*(_value) {
50+
yield* latch.open
51+
return yield* Effect.never
52+
})).pipe(Effect.fork)
53+
54+
const fiber2 = yield* queue.take((val) => Effect.succeed(val)).pipe(Effect.fork)
55+
56+
yield* latch.await
57+
58+
// allow some real time to pass to ensure the second take is really
59+
// waiting
60+
yield* TestClock.adjust(1000)
61+
yield* Effect.sleep(1000).pipe(
62+
TestServices.provideLive
63+
)
64+
assert.isNull(fiber2.unsafePoll())
65+
66+
yield* Fiber.interrupt(fiber)
67+
68+
yield* TestClock.adjust(1000)
69+
70+
assert.strictEqual((yield* Fiber.join(fiber2)).n, 42n)
2971
}))
30-
}))
31-
32-
it.effect("interrupt", () =>
33-
Effect.gen(function*() {
34-
const queue = yield* PersistedQueue.make({
35-
name: "test-queue-b",
36-
schema: Item
37-
})
38-
39-
yield* queue.offer({ n: 42n })
40-
41-
const latch = Effect.unsafeMakeLatch()
42-
const fiber = yield* queue.take(Effect.fnUntraced(function*(_value) {
43-
yield* latch.open
44-
return yield* Effect.never
45-
})).pipe(Effect.fork)
46-
47-
const fiber2 = yield* queue.take((val) => Effect.succeed(val)).pipe(Effect.fork)
48-
49-
yield* latch.await
50-
51-
// allow some real time to pass to ensure the second take is really
52-
// waiting
53-
yield* TestClock.adjust(1000)
54-
yield* Effect.sleep(1000).pipe(
55-
TestServices.provideLive
56-
)
57-
assert.isNull(fiber2.unsafePoll())
58-
59-
yield* Fiber.interrupt(fiber)
60-
61-
yield* TestClock.adjust(1000)
62-
63-
assert.strictEqual((yield* Fiber.join(fiber2)).n, 42n)
64-
}))
65-
66-
it.effect("failure", () =>
67-
Effect.gen(function*() {
68-
const queue = yield* PersistedQueue.make({
69-
name: "test-queue-c",
70-
schema: Item
71-
})
72-
73-
yield* queue.offer({ n: 42n })
74-
75-
const error = yield* queue.take(() => Effect.fail("boom")).pipe(Effect.flip)
76-
assert.strictEqual(error, "boom")
77-
78-
const value = yield* queue.take((val, { attempts }) => {
79-
assert.strictEqual(attempts, 1)
80-
return Effect.succeed(val)
81-
})
82-
assert.strictEqual(value.n, 42n)
83-
}))
84-
85-
it.effect("idempotent offer", () =>
86-
Effect.gen(function*() {
87-
const queue = yield* PersistedQueue.make({
88-
name: "idempotent-offer",
89-
schema: Item
90-
})
91-
92-
yield* queue.offer({ n: 42n }, { id: "custom-id" })
93-
yield* queue.offer({ n: 42n }, { id: "custom-id" })
94-
yield* queue.take(Effect.fnUntraced(function*(value) {
72+
73+
it.effect("failure", () =>
74+
Effect.gen(function*() {
75+
const queue = yield* PersistedQueue.make({
76+
name: "test-queue-c",
77+
schema: Item
78+
})
79+
80+
yield* queue.offer({ n: 42n })
81+
82+
const error = yield* queue.take(() => Effect.fail("boom")).pipe(Effect.flip)
83+
assert.strictEqual(error, "boom")
84+
85+
const value = yield* queue.take((val, { attempts }) => {
86+
assert.strictEqual(attempts, 1)
87+
return Effect.succeed(val)
88+
})
9589
assert.strictEqual(value.n, 42n)
9690
}))
97-
const fiber = yield* queue.take(Effect.fnUntraced(function*(value) {
98-
assert.strictEqual(value.n, 42n)
99-
})).pipe(Effect.fork)
10091

101-
yield* TestClock.adjust(1000)
102-
yield* Effect.sleep(1000).pipe(
103-
TestServices.provideLive
104-
)
105-
106-
assert.isNull(fiber.unsafePoll())
107-
}))
92+
it.effect("idempotent offer", () =>
93+
Effect.gen(function*() {
94+
const queue = yield* PersistedQueue.make({
95+
name: "idempotent-offer",
96+
schema: Item
97+
})
98+
99+
yield* queue.offer({ n: 42n }, { id: "custom-id" })
100+
yield* queue.offer({ n: 42n }, { id: "custom-id" })
101+
yield* queue.take(Effect.fnUntraced(function*(value) {
102+
assert.strictEqual(value.n, 42n)
103+
}))
104+
const fiber = yield* queue.take(Effect.fnUntraced(function*(value) {
105+
assert.strictEqual(value.n, 42n)
106+
})).pipe(Effect.fork)
107+
108+
yield* TestClock.adjust(1000)
109+
yield* Effect.sleep(1000).pipe(
110+
TestServices.provideLive
111+
)
112+
113+
assert.isNull(fiber.unsafePoll())
114+
}))
115+
})
108116
})
109117

110118
const Item = Schema.Struct({

0 commit comments

Comments
 (0)