Skip to content

Commit af7916a

Browse files
authored
add EntityResource module (#5778)
1 parent bf883f4 commit af7916a

File tree

12 files changed

+261
-29
lines changed

12 files changed

+261
-29
lines changed

.changeset/shiny-views-admire.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/cluster": patch
3+
---
4+
5+
fix postgres unprocessed message ordering

.changeset/smart-pumas-dig.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"effect": patch
3+
---
4+
5+
add RcRef.invalidate api

.changeset/wise-books-lose.md

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
---
2+
"@effect/cluster": patch
3+
---
4+
5+
add @effect/cluster EntityResource module
6+
7+
A `EntityResource` is a resource that can be acquired inside a cluster
8+
entity, which will keep the entity alive even across restarts.
9+
10+
The resource will only be fully released when the idle time to live is
11+
reached, or when the `close` effect is called.
12+
13+
By default, the `idleTimeToLive` is infinite, meaning the resource will only
14+
be released when `close` is called.
15+
16+
```ts
17+
import { Entity, EntityResource } from "@effect/cluster"
18+
import { Rpc } from "@effect/rpc"
19+
import { Effect } from "effect"
20+
21+
const EntityA = Entity.make("EntityA", [Rpc.make("method")])
22+
23+
export const EntityALayer = EntityA.toLayer(
24+
Effect.gen(function* () {
25+
// When the entity receives a message, it will first acquire the resource
26+
//
27+
// If the entity restarts, the resource will be re-acquired in the new
28+
// instance.
29+
//
30+
// It will only be released when the idle TTL is reached, or when the
31+
// `close` effect is called.
32+
const resource = yield* EntityResource.make({
33+
acquire: Effect.acquireRelease(
34+
Effect.logInfo("Acquiring Entity resource"),
35+
() => Effect.logInfo("Releasing Entity resource")
36+
),
37+
// If the resource is not used for 10 minutes, it will be released and the
38+
// entity will be allowed to shut down.
39+
idleTimeToLive: "10 minutes"
40+
})
41+
42+
return EntityA.of({
43+
method: Effect.fnUntraced(function* () {
44+
yield* Effect.logInfo("EntityA.method called")
45+
// To access the resource, use `resource.get` inside an Effect.scoped
46+
yield* resource.get
47+
}, Effect.scoped)
48+
})
49+
}),
50+
{
51+
// After the resource is released, if the entity is not used for 1 minute,
52+
// the entity will be shut down.
53+
maxIdleTime: "1 minute"
54+
}
55+
)
56+
```

packages/cluster/src/Entity.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,7 @@ export const keepAlive: (
609609
const address = yield* CurrentAddress
610610
const requestId = yield* sharding.getSnowflake
611611
const span = yield* Effect.orDie(Effect.currentSpan)
612+
olatch.value.unsafeClose()
612613
yield* Effect.orDie(sharding.sendOutgoing(
613614
new Message.OutgoingRequest({
614615
rpc: KeepAliveRpc,
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* @since 1.0.0
3+
*/
4+
import * as Duration from "effect/Duration"
5+
import * as Effect from "effect/Effect"
6+
import { identity } from "effect/Function"
7+
import * as RcRef from "effect/RcRef"
8+
import * as Scope from "effect/Scope"
9+
import * as Entity from "./Entity.js"
10+
import type { Sharding } from "./Sharding.js"
11+
12+
/**
13+
* @since 1.0.0
14+
* @category Type ids
15+
*/
16+
export const TypeId: TypeId = "~@effect/cluster/EntityResource"
17+
18+
/**
19+
* @since 1.0.0
20+
* @category Type ids
21+
*/
22+
export type TypeId = "~@effect/cluster/EntityResource"
23+
24+
/**
25+
* @since 1.0.0
26+
* @category Models
27+
*/
28+
export interface EntityResource<out A, out E = never> {
29+
readonly [TypeId]: TypeId
30+
readonly get: Effect.Effect<A, E, Scope.Scope>
31+
readonly close: Effect.Effect<void>
32+
}
33+
34+
/**
35+
* A `EntityResource` is a resource that can be acquired inside a cluster
36+
* entity, which will keep the entity alive even across restarts.
37+
*
38+
* The resource will only be fully released when the idle time to live is
39+
* reached, or when the `close` effect is called.
40+
*
41+
* By default, the `idleTimeToLive` is infinite, meaning the resource will only
42+
* be released when `close` is called.
43+
*
44+
* @since 1.0.0
45+
* @category Constructors
46+
*/
47+
export const make: <A, E, R>(options: {
48+
readonly acquire: Effect.Effect<A, E, R>
49+
readonly idleTimeToLive?: Duration.DurationInput | undefined
50+
/**
51+
* When to close the resource Scope.
52+
*
53+
* If set to "explicit", the resource will only be cleaned up when either the
54+
* `idleTimeToLive` is reached, or the .close effect is called.
55+
*
56+
* Defaults to "always", which means the resource will be cleaned up when the
57+
* the parent Scope is closed.
58+
*/
59+
readonly shutdownMode?: "explicit" | "always" | undefined
60+
}) => Effect.Effect<
61+
EntityResource<A, E>,
62+
E,
63+
Scope.Scope | R | Sharding | Entity.CurrentAddress
64+
> = Effect.fnUntraced(function*<A, E, R>(options: {
65+
readonly acquire: Effect.Effect<A, E, R>
66+
readonly idleTimeToLive?: Duration.DurationInput | undefined
67+
readonly shutdownMode?: "explicit" | "always" | undefined
68+
}) {
69+
const shutdownMode = options.shutdownMode ?? "always"
70+
let shuttingDown = false
71+
72+
const ref = yield* RcRef.make({
73+
acquire: Effect.gen(function*() {
74+
let scope = yield* Effect.scope
75+
76+
if (shutdownMode === "explicit") {
77+
const closeable = yield* Scope.make()
78+
const context = yield* Effect.context<Sharding | Entity.CurrentAddress>()
79+
yield* Scope.addFinalizerExit(
80+
scope,
81+
Effect.fnUntraced(function*(exit) {
82+
if (shuttingDown) return
83+
yield* Scope.close(closeable, exit)
84+
yield* Entity.keepAlive(false)
85+
}, Effect.provide(context))
86+
)
87+
scope = closeable
88+
} else {
89+
yield* Effect.addFinalizer(() => {
90+
if (shuttingDown) return Effect.void
91+
return Entity.keepAlive(false)
92+
})
93+
}
94+
95+
yield* Entity.keepAlive(true)
96+
97+
return yield* options.acquire.pipe(
98+
Scope.extend(scope)
99+
)
100+
}),
101+
idleTimeToLive: options.idleTimeToLive ?? Duration.infinity
102+
})
103+
104+
yield* Effect.addFinalizer(() => {
105+
shuttingDown = true
106+
return Effect.void
107+
})
108+
109+
// Initialize the resource
110+
yield* Effect.scoped(RcRef.get(ref))
111+
112+
return identity<EntityResource<A, E>>({
113+
[TypeId]: TypeId,
114+
get: RcRef.get(ref),
115+
close: RcRef.invalidate(ref)
116+
})
117+
})

packages/cluster/src/SqlMessageStorage.ts

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -302,30 +302,36 @@ export const make = Effect.fnUntraced(function*(options?: {
302302
mssql: () => (s: string) => `N'${s}'`,
303303
orElse: () => (s: string) => `'${s}'`
304304
})
305+
const forUpdate = sql.onDialectOrElse({
306+
sqlite: () => sql.literal(""),
307+
orElse: () => sql.literal("FOR UPDATE")
308+
})
305309

306310
const getUnprocessedMessages = sql.onDialectOrElse({
307311
pg: () => (shardIds: ReadonlyArray<string>, now: number) =>
308312
sql<MessageJoinRow>`
309-
UPDATE ${messagesTableSql} m
310-
SET last_read = ${sqlNow}
311-
FROM (
312-
SELECT m.*
313-
FROM ${messagesTableSql} m
314-
WHERE m.shard_id IN (${sql.literal(shardIds.map(wrapString).join(","))})
315-
AND NOT EXISTS (
316-
SELECT 1 FROM ${repliesTableSql}
317-
WHERE request_id = m.request_id
318-
AND (kind = ${replyKindWithExit} OR acked = ${sqlFalse})
319-
)
320-
AND m.processed = ${sqlFalse}
321-
AND (m.last_read IS NULL OR m.last_read < ${tenMinutesAgo})
322-
AND (m.deliver_at IS NULL OR m.deliver_at <= ${sql.literal(String(now))})
323-
ORDER BY m.rowid ASC
324-
FOR UPDATE
325-
) AS ids
326-
LEFT JOIN ${repliesTableSql} r ON r.id = ids.last_reply_id
327-
WHERE m.id = ids.id
328-
RETURNING ids.*, r.id as reply_reply_id, r.kind as reply_kind, r.payload as reply_payload, r.sequence as reply_sequence
313+
WITH messages AS (
314+
UPDATE ${messagesTableSql} m
315+
SET last_read = ${sqlNow}
316+
FROM (
317+
SELECT m.*
318+
FROM ${messagesTableSql} m
319+
WHERE m.shard_id IN (${sql.literal(shardIds.map(wrapString).join(","))})
320+
AND NOT EXISTS (
321+
SELECT 1 FROM ${repliesTableSql}
322+
WHERE request_id = m.request_id
323+
AND (kind = ${replyKindWithExit} OR acked = ${sqlFalse})
324+
)
325+
AND m.processed = ${sqlFalse}
326+
AND (m.last_read IS NULL OR m.last_read < ${tenMinutesAgo})
327+
AND (m.deliver_at IS NULL OR m.deliver_at <= ${sql.literal(String(now))})
328+
FOR UPDATE
329+
) AS ids
330+
LEFT JOIN ${repliesTableSql} r ON r.id = ids.last_reply_id
331+
WHERE m.id = ids.id
332+
RETURNING ids.*, r.id as reply_reply_id, r.kind as reply_kind, r.payload as reply_payload, r.sequence as reply_sequence
333+
)
334+
SELECT * FROM messages ORDER BY rowid ASC
329335
`,
330336
orElse: () => (shardIds: ReadonlyArray<string>, now: number) =>
331337
sql<MessageJoinRow>`
@@ -342,6 +348,7 @@ export const make = Effect.fnUntraced(function*(options?: {
342348
AND (m.last_read IS NULL OR m.last_read < ${tenMinutesAgo})
343349
AND (m.deliver_at IS NULL OR m.deliver_at <= ${sql.literal(String(now))})
344350
ORDER BY m.rowid ASC
351+
${forUpdate}
345352
`.unprepared.pipe(
346353
Effect.tap((rows) => {
347354
if (rows.length === 0) {

packages/cluster/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ export * as EntityProxy from "./EntityProxy.js"
5353
*/
5454
export * as EntityProxyServer from "./EntityProxyServer.js"
5555

56+
/**
57+
* @since 1.0.0
58+
*/
59+
export * as EntityResource from "./EntityResource.js"
60+
5661
/**
5762
* @since 1.0.0
5863
*/

packages/cluster/src/internal/entityManager.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ export const make = Effect.fnUntraced(function*<
131131

132132
const scope = yield* Effect.scope
133133
const endLatch = Effect.unsafeMakeLatch()
134-
const keepAliveLatch = Effect.unsafeMakeLatch()
134+
const keepAliveLatch = Effect.unsafeMakeLatch(false)
135135

136136
// on shutdown, reset the storage for the entity
137137
yield* Scope.addFinalizerExit(
@@ -420,7 +420,6 @@ export const make = Effect.fnUntraced(function*<
420420

421421
if (server.keepAliveEnabled) return reply
422422
server.keepAliveEnabled = true
423-
server.keepAliveLatch.unsafeClose()
424423
return server.keepAliveLatch.whenOpen(Effect.suspend(() => {
425424
server.keepAliveEnabled = false
426425
return reply
@@ -537,7 +536,7 @@ export const make = Effect.fnUntraced(function*<
537536
requestId: message.envelope.requestId,
538537
exit: Exit.die(new MalformedMessage({ cause }))
539538
}),
540-
rpc: entity.protocol.requests.get(message.envelope.tag)!,
539+
rpc: entityRpcs.get(message.envelope.tag)!,
541540
context
542541
})
543542
))
@@ -549,7 +548,7 @@ export const make = Effect.fnUntraced(function*<
549548
)
550549
}
551550
const request = message as Message.IncomingRequest<any>
552-
const rpc = entity.protocol.requests.get(decoded.envelope.tag)!
551+
const rpc = entityRpcs.get(decoded.envelope.tag)!
553552
return sendLocal(
554553
new Message.IncomingRequestLocal({
555554
envelope: decoded.envelope,

packages/cluster/src/internal/entityReaper.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ export class EntityReaper extends Effect.Service<EntityReaper>()("@effect/cluste
3535
for (const { entities, maxIdleTime, servers } of registered) {
3636
for (const state of servers.values()) {
3737
const duration = now - state.lastActiveCheck
38-
if (state.activeRequests.size > 0 || duration < maxIdleTime) {
38+
if (state.keepAliveEnabled || state.activeRequests.size > 0 || duration < maxIdleTime) {
3939
continue
4040
}
4141
yield* Effect.fork(entities.removeIgnore(state.address))

packages/cluster/test/SqlMessageStorage.test.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,26 @@ describe("SqlMessageStorage", () => {
4141
it.effect("saveRequest", () =>
4242
Effect.gen(function*() {
4343
const storage = yield* MessageStorage.MessageStorage
44-
const request = yield* makeRequest()
44+
const request = yield* makeRequest({ payload: { id: 1 } })
4545
const result = yield* storage.saveRequest(request)
4646
expect(result._tag).toEqual("Success")
4747

48-
const messages = yield* storage.unprocessedMessages([request.envelope.address.shardId])
49-
expect(messages).toHaveLength(1)
48+
for (let i = 2; i <= 5; i++) {
49+
yield* storage.saveRequest(yield* makeRequest({ payload: { id: i } }))
50+
}
51+
52+
yield* storage.saveReply(yield* makeReply(request))
53+
54+
let messages = yield* storage.unprocessedMessages([request.envelope.address.shardId])
55+
expect(messages).toHaveLength(4)
56+
expect(messages.map((m: any) => m.envelope.payload.id)).toEqual([2, 3, 4, 5])
57+
58+
for (let i = 6; i <= 10; i++) {
59+
yield* storage.saveRequest(yield* makeRequest({ payload: { id: i } }))
60+
}
61+
messages = yield* storage.unprocessedMessages([request.envelope.address.shardId])
62+
expect(messages).toHaveLength(5)
63+
expect(messages.map((m: any) => m.envelope.payload.id)).toEqual([6, 7, 8, 9, 10])
5064
}))
5165

5266
it.effect("saveReply + saveRequest duplicate", () =>

0 commit comments

Comments
 (0)