Skip to content

Commit 8abe7b6

Browse files
committed
avoid re-embedding when files are renamed/moved or chunks move between files
1 parent df99cb0 commit 8abe7b6

File tree

3 files changed

+291
-102
lines changed

3 files changed

+291
-102
lines changed

mimir-rag/src/cli/ingest.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ function logStats(stats: IngestionPipelineStats): void {
4949
logger.info(`Processed documents: ${stats.processedDocuments}`);
5050
logger.info(`Skipped documents: ${stats.skippedDocuments}`);
5151
logger.info(`Chunks upserted: ${stats.upsertedChunks}`);
52-
logger.info(`Chunks reordered: ${stats.reorderedChunks}`);
52+
logger.info(`Chunks moved: ${stats.movedChunks}`);
5353
logger.info(`Chunks deleted: ${stats.deletedChunks}`);
5454
}
5555

mimir-rag/src/ingest/pipeline.ts

Lines changed: 171 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type { LLMClientBundle } from "../llm/types";
44
import { getLogger } from "../utils/logger";
55
import { downloadGithubMdxFiles, GithubMdxDocument } from "./github";
66
import { chunkMdxFile, enforceChunkTokenLimit, MdxChunk } from "./chunker";
7-
import type { SupabaseVectorStore, ExistingChunkInfo } from "../supabase/client";
7+
import type { SupabaseVectorStore } from "../supabase/client";
88
import type { DocumentChunk } from "../supabase/types";
99
import { resolveEmbeddingInputTokenLimit } from "../llm/modelLimits";
1010
import { resolveSourceLinks } from "../utils/sourceLinks";
@@ -18,17 +18,24 @@ interface PendingEmbeddingChunk {
1818
contextualText: string;
1919
}
2020

