fix: refresh buffered chunk tokens before OM activation#13955
fix: refresh buffered chunk tokens before OM activation#13955TylerBarnes merged 13 commits intomainfrom
Conversation
Keep no-marker cleanup from clearing live input/response queues before the save finishes. This avoids a brief under-inclusion window where fresh turn context can disappear and the model can fall back to stale planning state. Co-Authored-By: Mastra Code (openai/gpt-5.4) <noreply@mastra.ai>
- Add refreshBufferedChunkMessageTokens helper to recount per-chunk messageTokens from the live message list before activation math - Pass refreshed chunks to swapBufferedToActive so stale token weights don't cause over/under-activation - Add removalOrder backoff restoration loop in activation cleanup - Preserve unobserved message tails during activation cleanup - Fire-and-forget setPendingMessageTokens to avoid blocking - Simplify repro capture to use freshRecord directly Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
Add getLastObservedMessageCursor and isMessageAtOrBeforeCursor to ObservationalMemory. These are not called yet — just adding the code to isolate whether their presence causes issues. Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
After step-0 buffered activation, persist a cursor pointing at the latest activated message into thread metadata. This provides a stable fallback for replay pruning when observation-end markers are absent. Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
…ation Persist cursor at all three observation metadata write sites (step-0 activation was already done). Still not read anywhere — next hunk will wire the cursor into filterAlreadyObservedMessages. Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
…prefix pruning Make filterAlreadyObservedMessages async with options. In the record fallback branch, derive a cursor from thread metadata (or observed message IDs) and use it for sealed-prefix part trimming and cursor-based removal. Adds useMarkerBoundaryPruning option for future step>0 usage. Still called only at step 0. Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
…eanup guard Step 4 now runs on step > 0 too (record-fallback only, no marker pruning). Skipped when threshold cleanup already ran to avoid double-pruning. This is the last behavioral hunk from the original branch. Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
Cherry-pick test coverage from fix/om-activation-replay-pruning. Adjust mid-turn blockAfter threshold for recounting fix and remove T4-E partial-tail test (not applicable without reminted-tail logic). Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
Remove sealedAt-based part trimming in filterAlreadyObservedMessages and related T2-A test. This logic is incomplete without the remintObservedPrefixPartCount changes in MessageMerger/message-list. Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
🦋 Changeset detectedLatest commit: da55da3 The changes in this PR will be included in the next version bump. This PR includes changesets to release 24 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
The latest updates on your projects. Learn more about Vercel for GitHub.
1 Skipped Deployment
|
WalkthroughAdds a changelog entry for a patch release. Introduces optional cursor tracking for the last observed message in thread metadata, accepts refreshed buffered observation chunks at storage activation time, recalculates buffered chunk token counts from current in-memory messages, expands pruning logic with cursor-based fallback, and adds a broad set of single-thread replay tests. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~65 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
packages/memory/src/processors/observational-memory/observational-memory.ts (2)
2789-2816:⚠️ Potential issue | 🟠 MajorDon’t run the timestamp fallback once a cursor exists.
Buffered chunks store
lastObservedAtas an exclusive cursor (max message time + 1ms). With the current flow, a same-timestamp sibling thatderivedCursorintentionally keeps is still removed bymsgDate <= lastObservedAt, so the new cursor-based replay pruning never actually protects those messages.Suggested fix
- if (derivedCursor && this.isMessageAtOrBeforeCursor(msg, derivedCursor)) { - messagesToRemove.push(msg.id); - continue; - } - - // Remove if created before lastObservedAt (these messages' content is - // already captured in activeObservations via buffered activation) - if (lastObservedAt && msg.createdAt) { + if (derivedCursor) { + if (this.isMessageAtOrBeforeCursor(msg, derivedCursor)) { + messagesToRemove.push(msg.id); + } + continue; + } + + // Remove if created before lastObservedAt (these messages' content is + // already captured in activeObservations via buffered activation) + if (lastObservedAt && msg.createdAt) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/memory/src/processors/observational-memory/observational-memory.ts` around lines 2789 - 2816, The timestamp-based fallback should not run when a cursor has been derived: update the removal logic in observational-memory.ts so the block that checks lastObservedAt only executes when derivedCursor is falsy; specifically, guard the existing "if (lastObservedAt && msg.createdAt) { ... }" timestamp check (and its msgDate <= lastObservedAt removal) with a check for !derivedCursor (so derivedCursor produced by getLastObservedMessageCursor / fallbackCursor prevents the timestamp fallback from removing messages that isMessageAtOrBeforeCursor already preserved), leaving the earlier cursor and observedIds checks intact and continuing to push ids into messagesToRemove only when appropriate.
2970-3005:⚠️ Potential issue | 🟠 MajorPersist the activation cursor before mutating
messageList.
activatedMessagesis collected afterremoveByIds(), so this step-0 path writeslastObservedMessageCursor: undefined. The step>0 buffered-activation path inhandleThresholdReached()still never writes this field, so those activations also fall back to timestamp pruning on replay.Suggested fix
const activatedIds = activationResult.activatedMessageIds ?? []; +const activatedSet = new Set(activatedIds); +const allMsgs = messageList.get.all.db(); +const activatedMessages = allMsgs.filter(msg => msg?.id && activatedSet.has(msg.id)); + if (activatedIds.length > 0) { - const activatedSet = new Set(activatedIds); - const allMsgs = messageList.get.all.db(); - const idsToRemove = allMsgs + const idsToRemove = activatedMessages .filter(msg => msg?.id && msg.id !== 'om-continuation' && activatedSet.has(msg.id)) .map(msg => msg.id); if (idsToRemove.length > 0) { messageList.removeByIds(idsToRemove); } } ... -const activatedSet = new Set(activationResult.activatedMessageIds ?? []); -const activatedMessages = messageList.get.all.db().filter(msg => msg?.id && activatedSet.has(msg.id)); const newMetadata = setThreadOMMetadata(thread.metadata, { suggestedResponse: activationResult.suggestedContinuation, currentTask: activationResult.currentTask, lastObservedMessageCursor: this.getLastObservedMessageCursor(activatedMessages), });Apply the same metadata update in
handleThresholdReached()so non-step-0 activations persist the cursor too.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/memory/src/processors/observational-memory/observational-memory.ts` around lines 2970 - 3005, The code is persisting thread OM metadata (lastObservedMessageCursor) after messageList.removeByIds(), so activatedMessages becomes empty and cursor is lost; fix by computing/persisting the activation cursor before mutating messageList (i.e., build activatedMessages and call setThreadOMMetadata with lastObservedMessageCursor computed via getLastObservedMessageCursor(activatedMessages) prior to removeByIds), and apply the same metadata-write logic inside handleThresholdReached() so non-step-0 buffered activations also persist the cursor (update both the activation path around removeByIds and the handleThresholdReached method to call setThreadOMMetadata with suggestedContinuation/currentTask/lastObservedMessageCursor).
🧹 Nitpick comments (1)
packages/memory/src/processors/observational-memory/__tests__/observational-memory.test.ts (1)
8346-9012: Add at least one replay case that driveslastObservedMessageCursor.These regressions all prune through
lastObservedAtand/orobservedMessageIds, so they won't fail if the new cursor-based replay path regresses. Please add a same-timestamp case that persistslastObservedMessageCursorand proves only the post-cursor tail survives.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/memory/src/processors/observational-memory/__tests__/observational-memory.test.ts` around lines 8346 - 9012, Add a new replay test that exercises cursor-based replay by persisting lastObservedMessageCursor and verifying only content after that cursor survives; in the existing suite use createReplayFixture()/ObservationalMemory and MessageList to create two messages with identical createdAt timestamps where the first contains an observation boundary (or sealed marker) and the second is the fresh tail, save an observational record via storage.initializeObservationalMemory and storage.updateActiveObservations setting lastObservedMessageCursor to point at the first message (and lastObservedAt to that timestamp), then call (om as any).filterAlreadyObservedMessages(...) and assert via getModelVisibleText or messageList.get.all.db() that the pre-cursor content is removed and only the post-cursor tail remains; reference lastObservedMessageCursor, filterAlreadyObservedMessages, ObservationalMemory, MessageList and storage.updateActiveObservations when locating where to add the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.changeset/sour-parrots-love.md:
- Line 6: The changeset line "Fixed observational memory activation selecting
too many chunks by refreshing buffered chunk token counts from the current
message list before activation..." is implementation-focused; rewrite it to
state the user-visible outcome instead. Replace that sentence (in the
sour-parrots-love changeset) with a concise outcome-focused description such as:
"Observational memory now activates the correct buffered observations based on
the latest thread state, preventing incorrect or missing activations of past
observations." Ensure the wording emphasizes the behavior change (correct
activations) and removes references to token recounting or stale weights.
In `@packages/core/src/memory/types.ts`:
- Around line 60-61: getThreadOMMetadata currently casts mastra.om to
ThreadOMMetadata without validating nested fields; add a shape check for
lastObservedMessageCursor (ensure it's an object with string createdAt and id)
before including it in the returned ThreadOMMetadata or normalize it to
undefined. Update getThreadOMMetadata to verify
mastra.om.lastObservedMessageCursor exists and that both createdAt and id are
strings (or set lastObservedMessageCursor = undefined) so malformed persisted
cursors cannot be returned as typed values. Ensure the function still returns
ThreadOMMetadata but only with validated/normalized lastObservedMessageCursor.
In `@packages/core/src/storage/domains/memory/inmemory.ts`:
- Around line 938-944: Don't assign input.bufferedChunks directly into
record.bufferedObservationChunks; treat input.bufferedChunks as an
activation-time override only. Use a local variable (chunks) for all
activation/recount math (the existing
Array.isArray(record.bufferedObservationChunks) ? ... logic), compute which
items are consumed during activation, then persist back to
record.bufferedObservationChunks only the actual remainder that should remain
(and only when that remainder matches the same indexes/lengths expected from the
stored buffer to avoid deleting skipped items). In practice: remove the direct
assignment to record.bufferedObservationChunks = input.bufferedChunks, use
chunks for activation, compute remainder, and update
record.bufferedObservationChunks only when the remainder corresponds 1:1 with
the original stored buffer.
In
`@packages/memory/src/processors/observational-memory/__tests__/observational-memory.test.ts`:
- Around line 8919-8971: The test T4-A-debug should not rely on setTimeout
sleeps; replace the artificial 25ms/5ms timing with an explicit barrier: in the
test, create two Promises (startedPromise with startedResolve and resumePromise
with resumeResolve), then patch saveMessagesWithSealedIdTracking so it calls
startedResolve() right before the artificial pause and awaits resumePromise
before delegating to originalSave; call cleanupAfterObservation, await
startedPromise to observe the intermediate state via
getModelVisibleText(messageList), assert it contains 'fresh-next-turn', then
call resumeResolve() and await cleanupPromise; update references to
saveMessagesWithSealedIdTracking, cleanupAfterObservation, getModelVisibleText,
messageList and the test name T4-A-debug accordingly.
In `@packages/memory/src/processors/observational-memory/observational-memory.ts`:
- Around line 3240-3244: The step>0 pruning pass currently calls
filterAlreadyObservedMessages when didThresholdCleanup is false, and because
useMarkerBoundaryPruning is false for stepNumber>0 the function falls back to
removing any message whose ID is in record.observedMessageIds, which can drop
live tail messages intentionally preserved by cleanupAfterObservation; fix by
restricting that fallback to replay-loaded messages or by making the fallback
source-aware: either (A) change the caller around that
filterAlreadyObservedMessages invocation to only call it for stepNumber>0 when
the MessageList/record indicates a replay load (e.g., record.source === 'replay'
or a new record.isReplay flag), or (B) add a parameter to
filterAlreadyObservedMessages (or adjust its logic) so when
useMarkerBoundaryPruning is false it will check record.origin/source before
removing IDs from record.observedMessageIds and skip deletion for live
MessageList sources; update references to didThresholdCleanup,
filterAlreadyObservedMessages, record.observedMessageIds,
cleanupAfterObservation, and stepNumber accordingly.
---
Outside diff comments:
In `@packages/memory/src/processors/observational-memory/observational-memory.ts`:
- Around line 2789-2816: The timestamp-based fallback should not run when a
cursor has been derived: update the removal logic in observational-memory.ts so
the block that checks lastObservedAt only executes when derivedCursor is falsy;
specifically, guard the existing "if (lastObservedAt && msg.createdAt) { ... }"
timestamp check (and its msgDate <= lastObservedAt removal) with a check for
!derivedCursor (so derivedCursor produced by getLastObservedMessageCursor /
fallbackCursor prevents the timestamp fallback from removing messages that
isMessageAtOrBeforeCursor already preserved), leaving the earlier cursor and
observedIds checks intact and continuing to push ids into messagesToRemove only
when appropriate.
- Around line 2970-3005: The code is persisting thread OM metadata
(lastObservedMessageCursor) after messageList.removeByIds(), so
activatedMessages becomes empty and cursor is lost; fix by computing/persisting
the activation cursor before mutating messageList (i.e., build activatedMessages
and call setThreadOMMetadata with lastObservedMessageCursor computed via
getLastObservedMessageCursor(activatedMessages) prior to removeByIds), and apply
the same metadata-write logic inside handleThresholdReached() so non-step-0
buffered activations also persist the cursor (update both the activation path
around removeByIds and the handleThresholdReached method to call
setThreadOMMetadata with
suggestedContinuation/currentTask/lastObservedMessageCursor).
---
Nitpick comments:
In
`@packages/memory/src/processors/observational-memory/__tests__/observational-memory.test.ts`:
- Around line 8346-9012: Add a new replay test that exercises cursor-based
replay by persisting lastObservedMessageCursor and verifying only content after
that cursor survives; in the existing suite use
createReplayFixture()/ObservationalMemory and MessageList to create two messages
with identical createdAt timestamps where the first contains an observation
boundary (or sealed marker) and the second is the fresh tail, save an
observational record via storage.initializeObservationalMemory and
storage.updateActiveObservations setting lastObservedMessageCursor to point at
the first message (and lastObservedAt to that timestamp), then call (om as
any).filterAlreadyObservedMessages(...) and assert via getModelVisibleText or
messageList.get.all.db() that the pre-cursor content is removed and only the
post-cursor tail remains; reference lastObservedMessageCursor,
filterAlreadyObservedMessages, ObservationalMemory, MessageList and
storage.updateActiveObservations when locating where to add the test.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 1df9e428-93d0-4149-883e-1e30b320868e
📒 Files selected for processing (6)
.changeset/sour-parrots-love.mdpackages/core/src/memory/types.tspackages/core/src/storage/domains/memory/inmemory.tspackages/core/src/storage/types.tspackages/memory/src/processors/observational-memory/__tests__/observational-memory.test.tspackages/memory/src/processors/observational-memory/observational-memory.ts
packages/memory/src/processors/observational-memory/__tests__/observational-memory.test.ts
Show resolved
Hide resolved
packages/memory/src/processors/observational-memory/observational-memory.ts
Show resolved
Hide resolved
#13955 (comment) Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
Don't overwrite the persisted bufferedObservationChunks with the caller-provided refreshed override. Use refreshed chunks only for activation boundary selection math. #13955 (comment) Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
T4-A-debug now uses a blocked promise instead of setTimeout(25ms/5ms) to observe intermediate state deterministically. #13955 (comment) Co-Authored-By: Mastra Code (anthropic/claude-opus-4-6) <noreply@mastra.ai>
|
Reese says: @coderabbitai Addressed 3 of 5 comments with commits (changeset wording, inmemory.ts local-only override, T4-A-debug barrier fix). Replied to the other 2 inline explaining why they're intentional. Please re-review when you get a chance! Warmly, Claude |
|
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
♻️ Duplicate comments (1)
packages/memory/src/processors/observational-memory/__tests__/observational-memory.test.ts (1)
8882-8901:⚠️ Potential issue | 🟠 MajorMake both race-window tests synchronize on “save started,” not on timing.
Line 8900 still relies on a
25ms/5mssleep, and Line 8968 reads intermediate state without provingcleanupAfterObservation()has actually reached the blocked save. That means these tests can still pass without exercising the intended race window.Suggested fix
- const originalSave = (om as any).saveMessagesWithSealedIdTracking.bind(om); - const saveStarted: { value: boolean } = { value: false }; - (om as any).saveMessagesWithSealedIdTracking = async (...args: any[]) => { - saveStarted.value = true; - await new Promise(resolve => setTimeout(resolve, 25)); - return originalSave(...args); - }; + const originalSave = (om as any).saveMessagesWithSealedIdTracking.bind(om); + let startedSave!: () => void; + const saveStarted = new Promise<void>(resolve => { + startedSave = resolve; + }); + let releaseSave!: () => void; + const saveBlocked = new Promise<void>(resolve => { + releaseSave = resolve; + }); + (om as any).saveMessagesWithSealedIdTracking = async (...args: any[]) => { + startedSave(); + await saveBlocked; + return originalSave(...args); + }; const cleanupPromise = (om as any).cleanupAfterObservation( messageList, new Set<string>(), threadId, resourceId, {}, undefined, undefined, ); - await new Promise(resolve => setTimeout(resolve, 5)); - expect(saveStarted.value).toBe(true); + await saveStarted; await (om as any).filterAlreadyObservedMessages( messageList, { observedMessageIds: ['race-old'], @@ const duringRaceText = getModelVisibleText(messageList); expect(duringRaceText).not.toContain('already-observed-race'); + releaseSave(); await cleanupPromise;Apply the same
saveStarted/releaseSavepattern toT4-A-debugso the assertion always runs inside the actual blocked-save window.Also applies to: 8948-8974
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/memory/src/processors/observational-memory/__tests__/observational-memory.test.ts` around lines 8882 - 8901, The test currently uses fixed sleeps to hit the race window; modify the test to synchronize on the actual blocked save by stubbing om.saveMessagesWithSealedIdTracking to set saveStarted=true and await a releaseSave promise before proceeding, then start cleanupAfterObservation and wait for saveStarted before making assertions (and resolve releaseSave to let the save finish). Apply this same saveStarted/releaseSave pattern to the T4-A-debug test and the other occurrence (around saveMessagesWithSealedIdTracking and cleanupAfterObservation) so assertions read state only while the save is definitively blocked.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In
`@packages/memory/src/processors/observational-memory/__tests__/observational-memory.test.ts`:
- Around line 8882-8901: The test currently uses fixed sleeps to hit the race
window; modify the test to synchronize on the actual blocked save by stubbing
om.saveMessagesWithSealedIdTracking to set saveStarted=true and await a
releaseSave promise before proceeding, then start cleanupAfterObservation and
wait for saveStarted before making assertions (and resolve releaseSave to let
the save finish). Apply this same saveStarted/releaseSave pattern to the
T4-A-debug test and the other occurrence (around
saveMessagesWithSealedIdTracking and cleanupAfterObservation) so assertions read
state only while the save is definitively blocked.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: e72bc10b-6d26-45f9-97b7-6c57e4da6ebb
📒 Files selected for processing (3)
.changeset/sour-parrots-love.mdpackages/core/src/storage/domains/memory/inmemory.tspackages/memory/src/processors/observational-memory/__tests__/observational-memory.test.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- .changeset/sour-parrots-love.md
- packages/core/src/storage/domains/memory/inmemory.ts
There was a problem hiding this comment.
♻️ Duplicate comments (1)
packages/memory/src/processors/observational-memory/__tests__/observational-memory.test.ts (1)
8882-8901:⚠️ Potential issue | 🟠 MajorSynchronize the race assertion on the save hook actually blocking.
T4-Astill uses sleeps, andT4-A-debugsamples state immediately after kicking offcleanupAfterObservationwithout waiting forsaveMessagesWithSealedIdTrackingto reach its blocked point. Both variants can observe the wrong phase of the race and flap in CI.Suggested direction
- const originalSave = (om as any).saveMessagesWithSealedIdTracking.bind(om); - const saveStarted: { value: boolean } = { value: false }; - (om as any).saveMessagesWithSealedIdTracking = async (...args: any[]) => { - saveStarted.value = true; - await new Promise(resolve => setTimeout(resolve, 25)); - return originalSave(...args); - }; + const originalSave = (om as any).saveMessagesWithSealedIdTracking.bind(om); + let releaseSave!: () => void; + let markSaveStarted!: () => void; + const saveStarted = new Promise<void>(resolve => { + markSaveStarted = resolve; + }); + const saveBlocked = new Promise<void>(resolve => { + releaseSave = resolve; + }); + (om as any).saveMessagesWithSealedIdTracking = async (...args: any[]) => { + markSaveStarted(); + await saveBlocked; + return originalSave(...args); + }; const cleanupPromise = (om as any).cleanupAfterObservation( messageList, new Set<string>(), threadId, @@ - await new Promise(resolve => setTimeout(resolve, 5)); - expect(saveStarted.value).toBe(true); + await saveStarted; const duringRaceText = getModelVisibleText(messageList); expect(duringRaceText).toContain('fresh-next-turn'); + releaseSave(); await cleanupPromise;Also applies to: 8948-8974
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/memory/src/processors/observational-memory/__tests__/observational-memory.test.ts` around lines 8882 - 8901, The test's race check is unreliable because it uses setTimeout sleeps instead of synchronizing when the overridden saveMessagesWithSealedIdTracking actually reaches its blocked point; change the test to create a manual blocker Promise and a starter Promise: override saveMessagesWithSealedIdTracking to set saveStarted.value = true, resolve a "started" resolver so the test can await that deterministically, then await the blocker Promise inside the override to actually block until the test resolves it; kick off cleanupAfterObservation, await the "started" Promise (not a sleep) to assert saveStarted is true, then resolve the blocker so the override continues and finally await cleanupPromise. Reference: saveMessagesWithSealedIdTracking, saveStarted, cleanupAfterObservation, and cleanupPromise.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In
`@packages/memory/src/processors/observational-memory/__tests__/observational-memory.test.ts`:
- Around line 8882-8901: The test's race check is unreliable because it uses
setTimeout sleeps instead of synchronizing when the overridden
saveMessagesWithSealedIdTracking actually reaches its blocked point; change the
test to create a manual blocker Promise and a starter Promise: override
saveMessagesWithSealedIdTracking to set saveStarted.value = true, resolve a
"started" resolver so the test can await that deterministically, then await the
blocker Promise inside the override to actually block until the test resolves
it; kick off cleanupAfterObservation, await the "started" Promise (not a sleep)
to assert saveStarted is true, then resolve the blocker so the override
continues and finally await cleanupPromise. Reference:
saveMessagesWithSealedIdTracking, saveStarted, cleanupAfterObservation, and
cleanupPromise.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 9cf5bf0b-0479-4294-addb-076af2605e28
📒 Files selected for processing (3)
.changeset/sour-parrots-love.mdpackages/core/src/storage/domains/memory/inmemory.tspackages/memory/src/processors/observational-memory/__tests__/observational-memory.test.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/core/src/storage/domains/memory/inmemory.ts
Fixes observational memory activation selecting too many buffered chunks by refreshing chunk token counts from the current message list before activation math runs. Previously, stale token weights from buffering time could cause over- or under-activation. Also adds cursor-based replay pruning so already-observed messages are filtered on all steps (not just step 0), with a `didThresholdCleanup` guard to avoid double-pruning when threshold cleanup already ran. **Core fix**: `refreshBufferedChunkMessageTokens()` recomputes per-chunk `messageTokens` from live context before projected-removal math. Only recounts when the full chunk is present — partial recount is skipped to avoid undercounting. **Testing**: 275/275 OM tests passing, including cherry-picked regression tests from the original branch with adjusted thresholds for the recounting fix. Changes span `@mastra/memory` (main fix) and `@mastra/core` (storage plumbing for `bufferedChunks` override + `lastObservedMessageCursor` type). <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Bug Fixes** * Observational memory activation now uses up-to-date thread state and cursor-based pruning to prevent stale token counts and improve recall accuracy. * **Tests** * Added extensive single-thread replay tests covering boundary pruning, activation timing, replay behavior, and related edge cases. * **Chores** * Added a changelog entry documenting the patch release fix. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Mastra Code (openai/gpt-5.4) <noreply@mastra.ai>
Fixes observational memory activation selecting too many buffered chunks by refreshing chunk token counts from the current message list before activation math runs. Previously, stale token weights from buffering time could cause over- or under-activation.
Also adds cursor-based replay pruning so already-observed messages are filtered on all steps (not just step 0), with a
didThresholdCleanupguard to avoid double-pruning when threshold cleanup already ran.Core fix:
refreshBufferedChunkMessageTokens()recomputes per-chunkmessageTokensfrom live context before projected-removal math. Only recounts when the full chunk is present — partial recount is skipped to avoid undercounting.Testing: 275/275 OM tests passing, including cherry-picked regression tests from the original branch with adjusted thresholds for the recounting fix.
Changes span
@mastra/memory(main fix) and@mastra/core(storage plumbing forbufferedChunksoverride +lastObservedMessageCursortype).Summary by CodeRabbit
Bug Fixes
Tests
Chores