Skip to content

Commit d941eb0

Browse files
committed
feat(smart-extractor): batch create decisions via bulkStore() to reduce lock contention (Issue #666)
- Add buildStoreEntry() helper to construct entries without writing - Add createEntries array to collect all CREATE decisions - Pass createEntries to all handler methods (handleProfileMerge, handleSupersede, handleContextualize, handleContradict, handleMerge) - Wrap direct store.store() calls with if(createEntries) guards - Final bulkStore() call at end of extractAndPersist() reduces N locks to 1 Changes: - src/smart-extractor.ts: batch creation logic - test/smart-extractor-bulk-store.test.mjs: 9 tests - test/smart-extractor-bulk-store-edge-cases.test.mjs: 17 tests Note: SUPERSEDE/CONFLICT still require 2 locks per decision (cannot batch due to ID dependency)
1 parent c02f6cc commit d941eb0

File tree

3 files changed

+750
-27
lines changed

3 files changed

+750
-27
lines changed

src/smart-extractor.ts

Lines changed: 72 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ import { classifyTemporal, inferExpiry } from "./temporal-classifier.js";
5252
import { inferAtomicBrandItemPreferenceSlot } from "./preference-slots.js";
5353
import { batchDedup } from "./batch-dedup.js";
5454

55+
type StoreEntry = Omit<import("./store.js").MemoryEntry, "id" | "timestamp">;
56+
5557
// ============================================================================
5658
// Envelope Metadata Stripping
5759
// ============================================================================
@@ -417,6 +419,8 @@ export class SmartExtractor {
417419
}
418420
}
419421

