|
63 | 63 | import java.util.Queue; |
64 | 64 | import java.util.Set; |
65 | 65 | import java.util.UUID; |
66 | | -import java.util.concurrent.ConcurrentHashMap; |
67 | 66 | import java.util.concurrent.ConcurrentLinkedQueue; |
68 | 67 | import java.util.concurrent.ExecutionException; |
69 | 68 | import java.util.concurrent.Executors; |
@@ -160,7 +159,6 @@ public class LocalNode extends Node implements Closeable { |
160 | 159 | private final Optional<Path> statusFilePath; |
161 | 160 | private final Optional<Path> sessionHistoryFilePath; |
162 | 161 | private final Queue<SessionHistoryEntry> sessionHistory = new ConcurrentLinkedQueue<>(); |
163 | | - private final Map<SessionId, Instant> sessionStartTimes = new ConcurrentHashMap<>(); |
164 | 162 |
|
165 | 163 | protected LocalNode( |
166 | 164 | Tracer tracer, |
@@ -308,6 +306,7 @@ protected LocalNode( |
308 | 306 |
|
309 | 307 | bus.addListener(SessionStartedEvent.listener(this::recordSessionStart)); |
310 | 308 | bus.addListener(SessionClosedEvent.listener(this::recordSessionStop)); |
| 309 | + bus.addListener(NodeHeartBeatEvent.listener(this::cleanupSessionHistory)); |
311 | 310 |
|
312 | 311 | shutdown = |
313 | 312 | () -> { |
@@ -1138,33 +1137,58 @@ private void recordSessionStart(SessionId sessionId) { |
1138 | 1137 | return; |
1139 | 1138 | } |
1140 | 1139 | Instant startTime = Instant.now(); |
1141 | | - sessionStartTimes.put(sessionId, startTime); |
1142 | 1140 | sessionHistory.add(new SessionHistoryEntry(sessionId, startTime, null)); |
1143 | 1141 | writeSessionHistoryToFile(); |
1144 | 1142 | } |
1145 | 1143 |
|
1146 | 1144 | private void recordSessionStop(SessionId sessionId) { |
1147 | | - if (!isSessionOwner(sessionId)) { |
1148 | | - return; |
1149 | | - } |
1150 | 1145 | Instant stopTime = Instant.now(); |
1151 | | - Instant startTime = sessionStartTimes.remove(sessionId); |
1152 | | - if (startTime != null) { |
1153 | | - // Find and update the existing history entry |
1154 | | - sessionHistory.stream() |
1155 | | - .filter(entry -> entry.getSessionId().equals(sessionId)) |
1156 | | - .findFirst() |
1157 | | - .ifPresent(entry -> entry.setStopTime(stopTime)); |
1158 | | - writeSessionHistoryToFile(); |
| 1146 | + // Find and update the existing history entry |
| 1147 | + sessionHistory.stream() |
| 1148 | + .filter(entry -> entry.getSessionId().equals(sessionId)) |
| 1149 | + .findFirst() |
| 1150 | + .ifPresent( |
| 1151 | + entry -> { |
| 1152 | + entry.setStopTime(stopTime); |
| 1153 | + writeSessionHistoryToFile(); |
| 1154 | + }); |
| 1155 | + } |
| 1156 | + |
| 1157 | + private void cleanupSessionHistory(NodeStatus status) { |
| 1158 | + int maxHistorySize = 100; |
| 1159 | + if (!status.getNodeId().equals(getId()) || sessionHistory.size() < maxHistorySize) { |
| 1160 | + return; |
1159 | 1161 | } |
| 1162 | + |
| 1163 | + // Keep only the last 100 completed sessions |
| 1164 | + List<SessionHistoryEntry> completedSessions = |
| 1165 | + sessionHistory.stream() |
| 1166 | + .filter(entry -> entry.getStopTime() != null) |
| 1167 | + .sorted( |
| 1168 | + (a, b) -> |
| 1169 | + b.getStartTime().compareTo(a.getStartTime())) // Sort by start time descending |
| 1170 | + .limit(100) |
| 1171 | + .collect(Collectors.toList()); |
| 1172 | + |
| 1173 | + // Keep all ongoing sessions |
| 1174 | + List<SessionHistoryEntry> ongoingSessions = |
| 1175 | + sessionHistory.stream() |
| 1176 | + .filter(entry -> entry.getStopTime() == null) |
| 1177 | + .collect(Collectors.toList()); |
| 1178 | + |
| 1179 | + // Clear and rebuild the history queue |
| 1180 | + sessionHistory.clear(); |
| 1181 | + sessionHistory.addAll(completedSessions); |
| 1182 | + sessionHistory.addAll(ongoingSessions); |
| 1183 | + |
| 1184 | + // Write the cleaned history to file |
| 1185 | + writeSessionHistoryToFile(); |
1160 | 1186 | } |
1161 | 1187 |
|
1162 | 1188 | private void writeSessionHistoryToFile() { |
1163 | 1189 | if (sessionHistoryFilePath.isPresent()) { |
1164 | 1190 | try { |
1165 | 1191 | List<SessionHistoryEntry> sortedHistory = new ArrayList<>(sessionHistory); |
1166 | | - sortedHistory.sort((a, b) -> a.getStartTime().compareTo(b.getStartTime())); |
1167 | | - |
1168 | 1192 | String historyJson = JSON.toJson(sortedHistory); |
1169 | 1193 | Files.write( |
1170 | 1194 | sessionHistoryFilePath.get(), |
|
0 commit comments