-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path_scoring.py
More file actions
189 lines (153 loc) · 5.84 KB
/
Copy path_scoring.py
File metadata and controls
189 lines (153 loc) · 5.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""Shared BM25 + substring hybrid scoring used by in-process backends.
Memory and Kuzu backends share this scoring so search ranking stays
identical across them (parity guarantee). Any backend that can provide a
`list[Node]` candidate set may reuse these functions.
The BM25 hybrid is lifted from the v0.10.0 tuning in ``MemoryBackend``
and kept here verbatim so that swapping backends does not change IR
metrics.
"""
from __future__ import annotations
import math
from collections.abc import Iterable
from difflib import SequenceMatcher
from synaptic.models import Node
def bm25_hybrid_score(
nodes: Iterable[Node],
query: str,
*,
limit: int = 20,
) -> list[Node]:
"""Rank ``nodes`` against ``query`` using BM25 + substring hybrid.
This mirrors ``MemoryBackend.search_fts``:
- Okapi BM25 with k1=1.5, b=0.75 and a 3x title boost
- Substring component (corpus-size independent)
- Bigram bonus, tag bonus, ``_search_keywords`` bonus
- Query term coverage bonus
- Adaptive BM25/substring weight based on corpus size
"""
query_lower = query.lower()
terms = [t for t in query_lower.split() if len(t) >= 1]
if not terms:
return []
node_list = list(nodes)
n_docs = len(node_list)
if n_docs == 0:
return []
k1 = 1.5
b = 0.75
title_boost = 3.0
doc_texts: dict[str, str] = {}
doc_lengths: dict[str, int] = {}
for node in node_list:
text = f"{node.title.lower()} {node.content.lower()}"
if node.tags:
text += " " + " ".join(node.tags).lower()
if node.properties:
kw = node.properties.get("_search_keywords", "")
if kw:
text += " " + kw.lower()
doc_texts[node.id] = text
doc_lengths[node.id] = len(text.split())
avgdl = sum(doc_lengths.values()) / n_docs if n_docs > 0 else 1.0
doc_freq: dict[str, int] = {}
for t in terms:
count = 0
for text in doc_texts.values():
if t in text:
count += 1
doc_freq[t] = count
bigrams: list[str] = []
if len(terms) >= 2:
for i in range(len(terms) - 1):
bigrams.append(f"{terms[i]} {terms[i + 1]}")
scored: list[tuple[Node, float]] = []
for node in node_list:
title_lower = node.title.lower()
content_lower = node.content.lower()
full_text = doc_texts[node.id]
dl = doc_lengths[node.id]
bm25_score = 0.0
substr_score = 0.0
matched_terms = 0
if query_lower in title_lower:
substr_score += len(terms) * 3.0
for t in terms:
tf_content = content_lower.count(t)
tf_title = title_lower.count(t)
if tf_content == 0 and tf_title == 0:
continue
df = doc_freq.get(t, 0)
idf = math.log((n_docs - df + 0.5) / (df + 0.5) + 1.0)
if tf_content > 0:
numerator = tf_content * (k1 + 1)
denominator = tf_content + k1 * (1 - b + b * dl / avgdl)
bm25_score += idf * numerator / denominator
if tf_title > 0:
bm25_score += idf * title_boost
if tf_title > 0:
substr_score += 2.0
if tf_content > 0:
substr_score += 1.0
matched_terms += 1
for bg in bigrams:
if bg in full_text:
bm25_score += 1.5
substr_score += 1.5
if node.tags:
tag_text = " ".join(node.tags).lower()
for t in terms:
if t in tag_text:
substr_score += 1.0
if node.properties:
search_kw = node.properties.get("_search_keywords", "").lower()
if search_kw:
for t in terms:
if t in search_kw:
substr_score += 1.5
if len(terms) >= 2 and matched_terms > 0:
coverage = matched_terms / len(terms)
if coverage >= 0.8:
substr_score += len(terms) * 1.5
elif coverage >= 0.5:
substr_score += len(terms) * 0.5
bm25_weight = min(0.8, max(0.1, (n_docs - 500) / 5000))
score = bm25_score * bm25_weight + substr_score * (1 - bm25_weight)
if score > 0:
scored.append((node, score))
scored.sort(key=lambda x: x[1], reverse=True)
return [n for n, _ in scored[:limit]]
def fuzzy_score(
nodes: Iterable[Node],
query: str,
*,
limit: int = 20,
threshold: float = 0.4,
) -> list[Node]:
"""Fuzzy string matching across title/content/tags (``MemoryBackend`` parity)."""
query_lower = query.lower()
query_terms = list(dict.fromkeys(query_lower.split()))[:10]
scored: list[tuple[Node, float]] = []
for node in nodes:
title_lower = node.title.lower()
title_ratio = SequenceMatcher(None, query_lower[:200], title_lower).ratio()
best = title_ratio
if query_terms:
title_words = title_lower.split()
content_words = node.content.lower().split()[:100]
tag_words = [t.lower() for t in (node.tags or [])]
text_words = title_words + content_words + tag_words
term_scores: list[float] = []
for qt in query_terms:
term_best = 0.0
for tw in text_words:
r = SequenceMatcher(None, qt, tw).ratio()
if r > term_best:
term_best = r
term_scores.append(term_best)
avg_term = sum(term_scores) / len(term_scores)
title_boost = sum(0.1 for qt in query_terms if qt in title_lower)
best = max(best, avg_term) + title_boost
if best >= threshold:
scored.append((node, best))
scored.sort(key=lambda x: x[1], reverse=True)
return [n for n, _ in scored[:limit]]