Skip to content

Commit 00e2e0b

Browse files
committed
refactor session persistence to async debounced writes
Reduce runtime blocking by replacing sync session writes with a 5s async debounce queue and avoid persisting verbose tool traces to disk.
1 parent cd7a2e5 commit 00e2e0b

File tree

2 files changed

+102
-16
lines changed

2 files changed

+102
-16
lines changed

packages/config/local/sessions.ts

Lines changed: 102 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import * as fs from "fs";
22
import * as path from "path";
33
import * as os from "os";
4+
import { writeFile } from "fs/promises";
45
import { log } from "@/utils";
56

67
const readFileSync = fs.readFileSync;
7-
const writeFileSync = fs.writeFileSync;
88
const existsSync = fs.existsSync;
99
const mkdirSync = fs.mkdirSync;
1010
const readdirSync = fs.readdirSync;
@@ -14,6 +14,7 @@ const homedir = typeof os.homedir === "function" ? os.homedir : () => "";
1414

1515
const ODE_CONFIG_DIR = join(homedir(), ".config", "ode");
1616
const SESSIONS_DIR = join(ODE_CONFIG_DIR, "sessions");
17+
const SESSION_SAVE_DEBOUNCE_MS = 5000;
1718

1819
export interface TrackedTool {
1920
id: string;
@@ -39,7 +40,7 @@ export interface ActiveRequest {
3940
startedAt: number;
4041
lastUpdatedAt: number;
4142
currentText: string;
42-
tools: TrackedTool[];
43+
tools?: TrackedTool[];
4344
todos: TrackedTodo[];
4445
statusFrozen?: boolean;
4546
state: "processing" | "completed" | "failed";
@@ -79,6 +80,10 @@ export interface PersistedSession {
7980
// In-memory cache
8081
const activeSessions = new Map<string, PersistedSession>();
8182
const processedMessages = new Set<string>();
83+
const pendingWriteTimers = new Map<string, ReturnType<typeof setTimeout>>();
84+
const pendingWriteSnapshots = new Map<string, PersistedSession>();
85+
const writeChains = new Map<string, Promise<void>>();
86+
const deletedSessionKeys = new Set<string>();
8287

8388
function ensureSessionsDir(): void {
8489
if (!existsSync(SESSIONS_DIR)) {
@@ -96,6 +101,58 @@ function getSessionFilePath(sessionKey: string): string {
96101
return join(SESSIONS_DIR, `${safeKey}.json`);
97102
}
98103

104+
function sanitizeSessionForStorage(session: PersistedSession): PersistedSession {
105+
const snapshot = structuredClone(session);
106+
if (snapshot.activeRequest) {
107+
delete (snapshot.activeRequest as Partial<ActiveRequest>).tools;
108+
}
109+
return snapshot;
110+
}
111+
112+
function enqueueSessionWrite(sessionKey: string, immediate = false): void {
113+
const existingTimer = pendingWriteTimers.get(sessionKey);
114+
if (existingTimer) {
115+
clearTimeout(existingTimer);
116+
pendingWriteTimers.delete(sessionKey);
117+
}
118+
119+
const flush = () => {
120+
pendingWriteTimers.delete(sessionKey);
121+
if (deletedSessionKeys.has(sessionKey)) {
122+
pendingWriteSnapshots.delete(sessionKey);
123+
return;
124+
}
125+
const snapshot = pendingWriteSnapshots.get(sessionKey);
126+
if (!snapshot) return;
127+
pendingWriteSnapshots.delete(sessionKey);
128+
const filePath = getSessionFilePath(sessionKey);
129+
const payload = JSON.stringify(snapshot, null, 2);
130+
const previous = writeChains.get(sessionKey) ?? Promise.resolve();
131+
const next = previous
132+
.catch(() => undefined)
133+
.then(async () => {
134+
await writeFile(filePath, payload, "utf-8");
135+
})
136+
.catch((err) => {
137+
log.error("Failed to save session", { sessionKey, error: String(err) });
138+
})
139+
.finally(() => {
140+
if (writeChains.get(sessionKey) === next) {
141+
writeChains.delete(sessionKey);
142+
}
143+
});
144+
writeChains.set(sessionKey, next);
145+
};
146+
147+
if (immediate) {
148+
flush();
149+
return;
150+
}
151+
152+
const timer = setTimeout(flush, SESSION_SAVE_DEBOUNCE_MS);
153+
pendingWriteTimers.set(sessionKey, timer);
154+
}
155+
99156
export function loadSession(channelId: string, threadId: string): PersistedSession | null {
100157
const sessionKey = getSessionKey(channelId, threadId);
101158

@@ -119,6 +176,7 @@ export function loadSession(channelId: string, threadId: string): PersistedSessi
119176
};
120177
active.channelId = active.settingsChannelId || active.channelId || session.channelId;
121178
active.replyThreadId = active.replyThreadId || active.replyChannelId || session.threadId;
179+
active.tools = Array.isArray(active.tools) ? active.tools : [];
122180
}
123181
activeSessions.set(sessionKey, session);
124182
return session;
@@ -128,23 +186,42 @@ export function loadSession(channelId: string, threadId: string): PersistedSessi
128186
}
129187
}
130188

131-
export function saveSession(session: PersistedSession): void {
189+
export function saveSession(session: PersistedSession, options?: { immediate?: boolean }): void {
132190
ensureSessionsDir();
133191
const sessionKey = getSessionKey(session.channelId, session.threadId);
192+
deletedSessionKeys.delete(sessionKey);
134193
session.lastActivityAt = Date.now();
135194
activeSessions.set(sessionKey, session);
136195

137-
const filePath = getSessionFilePath(sessionKey);
138-
try {
139-
writeFileSync(filePath, JSON.stringify(session, null, 2));
140-
} catch (err) {
141-
log.error("Failed to save session", { sessionKey, error: String(err) });
142-
}
196+
pendingWriteSnapshots.set(sessionKey, sanitizeSessionForStorage(session));
197+
enqueueSessionWrite(sessionKey, options?.immediate ?? false);
143198
}
144199

145200
export function deleteSession(channelId: string, threadId: string): void {
146201
const sessionKey = getSessionKey(channelId, threadId);
147202
activeSessions.delete(sessionKey);
203+
deletedSessionKeys.add(sessionKey);
204+
205+
const timer = pendingWriteTimers.get(sessionKey);
206+
if (timer) {
207+
clearTimeout(timer);
208+
pendingWriteTimers.delete(sessionKey);
209+
}
210+
pendingWriteSnapshots.delete(sessionKey);
211+
const inFlight = writeChains.get(sessionKey);
212+
if (inFlight) {
213+
void inFlight.finally(() => {
214+
if (!deletedSessionKeys.has(sessionKey)) return;
215+
const pathAfterWrite = getSessionFilePath(sessionKey);
216+
if (existsSync(pathAfterWrite)) {
217+
try {
218+
unlinkSync(pathAfterWrite);
219+
} catch {
220+
// Ignore delete errors
221+
}
222+
}
223+
});
224+
}
148225

149226
const filePath = getSessionFilePath(sessionKey);
150227
if (existsSync(filePath)) {
@@ -189,8 +266,10 @@ export function updateActiveRequest(
189266
const session = loadSession(channelId, threadId);
190267
if (!session?.activeRequest) return;
191268

192-
Object.assign(session.activeRequest, updates, { lastUpdatedAt: Date.now() });
193-
saveSession(session);
269+
const sanitized = { ...updates } as Partial<ActiveRequest>;
270+
delete sanitized.tools;
271+
Object.assign(session.activeRequest, sanitized, { lastUpdatedAt: Date.now() });
272+
saveSession(session, { immediate: false });
194273
}
195274

196275
export function completeActiveRequest(
@@ -259,7 +338,11 @@ export function getActiveRequest(channelId: string, threadId: string): ActiveReq
259338

260339
export function loadAllSessions(): PersistedSession[] {
261340
ensureSessionsDir();
262-
const sessions: PersistedSession[] = [];
341+
const sessionsByKey = new Map<string, PersistedSession>();
342+
343+
for (const [sessionKey, session] of activeSessions.entries()) {
344+
sessionsByKey.set(sessionKey, session);
345+
}
263346

264347
try {
265348
const files = readdirSync(SESSIONS_DIR).filter(f => f.endsWith(".json"));
@@ -268,9 +351,13 @@ export function loadAllSessions(): PersistedSession[] {
268351
try {
269352
const data = readFileSync(filePath, "utf-8");
270353
const session = JSON.parse(data) as PersistedSession;
271-
sessions.push(session);
272354
const sessionKey = getSessionKey(session.channelId, session.threadId);
273-
activeSessions.set(sessionKey, session);
355+
if (!sessionsByKey.has(sessionKey)) {
356+
sessionsByKey.set(sessionKey, session);
357+
}
358+
if (!activeSessions.has(sessionKey)) {
359+
activeSessions.set(sessionKey, session);
360+
}
274361
} catch {
275362
// Skip invalid session files
276363
}
@@ -279,7 +366,7 @@ export function loadAllSessions(): PersistedSession[] {
279366
// Sessions dir doesn't exist yet
280367
}
281368

282-
return sessions;
369+
return Array.from(sessionsByKey.values());
283370
}
284371

285372
export function getSessionsWithPendingRequests(platform?: "slack" | "discord" | "lark"): PersistedSession[] {

packages/core/runtime/open-request.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ export async function runOpenRequest(params: {
134134
updateActiveRequest(context.channelId, context.threadId, {
135135
statusMessageTs: request.statusMessageTs,
136136
currentText: request.currentText,
137-
tools: request.tools,
138137
todos: request.todos,
139138
statusFrozen: request.statusFrozen,
140139
});

0 commit comments

Comments
 (0)