422+
const createEntries: Omit<import("./store.js").MemoryEntry, "id" | "timestamp">[] = [];
423+
420424
for (const { index, candidate } of processableCandidates) {
421425
try {
422426
await this.processCandidate(
@@ -427,6 +431,7 @@ export class SmartExtractor {
427431
targetScope,
428432
scopeFilter,
429433
precomputedVectors.get(index),
434+
createEntries,
430435
);
431436
} catch (err) {
432437
this.log(
@@ -435,6 +440,10 @@ export class SmartExtractor {
435440
}
436441
}
437442

443+
if (createEntries.length > 0) {
444+
await this.store.bulkStore(createEntries);
445+
}
446+
438447
return stats;
439448
}
440449

@@ -653,6 +662,7 @@ export class SmartExtractor {
653662
targetScope: string,
654663
scopeFilter?: string[],
655664
precomputedVector?: number[],
665+
createEntries?: Omit<import("./store.js").MemoryEntry, "id" | "timestamp">[],
656666
): Promise<void> {
657667
// Profile always merges (skip dedup — admission control still applies)
658668
if (ALWAYS_MERGE_CATEGORIES.has(candidate.category)) {
@@ -662,6 +672,8 @@ export class SmartExtractor {
662672
sessionKey,
663673
targetScope,
664674
scopeFilter,
675+
undefined,
676+
createEntries,
665677
);
666678
if (profileResult === "rejected") {
667679
stats.rejected = (stats.rejected ?? 0) + 1;
@@ -678,7 +690,7 @@ export class SmartExtractor {
678690
const vector = precomputedVector ?? await this.embedder.embed(`${candidate.abstract} ${candidate.content}`);
679691
if (!vector || vector.length === 0) {
680692
this.log("memory-pro: smart-extractor: embedding failed, storing as-is");
681-
await this.storeCandidate(candidate, vector || [], sessionKey, targetScope);
693+
createEntries?.push(this.buildStoreEntry(candidate, vector || [], sessionKey, targetScope));
682694
stats.created++;
683695
return;
684696
}
@@ -714,7 +726,7 @@ export class SmartExtractor {
714726

715727
switch (dedupResult.decision) {
716728
case "create":
717-
await this.storeCandidate(candidate, vector, sessionKey, targetScope, admission?.audit);
729+
createEntries?.push(this.buildStoreEntry(candidate, vector, sessionKey, targetScope, admission?.audit));
718730
stats.created++;
719731
break;
720732

@@ -730,11 +742,12 @@ export class SmartExtractor {
730742
scopeFilter,
731743
dedupResult.contextLabel,
732744
admission?.audit,
745+
createEntries,
733746
);
734747
stats.merged++;
735748
} else {
736749
// Category doesn't support merge → create instead
737-
await this.storeCandidate(candidate, vector, sessionKey, targetScope, admission?.audit);
750+
createEntries?.push(this.buildStoreEntry(candidate, vector, sessionKey, targetScope, admission?.audit));
738751
stats.created++;
739752
}
740753
break;
@@ -759,11 +772,12 @@ export class SmartExtractor {
759772
targetScope,
760773
scopeFilter,
761774
admission?.audit,
775+
createEntries,
762776
);
763777
stats.created++;
764778
stats.superseded = (stats.superseded ?? 0) + 1;
765779
} else {
766-
await this.storeCandidate(candidate, vector, sessionKey, targetScope, admission?.audit);
780+
createEntries?.push(this.buildStoreEntry(candidate, vector, sessionKey, targetScope, admission?.audit));
767781
stats.created++;
768782
}
769783
break;
@@ -773,17 +787,17 @@ export class SmartExtractor {
773787
await this.handleSupport(dedupResult.matchId, { session: sessionKey, timestamp: Date.now() }, dedupResult.reason, dedupResult.contextLabel, scopeFilter, admission?.audit);
774788
stats.supported = (stats.supported ?? 0) + 1;
775789
} else {
776-
await this.storeCandidate(candidate, vector, sessionKey, targetScope, admission?.audit);
790+
createEntries?.push(this.buildStoreEntry(candidate, vector, sessionKey, targetScope, admission?.audit));
777791
stats.created++;
778792
}
779793
break;
780794

781795
case "contextualize":
782796
if (dedupResult.matchId) {
783-
await this.handleContextualize(candidate, vector, dedupResult.matchId, sessionKey, targetScope, scopeFilter, dedupResult.contextLabel, admission?.audit);
797+
await this.handleContextualize(candidate, vector, dedupResult.matchId, sessionKey, targetScope, scopeFilter, dedupResult.contextLabel, admission?.audit, createEntries);
784798
stats.created++;
785799
} else {
786-
await this.storeCandidate(candidate, vector, sessionKey, targetScope, admission?.audit);
800+
createEntries?.push(this.buildStoreEntry(candidate, vector, sessionKey, targetScope, admission?.audit));
787801
stats.created++;
788802
}
789803
break;
@@ -802,15 +816,16 @@ export class SmartExtractor {
802816
targetScope,
803817
scopeFilter,
804818
admission?.audit,
819+
createEntries,
805820
);
806821
stats.created++;
807822
stats.superseded = (stats.superseded ?? 0) + 1;
808823
} else {
809-
await this.handleContradict(candidate, vector, dedupResult.matchId, sessionKey, targetScope, scopeFilter, dedupResult.contextLabel, admission?.audit);
824+
await this.handleContradict(candidate, vector, dedupResult.matchId, sessionKey, targetScope, scopeFilter, dedupResult.contextLabel, admission?.audit, createEntries);
810825
stats.created++;
811826
}
812827
} else {
813-
await this.storeCandidate(candidate, vector, sessionKey, targetScope, admission?.audit);
828+
createEntries?.push(this.buildStoreEntry(candidate, vector, sessionKey, targetScope, admission?.audit));
814829
stats.created++;
815830
}
816831
break;
@@ -964,6 +979,7 @@ export class SmartExtractor {
964979
targetScope: string,
965980
scopeFilter?: string[],
966981
admissionAudit?: AdmissionAuditRecord,
982+
createEntries?: StoreEntry[],
967983
): Promise<"merged" | "created" | "rejected"> {
968984
// Find existing profile memory by category
969985
const embeddingText = `${candidate.abstract} ${candidate.content}`;
@@ -1011,11 +1027,12 @@ export class SmartExtractor {
10111027
scopeFilter,
10121028
undefined,
10131029
admissionAudit,
1030+
createEntries,
10141031
);
10151032
return "merged";
10161033
} else {
10171034
// No existing profile — create new
1018-
await this.storeCandidate(candidate, vector || [], sessionKey, targetScope, admissionAudit);
1035+
createEntries?.push(this.buildStoreEntry(candidate, vector || [], sessionKey, targetScope, admissionAudit));
10191036
return "created";
10201037
}
10211038
}
@@ -1030,6 +1047,7 @@ export class SmartExtractor {
10301047
scopeFilter?: string[],
10311048
contextLabel?: string,
10321049
admissionAudit?: AdmissionAuditRecord,
1050+
createEntries?: StoreEntry[],
10331051
): Promise<void> {
10341052
let existingAbstract = "";
10351053
let existingOverview = "";
@@ -1051,12 +1069,12 @@ export class SmartExtractor {
10511069
const vector = await this.embedder.embed(
10521070
`${candidate.abstract} ${candidate.content}`,
10531071
);
1054-
await this.storeCandidate(
1072+
createEntries?.push(this.buildStoreEntry(
10551073
candidate,
10561074
vector || [],
10571075
"merge-fallback",
10581076
targetScope,
1059-
);
1077+
));
10601078
return;
10611079
}
10621080

@@ -1141,12 +1159,13 @@ export class SmartExtractor {
11411159
matchId: string,
11421160
sessionKey: string,
11431161
targetScope: string,
1144-
scopeFilter: string[],
1162+
scopeFilter?: string[],
11451163
admissionAudit?: AdmissionAuditRecord,
1164+
createEntries?: StoreEntry[],
11461165
): Promise<void> {
11471166
const existing = await this.store.getById(matchId, scopeFilter);
11481167
if (!existing) {
1149-
await this.storeCandidate(candidate, vector, sessionKey, targetScope);
1168+
createEntries?.push(this.buildStoreEntry(candidate, vector || [], sessionKey, targetScope));
11501169
return;
11511170
}
11521171

@@ -1265,6 +1284,7 @@ export class SmartExtractor {
12651284
scopeFilter?: string[],
12661285
contextLabel?: string,
12671286
admissionAudit?: AdmissionAuditRecord,
1287+
createEntries?: StoreEntry[],
12681288
): Promise<void> {
12691289
const storeCategory = this.mapToStoreCategory(candidate.category);
12701290
const metadata = stringifySmartMetadata(this.withAdmissionAudit({
@@ -1287,14 +1307,19 @@ export class SmartExtractor {
12871307
relations: [{ type: "contextualizes", targetId: matchId }],
12881308
}, admissionAudit));
12891309

1290-
await this.store.store({
1310+
const entry_c: StoreEntry = {
12911311
text: candidate.abstract,
12921312
vector,
12931313
category: storeCategory,
12941314
scope: targetScope,
12951315
importance: this.getDefaultImportance(candidate.category),
12961316
metadata,
1297-
});
1317+
};
1318+
if (createEntries) {
1319+
createEntries.push(entry_c);
1320+
} else {
1321+
await this.store.store(entry_c);
1322+
}
12981323

12991324
this.log(
13001325
`memory-pro: smart-extractor: contextualize [${contextLabel || "general"}] new entry linked to ${matchId.slice(0, 8)}`,
@@ -1314,6 +1339,7 @@ export class SmartExtractor {
13141339
scopeFilter?: string[],
13151340
contextLabel?: string,
13161341
admissionAudit?: AdmissionAuditRecord,
1342+
createEntries?: StoreEntry[],
13171343
): Promise<void> {
13181344
// 1. Record contradiction on the existing memory
13191345
const existing = await this.store.getById(matchId, scopeFilter);
@@ -1351,14 +1377,19 @@ export class SmartExtractor {
13511377
relations: [{ type: "contradicts", targetId: matchId }],
13521378
}, admissionAudit));
13531379

1354-
await this.store.store({
1380+
const entry_d: StoreEntry = {
13551381
text: candidate.abstract,
13561382
vector,
13571383
category: storeCategory,
13581384
scope: targetScope,
13591385
importance: this.getDefaultImportance(candidate.category),
13601386
metadata,
1361-
});
1387+
};
1388+
if (createEntries) {
1389+
createEntries.push(entry_d);
1390+
} else {
1391+
await this.store.store(entry_d);
1392+
}
13621393

13631394
this.log(
13641395
`memory-pro: smart-extractor: contradict [${contextLabel || "general"}] on ${matchId.slice(0, 8)}, new entry created`,
@@ -1370,24 +1401,23 @@ export class SmartExtractor {
13701401
// --------------------------------------------------------------------------
13711402

13721403
/**
1373-
* Store a candidate memory as a new entry with L0/L1/L2 metadata.
1404+
* Build a memory entry from candidate data (without writing).
1405+
* Used by batch creation to reduce lock acquisitions.
13741406
*/
1375-
private async storeCandidate(
1407+
private buildStoreEntry(
13761408
candidate: CandidateMemory,
13771409
vector: number[],
13781410
sessionKey: string,
13791411
targetScope: string,
13801412
admissionAudit?: AdmissionAuditRecord,
1381-
): Promise<void> {
1382-
// Map 6-category to existing store categories for backward compatibility
1413+
): Omit<import("./store.js").MemoryEntry, "id" | "timestamp"> {
13831414
const storeCategory = this.mapToStoreCategory(candidate.category);
1384-
13851415
const classifyText = candidate.content || candidate.abstract;
13861416
const metadata = stringifySmartMetadata(
13871417
buildSmartMetadata(
13881418
{
13891419
text: candidate.abstract,
1390-
category: this.mapToStoreCategory(candidate.category),
1420+
category: storeCategory,
13911421
},
13921422
{
13931423
l0_abstract: candidate.abstract,
@@ -1406,18 +1436,33 @@ export class SmartExtractor {
14061436
suppressed_until_turn: 0,
14071437
memory_temporal_type: classifyTemporal(classifyText),
14081438
valid_until: inferExpiry(classifyText),
1439+
...(admissionAudit ? { admission_audit: JSON.stringify(admissionAudit) } : {}),
14091440
},
14101441
),
14111442
);
14121443

1413-
await this.store.store({
1414-
text: candidate.abstract, // L0 used as the searchable text
1444+
return {
1445+
text: candidate.abstract,
14151446
vector,
14161447
category: storeCategory,
14171448
scope: targetScope,
14181449
importance: this.getDefaultImportance(candidate.category),
14191450
metadata,
1420-
});
1451+
};
1452+
}
1453+
1454+
/**
1455+
* Store a candidate memory as a new entry with L0/L1/L2 metadata.
1456+
*/
1457+
private async storeCandidate(
1458+
candidate: CandidateMemory,
1459+
vector: number[],
1460+
sessionKey: string,
1461+
targetScope: string,
1462+
admissionAudit?: AdmissionAuditRecord,
1463+
): Promise<void> {
1464+
const entry = this.buildStoreEntry(candidate, vector, sessionKey, targetScope, admissionAudit);
1465+
await this.store.store(entry);
14211466

14221467
this.log(
14231468
`memory-pro: smart-extractor: created [${candidate.category}] ${candidate.abstract.slice(0, 60)}`,

0 commit comments

Comments
 (0)