-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathingest_chroma.py
More file actions
434 lines (372 loc) · 15 KB
/
ingest_chroma.py
File metadata and controls
434 lines (372 loc) · 15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
#!/usr/bin/env python3
"""
Create or update a Chroma DB from local documents using llama.cpp embeddings.
Improvements for stability:
- Adaptive micro-batching: halves batch size on embedding failure until it succeeds.
- Token-safe re-chunking: splits any oversized chunk by tokens using a tokenizer-only llama.cpp.
Supported inputs:
- .pdf (PyPDF)
- .docx (docx2txt)
- .txt (plain text)
- .md (UnstructuredMarkdownLoader if available, else plain text)
- .doc (optional, requires 'unstructured' extras)
Examples:
# Add/Update (files and/or directories; globs OK)
python ingest_chroma.py --db ./database/chroma_db1 \
--embed-model /models/nomic-embed-text-v1.5.f16.gguf \
docs/ file1.pdf notes/*.md
# Rebuild (wipe DB first, then ingest)
python ingest_chroma.py --rebuild --db ./database/chroma_db1 \
--embed-model /models/nomic-embed-text-v1.5.f16.gguf \
./docs
Install:
pip install -U "langchain>=0.2" "langchain-community>=0.2" langchain-chroma \
chromadb llama-cpp-python pypdf python-docx docx2txt
Optional (for .doc and richer .md parsing):
pip install "unstructured[local-inference,pdf]"
"""
import argparse
import hashlib
import os
import shutil
import sys
from pathlib import Path
from typing import List, Tuple
from langchain_chroma import Chroma
from langchain_community.embeddings import LlamaCppEmbeddings
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Loaders
from langchain_community.document_loaders import (
PyPDFLoader,
TextLoader,
Docx2txtLoader,
)
# Optional loaders via unstructured
HAS_UNSTRUCTURED = True
try:
from langchain_community.document_loaders import (
UnstructuredWordDocumentLoader,
UnstructuredMarkdownLoader,
)
except Exception:
HAS_UNSTRUCTURED = False
# Tokenizer-only llama.cpp for token counting / detokenization
try:
from llama_cpp import Llama as LlamaTokenizerOnly
except Exception:
LlamaTokenizerOnly = None
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Create/update a Chroma DB from PDFs, DOCX, TXT, and MD using llama.cpp embeddings."
)
p.add_argument(
"inputs",
nargs="+",
help="Files and/or directories. Directories are scanned recursively. Globs OK.",
)
p.add_argument(
"--db",
default="./database/chroma_db1",
help="Chroma persist directory (default: ./database/chroma_db1)",
)
p.add_argument(
"--rebuild",
action="store_true",
help="Delete the existing DB at --db before ingesting (fresh rebuild).",
)
p.add_argument(
"--embed-model",
default=os.environ.get("EMBED_MODEL_PATH", ""),
help="Path to GGUF embedding model (e.g., nomic-embed-text-v1.5.f16.gguf). "
"Can also set EMBED_MODEL_PATH env var.",
)
p.add_argument("--chunk-size", type=int, default=800, help="Character chunk size (default: 800)")
p.add_argument("--chunk-overlap", type=int, default=120, help="Character overlap (default: 120)")
# llama.cpp embedding params
p.add_argument("--threads", type=int, default=8, help="CPU threads for embeddings (default: 8)")
p.add_argument("--ctx", type=int, default=4096, help="n_ctx for embeddings (default: 4096)")
p.add_argument("--gpu-layers", type=int, default=0,
help="GPU layers for embeddings (usually 0; set >0 only if you want GPU offload).")
p.add_argument("--llama-n-batch", type=int, default=256,
help="llama.cpp token batch size per sequence for embeddings (default: 256).")
# Chroma options
p.add_argument("--collection", default=None, help="Optional Chroma collection name.")
# Stability knobs
p.add_argument("--embed-batch-size", type=int, default=16,
help="Max number of chunks per add() call (default: 16).")
p.add_argument("--embed-microbatch-min", type=int, default=1,
help="Minimum micro-batch size when adaptively splitting on failures (default: 1).")
# Token-safe guard
p.add_argument("--max-embed-chunk-tokens", type=int, default=2048,
help="If a chunk exceeds this token count (embedding tokenizer), it will be sub-split.")
p.add_argument("--token-overlap", type=int, default=64,
help="Token overlap when sub-splitting oversized chunks (default: 64).")
p.add_argument("--disable-token-guard", action="store_true",
help="Disable token-based sub-splitting guard.")
return p.parse_args()
def expand_globs_or_literal(path_str: str) -> List[Path]:
if any(c in path_str for c in "*?[]"):
return [Path(p) for p in sorted(map(str, Path().glob(path_str)))]
return [Path(path_str)]
def discover_files(paths: List[str]) -> List[Path]:
exts = {".pdf", ".docx", ".txt", ".doc", ".md"}
found: List[Path] = []
for raw in paths:
for p in map(Path, expand_globs_or_literal(raw)):
if p.is_dir():
for sub in p.rglob("*"):
if sub.is_file() and sub.suffix.lower() in exts:
found.append(sub)
elif p.is_file() and p.suffix.lower() in exts:
found.append(p)
# De-dup & sort for stable order
return sorted(set(f.resolve() for f in found))
def load_one(path: Path) -> List[Document]:
suffix = path.suffix.lower()
if suffix == ".pdf":
loader = PyPDFLoader(str(path))
docs = loader.load()
elif suffix == ".docx":
loader = Docx2txtLoader(str(path))
docs = loader.load()
elif suffix == ".txt":
loader = TextLoader(str(path), encoding="utf-8")
docs = loader.load()
elif suffix == ".md":
if HAS_UNSTRUCTURED:
loader = UnstructuredMarkdownLoader(str(path), mode="elements")
docs = loader.load()
else:
loader = TextLoader(str(path), encoding="utf-8")
docs = loader.load()
elif suffix == ".doc":
if not HAS_UNSTRUCTURED:
print(f"[WARN] .doc requires unstructured; skipping: {path}", file=sys.stderr)
return []
loader = UnstructuredWordDocumentLoader(str(path))
docs = loader.load()
else:
return []
for d in docs:
d.metadata = {
**d.metadata,
"source": str(path),
"abs_source": str(path.resolve()),
}
return docs
def load_all(files: List[Path]) -> List[Document]:
out: List[Document] = []
for f in files:
try:
docs = load_one(f)
if docs:
out.extend(docs)
print(f"[OK] Loaded {len(docs):>3} page(s) from {f}")
else:
print(f"[SKIP] No docs from {f}")
except Exception as e:
print(f"[ERROR] Loading failed for {f}: {e}", file=sys.stderr)
return out
def chunk_docs(docs: List[Document], chunk_size: int, chunk_overlap: int) -> List[Document]:
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", " ", ""],
)
return splitter.split_documents(docs)
# ---- Token guard: split any overlong chunk by tokens (embedding tokenizer) ----
def build_tokenizer(model_path: str, n_ctx: int):
if LlamaTokenizerOnly is None:
return None
return LlamaTokenizerOnly(
model_path=model_path,
n_ctx=n_ctx,
vocab_only=True, # do NOT load full weights
embedding=False,
n_gpu_layers=0,
logits_all=False,
)
def count_tokens(tokenizer, text: str) -> int:
if tokenizer is None:
# fallback heuristic
return max(1, len(text) // 4)
return len(tokenizer.tokenize(text.encode("utf-8"), add_bos=False))
def split_chunk_by_tokens(tokenizer, doc: Document, max_tokens: int, overlap_tokens: int) -> List[Document]:
text = doc.page_content or ""
if tokenizer is None:
# crude character window fallback if no tokenizer
approx = max_tokens * 4
step = max(1, approx - (overlap_tokens * 4))
subs = []
i = 0
sub_idx = 0
while i < len(text):
subs.append(text[i:i+approx])
i += step
sub_idx += 1
out = []
for j, t in enumerate(subs):
d = Document(page_content=t, metadata={**doc.metadata, "subchunk": j})
out.append(d)
return out
ids = tokenizer.tokenize(text.encode("utf-8"), add_bos=False)
# keep a small safety margin
window = max(1, max_tokens - 32)
stride = max(1, window - overlap_tokens)
out: List[Document] = []
if len(ids) <= window:
return [doc]
sub_idx = 0
for start in range(0, len(ids), stride):
sub_ids = ids[start:start + window]
if not sub_ids:
break
sub_text = tokenizer.detokenize(sub_ids).decode("utf-8", errors="ignore")
d = Document(page_content=sub_text, metadata={**doc.metadata, "subchunk": sub_idx})
out.append(d)
sub_idx += 1
if start + window >= len(ids):
break
return out
def token_guard_rechunk(chunks: List[Document], tokenizer, max_tokens: int, overlap_tokens: int) -> List[Document]:
safe: List[Document] = []
oversized = 0
for ch in chunks:
tks = count_tokens(tokenizer, ch.page_content or "")
if tks > max_tokens:
oversized += 1
safe.extend(split_chunk_by_tokens(tokenizer, ch, max_tokens, overlap_tokens))
else:
safe.append(ch)
if oversized:
print(f"[TokenGuard] Split {oversized} oversized chunk(s) into token-safe subchunks.")
return safe
def make_chunk_ids(chunks: List[Document]) -> Tuple[List[Document], List[str]]:
"""
Deterministic IDs per chunk to allow safe upserts.
ID = sha1(abs_path | file_mtime_ns | page_num | subchunk | idx | first_40_chars)
"""
ids = []
for i, ch in enumerate(chunks):
src = ch.metadata.get("abs_source", ch.metadata.get("source", ""))
page = ch.metadata.get("page", -1)
subchunk = ch.metadata.get("subchunk", 0)
try:
mtime_ns = Path(src).stat().st_mtime_ns if src else 0
except Exception:
mtime_ns = 0
head = (ch.page_content or "")[:40]
h = hashlib.sha1()
h.update(str(src).encode("utf-8"))
h.update(str(mtime_ns).encode("utf-8"))
h.update(str(page).encode("utf-8"))
h.update(str(subchunk).encode("utf-8"))
h.update(str(i).encode("utf-8"))
h.update(head.encode("utf-8"))
ids.append(h.hexdigest())
return chunks, ids
def add_documents_adaptive(vectorstore: Chroma, docs: List[Document], ids: List[str],
init_bs: int, min_bs: int):
"""
Add docs in batches. On failure, halve batch size until success.
Skips a single offending doc after logging an error (won't crash the run).
"""
n = len(docs)
start = 0
bs = max(1, init_bs)
min_bs = max(1, min_bs)
while start < n:
end = min(n, start + bs)
try:
vectorstore.add_documents(documents=docs[start:end], ids=ids[start:end])
start = end
except Exception as e:
msg = str(e)
print(f"[WARN] add_documents failed for batch size {bs}: {msg}", file=sys.stderr)
if bs > min_bs:
bs = max(min_bs, bs // 2)
print(f"[INFO] Retrying with smaller batch size: {bs}")
continue
# If already at min_bs (1), skip this single doc to avoid stalling the whole run
if end - start == 1:
bad_id = ids[start]
bad_src = docs[start].metadata.get("source", "unknown")
print(f"[ERROR] Skipping single offending doc id={bad_id} src={bad_src}", file=sys.stderr)
start = end # skip one
continue
# Last fallback: drop to singletons within this small window
for i in range(start, end):
try:
vectorstore.add_documents(documents=[docs[i]], ids=[ids[i]])
except Exception as ee:
print(f"[ERROR] Skipping singleton id={ids[i]}: {ee}", file=sys.stderr)
start = end
def main():
args = parse_args()
if not args.embed_model:
print("ERROR: --embed-model is required (path to a GGUF embedding model).", file=sys.stderr)
sys.exit(1)
# Optionally nuke the DB first
if args.rebuild and Path(args.db).exists():
print(f"[REBUILD] Removing existing DB at {args.db} …")
shutil.rmtree(args.db, ignore_errors=True)
files = discover_files(args.inputs)
if not files:
print("No supported files found.", file=sys.stderr)
sys.exit(1)
print(f"Discovered {len(files)} files.")
raw_docs = load_all(files)
if not raw_docs:
print("No documents could be loaded.", file=sys.stderr)
sys.exit(1)
# Character-based chunking first (fast & robust)
chunks = chunk_docs(raw_docs, args.chunk_size, args.chunk_overlap)
print(f"Split into {len(chunks)} chunks (chars={args.chunk_size}, overlap={args.chunk_overlap}).")
# Token guard: split any chunk exceeding max tokens (embedding model tokenizer)
tokenizer = None
if not args.disable_token_guard:
if LlamaTokenizerOnly is None:
print("[WARN] llama-cpp-python not available for token guard; falling back to char heuristic.",
file=sys.stderr)
else:
tokenizer = build_tokenizer(args.embed_model, args.ctx)
chunks = token_guard_rechunk(
chunks, tokenizer, args.max_embed_chunk_tokens, args.token_overlap
)
# Embeddings (llama.cpp) — conservative defaults for stability
embeddings = LlamaCppEmbeddings(
model_path=args.embed_model,
n_ctx=max(2048, args.ctx), # 2048–4096 is plenty for embeddings
n_threads=args.threads,
n_gpu_layers=args.gpu_layers, # keep 0 unless you specifically want GPU offload
n_batch=max(32, min(args.llama_n_batch, 1024)), # conservative token batch per sequence
verbose=False,
)
# Open (or create) vector store
vs_kwargs = dict(
persist_directory=args.db,
embedding_function=embeddings,
)
if args.collection:
vs_kwargs["collection_name"] = args.collection
vectorstore = Chroma(**vs_kwargs)
# Stable upsert by deterministic IDs, with adaptive micro-batching
chunks, ids = make_chunk_ids(chunks)
print("Embedding & upserting into Chroma…")
add_documents_adaptive(
vectorstore,
docs=chunks,
ids=ids,
init_bs=args.embed_batch_size,
min_bs=args.embed_microbatch_min,
)
# Chroma 0.5+ persists automatically; older versions need .persist()
if hasattr(vectorstore, "persist"):
try:
vectorstore.persist()
except Exception:
pass
print(f"Done. DB at: {args.db}")
if __name__ == "__main__":
main()