From 6b2baffc57cf5a36d9f1ae2918e79a978b84b594 Mon Sep 17 00:00:00 2001 From: Rolando Santamaria Maso Date: Sat, 6 Jun 2026 14:12:45 +0200 Subject: [PATCH 1/2] =?UTF-8?q?feat(memory):=20episode=20lifecycle=20?= =?UTF-8?q?=E2=80=94=20dedup-on-write=20+=20eviction=20(cap/TTL)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit EpisodeStore had no dedup and no eviction: episodes accumulated forever and near-duplicate sessions piled up unbounded (disk growth, noisier recall). This adds bounded, de-duplicated episode storage, config-gated with safe defaults, no on-disk format change. - Dedup-on-write: a new episode whose cosine similarity to an existing one is >= episode_dedup_threshold (default 0.92) REPLACES it (newest-wins). Uses an ephemeral RP embedder (the NewRPRanker primitive) over full on-disk summaries — never the shared dirty-rebuild vector index, avoiding mid-write re-entrancy. Provenance-gated: an untrusted near-dup can never evict a trusted/approved episode (trustRank mirrors the recall filter). - Eviction: prune-on-write applies TTL (episode_ttl_days, default 0/off) then a count cap (max_episodes, default 500), deleting both the .md file and the index entry. Crash-safe order: files -> writeIndex -> markDirty. Also exposes EpisodeStore.Prune() for session-end/CLI use. - Locking: dedup + .md write + index update + prune + markDirty now happen under a single e.mu hold (new writeLocked). Fixes a latent bug where re-writing the same sessionID appended a duplicate index entry. - Config: EpisodeDedupThreshold / MaxEpisodes / EpisodeTTLDays wired through MemoryConfig, DefaultMemoryConfig, both overlay sites, and a new NewEpisodeStoreWithLifecycle (bare NewEpisodeStore keeps lifecycle off, so existing callers/tests are unaffected). Documented in docs/CONFIG.md. Fact supersession is deferred (facts lack per-entry metadata; already covered by merge-on-write + session-end LLM consolidation). Tests: dedup replace/threshold/disabled, provenance safety (both directions), eviction by count + TTL, TTL-disabled, self-overwrite regression, evicted-id absent from recall, -race concurrency (16 goroutines), config defaults + overlay-to-store wiring. Co-Authored-By: Claude Opus 4.8 --- docs/CONFIG.md | 8 +- internal/config/loader.go | 9 + internal/memory/episode_lifecycle_test.go | 339 ++++++++++++++++++++++ internal/memory/episodes.go | 201 +++++++++++-- internal/memory/memory.go | 30 +- 5 files changed, 566 insertions(+), 21 deletions(-) create mode 100644 internal/memory/episode_lifecycle_test.go diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 69bca85..8887f3f 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -203,7 +203,10 @@ The `memory` section controls the persistent memory system (see [docs/MEMORY.md] "llm_consolidate": true, "merge_threshold": 0.7, "add_threshold": 0.3, - "auto_approve_episodes": false + "auto_approve_episodes": false, + "episode_dedup_threshold": 0.92, + "max_episodes": 500, + "episode_ttl_days": 0 } } ``` @@ -225,6 +228,9 @@ The `memory` section controls the persistent memory system (see [docs/MEMORY.md] | `merge_threshold` | 0.7 | Cosine similarity above which two fact entries are **auto-merged** without an LLM call (0.0–1.0). Raise it to merge less aggressively; lower it to merge more. | | `add_threshold` | 0.3 | Cosine similarity below which a new fact entry is **auto-added** without an LLM call (0.0–1.0). Between `add_threshold` and `merge_threshold` the LLM decides. Keep `add_threshold` < `merge_threshold`. | | `auto_approve_episodes` | false | **Security trade-off.** When true, untrusted episodes (sessions that touched web/MCP/out-of-workspace content) are auto-approved at session end so they are recalled without a manual `odek memory promote`. Leaving it `false` keeps the human review gate (recommended). | +| `episode_dedup_threshold` | 0.92 | Cosine similarity above which a newly written episode is treated as a near-duplicate of an existing one and **replaces** it (newest wins). An untrusted episode never replaces a trusted/approved one. `0` disables dedup. | +| `max_episodes` | 500 | Maximum number of stored episodes. On each write, episodes beyond this count are evicted oldest-first (both the summary file and the index entry). `0` disables the cap. | +| `episode_ttl_days` | 0 | Evict episodes older than this many days. `0` (default) disables TTL-based eviction. | ### ⚠️ `extract_facts` — automatic fact learning (opt-in, off by default) diff --git a/internal/config/loader.go b/internal/config/loader.go index 827d7f9..77ccd84 100644 --- a/internal/config/loader.go +++ b/internal/config/loader.go @@ -859,6 +859,15 @@ func resolveMemory(cfg *memory.MemoryConfig) memory.MemoryConfig { if cfg.AutoApproveEpisodes != nil { def.AutoApproveEpisodes = cfg.AutoApproveEpisodes } + if cfg.EpisodeDedupThreshold > 0 { + def.EpisodeDedupThreshold = cfg.EpisodeDedupThreshold + } + if cfg.MaxEpisodes > 0 { + def.MaxEpisodes = cfg.MaxEpisodes + } + if cfg.EpisodeTTLDays > 0 { + def.EpisodeTTLDays = cfg.EpisodeTTLDays + } return def } diff --git a/internal/memory/episode_lifecycle_test.go b/internal/memory/episode_lifecycle_test.go new file mode 100644 index 0000000..18597d2 --- /dev/null +++ b/internal/memory/episode_lifecycle_test.go @@ -0,0 +1,339 @@ +package memory + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "testing" + "time" +) + +// countMDFiles returns the number of *.md episode summary files in dir. +func countMDFiles(t *testing.T, dir string) int { + t.Helper() + matches, err := filepath.Glob(filepath.Join(dir, "*.md")) + if err != nil { + t.Fatalf("glob: %v", err) + } + return len(matches) +} + +// writeIndexDirect writes index.json + matching .md files straight to disk, +// bypassing WriteWithProvenance so tests can control CreatedAt (e.g. backdate +// for TTL). Returns nothing; fatals on error. +func writeIndexDirect(t *testing.T, dir string, metas []EpisodeMeta) { + t.Helper() + if err := os.MkdirAll(dir, 0700); err != nil { + t.Fatal(err) + } + for _, m := range metas { + if err := os.WriteFile(filepath.Join(dir, m.SessionID+".md"), []byte(m.Summary), 0600); err != nil { + t.Fatal(err) + } + } + data, err := json.MarshalIndent(metas, "", " ") + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(dir, episodeIndexFile), data, 0600); err != nil { + t.Fatal(err) + } +} + +// ── Dedup ────────────────────────────────────────────────────────────── + +func TestEpisodeDedup_ReplaceNewestWins(t *testing.T) { + resetEpIdxes() + dir := t.TempDir() + es := NewEpisodeStoreWithLifecycle(dir, nil, 0.92, 0, 0) + + const summary = "refactored the postgres connection pool and added retry logic" + writeTestEpisode(t, es, "20260101-a", summary) + time.Sleep(2 * time.Millisecond) + writeTestEpisode(t, es, "20260102-b", summary) // identical → cosine 1.0 → replace + + idx, err := es.ReadIndex() + if err != nil { + t.Fatal(err) + } + if len(idx) != 1 { + t.Fatalf("expected 1 episode after dedup, got %d", len(idx)) + } + if idx[0].SessionID != "20260102-b" { + t.Errorf("expected newest to win (20260102-b), kept %q", idx[0].SessionID) + } + if n := countMDFiles(t, dir); n != 1 { + t.Errorf("expected 1 .md file after dedup, got %d", n) + } +} + +func TestEpisodeDedup_BelowThresholdKeepsBoth(t *testing.T) { + resetEpIdxes() + dir := t.TempDir() + es := NewEpisodeStoreWithLifecycle(dir, nil, 0.92, 0, 0) + + // Disjoint vocabulary → cosine well below 0.92. + writeTestEpisode(t, es, "20260101-a", "refactored the postgres database schema migration") + writeTestEpisode(t, es, "20260102-b", "updated frontend button hover animation styling") + + idx, _ := es.ReadIndex() + if len(idx) != 2 { + t.Fatalf("expected 2 distinct episodes, got %d", len(idx)) + } +} + +func TestEpisodeDedup_Disabled(t *testing.T) { + resetEpIdxes() + dir := t.TempDir() + es := NewEpisodeStoreWithLifecycle(dir, nil, 0, 0, 0) // dedup off + + const summary = "identical episode summary content here" + writeTestEpisode(t, es, "20260101-a", summary) + writeTestEpisode(t, es, "20260102-b", summary) + + idx, _ := es.ReadIndex() + if len(idx) != 2 { + t.Fatalf("dedup disabled should keep both identical episodes, got %d", len(idx)) + } +} + +func TestEpisodeDedup_ProvenanceSafety(t *testing.T) { + const summary = "investigated the auth token refresh flow in detail" + + t.Run("untrusted does not evict trusted", func(t *testing.T) { + resetEpIdxes() + dir := t.TempDir() + es := NewEpisodeStoreWithLifecycle(dir, nil, 0.92, 0, 0) + + if err := es.WriteWithProvenance("20260101-a", summary, 5, EpisodeProvenance{}); err != nil { + t.Fatal(err) // trusted + } + if err := es.WriteWithProvenance("20260102-b", summary, 5, EpisodeProvenance{Untrusted: true}); err != nil { + t.Fatal(err) // untrusted near-dup + } + + idx, _ := es.ReadIndex() + if len(idx) != 2 { + t.Fatalf("untrusted near-dup must NOT replace trusted; expected 2, got %d", len(idx)) + } + // The trusted episode must still be present. + var trustedPresent bool + for _, m := range idx { + if m.SessionID == "20260101-a" && !m.Provenance.Untrusted { + trustedPresent = true + } + } + if !trustedPresent { + t.Error("trusted episode 20260101-a was lost") + } + }) + + t.Run("trusted evicts untrusted", func(t *testing.T) { + resetEpIdxes() + dir := t.TempDir() + es := NewEpisodeStoreWithLifecycle(dir, nil, 0.92, 0, 0) + + if err := es.WriteWithProvenance("20260101-a", summary, 5, EpisodeProvenance{Untrusted: true}); err != nil { + t.Fatal(err) // untrusted + } + if err := es.WriteWithProvenance("20260102-b", summary, 5, EpisodeProvenance{}); err != nil { + t.Fatal(err) // trusted near-dup → may replace (upgrade) + } + + idx, _ := es.ReadIndex() + if len(idx) != 1 { + t.Fatalf("trusted near-dup should replace untrusted; expected 1, got %d", len(idx)) + } + if idx[0].SessionID != "20260102-b" || idx[0].Provenance.Untrusted { + t.Errorf("expected trusted 20260102-b to remain, got %+v", idx[0]) + } + }) +} + +// ── Eviction ─────────────────────────────────────────────────────────── + +func TestEpisodeEviction_ByCount(t *testing.T) { + resetEpIdxes() + dir := t.TempDir() + es := NewEpisodeStoreWithLifecycle(dir, nil, 0, 3, 0) // dedup off, cap 3 + + for i := 0; i < 5; i++ { + writeTestEpisode(t, es, fmt.Sprintf("20260101-%02d", i), + fmt.Sprintf("session number %d distinct work item alpha%d", i, i)) + time.Sleep(2 * time.Millisecond) // ensure strictly increasing CreatedAt + } + + idx, _ := es.ReadIndex() + if len(idx) != 3 { + t.Fatalf("expected cap of 3, got %d", len(idx)) + } + if n := countMDFiles(t, dir); n != 3 { + t.Errorf("expected 3 .md files after eviction, got %d", n) + } + // The 3 newest (02,03,04) must remain; oldest (00,01) evicted. + for _, m := range idx { + if m.SessionID == "20260101-00" || m.SessionID == "20260101-01" { + t.Errorf("oldest episode %q should have been evicted", m.SessionID) + } + } +} + +func TestEpisodeEviction_ByTTL(t *testing.T) { + resetEpIdxes() + dir := t.TempDir() + now := time.Now().UTC() + + // Two fresh, one 10 days old. + writeIndexDirect(t, dir, []EpisodeMeta{ + {SessionID: "20260101-a", CreatedAt: now.Add(-1 * time.Hour), Summary: "fresh one"}, + {SessionID: "20260101-b", CreatedAt: now.Add(-2 * time.Hour), Summary: "fresh two"}, + {SessionID: "20260101-c", CreatedAt: now.Add(-10 * 24 * time.Hour), Summary: "stale one"}, + }) + + es := NewEpisodeStoreWithLifecycle(dir, nil, 0, 0, 7) // TTL 7 days + if err := es.Prune(); err != nil { + t.Fatalf("Prune: %v", err) + } + + idx, _ := es.ReadIndex() + if len(idx) != 2 { + t.Fatalf("expected 2 fresh episodes after TTL prune, got %d", len(idx)) + } + for _, m := range idx { + if m.SessionID == "20260101-c" { + t.Error("stale episode 20260101-c should have been evicted by TTL") + } + } + if _, err := os.Stat(filepath.Join(dir, "20260101-c.md")); !os.IsNotExist(err) { + t.Error("stale .md file should have been removed") + } +} + +func TestEpisodeEviction_TTLDisabled(t *testing.T) { + resetEpIdxes() + dir := t.TempDir() + now := time.Now().UTC() + writeIndexDirect(t, dir, []EpisodeMeta{ + {SessionID: "20260101-a", CreatedAt: now.Add(-365 * 24 * time.Hour), Summary: "ancient"}, + }) + + es := NewEpisodeStoreWithLifecycle(dir, nil, 0, 0, 0) // TTL disabled + if err := es.Prune(); err != nil { + t.Fatalf("Prune: %v", err) + } + idx, _ := es.ReadIndex() + if len(idx) != 1 { + t.Fatalf("TTL disabled must retain old episodes, got %d", len(idx)) + } +} + +// ── Self-overwrite ───────────────────────────────────────────────────── + +func TestEpisodeWrite_SelfOverwriteSingleEntry(t *testing.T) { + resetEpIdxes() + dir := t.TempDir() + es := NewEpisodeStoreWithLifecycle(dir, nil, 0, 0, 0) + + writeTestEpisode(t, es, "20260101-a", "first version of the summary") + writeTestEpisode(t, es, "20260101-a", "second version of the summary") + + idx, _ := es.ReadIndex() + if len(idx) != 1 { + t.Fatalf("re-writing same sessionID must not duplicate the index entry, got %d", len(idx)) + } + full, _ := es.Read("20260101-a") + if full != "second version of the summary" { + t.Errorf("expected the latest summary on disk, got %q", full) + } +} + +// ── Recall consistency ───────────────────────────────────────────────── + +func TestEpisodeLifecycle_EvictedAbsentFromRecall(t *testing.T) { + resetEpIdxes() + dir := t.TempDir() + es := NewEpisodeStoreWithLifecycle(dir, nil, 0, 2, 0) // cap 2 + + writeTestEpisode(t, es, "20260101-00", "postgres database connection pooling work") + time.Sleep(2 * time.Millisecond) + writeTestEpisode(t, es, "20260101-01", "redis caching layer latency tuning") + time.Sleep(2 * time.Millisecond) + writeTestEpisode(t, es, "20260101-02", "kafka consumer rebalance bugfix") // evicts -00 + + got, err := es.recallByVector("postgres database", 5) + if err != nil { + t.Fatalf("recallByVector: %v", err) + } + for _, m := range got { + if m.SessionID == "20260101-00" { + t.Error("evicted episode 20260101-00 must not appear in recall (index should have rebuilt)") + } + } +} + +// ── Concurrency ──────────────────────────────────────────────────────── + +func TestEpisodeLifecycle_ConcurrentSafety(t *testing.T) { + resetEpIdxes() + dir := t.TempDir() + const maxEp = 5 + es := NewEpisodeStoreWithLifecycle(dir, nil, 0.92, maxEp, 0) + + var wg sync.WaitGroup + for i := 0; i < 16; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + // Mix of distinct and overlapping (near-dup) summaries. + id := fmt.Sprintf("20260101-%02d", i) + summary := fmt.Sprintf("worked on subsystem %d with shared common boilerplate text", i%4) + _ = es.WriteWithProvenance(id, summary, 5, EpisodeProvenance{}) + _, _ = es.recallByVector("subsystem common", 3) + }(i) + } + wg.Wait() + + idx, err := es.ReadIndex() + if err != nil { + t.Fatalf("ReadIndex: %v", err) + } + if len(idx) > maxEp { + t.Errorf("index length %d exceeds cap %d", len(idx), maxEp) + } +} + +// ── Config wiring ────────────────────────────────────────────────────── + +func TestMemoryConfig_EpisodeLifecycleDefaults(t *testing.T) { + d := DefaultMemoryConfig() + if d.EpisodeDedupThreshold != defaultEpisodeDedupThreshold { + t.Errorf("EpisodeDedupThreshold default = %v, want %v", d.EpisodeDedupThreshold, defaultEpisodeDedupThreshold) + } + if d.MaxEpisodes != defaultMaxEpisodes { + t.Errorf("MaxEpisodes default = %d, want %d", d.MaxEpisodes, defaultMaxEpisodes) + } + if d.EpisodeTTLDays != 0 { + t.Errorf("EpisodeTTLDays default = %d, want 0 (disabled)", d.EpisodeTTLDays) + } +} + +func TestMemoryConfig_EpisodeLifecycleOverlayWiredToStore(t *testing.T) { + resetEpIdxes() + dir := t.TempDir() + cfg := DefaultMemoryConfig() + cfg.EpisodeDedupThreshold = 0 // disable dedup to isolate the cap + cfg.MaxEpisodes = 2 + mm := NewMemoryManager(dir, nil, cfg) + + for i := 0; i < 4; i++ { + _ = mm.episodes.WriteWithProvenance(fmt.Sprintf("20260101-%02d", i), + fmt.Sprintf("distinct task %d zeta%d", i, i), 5, EpisodeProvenance{}) + time.Sleep(2 * time.Millisecond) + } + idx, _ := mm.episodes.ReadIndex() + if len(idx) != 2 { + t.Fatalf("NewMemoryManager should wire MaxEpisodes=2 into the store; got %d", len(idx)) + } +} diff --git a/internal/memory/episodes.go b/internal/memory/episodes.go index 6d969e6..d18cb72 100644 --- a/internal/memory/episodes.go +++ b/internal/memory/episodes.go @@ -52,6 +52,14 @@ type EpisodeStore struct { idxCache []EpisodeMeta // cached index, nil = not loaded muCache sync.RWMutex // fine-grained lock for cache reads + // Lifecycle policy (see NewEpisodeStoreWithLifecycle). Zero values disable + // each mechanism, so the bare NewEpisodeStore constructor keeps the legacy + // behavior (no dedup, no eviction); production wiring supplies defaults via + // MemoryConfig. + dedupThreshold float32 // cosine ≥ this → new episode replaces near-duplicate; 0 disables + maxEpisodes int // keep at most this many episodes; 0 disables the cap + ttlDays int // evict episodes older than this many days; 0 disables TTL + // queryCache caches the last Search query result to avoid // re-ranking identical queries on consecutive turns. // Protected by muQuery. @@ -60,15 +68,27 @@ type EpisodeStore struct { muQuery sync.RWMutex } -// NewEpisodeStore creates an EpisodeStore rooted at dir. If rankFn is nil, -// a default ranker is used (SimpleCall-based — requires LLM client). +// NewEpisodeStore creates an EpisodeStore rooted at dir with lifecycle +// management disabled (no dedup, no eviction). If rankFn is nil, a default +// ranker is used (SimpleCall-based — requires LLM client). func NewEpisodeStore(dir string, rankFn RankStrategy) *EpisodeStore { + return NewEpisodeStoreWithLifecycle(dir, rankFn, 0, 0, 0) +} + +// NewEpisodeStoreWithLifecycle creates an EpisodeStore with dedup + eviction +// policy. dedupThreshold is the cosine above which a new episode replaces an +// existing near-duplicate (0 disables); maxEpisodes caps the stored count +// (0 disables); ttlDays evicts episodes older than that many days (0 disables). +func NewEpisodeStoreWithLifecycle(dir string, rankFn RankStrategy, dedupThreshold float32, maxEpisodes, ttlDays int) *EpisodeStore { if rankFn == nil { rankFn = defaultRanker } return &EpisodeStore{ - dir: dir, - rankFn: rankFn, + dir: dir, + rankFn: rankFn, + dedupThreshold: dedupThreshold, + maxEpisodes: maxEpisodes, + ttlDays: ttlDays, } } @@ -98,24 +118,66 @@ func (e *EpisodeStore) WriteWithProvenance(sessionID, summary string, turns int, summary = truncateAtRune(summary, maxEpisodeSummaryBytes) + "..." } - // Write summary file + e.mu.Lock() + defer e.mu.Unlock() + return e.writeLocked(sessionID, summary, turns, prov) +} + +// writeLocked performs the full episode write under e.mu: dedup against +// existing episodes, write the summary file, update + prune the index, and +// mark the vector index dirty. File mutations happen before writeIndex, which +// happens before markDirty, so a crash leaves at most a dangling index entry +// (rebuild/recall tolerate a missing .md) rather than an orphan file in the +// index. Caller must hold e.mu. +func (e *EpisodeStore) writeLocked(sessionID, summary string, turns int, prov EpisodeProvenance) error { + idx, err := e.ReadIndex() + if err != nil { + idx = []EpisodeMeta{} + } + + // Drop any existing entry for this same sessionID (re-running a session + // overwrites its episode rather than appending a duplicate index entry). + idx = removeBySessionID(idx, sessionID) + + // Dedup: if a near-duplicate exists, replace it with this newer episode — + // but never let an untrusted episode evict a trusted/approved one. + if e.dedupThreshold > 0 { + if dupIdx, sim := e.findDuplicate(summary, idx); dupIdx >= 0 && sim >= e.dedupThreshold { + if trustRank(prov) >= trustRank(idx[dupIdx].Provenance) { + _ = os.Remove(filepath.Join(e.dir, idx[dupIdx].SessionID+".md")) + idx = append(idx[:dupIdx], idx[dupIdx+1:]...) + } + } + } + + // Write the summary file. path := filepath.Join(e.dir, sessionID+".md") if err := os.WriteFile(path, []byte(summary), 0600); err != nil { return fmt.Errorf("memory: write episode: %w", err) } - // Mark the episode vector index dirty so it rebuilds on the next recall, - // picking up this new episode. No embedding on the write path. - sharedEpisodeIndex(e.dir).markDirty() - - // Update index - return e.addToIndex(EpisodeMeta{ + idx = append(idx, EpisodeMeta{ SessionID: sessionID, Turns: turns, CreatedAt: time.Now().UTC(), Summary: truncateForIndex(summary), Provenance: prov, }) + + // Evict by TTL + count cap; remove the corresponding summary files. + idx, removed := e.pruneLocked(idx) + for _, sid := range removed { + _ = os.Remove(filepath.Join(e.dir, sid+".md")) + } + + if err := e.writeIndex(idx); err != nil { + return err + } + + // Mark the vector index dirty so it rebuilds on the next recall, picking up + // the new episode and dropping any evicted/replaced ids. + sharedEpisodeIndex(e.dir).markDirty() + return nil } // WriteIfEnough calls Write only if turns >= threshold. @@ -348,21 +410,122 @@ func (e *EpisodeStore) PendingReview() ([]EpisodeMeta, error) { return pending, nil } -// ── Index helpers ───────────────────────────────────────────────────── +// ── Lifecycle helpers ───────────────────────────────────────────────── -// addToIndex appends an entry to the index and writes it. -// Caller must hold e.mu (acquired by Write). -func (e *EpisodeStore) addToIndex(meta EpisodeMeta) error { +// Prune evicts episodes by TTL and count cap (see NewEpisodeStoreWithLifecycle) +// and removes their summary files. Safe to call at session end or from a CLI. +// No-op when both the cap and TTL are disabled. +func (e *EpisodeStore) Prune() error { e.mu.Lock() defer e.mu.Unlock() + if e.maxEpisodes <= 0 && e.ttlDays <= 0 { + return nil + } idx, err := e.ReadIndex() if err != nil { - // Index error means we start fresh - idx = []EpisodeMeta{} + return err } - idx = append(idx, meta) - return e.writeIndex(idx) + idx, removed := e.pruneLocked(idx) + if len(removed) == 0 { + return nil + } + for _, sid := range removed { + _ = os.Remove(filepath.Join(e.dir, sid+".md")) + } + if err := e.writeIndex(idx); err != nil { + return err + } + sharedEpisodeIndex(e.dir).markDirty() + return nil +} + +// pruneLocked applies TTL then count-cap eviction, returning the kept entries +// and the sessionIDs whose summary files should be removed. Trust-blind by +// design: eviction is a disk/recall budget, not a trust decision. Caller must +// hold e.mu. +func (e *EpisodeStore) pruneLocked(idx []EpisodeMeta) ([]EpisodeMeta, []string) { + var removed []string + + if e.ttlDays > 0 { + cutoff := time.Now().UTC().Add(-time.Duration(e.ttlDays) * 24 * time.Hour) + kept := make([]EpisodeMeta, 0, len(idx)) + for _, m := range idx { + if m.CreatedAt.Before(cutoff) { + removed = append(removed, m.SessionID) + } else { + kept = append(kept, m) + } + } + idx = kept + } + + if e.maxEpisodes > 0 && len(idx) > e.maxEpisodes { + // Newest-first so the oldest fall off the end. + sort.Slice(idx, func(i, j int) bool { + return idx[i].CreatedAt.After(idx[j].CreatedAt) + }) + for _, m := range idx[e.maxEpisodes:] { + removed = append(removed, m.SessionID) + } + idx = idx[:e.maxEpisodes] + } + + return idx, removed +} + +// findDuplicate returns the index of the existing episode most similar to +// newSummary and that similarity, comparing full on-disk summaries via an +// ephemeral RP embedder (the same primitive NewRPRanker uses). Returns +// (-1, 0) for an empty corpus. Caller must hold e.mu. +func (e *EpisodeStore) findDuplicate(newSummary string, idx []EpisodeMeta) (int, float32) { + if len(idx) == 0 { + return -1, 0 + } + corpus := make([]string, len(idx)) + for i, m := range idx { + if s, err := e.Read(m.SessionID); err == nil { + corpus[i] = s + } else { + corpus[i] = m.Summary // fallback to the index summary + } + } + + rp := vector.NewRandomProjections(64) + rp.Fit(append(append([]string{}, corpus...), newSummary)) + newVec, _ := rp.Embed(newSummary) + + best := -1 + var bestSim float32 + for i, s := range corpus { + vec, _ := rp.Embed(s) + if sim := cosineVector(newVec, vec); sim > bestSim { + bestSim = sim + best = i + } + } + return best, bestSim +} + +// removeBySessionID returns idx without any entry matching sessionID. +func removeBySessionID(idx []EpisodeMeta, sessionID string) []EpisodeMeta { + out := idx[:0] + for _, m := range idx { + if m.SessionID != sessionID { + out = append(out, m) + } + } + return out +} + +// trustRank maps provenance to a coarse trust level for dedup gating: an +// untrusted, unapproved episode ranks below any trusted/approved one, so it +// can never evict it. Mirrors the recall provenance filter. +func trustRank(p EpisodeProvenance) int { + if p.Untrusted && !p.UserApproved && !p.AutoApproved { + return 0 + } + return 1 } // writeIndex serializes the index to disk atomically (temp + rename). diff --git a/internal/memory/memory.go b/internal/memory/memory.go index c2bff53..c211676 100644 --- a/internal/memory/memory.go +++ b/internal/memory/memory.go @@ -51,6 +51,13 @@ const defaultBufferLines = 20 // Default minimum turns before episode extraction triggers. const defaultMinTurnsForExtraction = 3 +// Episode lifecycle defaults. Dedup on (high threshold so only genuine +// near-duplicates collapse); a generous count cap; TTL disabled. +const ( + defaultEpisodeDedupThreshold = 0.92 + defaultMaxEpisodes = 500 +) + // MemoryConfig holds configuration for the memory system. // Mirrors the JSON config section. // Bool fields use *bool so that JSON omitempty can distinguish @@ -72,6 +79,15 @@ type MemoryConfig struct { AddThreshold float32 `json:"add_threshold,omitempty"` MinTurnsForExtraction int `json:"min_turns_for_extraction,omitempty"` + // Episode lifecycle (see internal/memory/episodes.go). EpisodeDedupThreshold + // is the cosine above which a new episode replaces an existing near-duplicate + // (0 disables dedup). MaxEpisodes caps the stored episode count, evicting the + // oldest beyond it (0 disables the cap). EpisodeTTLDays evicts episodes older + // than that many days (0 disables TTL). + EpisodeDedupThreshold float32 `json:"episode_dedup_threshold,omitempty"` + MaxEpisodes int `json:"max_episodes,omitempty"` + EpisodeTTLDays int `json:"episode_ttl_days,omitempty"` + // AutoApproveEpisodes, when true, stamps untrusted episodes as approved at // session-end so they are recalled without a manual `odek memory promote`. // SECURITY: this is the opt-in escape valve that trades the human review @@ -104,6 +120,9 @@ func DefaultMemoryConfig() MemoryConfig { AddThreshold: AddThreshold, MinTurnsForExtraction: defaultMinTurnsForExtraction, AutoApproveEpisodes: boolPtr(false), // secure default — human gate stays on + EpisodeDedupThreshold: defaultEpisodeDedupThreshold, + MaxEpisodes: defaultMaxEpisodes, + EpisodeTTLDays: 0, // TTL disabled by default } } @@ -187,6 +206,15 @@ func NewMemoryManager(memoryDir string, llc LLMClient, cfg MemoryConfig) *Memory if cfg.AutoApproveEpisodes != nil { def.AutoApproveEpisodes = cfg.AutoApproveEpisodes } + if cfg.EpisodeDedupThreshold > 0 { + def.EpisodeDedupThreshold = cfg.EpisodeDedupThreshold + } + if cfg.MaxEpisodes > 0 { + def.MaxEpisodes = cfg.MaxEpisodes + } + if cfg.EpisodeTTLDays > 0 { + def.EpisodeTTLDays = cfg.EpisodeTTLDays + } cfg = def factsDir := memoryDir @@ -201,7 +229,7 @@ func NewMemoryManager(memoryDir string, llc LLMClient, cfg MemoryConfig) *Memory } else { rankFn = NewRPRanker(64) } - episodeStore := NewEpisodeStore(episodesDir, rankFn) + episodeStore := NewEpisodeStoreWithLifecycle(episodesDir, rankFn, cfg.EpisodeDedupThreshold, cfg.MaxEpisodes, cfg.EpisodeTTLDays) mergeDetector := NewMergeDetectorWithThresholds(0, cfg.MergeThreshold, cfg.AddThreshold) return &MemoryManager{ From 844da80e038de9d85e14ed15fe6c80699b14cb48 Mon Sep 17 00:00:00 2001 From: Rolando Santamaria Maso Date: Sat, 6 Jun 2026 14:16:06 +0200 Subject: [PATCH 2/2] fix(memory): validate sessionID before deleting episode files Adversarial review (AI Verification Protocol) found a path-traversal defense-in-depth gap: eviction/dedup called os.Remove on sessionIDs read straight from index.json without validation, so a crafted/corrupted index entry (e.g. "../victim") could delete a .md file OUTSIDE the episodes dir. Every other file op in the package (Read/Write/Promote) already validates. Add removeEpisodeFile(sessionID) which calls session.ValidateSessionID before os.Remove, and route all three eviction/dedup deletions through it. Adds a traversal-safe regression test. Co-Authored-By: Claude Opus 4.8 --- internal/memory/episode_lifecycle_test.go | 31 +++++++++++++++++++++++ internal/memory/episodes.go | 17 ++++++++++--- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/internal/memory/episode_lifecycle_test.go b/internal/memory/episode_lifecycle_test.go index 18597d2..97f836a 100644 --- a/internal/memory/episode_lifecycle_test.go +++ b/internal/memory/episode_lifecycle_test.go @@ -304,6 +304,37 @@ func TestEpisodeLifecycle_ConcurrentSafety(t *testing.T) { } } +// ── Security: traversal-safe eviction ────────────────────────────────── + +// A crafted/corrupted index.json with a traversal sessionID must never make +// eviction delete a file outside the episodes dir. +func TestEpisodeEviction_TraversalSafe(t *testing.T) { + resetEpIdxes() + base := t.TempDir() + dir := filepath.Join(base, "mem") + if err := os.MkdirAll(dir, 0700); err != nil { + t.Fatal(err) + } + victim := filepath.Join(base, "victim.md") + if err := os.WriteFile(victim, []byte("do not delete"), 0600); err != nil { + t.Fatal(err) + } + + old := time.Now().UTC().Add(-100 * 24 * time.Hour) + writeIndexDirect(t, dir, []EpisodeMeta{ + {SessionID: "../victim", CreatedAt: old, Summary: "x"}, + }) + + es := NewEpisodeStoreWithLifecycle(dir, nil, 0, 0, 7) // TTL 7d → would evict the entry + if err := es.Prune(); err != nil { + t.Fatalf("Prune: %v", err) + } + + if _, err := os.Stat(victim); os.IsNotExist(err) { + t.Error("traversal sessionID caused deletion of a file OUTSIDE the episodes dir") + } +} + // ── Config wiring ────────────────────────────────────────────────────── func TestMemoryConfig_EpisodeLifecycleDefaults(t *testing.T) { diff --git a/internal/memory/episodes.go b/internal/memory/episodes.go index d18cb72..1d35f2c 100644 --- a/internal/memory/episodes.go +++ b/internal/memory/episodes.go @@ -144,7 +144,7 @@ func (e *EpisodeStore) writeLocked(sessionID, summary string, turns int, prov Ep if e.dedupThreshold > 0 { if dupIdx, sim := e.findDuplicate(summary, idx); dupIdx >= 0 && sim >= e.dedupThreshold { if trustRank(prov) >= trustRank(idx[dupIdx].Provenance) { - _ = os.Remove(filepath.Join(e.dir, idx[dupIdx].SessionID+".md")) + e.removeEpisodeFile(idx[dupIdx].SessionID) idx = append(idx[:dupIdx], idx[dupIdx+1:]...) } } @@ -167,7 +167,7 @@ func (e *EpisodeStore) writeLocked(sessionID, summary string, turns int, prov Ep // Evict by TTL + count cap; remove the corresponding summary files. idx, removed := e.pruneLocked(idx) for _, sid := range removed { - _ = os.Remove(filepath.Join(e.dir, sid+".md")) + e.removeEpisodeFile(sid) } if err := e.writeIndex(idx); err != nil { @@ -431,7 +431,7 @@ func (e *EpisodeStore) Prune() error { return nil } for _, sid := range removed { - _ = os.Remove(filepath.Join(e.dir, sid+".md")) + e.removeEpisodeFile(sid) } if err := e.writeIndex(idx); err != nil { return err @@ -507,6 +507,17 @@ func (e *EpisodeStore) findDuplicate(newSummary string, idx []EpisodeMeta) (int, return best, bestSim } +// removeEpisodeFile deletes a session's summary file, but ONLY after validating +// the sessionID — defense-in-depth so a crafted/corrupted index.json entry can +// never make eviction/dedup os.Remove a path outside the episodes dir. Mirrors +// the validation that Read/Write/Promote already apply. Best-effort. +func (e *EpisodeStore) removeEpisodeFile(sessionID string) { + if err := session.ValidateSessionID(sessionID); err != nil { + return + } + _ = os.Remove(filepath.Join(e.dir, sessionID+".md")) +} + // removeBySessionID returns idx without any entry matching sessionID. func removeBySessionID(idx []EpisodeMeta, sessionID string) []EpisodeMeta { out := idx[:0]