Skip to content

Commit 1f91a34

Browse files
committed
Rafactor InMemoryAgent Process repo to use HierarchyAwareEvictionPolicy
1 parent 90e03b3 commit 1f91a34

File tree

3 files changed

+292
-41
lines changed

3 files changed

+292
-41
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2024-2026 Embabel Pty Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.embabel.agent.spi.support
17+
18+
import com.embabel.agent.core.AgentProcess
19+
import java.util.*
20+
21+
/**
22+
* Eviction policy for agent process hierarchies in memory-bounded storage.
23+
*
24+
* NOTE: This policy is applicable only to in-memory stores where eviction is necessary
25+
* to prevent memory overflow. Disk or cloud-based storage implementations typically
26+
* do not require eviction as storage is scalable; cleanup is instead handled via
27+
* TTL policies, scheduled jobs, or explicit deletion based on retention requirements.
28+
*
29+
* Eviction rules:
30+
* 1. Only evict entire hierarchies (root + all descendants), never partial
31+
* 2. Only evict if the entire hierarchy is finished (no running processes)
32+
*
33+
* This ensures findByParentId always finds active children for kill propagation.
34+
*
35+
* @param windowSize Maximum number of root processes to retain
36+
*/
37+
internal class HierarchyAwareEvictionPolicy(private val windowSize: Int) {
38+
39+
/**
40+
* Evict oldest finished hierarchies if accessOrder exceeds windowSize.
41+
*
42+
* @param accessOrder Queue of root process IDs in access order (oldest first)
43+
* @param map The process map to evict from
44+
*/
45+
fun evictIfNeeded(accessOrder: Queue<String>, map: MutableMap<String, AgentProcess>) {
46+
while (accessOrder.size > windowSize) {
47+
val oldestRootId = accessOrder.peek() ?: break
48+
if (isHierarchyFinished(oldestRootId, map)) {
49+
accessOrder.poll()
50+
evictHierarchy(oldestRootId, map, accessOrder)
51+
} else {
52+
// Oldest hierarchy still running - stop eviction attempts
53+
// to preserve FIFO order and prevent skipping
54+
break
55+
}
56+
}
57+
}
58+
59+
/**
60+
* Check if the entire process hierarchy (process + all descendants) is finished.
61+
* Returns true only if the process AND all its children recursively are finished.
62+
*/
63+
private fun isHierarchyFinished(processId: String, map: Map<String, AgentProcess>): Boolean {
64+
val process = map[processId] ?: return true
65+
if (!process.finished) return false
66+
return findChildrenOf(processId, map).all { isHierarchyFinished(it.id, map) }
67+
}
68+
69+
/**
70+
* Evict an entire process hierarchy (process + all descendants).
71+
* Must only be called when isHierarchyFinished returns true.
72+
*/
73+
private fun evictHierarchy(
74+
processId: String,
75+
map: MutableMap<String, AgentProcess>,
76+
accessOrder: Queue<String>,
77+
) {
78+
findChildrenOf(processId, map).forEach { child ->
79+
evictHierarchy(child.id, map, accessOrder)
80+
}
81+
map.remove(processId)
82+
accessOrder.remove(processId)
83+
}
84+
85+
private fun findChildrenOf(parentId: String, map: Map<String, AgentProcess>): List<AgentProcess> =
86+
map.values.filter { it.parentId == parentId }
87+
}

