Skip to content

Commit 7ccd85f

Browse files
committed
Introduce terminate feature for actors, including handling of termination state, tests, and enhanced Actor lifecycle management.
1 parent e3deaf3 commit 7ccd85f

File tree

12 files changed

+520
-27
lines changed

12 files changed

+520
-27
lines changed

actor4k-cluster/src/commonMain/kotlin/io/github/smyrgeorge/actor4k/cluster/ClusterActorRef.kt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,26 @@ class ClusterActorRef(
125125
}
126126
}
127127

128+
/**
129+
* Terminates the associated actor by sending a termination request to its service.
130+
*
131+
* This method interacts with the actor's service to initiate a termination operation.
132+
* It processes the service's response to determine whether the termination succeeded,
133+
* failed, or resulted in an unexpected outcome.
134+
*
135+
* @return A [Result] wrapping [Unit] if the termination operation is successful.
136+
* If the response indicates a failure, an exception is wrapped in the result.
137+
* An [IllegalStateException] is returned for unexpected response types.
138+
*/
139+
override suspend fun terminate(): Result<Unit> {
140+
val res = service.terminate(address).getOrElse { return Result.failure(it) }
141+
return when (res) {
142+
is RpcEnvelope.Response.Empty -> Result.success(Unit)
143+
is RpcEnvelope.Response.Failure -> Result.failure(res.exception())
144+
else -> Result.failure(IllegalStateException("Unexpected response $res for ask command."))
145+
}
146+
}
147+
128148
/**
129149
* Provides a string representation of the `ClusterActorRef` instance.
130150
*

actor4k-cluster/src/commonMain/kotlin/io/github/smyrgeorge/actor4k/cluster/rpc/RpcEnvelope.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ interface RpcEnvelope {
3434

3535
@Serializable
3636
data class Shutdown(override val id: Long, val addr: Address) : Request
37+
38+
@Serializable
39+
data class Terminate(override val id: Long, val addr: Address) : Request
3740
}
3841

3942
/**

actor4k-cluster/src/commonMain/kotlin/io/github/smyrgeorge/actor4k/cluster/rpc/RpcReceiveService.kt

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import io.github.smyrgeorge.actor4k.cluster.rpc.RpcEnvelope.Request
55
import io.github.smyrgeorge.actor4k.cluster.rpc.RpcEnvelope.Response
66
import io.github.smyrgeorge.actor4k.util.Logger
77
import io.github.smyrgeorge.actor4k.util.extentions.launch
8-
import io.ktor.websocket.*
8+
import io.ktor.websocket.Frame
9+
import io.ktor.websocket.WebSocketSession
10+
import io.ktor.websocket.send
911
import kotlinx.serialization.ExperimentalSerializationApi
1012
import kotlinx.serialization.protobuf.ProtoBuf
1113

@@ -42,14 +44,15 @@ class RpcReceiveService(
4244
log.warn("Received non-binary frame: $frame")
4345
return@launch
4446
}
45-
val msg = protoBuf.decodeFromByteArray(Request.serializer(), frame.data)
46-
val res = when (msg) {
47+
val res = when (val msg = protoBuf.decodeFromByteArray(Request.serializer(), frame.data)) {
4748
is Request.Echo -> echo(msg)
4849
is Request.Tell -> tell(msg)
4950
is Request.Ask -> ask(msg)
5051
is Request.Status -> status(msg)
5152
is Request.Stats -> stats(msg)
5253
is Request.Shutdown -> shutdown(msg)
54+
is Request.Terminate -> terminate(msg)
55+
5356
}
5457
session.send(protoBuf.encodeToByteArray(Response.serializer(), res))
5558
}
@@ -140,6 +143,22 @@ class RpcReceiveService(
140143
return Response.Empty(msg.id)
141144
}
142145

146+
/**
147+
* Processes a Terminate request by retrieving the actor associated with the specified address
148+
* and attempting to terminate it. Constructs a Response based on the outcome of the operation.
149+
*
150+
* @param msg The Terminate request containing the unique message ID and the address of the actor to terminate.
151+
* @return A `Response.Empty` if the termination is successfully completed, or a `Response.Failure`
152+
* if an error occurs during the operation.
153+
*/
154+
private suspend fun terminate(msg: Request.Terminate): Response {
155+
registry.get(msg.addr)
156+
.getOrElse { return it.failure(msg.id) }
157+
.terminate()
158+
.getOrElse { return it.failure(msg.id) }
159+
return Response.Empty(msg.id)
160+
}
161+
143162
companion object {
144163
private fun Throwable.failure(id: Long): Response.Failure =
145164
Response.Failure(id, message, cause?.message)

actor4k-cluster/src/commonMain/kotlin/io/github/smyrgeorge/actor4k/cluster/rpc/RpcSendService.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,24 @@ class RpcSendService(
159159
res
160160
}
161161

162+
/**
163+
* Sends a termination request to the specified actor's address and waits for the corresponding response.
164+
* This method ensures that the response ID matches the request ID to maintain consistency.
165+
*
166+
* @param addr The address of the actor to be terminated.
167+
* @return A [Result] containing the [Response] received in response to the termination request,
168+
* or an error if the operation fails.
169+
* @throws IllegalStateException if the response ID does not match the request ID.
170+
*/
171+
suspend fun terminate(addr: Address): Result<Response> = runCatching {
172+
val req = Request.Terminate(nextId(), addr)
173+
val res = rpc.request<Response>(req.id) {
174+
session.send(req.serialize())
175+
}
176+
if (res.id != req.id) error("Sanity check failed :: req.id != res.id.")
177+
res
178+
}
179+
162180
/**
163181
* Closes the current session associated with the `RpcSendService`.
164182
*

actor4k/src/commonMain/kotlin/io/github/smyrgeorge/actor4k/actor/Actor.kt

Lines changed: 83 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
4747
protected val log: Logger = ActorSystem.loggerFactory.getLogger(this::class)
4848

4949
private val stats: Stats = Stats()
50-
private var status = Status.CREATED
51-
private var initializationFailed: Exception? = null
50+
private var status: Status = Status.CREATED
51+
private var initializationError: Throwable? = null
52+
private var terminationError: Throwable? = null
5253
private val address: Address = Address.of(this::class, key)
5354
private val ref: LocalRef = LocalRef(address = address, actor = this)
5455

@@ -164,10 +165,10 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
164165
onBeforeActivate()
165166
// Set 'ACTIVATING' status.
166167
status = Status.ACTIVATING
167-
} catch (e: Exception) {
168+
} catch (e: Throwable) {
168169
// In case of an error, we need to close the [Actor] immediately.
169170
log.error("[$address::onBeforeActivate] Failed to activate, will shutdown (${e.message ?: ""})")
170-
initializationFailed = e
171+
initializationError = e
171172
shutdown()
172173
}
173174

@@ -178,11 +179,18 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
178179
// Case that activation flow failed and we still have messages to consume.
179180
// If we get a shutdown event and the actor never initialized successfully,
180181
// we need to reply with an error and to drop all the messages.
181-
if (initializationFailed != null) {
182+
if (initializationError != null) {
182183
replyActivationError(pattern)
183184
return@consumeEach
184185
}
185186

187+
// Case that termination flow was triggered.
188+
// In this case, we need to reply with an error and to drop all the messages.
189+
if (terminationError != null) {
190+
replyTerminationError(pattern)
191+
return@consumeEach
192+
}
193+
186194
val msg: Req = pattern.msg.apply {
187195
id = stats.receivedMessages
188196
kind = pattern.messageKind
@@ -195,10 +203,10 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
195203
// Set 'READY' status.
196204
status = Status.READY
197205
stats.initializedAt = Clock.System.now().toEpochMilliseconds()
198-
} catch (e: Exception) {
206+
} catch (e: Throwable) {
199207
// In case of an error, we need to close the [Actor] immediately.
200208
log.error("[$address::activate] Failed to activate, will shutdown (${e.message ?: ""})")
201-
initializationFailed = e
209+
initializationError = e
202210
replyActivationError(pattern)
203211
shutdown()
204212
return@consumeEach
@@ -234,7 +242,7 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
234242

235243
else -> r
236244
}
237-
} catch (e: Exception) {
245+
} catch (e: Throwable) {
238246
Behavior.Error(e)
239247
}
240248

@@ -262,7 +270,7 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
262270
// Handle afterReceive hook.
263271
try {
264272
afterReceive(msg, result)
265-
} catch (e: Exception) {
273+
} catch (e: Throwable) {
266274
log.warn("[$address::afterReceive] Failed to process afterReceive hook (${e.message ?: ""})")
267275
}
268276
}
@@ -272,12 +280,13 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
272280
// This allows Router workers (FIRST_AVAILABLE) to re-register availability, avoiding deadlocks.
273281
try {
274282
afterReceive(msg)
275-
} catch (e: Exception) {
283+
} catch (e: Throwable) {
276284
log.warn("[$address::afterReceive] Failed to process afterReceive hook for Behavior.None (${e.message ?: ""})")
277285
}
278286
}
279287

280288
is Behavior.Shutdown -> shutdown()
289+
is Behavior.Terminate -> terminate()
281290
}
282291
}
283292

@@ -323,7 +332,7 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
323332
mail.send(ask)
324333
ask.replyTo.receive()
325334
}
326-
} catch (e: Exception) {
335+
} catch (e: Throwable) {
327336
Result.failure(e)
328337
} finally {
329338
ask.replyTo.close()
@@ -391,6 +400,8 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
391400
// Drain the stash channel.
392401
@OptIn(ExperimentalCoroutinesApi::class)
393402
while (!stash.isEmpty) {
403+
// Skip processing if actor cannot handle messages (drain the stash).
404+
if (status == Status.SHUT_DOWN || status == Status.TERMINATED) continue
394405
val pattern = stash.receive()
395406
stats.stashedMessages -= 1
396407
process(pattern)
@@ -415,6 +426,24 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
415426
stash.close()
416427
}
417428

429+
/**
430+
* Initiates the termination process for the system or component.
431+
*
432+
* This method checks whether termination can be triggered based on the current status.
433+
* If termination is allowed, it updates the termination timestamp, changes the status
434+
* to `TERMINATING`, and closes associated resources such as mail and stash.
435+
*
436+
* The method has no effect if termination is not permitted.
437+
*/
438+
fun terminate(error: Throwable? = null) {
439+
if (!status.canTriggerTermination) return
440+
stats.triggeredTerminationAt = Clock.System.now().toEpochMilliseconds()
441+
terminationError = error ?: Exception("Actor terminated by user.")
442+
status = Status.TERMINATING
443+
mail.close()
444+
stash.close()
445+
}
446+
418447
/**
419448
* Represents message patterns used by the `Actor` for communication and message handling.
420449
*
@@ -479,17 +508,22 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
479508
* - `READY`: The actor is fully initialized and ready to process messages. Messages can be accepted during this state.
480509
* - `SHUTTING_DOWN`: The actor is in the process of shutting down. Messages cannot be accepted during this state.
481510
* - `SHUT_DOWN`: The actor has completed the shutdown process. Messages cannot be accepted during this state.
511+
* - `TERMINATING`: The actor is in the process of terminating. Messages cannot be accepted during this state.
512+
* - `TERMINATED`: The actor has terminated and is no longer available for use. Messages cannot be accepted during this state.
482513
*/
483514
@Serializable
484515
enum class Status(
485516
val canAcceptMessages: Boolean,
486517
val canTriggerShutdown: Boolean,
518+
val canTriggerTermination: Boolean,
487519
) {
488-
CREATED(true, true),
489-
ACTIVATING(true, true),
490-
READY(true, true),
491-
SHUTTING_DOWN(false, false),
492-
SHUT_DOWN(false, false)
520+
CREATED(true, true, true),
521+
ACTIVATING(true, true, true),
522+
READY(true, true, true),
523+
SHUTTING_DOWN(false, false, false),
524+
SHUT_DOWN(false, false, false),
525+
TERMINATING(false, false, false),
526+
TERMINATED(false, false, false);
493527
}
494528

495529
/**
@@ -513,6 +547,8 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
513547
var initializedAt: Long? = null,
514548
var triggeredShutDownAt: Long? = null,
515549
var shutDownAt: Long? = null,
550+
var triggeredTerminationAt: Long? = null,
551+
var terminatedAt: Long? = null,
516552
var lastMessageAt: Long = Clock.System.now().toEpochMilliseconds(),
517553
var receivedMessages: Long = 0,
518554
var stashedMessages: Long = 0
@@ -523,6 +559,8 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
523559
append("initializedAt=${instantFromEpochMilliseconds(initializedAt)}, ")
524560
append("triggeredShutDownAt=${instantFromEpochMilliseconds(triggeredShutDownAt)}, ")
525561
append("shutDownAt=${instantFromEpochMilliseconds(shutDownAt)}, ")
562+
append("triggeredTerminationAt=${instantFromEpochMilliseconds(triggeredTerminationAt)}, ")
563+
append("terminatedAt=${instantFromEpochMilliseconds(terminatedAt)}, ")
526564
append("lastMessageAt=${instantFromEpochMilliseconds(lastMessageAt)}, ")
527565
append("receivedMessages=${instantFromEpochMilliseconds(receivedMessages)}, ")
528566
append("stashedMessages=$stashedMessages")
@@ -544,7 +582,7 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
544582
for (e in this) {
545583
try {
546584
action(e)
547-
} catch (e: Exception) {
585+
} catch (e: Throwable) {
548586
log.warn("[$address::consume] An error occurred while processing. ${e.message ?: ""}")
549587
}
550588
}
@@ -561,12 +599,16 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
561599
}
562600
} catch (_: TimeoutCancellationException) {
563601
log.error("[$address::onShutdown] Shutdown hook timed out after ${ActorSystem.conf.actorShutdownHookTimeout}. Forcing shutdown.")
564-
} catch (e: Exception) {
602+
} catch (e: Throwable) {
565603
log.error("[$address::onShutdown] Error during shutdown hook: ${e.message ?: "Unknown error"}")
566604
} finally {
567-
// Unregister the actor even if the shutdown hook fails or times out
568-
status = Status.SHUT_DOWN
569-
stats.shutDownAt = Clock.System.now().toEpochMilliseconds()
605+
if (terminationError != null) {
606+
status = Status.TERMINATED
607+
stats.terminatedAt = Clock.System.now().toEpochMilliseconds()
608+
} else {
609+
status = Status.SHUT_DOWN
610+
stats.shutDownAt = Clock.System.now().toEpochMilliseconds()
611+
}
570612
ActorSystem.registry.unregister(this@Actor.ref)
571613
}
572614
}
@@ -591,7 +633,7 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
591633
log.warn("[$address::$operation] Could not reply in time (timeout after ${ActorSystem.conf.actorReplyTimeout}) (the message was processed successfully).")
592634
} catch (_: ClosedSendChannelException) {
593635
log.warn("[$address::$operation] Could not reply, the channel is closed (the message was processed successfully).")
594-
} catch (e: Exception) {
636+
} catch (e: Throwable) {
595637
val error = e.message ?: "Unknown error."
596638
log.warn("[$address::$operation] Could not reply (the message was processed successfully). {}", error)
597639
}
@@ -607,14 +649,32 @@ abstract class Actor<Req : ActorProtocol, Res : ActorProtocol.Response>(
607649
when (pattern) {
608650
is Patterns.Tell -> Unit
609651
is Patterns.Ask -> {
610-
val e = initializationFailed
652+
val e = initializationError
611653
?: IllegalStateException("Actor is prematurely closed (could not be initialized).")
612654
val r: Result<Res> = Result.failure(e)
613655
reply(operation = "activate", pattern = pattern, reply = r)
614656
}
615657
}
616658
}
617659

660+
/**
661+
* Handles the termination error scenario for the provided message pattern.
662+
*
663+
* @param pattern The message pattern that specifies the type of interaction (e.g., Tell or Ask)
664+
* and determines how the termination error is managed.
665+
*/
666+
private suspend fun replyTerminationError(pattern: Patterns<Req, Res>) {
667+
when (pattern) {
668+
is Patterns.Tell -> Unit
669+
is Patterns.Ask -> {
670+
val e = terminationError
671+
?: IllegalStateException("Actor is terminated, all pending messages are replied with errors.")
672+
val r: Result<Res> = Result.failure(e)
673+
reply(operation = "terminate", pattern = pattern, reply = r)
674+
}
675+
}
676+
}
677+
618678
companion object {
619679
/**
620680
* Generates a unique random key as a string, which includes a "key-" prefix followed by a hash code

actor4k/src/commonMain/kotlin/io/github/smyrgeorge/actor4k/actor/Behavior.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,15 @@ sealed interface Behavior<Res : Response> {
5858
* @param Res The type of `Response` this behavior operates on.
5959
*/
6060
class Shutdown<Res : Response> : Behavior<Res>
61+
62+
/**
63+
* Represents a termination behavior that extends the capabilities of a `Behavior`.
64+
*
65+
* The `Terminate` class models the end of a communication or processing process, providing
66+
* specialized features tailored for handling scenarios where a response of type `Res` is required.
67+
*
68+
* @param Res The type of the response that extends the `Response` class. This ensures the termination
69+
* behavior conforms to the expected structure and properties of a response defined by the protocol.
70+
*/
71+
class Terminate<Res : Response> : Behavior<Res>
6172
}

0 commit comments

Comments
 (0)