Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ode",
"version": "0.1.0",
"version": "0.1.1",
"description": "Coding anywhere with your coding agents connected",
"module": "packages/core/index.ts",
"type": "module",
Expand Down
152 changes: 136 additions & 16 deletions packages/config/local/sessions.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import * as fs from "fs";
import * as path from "path";
import * as os from "os";
import { writeFile } from "fs/promises";
import { log } from "@/utils";

const readFileSync = fs.readFileSync;
const writeFileSync = fs.writeFileSync;
const existsSync = fs.existsSync;
const mkdirSync = fs.mkdirSync;
const readdirSync = fs.readdirSync;
Expand All @@ -14,6 +14,8 @@ const homedir = typeof os.homedir === "function" ? os.homedir : () => "";

const ODE_CONFIG_DIR = join(homedir(), ".config", "ode");
const SESSIONS_DIR = join(ODE_CONFIG_DIR, "sessions");
const SESSION_SAVE_DEBOUNCE_MS = 5000;
const SESSION_RETENTION_MS = 7 * 24 * 60 * 60 * 1000;

export interface TrackedTool {
id: string;
Expand All @@ -39,7 +41,7 @@ export interface ActiveRequest {
startedAt: number;
lastUpdatedAt: number;
currentText: string;
tools: TrackedTool[];
tools?: TrackedTool[];
todos: TrackedTodo[];
statusFrozen?: boolean;
state: "processing" | "completed" | "failed";
Expand Down Expand Up @@ -79,6 +81,10 @@ export interface PersistedSession {
// In-memory cache
const activeSessions = new Map<string, PersistedSession>();
const processedMessages = new Set<string>();
const pendingWriteTimers = new Map<string, ReturnType<typeof setTimeout>>();
const pendingWriteSnapshots = new Map<string, PersistedSession>();
const writeChains = new Map<string, Promise<void>>();
const deletedSessionKeys = new Set<string>();

function ensureSessionsDir(): void {
if (!existsSync(SESSIONS_DIR)) {
Expand All @@ -96,12 +102,84 @@ function getSessionFilePath(sessionKey: string): string {
return join(SESSIONS_DIR, `${safeKey}.json`);
}

function getSessionLastActiveAt(session: PersistedSession): number {
if (Number.isFinite(session.lastActivityAt)) {
return session.lastActivityAt;
}
if (Number.isFinite(session.createdAt)) {
return session.createdAt;
}
return 0;
}

function isSessionExpired(session: PersistedSession, now = Date.now()): boolean {
const lastActiveAt = getSessionLastActiveAt(session);
return now - lastActiveAt >= SESSION_RETENTION_MS;
}

function sanitizeSessionForStorage(session: PersistedSession): PersistedSession {
const snapshot = structuredClone(session);
if (snapshot.activeRequest) {
delete (snapshot.activeRequest as Partial<ActiveRequest>).tools;
}
return snapshot;
}

function enqueueSessionWrite(sessionKey: string, immediate = false): void {
const existingTimer = pendingWriteTimers.get(sessionKey);
if (existingTimer) {
clearTimeout(existingTimer);
pendingWriteTimers.delete(sessionKey);
}

const flush = () => {
pendingWriteTimers.delete(sessionKey);
if (deletedSessionKeys.has(sessionKey)) {
pendingWriteSnapshots.delete(sessionKey);
return;
}
const snapshot = pendingWriteSnapshots.get(sessionKey);
if (!snapshot) return;
pendingWriteSnapshots.delete(sessionKey);
const filePath = getSessionFilePath(sessionKey);
const payload = JSON.stringify(snapshot, null, 2);
const previous = writeChains.get(sessionKey) ?? Promise.resolve();
const next = previous
.catch(() => undefined)
.then(async () => {
await writeFile(filePath, payload, "utf-8");
})
.catch((err) => {
log.error("Failed to save session", { sessionKey, error: String(err) });
})
.finally(() => {
if (writeChains.get(sessionKey) === next) {
writeChains.delete(sessionKey);
}
});
writeChains.set(sessionKey, next);
};

if (immediate) {
flush();
return;
}

const timer = setTimeout(flush, SESSION_SAVE_DEBOUNCE_MS);
pendingWriteTimers.set(sessionKey, timer);
}

export function loadSession(channelId: string, threadId: string): PersistedSession | null {
const sessionKey = getSessionKey(channelId, threadId);

// Check cache first
if (activeSessions.has(sessionKey)) {
return activeSessions.get(sessionKey)!;
const cached = activeSessions.get(sessionKey)!;
if (isSessionExpired(cached)) {
deleteSession(channelId, threadId);
return null;
}
return cached;
}

const filePath = getSessionFilePath(sessionKey);
Expand All @@ -112,13 +190,18 @@ export function loadSession(channelId: string, threadId: string): PersistedSessi
try {
const data = readFileSync(filePath, "utf-8");
const session = JSON.parse(data) as PersistedSession;
if (isSessionExpired(session)) {
deleteSession(channelId, threadId);
return null;
}
if (session.activeRequest) {
const active = session.activeRequest as ActiveRequest & {
settingsChannelId?: string;
replyChannelId?: string;
};
active.channelId = active.settingsChannelId || active.channelId || session.channelId;
active.replyThreadId = active.replyThreadId || active.replyChannelId || session.threadId;
active.tools = Array.isArray(active.tools) ? active.tools : [];
}
activeSessions.set(sessionKey, session);
return session;
Expand All @@ -128,23 +211,42 @@ export function loadSession(channelId: string, threadId: string): PersistedSessi
}
}

export function saveSession(session: PersistedSession): void {
export function saveSession(session: PersistedSession, options?: { immediate?: boolean }): void {
ensureSessionsDir();
const sessionKey = getSessionKey(session.channelId, session.threadId);
deletedSessionKeys.delete(sessionKey);
session.lastActivityAt = Date.now();
activeSessions.set(sessionKey, session);

const filePath = getSessionFilePath(sessionKey);
try {
writeFileSync(filePath, JSON.stringify(session, null, 2));
} catch (err) {
log.error("Failed to save session", { sessionKey, error: String(err) });
}
pendingWriteSnapshots.set(sessionKey, sanitizeSessionForStorage(session));
enqueueSessionWrite(sessionKey, options?.immediate ?? false);
}

export function deleteSession(channelId: string, threadId: string): void {
const sessionKey = getSessionKey(channelId, threadId);
activeSessions.delete(sessionKey);
deletedSessionKeys.add(sessionKey);

const timer = pendingWriteTimers.get(sessionKey);
if (timer) {
clearTimeout(timer);
pendingWriteTimers.delete(sessionKey);
}
pendingWriteSnapshots.delete(sessionKey);
const inFlight = writeChains.get(sessionKey);
if (inFlight) {
void inFlight.finally(() => {
if (!deletedSessionKeys.has(sessionKey)) return;
const pathAfterWrite = getSessionFilePath(sessionKey);
if (existsSync(pathAfterWrite)) {
try {
unlinkSync(pathAfterWrite);
} catch {
// Ignore delete errors
}
}
});
}

const filePath = getSessionFilePath(sessionKey);
if (existsSync(filePath)) {
Expand Down Expand Up @@ -189,8 +291,10 @@ export function updateActiveRequest(
const session = loadSession(channelId, threadId);
if (!session?.activeRequest) return;

Object.assign(session.activeRequest, updates, { lastUpdatedAt: Date.now() });
saveSession(session);
const sanitized = { ...updates } as Partial<ActiveRequest>;
delete sanitized.tools;
Object.assign(session.activeRequest, sanitized, { lastUpdatedAt: Date.now() });
saveSession(session, { immediate: false });
}

export function completeActiveRequest(
Expand Down Expand Up @@ -259,7 +363,15 @@ export function getActiveRequest(channelId: string, threadId: string): ActiveReq

export function loadAllSessions(): PersistedSession[] {
ensureSessionsDir();
const sessions: PersistedSession[] = [];
const sessionsByKey = new Map<string, PersistedSession>();

for (const [sessionKey, session] of Array.from(activeSessions.entries())) {
if (isSessionExpired(session)) {
deleteSession(session.channelId, session.threadId);
continue;
}
sessionsByKey.set(sessionKey, session);
}

try {
const files = readdirSync(SESSIONS_DIR).filter(f => f.endsWith(".json"));
Expand All @@ -268,9 +380,17 @@ export function loadAllSessions(): PersistedSession[] {
try {
const data = readFileSync(filePath, "utf-8");
const session = JSON.parse(data) as PersistedSession;
sessions.push(session);
if (isSessionExpired(session)) {
deleteSession(session.channelId, session.threadId);
continue;
}
const sessionKey = getSessionKey(session.channelId, session.threadId);
activeSessions.set(sessionKey, session);
if (!sessionsByKey.has(sessionKey)) {
sessionsByKey.set(sessionKey, session);
}
if (!activeSessions.has(sessionKey)) {
activeSessions.set(sessionKey, session);
}
} catch {
// Skip invalid session files
}
Expand All @@ -279,7 +399,7 @@ export function loadAllSessions(): PersistedSession[] {
// Sessions dir doesn't exist yet
}

return sessions;
return Array.from(sessionsByKey.values());
}

export function getSessionsWithPendingRequests(platform?: "slack" | "discord" | "lark"): PersistedSession[] {
Expand Down
26 changes: 25 additions & 1 deletion packages/config/local/settings.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as fs from "fs";
import * as path from "path";
import * as os from "os";
import { writeFile } from "fs/promises";
import {
setChannelCwd as setChannelCwdInConfig,
} from "./ode";
Expand All @@ -16,6 +17,7 @@ const homedir = typeof os.homedir === "function" ? os.homedir : () => "";
const ODE_CONFIG_DIR = join(homedir(), ".config", "ode");
const SETTINGS_FILE = join(ODE_CONFIG_DIR, "settings.json");
const AGENTS_DIR = join(ODE_CONFIG_DIR, "agents");
const SETTINGS_SAVE_DEBOUNCE_MS = 1000;

export interface ChannelSettings {
threadSessions: Record<string, string>; // threadId -> sessionId
Expand All @@ -34,6 +36,9 @@ export interface Settings {
}

let cachedSettings: Settings | null = null;
let pendingSettingsWriteTimer: ReturnType<typeof setTimeout> | null = null;
let pendingSettingsSnapshot: Settings | null = null;
let settingsWriteChain: Promise<void> = Promise.resolve();

function ensureDataDir(): void {
if (!existsSync(ODE_CONFIG_DIR)) {
Expand Down Expand Up @@ -95,7 +100,26 @@ export function saveSettings(settings: Settings): void {
...settings,
channels: normalizedChannels,
};
writeFileSync(SETTINGS_FILE, JSON.stringify(cachedSettings, null, 2));

pendingSettingsSnapshot = structuredClone(cachedSettings);
if (pendingSettingsWriteTimer) {
clearTimeout(pendingSettingsWriteTimer);
pendingSettingsWriteTimer = null;
}

pendingSettingsWriteTimer = setTimeout(() => {
pendingSettingsWriteTimer = null;
const snapshot = pendingSettingsSnapshot;
if (!snapshot) return;
pendingSettingsSnapshot = null;
const payload = JSON.stringify(snapshot, null, 2);
settingsWriteChain = settingsWriteChain
.catch(() => undefined)
.then(async () => {
await writeFile(SETTINGS_FILE, payload, "utf-8");
})
.catch(() => undefined);
}, SETTINGS_SAVE_DEBOUNCE_MS);
}

export function getPendingRestartMessages(): PendingRestartMessage[] {
Expand Down
1 change: 0 additions & 1 deletion packages/core/runtime/open-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ export async function runOpenRequest(params: {
updateActiveRequest(context.channelId, context.threadId, {
statusMessageTs: request.statusMessageTs,
currentText: request.currentText,
tools: request.tools,
todos: request.todos,
statusFrozen: request.statusFrozen,
});
Expand Down
73 changes: 73 additions & 0 deletions packages/core/test/session-retention.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { describe, expect, it } from "bun:test";
import * as fs from "fs";
import * as os from "os";
import * as path from "path";
import { deleteSession, loadSession } from "@/config/local/sessions";

const SESSIONS_DIR = path.join(os.homedir(), ".config", "ode", "sessions");

function getSessionFilePath(channelId: string, threadId: string): string {
const sessionKey = `${channelId}-${threadId}`;
const safeKey = sessionKey.replace(/[^a-zA-Z0-9-]/g, "_");
return path.join(SESSIONS_DIR, `${safeKey}.json`);
}

function writeSessionFixture(params: {
channelId: string;
threadId: string;
createdAt: number;
lastActivityAt: number;
}): string {
const { channelId, threadId, createdAt, lastActivityAt } = params;
const filePath = getSessionFilePath(channelId, threadId);
fs.mkdirSync(SESSIONS_DIR, { recursive: true });
fs.writeFileSync(filePath, JSON.stringify({
sessionId: `session-${threadId}`,
channelId,
threadId,
workingDirectory: "/tmp",
createdAt,
lastActivityAt,
}, null, 2));
return filePath;
}

describe("session retention", () => {
it("deletes session file when lastActivityAt is older than one week", () => {
const channelId = "RETENTION-C1";
const threadId = "RETENTION-T1";
const now = Date.now();
const staleAt = now - 8 * 24 * 60 * 60 * 1000;
const filePath = writeSessionFixture({
channelId,
threadId,
createdAt: staleAt,
lastActivityAt: staleAt,
});

expect(fs.existsSync(filePath)).toBeTrue();
expect(loadSession(channelId, threadId)).toBeNull();
expect(fs.existsSync(filePath)).toBeFalse();

deleteSession(channelId, threadId);
});

it("keeps session file when lastActivityAt is within one week", () => {
const channelId = "RETENTION-C2";
const threadId = "RETENTION-T2";
const now = Date.now();
const recentAt = now - 2 * 24 * 60 * 60 * 1000;
const filePath = writeSessionFixture({
channelId,
threadId,
createdAt: recentAt,
lastActivityAt: recentAt,
});

const loaded = loadSession(channelId, threadId);
expect(loaded).not.toBeNull();
expect(fs.existsSync(filePath)).toBeTrue();

deleteSession(channelId, threadId);
});
});