embabel-agent-api/src/main/kotlin/com/embabel/agent/spi/support/InMemoryAgentProcessRepository.kt

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -35,38 +35,14 @@ class InMemoryAgentProcessRepository(
3535
private val map: ConcurrentHashMap<String, AgentProcess> = ConcurrentHashMap()
3636
private val accessOrder: ConcurrentLinkedQueue<String> = ConcurrentLinkedQueue()
3737
private val lock = ReentrantReadWriteLock()
38+
private val evictionPolicy = HierarchyAwareEvictionPolicy(properties.windowSize)
3839

3940
override fun findById(id: String): AgentProcess? = lock.read {
4041
map[id]
4142
}
4243

4344
override fun findByParentId(parentId: String): List<AgentProcess> = lock.read {
44-
findByParentIdInternal(parentId)
45-
}
46-
47-
private fun findByParentIdInternal(parentId: String): List<AgentProcess> =
4845
map.values.filter { it.parentId == parentId }
49-
50-
/**
51-
* Check if the entire process hierarchy (process + all descendants) is finished.
52-
* Returns true only if the process AND all its children recursively are finished.
53-
*/
54-
private fun isHierarchyFinished(processId: String): Boolean {
55-
val process = map[processId] ?: return true
56-
if (!process.finished) return false
57-
return findByParentIdInternal(processId).all { isHierarchyFinished(it.id) }
58-
}
59-
60-
/**
61-
* Evict an entire process hierarchy (process + all descendants).
62-
* Must only be called when [isHierarchyFinished] returns true.
63-
*/
64-
private fun evictHierarchy(processId: String) {
65-
findByParentIdInternal(processId).forEach { child ->
66-
evictHierarchy(child.id)
67-
}
68-
map.remove(processId)
69-
accessOrder.remove(processId)
7046
}
7147

7248
override fun save(agentProcess: AgentProcess): AgentProcess = lock.write {
@@ -83,22 +59,7 @@ class InMemoryAgentProcessRepository(
8359
// Child processes are evicted together with their parent hierarchy.
8460
if (agentProcess.isRootProcess) {
8561
accessOrder.offer(processId)
86-
87-
// Eviction rules:
88-
// 1. Only evict entire hierarchies (root + all descendants), never partial
89-
// 2. Only evict if the entire hierarchy is finished (no running processes)
90-
// This ensures findByParentId always finds active children for kill propagation.
91-
while (accessOrder.size > properties.windowSize) {
92-
val oldestRootId = accessOrder.peek() ?: break
93-
if (isHierarchyFinished(oldestRootId)) {
94-
accessOrder.poll()
95-
evictHierarchy(oldestRootId)
96-
} else {
97-
// Oldest hierarchy still running - stop eviction attempts
98-
// to preserve FIFO order and prevent skipping
99-
break
100-
}
101-
}
62+
evictionPolicy.evictIfNeeded(accessOrder, map)
10263
}
10364

10465
agentProcess
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright 2024-2026 Embabel Pty Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.embabel.agent.spi.support
17+
18+
import com.embabel.agent.core.AgentProcess
19+
import io.mockk.every
20+
import io.mockk.mockk
21+
import org.junit.jupiter.api.Assertions.*
22+
import org.junit.jupiter.api.BeforeEach
23+
import org.junit.jupiter.api.Nested
24+
import org.junit.jupiter.api.Test
25+
import java.util.concurrent.ConcurrentLinkedQueue
26+
27+
class HierarchyAwareEvictionPolicyTest {
28+
29+
private lateinit var map: MutableMap<String, AgentProcess>
30+
private lateinit var accessOrder: ConcurrentLinkedQueue<String>
31+
32+
@BeforeEach
33+
fun setUp() {
34+
map = mutableMapOf()
35+
accessOrder = ConcurrentLinkedQueue()
36+
}
37+
38+
private fun mockRootProcess(id: String, finished: Boolean = true): AgentProcess = mockk {
39+
every { this@mockk.id } returns id
40+
every { parentId } returns null
41+
every { isRootProcess } returns true
42+
every { this@mockk.finished } returns finished
43+
}
44+
45+
private fun mockChildProcess(id: String, parentId: String, finished: Boolean = true): AgentProcess = mockk {
46+
every { this@mockk.id } returns id
47+
every { this@mockk.parentId } returns parentId
48+
every { isRootProcess } returns false
49+
every { this@mockk.finished } returns finished
50+
}
51+
52+
private fun addProcess(process: AgentProcess) {
53+
map[process.id] = process
54+
if (process.isRootProcess) {
55+
accessOrder.offer(process.id)
56+
}
57+
}
58+
59+
@Nested
60+
inner class BasicEvictionTests {
61+
62+
@Test
63+
fun `does not evict when under window size`() {
64+
val policy = HierarchyAwareEvictionPolicy(windowSize = 3)
65+
addProcess(mockRootProcess("root-1"))
66+
addProcess(mockRootProcess("root-2"))
67+
68+
policy.evictIfNeeded(accessOrder, map)
69+
70+
assertEquals(2, map.size)
71+
assertEquals(2, accessOrder.size)
72+
}
73+
74+
@Test
75+
fun `evicts oldest when window size exceeded`() {
76+
val policy = HierarchyAwareEvictionPolicy(windowSize = 2)
77+
addProcess(mockRootProcess("root-1"))
78+
addProcess(mockRootProcess("root-2"))
79+
addProcess(mockRootProcess("root-3"))
80+
81+
policy.evictIfNeeded(accessOrder, map)
82+
83+
assertEquals(2, map.size)
84+
assertNull(map["root-1"])
85+
assertNotNull(map["root-2"])
86+
assertNotNull(map["root-3"])
87+
}
88+
89+
@Test
90+
fun `evicts multiple when many over window size`() {
91+
val policy = HierarchyAwareEvictionPolicy(windowSize = 1)
92+
addProcess(mockRootProcess("root-1"))
93+
addProcess(mockRootProcess("root-2"))
94+
addProcess(mockRootProcess("root-3"))
95+
96+
policy.evictIfNeeded(accessOrder, map)
97+
98+
assertEquals(1, map.size)
99+
assertNotNull(map["root-3"])
100+
}
101+
}
102+
103+
@Nested
104+
inner class RunningProcessTests {
105+
106+
@Test
107+
fun `does not evict running root process`() {
108+
val policy = HierarchyAwareEvictionPolicy(windowSize = 1)
109+
addProcess(mockRootProcess("running-root", finished = false))
110+
addProcess(mockRootProcess("new-root"))
111+
112+
policy.evictIfNeeded(accessOrder, map)
113+
114+
// Both should exist - can't evict running process
115+
assertEquals(2, map.size)
116+
assertNotNull(map["running-root"])
117+
assertNotNull(map["new-root"])
118+
}
119+
120+
@Test
121+
fun `stops eviction at first running process`() {
122+
val policy = HierarchyAwareEvictionPolicy(windowSize = 1)
123+
addProcess(mockRootProcess("running-root", finished = false))
124+
addProcess(mockRootProcess("finished-root"))
125+
addProcess(mockRootProcess("another-root"))
126+
127+
policy.evictIfNeeded(accessOrder, map)
128+
129+
// Cannot evict running-root, so stops there (preserves FIFO order)
130+
assertEquals(3, map.size)
131+
}
132+
}
133+
134+
@Nested
135+
inner class HierarchyTests {
136+
137+
@Test
138+
fun `evicts entire hierarchy when root evicted`() {
139+
val policy = HierarchyAwareEvictionPolicy(windowSize = 1)
140+
addProcess(mockRootProcess("root-1"))
141+
addProcess(mockChildProcess("child-1", parentId = "root-1"))
142+
addProcess(mockChildProcess("child-2", parentId = "root-1"))
143+
addProcess(mockRootProcess("root-2"))
144+
145+
policy.evictIfNeeded(accessOrder, map)
146+
147+
// root-1 and all children should be evicted
148+
assertEquals(1, map.size)
149+
assertNull(map["root-1"])
150+
assertNull(map["child-1"])
151+
assertNull(map["child-2"])
152+
assertNotNull(map["root-2"])
153+
}
154+
155+
@Test
156+
fun `evicts nested children with grandparent`() {
157+
val policy = HierarchyAwareEvictionPolicy(windowSize = 1)
158+
addProcess(mockRootProcess("root"))
159+
addProcess(mockChildProcess("child", parentId = "root"))
160+
addProcess(mockChildProcess("grandchild", parentId = "child"))
161+
addProcess(mockRootProcess("new-root"))
162+
163+
policy.evictIfNeeded(accessOrder, map)
164+
165+
// Entire hierarchy evicted
166+
assertEquals(1, map.size)
167+
assertNull(map["root"])
168+
assertNull(map["child"])
169+
assertNull(map["grandchild"])
170+
assertNotNull(map["new-root"])
171+
}
172+
173+
@Test
174+
fun `running child prevents hierarchy eviction`() {
175+
val policy = HierarchyAwareEvictionPolicy(windowSize = 1)
176+
addProcess(mockRootProcess("root-1"))
177+
addProcess(mockChildProcess("running-child", parentId = "root-1", finished = false))
178+
addProcess(mockRootProcess("root-2"))
179+
180+
policy.evictIfNeeded(accessOrder, map)
181+
182+
// Hierarchy cannot be evicted due to running child
183+
assertEquals(3, map.size)
184+
assertNotNull(map["root-1"])
185+
assertNotNull(map["running-child"])
186+
assertNotNull(map["root-2"])
187+
}
188+
189+
@Test
190+
fun `running grandchild prevents hierarchy eviction`() {
191+
val policy = HierarchyAwareEvictionPolicy(windowSize = 1)
192+
addProcess(mockRootProcess("root"))
193+
addProcess(mockChildProcess("child", parentId = "root"))
194+
addProcess(mockChildProcess("running-grandchild", parentId = "child", finished = false))
195+
addProcess(mockRootProcess("new-root"))
196+
197+
policy.evictIfNeeded(accessOrder, map)
198+
199+
// Hierarchy cannot be evicted due to running grandchild
200+
assertEquals(4, map.size)
201+
}
202+
}
203+
}

0 commit comments

Comments
 (0)