21-
interface ChunkDiffResult {
22-
reordered: Array<{ id: number; chunkId: number }>;
23-
newOrUpdated: Array<{ chunk: MdxChunk; index: number }>;
24-
deletedIds: number[];
21+
/** Target location for a chunk in the new state */
22+
interface TargetChunkLocation {
23+
filepath: string;
24+
chunkId: number;
25+
chunk: MdxChunk;
2526
}
2627

28+
/** Classification of how a chunk should be handled */
29+
type ChunkClassification =
30+
| { type: "unchanged"; existingId: number }
31+
| { type: "moved"; existingId: number; newFilepath: string; newChunkId: number }
32+
| { type: "new"; chunk: MdxChunk; filepath: string; chunkId: number };
33+
2734
export interface IngestionPipelineStats {
2835
processedDocuments: number;
2936
skippedDocuments: number;
3037
upsertedChunks: number;
31-
reorderedChunks: number;
38+
movedChunks: number;
3239
deletedChunks: number;
3340
}
3441

@@ -55,17 +62,17 @@ export async function runIngestionPipeline(
5562
processedDocuments: 0,
5663
skippedDocuments: 0,
5764
upsertedChunks: 0,
58-
reorderedChunks: 0,
65+
movedChunks: 0,
5966
deletedChunks: 0,
6067
};
6168

62-
const pendingEmbeddings: PendingEmbeddingChunk[] = [];
63-
const pendingReorders: Array<{ id: number; chunkId: number }> = [];
64-
const pendingDeletes: number[] = [];
65-
const contextTasks: Promise<void>[] = [];
66-
6769
ingestionLogger.info(`Processing ${documents.length} MDX document${documents.length === 1 ? "" : "s"}.`);
6870

71+
// Collect target state from all documents: checksum -> target location
72+
const targetState = new Map<string, TargetChunkLocation>();
73+
const allChecksums: string[] = [];
74+
const documentChunksMap = new Map<string, MdxChunk[]>(); // filepath -> chunks
75+
6976
for (const document of documents) {
7077
const filepath = document.relativePath || document.path;
7178
const fileLogger = typeof ingestionLogger.child === "function"
@@ -92,45 +99,179 @@ export async function runIngestionPipeline(
9299
}
93100

94101
stats.processedDocuments += 1;
102+
documentChunksMap.set(filepath, preparedChunks);
103+
104+
preparedChunks.forEach((chunk, index) => {
105+
// If same checksum appears in multiple files, the last one wins
106+
// This is expected behavior - content deduplication
107+
targetState.set(chunk.checksum, {
108+
filepath,
109+
chunkId: index,
110+
chunk,
111+
});
112+
allChecksums.push(chunk.checksum);
113+
});
114+
}
95115

96-
const existing = await store.fetchExistingChunks(filepath);
97-
const diff = diffChunks(preparedChunks, existing);
116+
if (targetState.size === 0) {
117+
ingestionLogger.info("No chunks to process. Ingestion pipeline complete.");
118+
return { documents, stats };
119+
}
98120

99-
if (diff.reordered.length > 0) {
100-
pendingReorders.push(...diff.reordered);
101-
stats.reorderedChunks += diff.reordered.length;
102-
}
121+
// Fetch existing chunks by checksums globally
122+
const uniqueChecksums = [...new Set(allChecksums)];
123+
const existingByChecksum = await store.fetchChunksByChecksums(uniqueChecksums);
124+
125+
ingestionLogger.info(
126+
`Found ${existingByChecksum.size} existing checksum${existingByChecksum.size === 1 ? "" : "s"} in the database.`
127+
);
128+
129+
// Classify each target chunk: can we reuse an existing DB row, or do we need a new one?
130+
const classifications: ChunkClassification[] = [];
131+
132+
// When the same content (checksum) appears in multiple places, we might have multiple
133+
// DB rows with that checksum. This set tracks which DB row IDs we've already decided
134+
// to reuse, so we don't accidentally assign the same DB row to two different targets.
135+
const alreadyAssignedDbIds = new Set<number>();
136+
137+
for (const [checksum, target] of targetState.entries()) {
138+
const dbChunksWithSameChecksum = existingByChecksum.get(checksum);
139+
140+
if (dbChunksWithSameChecksum && dbChunksWithSameChecksum.length > 0) {
141+
// We found existing DB rows with matching content. Try to reuse one.
142+
143+
// First, check if any DB row is already at the exact target location
144+
const alreadyInPlace = dbChunksWithSameChecksum.find(
145+
(dbChunk) =>
146+
dbChunk.filepath === target.filepath &&
147+
dbChunk.chunkId === target.chunkId &&
148+
!alreadyAssignedDbIds.has(dbChunk.id)
149+
);
150+
151+
if (alreadyInPlace) {
152+
// This DB row is already where we want it - no changes needed
153+
classifications.push({ type: "unchanged", existingId: alreadyInPlace.id });
154+
alreadyAssignedDbIds.add(alreadyInPlace.id);
155+
} else {
156+
// No DB row at the target location. Find any unassigned DB row we can move there.
157+
const reusableDbChunk = dbChunksWithSameChecksum.find(
158+
(dbChunk) => !alreadyAssignedDbIds.has(dbChunk.id)
159+
);
103160

104-
if (diff.deletedIds.length > 0) {
105-
pendingDeletes.push(...diff.deletedIds);
106-
stats.deletedChunks += diff.deletedIds.length;
161+
if (reusableDbChunk) {
162+
// Move this existing DB row to the new location
163+
classifications.push({
164+
type: "moved",
165+
existingId: reusableDbChunk.id,
166+
newFilepath: target.filepath,
167+
newChunkId: target.chunkId,
168+
});
169+
alreadyAssignedDbIds.add(reusableDbChunk.id);
170+
} else {
171+
// All DB rows with this checksum are already assigned to other targets.
172+
// We need to create a new row (and generate a new embedding).
173+
classifications.push({
174+
type: "new",
175+
chunk: target.chunk,
176+
filepath: target.filepath,
177+
chunkId: target.chunkId,
178+
});
179+
}
180+
}
181+
} else {
182+
// No existing DB row with this checksum - create new
183+
classifications.push({
184+
type: "new",
185+
chunk: target.chunk,
186+
filepath: target.filepath,
187+
chunkId: target.chunkId,
188+
});
107189
}
190+
}
191+
192+
// Find orphaned chunks (checksums not in target state)
193+
const activeChecksums = new Set(targetState.keys());
194+
const orphanedIds = await store.findOrphanedChunkIds(activeChecksums);
195+
196+
// Move chunks to new locations (two-phase to avoid conflicts)
197+
const movedChunks = classifications.filter(
198+
(c): c is Extract<ChunkClassification, { type: "moved" }> => c.type === "moved"
199+
);
200+
201+
if (movedChunks.length > 0) {
202+
ingestionLogger.info(`Moving ${movedChunks.length} chunk${movedChunks.length === 1 ? "" : "s"} to new locations.`);
203+
await store.moveChunksAtomic(
204+
movedChunks.map((c) => ({
205+
id: c.existingId,
206+
filepath: c.newFilepath,
207+
chunkId: c.newChunkId,
208+
}))
209+
);
210+
stats.movedChunks = movedChunks.length;
211+
}
212+
213+
// Delete orphaned chunks
214+
if (orphanedIds.length > 0) {
215+
ingestionLogger.info(`Deleting ${orphanedIds.length} orphaned chunk${orphanedIds.length === 1 ? "" : "s"}.`);
216+
await store.deleteChunksByIds(orphanedIds);
217+
stats.deletedChunks = orphanedIds.length;
218+
}
219+
220+
// Embed and insert new chunks
221+
const newChunks = classifications.filter(
222+
(c): c is Extract<ChunkClassification, { type: "new" }> => c.type === "new"
223+
);
108224

109-
if (diff.newOrUpdated.length === 0) {
110-
fileLogger.debug("All chunks unchanged; no LLM work required.");
225+
if (newChunks.length === 0) {
226+
ingestionLogger.info("No new chunks required embeddings. Ingestion pipeline complete.");
227+
return { documents, stats };
228+
}
229+
230+
// Group new chunks by filepath for context generation
231+
const newChunksByFilepath = new Map<string, Array<{ chunk: MdxChunk; chunkId: number }>>();
232+
for (const c of newChunks) {
233+
const existing = newChunksByFilepath.get(c.filepath) ?? [];
234+
existing.push({ chunk: c.chunk, chunkId: c.chunkId });
235+
newChunksByFilepath.set(c.filepath, existing);
236+
}
237+
238+
const pendingEmbeddings: PendingEmbeddingChunk[] = [];
239+
const contextTasks: Promise<void>[] = [];
240+
241+
for (const [filepath, chunks] of newChunksByFilepath.entries()) {
242+
const document = documents.find(
243+
(d) => (d.relativePath || d.path) === filepath
244+
);
245+
246+
if (!document) {
247+
ingestionLogger.warn(`Document for filepath ${filepath} not found. Skipping context generation.`);
111248
continue;
112249
}
113250

251+
const fileLogger = typeof ingestionLogger.child === "function"
252+
? ingestionLogger.child({ file: filepath })
253+
: ingestionLogger;
254+
114255
fileLogger.info(
115-
`Generating context for ${diff.newOrUpdated.length} new or modified chunk${diff.newOrUpdated.length === 1 ? "" : "s"}.`
256+
`Generating context for ${chunks.length} new chunk${chunks.length === 1 ? "" : "s"}.`
116257
);
117258

118-
const chunkContents = diff.newOrUpdated.map((entry) => entry.chunk.chunkContent);
259+
const chunkContents = chunks.map((entry) => entry.chunk.chunkContent);
119260
const contextTask = llm.chat
120261
.generateFileChunkContexts(chunkContents, document.content)
121262
.then((contexts) => {
122-
if (contexts.length !== diff.newOrUpdated.length) {
263+
if (contexts.length !== chunks.length) {
123264
throw new Error(
124-
`Context generation returned ${contexts.length} entries for ${diff.newOrUpdated.length} chunks in ${filepath}.`
265+
`Context generation returned ${contexts.length} entries for ${chunks.length} chunks in ${filepath}.`
125266
);
126267
}
127268

128-
diff.newOrUpdated.forEach((entry, index) => {
269+
chunks.forEach((entry, index) => {
129270
const contextHeader = contexts[index]?.trim() ?? "";
130271
const contextualText = `${contextHeader}---${entry.chunk.chunkContent}`;
131272
pendingEmbeddings.push({
132273
filepath,
133-
chunkId: entry.index,
274+
chunkId: entry.chunkId,
134275
chunkTitle: entry.chunk.chunkTitle,
135276
checksum: entry.chunk.checksum,
136277
content: entry.chunk.chunkContent,
@@ -147,30 +288,10 @@ export async function runIngestionPipeline(
147288
});
148289

149290
contextTasks.push(contextTask);
150-
151-
stats.upsertedChunks += diff.newOrUpdated.length;
152291
}
153292

154293
await Promise.all(contextTasks);
155294

156-
if (pendingReorders.length > 0) {
157-
ingestionLogger.info(`Reordering ${pendingReorders.length} chunk${pendingReorders.length === 1 ? "" : "s"}.`);
158-
await store.updateChunkOrders(pendingReorders);
159-
}
160-
161-
if (pendingDeletes.length > 0) {
162-
ingestionLogger.info(`Deleting ${pendingDeletes.length} stale chunk${pendingDeletes.length === 1 ? "" : "s"}.`);
163-
await store.deleteChunksByIds(pendingDeletes);
164-
}
165-
166-
if (pendingEmbeddings.length === 0) {
167-
ingestionLogger.info("No new or updated chunks required embeddings. Ingestion pipeline complete.");
168-
return {
169-
documents,
170-
stats,
171-
};
172-
}
173-
174295
ingestionLogger.info(`Embedding ${pendingEmbeddings.length} chunk${pendingEmbeddings.length === 1 ? "" : "s"} using ${llm.embedding.config.provider}.`);
175296

176297
const embeddings = await llm.embedding.embedDocuments(
@@ -200,6 +321,7 @@ export async function runIngestionPipeline(
200321
});
201322

202323
await store.upsertChunks(upsertPayload);
324+
stats.upsertedChunks = pendingEmbeddings.length;
203325

204326
ingestionLogger.info("Ingestion pipeline completed successfully.");
205327

@@ -209,53 +331,3 @@ export async function runIngestionPipeline(
209331
};
210332
}
211333

212-
function diffChunks(chunks: MdxChunk[], existing: Map<number, ExistingChunkInfo>): ChunkDiffResult {
213-
const checksumBuckets = new Map<string, Array<{ chunkId: number; info: ExistingChunkInfo }>>();
214-
const matchedIds = new Set<number>();
215-
216-
for (const [chunkId, info] of existing.entries()) {
217-
const bucket = checksumBuckets.get(info.checksum);
218-
if (bucket) {
219-
bucket.push({ chunkId, info });
220-
} else {
221-
checksumBuckets.set(info.checksum, [{ chunkId, info }]);
222-
}
223-
}
224-
225-
const reordered: Array<{ id: number; chunkId: number }> = [];
226-
const newOrUpdated: Array<{ chunk: MdxChunk; index: number }> = [];
227-
228-
chunks.forEach((chunk, index) => {
229-
const bucket = checksumBuckets.get(chunk.checksum);
230-
231-
if (bucket && bucket.length > 0) {
232-
const match = bucket.shift()!;
233-
if (bucket.length === 0) {
234-
checksumBuckets.delete(chunk.checksum);
235-
}
236-
237-
matchedIds.add(match.info.id);
238-
239-
if (match.chunkId !== index) {
240-
reordered.push({ id: match.info.id, chunkId: index });
241-
}
242-
243-
return;
244-
}
245-
246-
newOrUpdated.push({ chunk, index });
247-
});
248-
249-
const deletedIds: number[] = [];
250-
existing.forEach((info) => {
251-
if (!matchedIds.has(info.id)) {
252-
deletedIds.push(info.id);
253-
}
254-
});
255-
256-
return {
257-
reordered,
258-
newOrUpdated,
259-
deletedIds,
260-
};
261-
}

0 commit comments

Comments
 (0)