Skip to content

Commit f38c215

Browse files
authored
add entityRegistrationTimeout to ShardingConfig (#5765)
1 parent 52753e2 commit f38c215

File tree

3 files changed

+30
-3
lines changed

3 files changed

+30
-3
lines changed

.changeset/chubby-wolves-scream.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/cluster": patch
3+
---
4+
5+
add entityRegistrationTimeout to ShardingConfig

packages/cluster/src/Sharding.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ interface EntityManagerState {
199199

200200
const make = Effect.gen(function*() {
201201
const config = yield* ShardingConfig
202+
const clock = yield* Effect.clock
202203

203204
const runnersService = yield* Runners
204205
const runnerHealth = yield* RunnerHealth.RunnerHealth
@@ -444,6 +445,8 @@ const make = Effect.gen(function*() {
444445

445446
if (storageEnabled && Option.isSome(config.runnerAddress)) {
446447
const selfAddress = config.runnerAddress.value
448+
const entityRegistrationTimeoutMillis = Duration.toMillis(config.entityRegistrationTimeout)
449+
const storageStartMillis = clock.unsafeCurrentTimeMillis()
447450

448451
yield* Effect.gen(function*() {
449452
yield* Effect.logDebug("Starting")
@@ -469,9 +472,14 @@ const make = Effect.gen(function*() {
469472
}
470473
const state = entityManagers.get(address.entityType)
471474
if (!state) {
472-
// reset address in the case that the entity is slow to register
473-
MutableHashSet.add(resetAddresses, address)
474-
return Effect.void
475+
const sinceStart = clock.unsafeCurrentTimeMillis() - storageStartMillis
476+
if (sinceStart < entityRegistrationTimeoutMillis) {
477+
// reset address in the case that the entity is slow to register
478+
MutableHashSet.add(resetAddresses, address)
479+
return Effect.void
480+
}
481+
// if the entity did not register in time, we save a defect reply
482+
return Effect.die(new Error(`Entity type '${address.entityType}' not registered`))
475483
} else if (state.status === "closed") {
476484
return Effect.void
477485
}

packages/cluster/src/ShardingConfig.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ export class ShardingConfig extends Context.Tag("@effect/cluster/ShardingConfig"
7878
* after which an entity will be interrupted.
7979
*/
8080
readonly entityMaxIdleTime: DurationInput
81+
/**
82+
* If an entity does not register itself within this time after a message is
83+
* sent to it, the message will be marked as failed.
84+
*
85+
* Defaults to 1 minute.
86+
*/
87+
readonly entityRegistrationTimeout: DurationInput
8188
/**
8289
* The maximum duration of time to wait for an entity to terminate.
8390
*
@@ -129,6 +136,7 @@ export const defaults: ShardingConfig["Type"] = {
129136
shardLockExpiration: Duration.seconds(35),
130137
entityMailboxCapacity: 4096,
131138
entityMaxIdleTime: Duration.minutes(1),
139+
entityRegistrationTimeout: Duration.minutes(1),
132140
entityTerminationTimeout: Duration.seconds(15),
133141
entityMessagePollInterval: Duration.seconds(10),
134142
entityReplyPollInterval: Duration.millis(200),
@@ -208,6 +216,12 @@ export const config: Config.Config<ShardingConfig["Type"]> = Config.all({
208216
"The maximum duration of inactivity (i.e. without receiving a message) after which an entity will be interrupted."
209217
)
210218
),
219+
entityRegistrationTimeout: Config.duration("entityRegistrationTimeout").pipe(
220+
Config.withDefault(defaults.entityRegistrationTimeout),
221+
Config.withDescription(
222+
"If an entity does not register itself within this time after a message is sent to it, the message will be marked as failed."
223+
)
224+
),
211225
entityTerminationTimeout: Config.duration("entityTerminationTimeout").pipe(
212226
Config.withDefault(defaults.entityTerminationTimeout),
213227
Config.withDescription("The maximum duration of time to wait for an entity to terminate.")

0 commit comments

Comments
 (0)