Coverage for stackone_ai/utils/tfidf_index.py: 94%
105 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-24 09:48 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-12-24 09:48 +0000
1"""
2Lightweight TF-IDF vector index for offline vector search.
3No external dependencies; tokenizes ASCII/latin text, lowercases,
4strips punctuation, removes a small stopword set, and builds a sparse index.
5"""
7from __future__ import annotations
9import math
10import re
11from typing import NamedTuple
14class TfidfDocument(NamedTuple):
15 """Document for TF-IDF indexing"""
17 id: str
18 text: str
21class TfidfResult(NamedTuple):
22 """Search result from TF-IDF index"""
24 id: str
25 score: float # cosine similarity (0..1)
28# Common English stopwords
29STOPWORDS = {
30 "a",
31 "an",
32 "the",
33 "and",
34 "or",
35 "but",
36 "if",
37 "then",
38 "else",
39 "for",
40 "of",
41 "in",
42 "on",
43 "to",
44 "from",
45 "by",
46 "with",
47 "as",
48 "at",
49 "is",
50 "are",
51 "was",
52 "were",
53 "be",
54 "been",
55 "it",
56 "this",
57 "that",
58 "these",
59 "those",
60 "not",
61 "no",
62 "can",
63 "could",
64 "should",
65 "would",
66 "may",
67 "might",
68 "do",
69 "does",
70 "did",
71 "have",
72 "has",
73 "had",
74 "you",
75 "your",
76}
79def tokenize(text: str) -> list[str]:
80 """Tokenize text by lowercasing, removing punctuation, and filtering stopwords
82 Args:
83 text: Input text to tokenize
85 Returns:
86 List of tokens
87 """
88 # Lowercase and replace non-alphanumeric (except underscore) with space
89 text = text.lower()
90 text = re.sub(r"[^a-z0-9_\s]", " ", text)
92 # Split and filter
93 tokens = [t for t in text.split() if t and t not in STOPWORDS]
94 return tokens
97class TfidfIndex:
98 """TF-IDF vector index for document search using sparse vectors"""
100 def __init__(self) -> None:
101 self.vocab: dict[str, int] = {}
102 self.idf: list[float] = []
103 self.docs: list[dict[str, str | dict[int, float] | float]] = []
105 def build(self, corpus: list[TfidfDocument]) -> None:
106 """Build index from a corpus of documents
108 Args:
109 corpus: List of documents to index
110 """
111 # Tokenize all documents
112 docs_tokens = [tokenize(doc.text) for doc in corpus]
114 # Build vocabulary
115 for tokens in docs_tokens:
116 for token in tokens:
117 if token not in self.vocab:
118 self.vocab[token] = len(self.vocab)
120 # Compute document frequency (df)
121 df: dict[int, int] = {}
122 for tokens in docs_tokens:
123 seen: set[int] = set()
124 for token in tokens:
125 term_id = self.vocab.get(token)
126 if term_id is not None and term_id not in seen:
127 seen.add(term_id)
128 df[term_id] = df.get(term_id, 0) + 1
130 # Compute IDF (inverse document frequency)
131 n_docs = len(corpus)
132 self.idf = []
133 for term_id in range(len(self.vocab)):
134 dfi = df.get(term_id, 0)
135 # Smoothed IDF
136 idf_value = math.log((n_docs + 1) / (dfi + 1)) + 1
137 self.idf.append(idf_value)
139 # Build document vectors
140 self.docs = []
141 for doc, tokens in zip(corpus, docs_tokens):
142 # Compute term frequency (TF)
143 tf: dict[int, int] = {}
144 for token in tokens:
145 term_id = self.vocab.get(token)
146 if term_id is not None: 146 ↛ 144line 146 didn't jump to line 144 because the condition on line 146 was always true
147 tf[term_id] = tf.get(term_id, 0) + 1
149 # Build weighted vector (TF-IDF)
150 vec: dict[int, float] = {}
151 norm_sq = 0.0
152 n_tokens = len(tokens)
154 for term_id, freq in tf.items():
155 if term_id >= len(self.idf) or n_tokens == 0: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 continue
157 idf_val = self.idf[term_id]
158 weight = (freq / n_tokens) * idf_val
159 if weight > 0: 159 ↛ 154line 159 didn't jump to line 154 because the condition on line 159 was always true
160 vec[term_id] = weight
161 norm_sq += weight * weight
163 norm = math.sqrt(norm_sq) if norm_sq > 0 else 1.0
165 self.docs.append(
166 {
167 "id": doc.id,
168 "vec": vec,
169 "norm": norm,
170 }
171 )
173 def search(self, query: str, k: int = 10) -> list[TfidfResult]:
174 """Search for documents similar to the query
176 Args:
177 query: Search query
178 k: Maximum number of results to return
180 Returns:
181 List of results sorted by score (descending)
182 """
183 tokens = tokenize(query)
184 if not tokens or not self.vocab:
185 return []
187 # Compute query term frequency
188 tf: dict[int, int] = {}
189 for token in tokens:
190 term_id = self.vocab.get(token)
191 if term_id is not None:
192 tf[term_id] = tf.get(term_id, 0) + 1
194 if not tf:
195 return []
197 # Build query vector
198 q_vec: dict[int, float] = {}
199 q_norm_sq = 0.0
200 n_tokens = len(tokens)
202 for term_id, freq in tf.items():
203 if term_id >= len(self.idf): 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 continue
205 idf_val = self.idf[term_id]
206 weight = (freq / n_tokens) * idf_val if n_tokens > 0 else 0
207 if weight > 0: 207 ↛ 202line 207 didn't jump to line 202 because the condition on line 207 was always true
208 q_vec[term_id] = weight
209 q_norm_sq += weight * weight
211 q_norm = math.sqrt(q_norm_sq) if q_norm_sq > 0 else 1.0
213 # Compute cosine similarity with each document
214 scores: list[TfidfResult] = []
215 for doc in self.docs:
216 doc_vec = doc["vec"]
217 doc_norm = doc["norm"]
218 if not isinstance(doc_vec, dict) or not isinstance(doc_norm, (int, float)): 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 continue
221 # Compute dot product (iterate over smaller map for efficiency)
222 dot = 0.0
223 small_vec = q_vec if len(q_vec) <= len(doc_vec) else doc_vec
224 big_vec = doc_vec if len(q_vec) <= len(doc_vec) else q_vec
226 for term_id, weight in small_vec.items():
227 other_weight = big_vec.get(term_id)
228 if other_weight is not None:
229 dot += weight * other_weight
231 # Cosine similarity
232 similarity = dot / (q_norm * doc_norm)
233 if similarity > 0:
234 doc_id = doc["id"]
235 if isinstance(doc_id, str): 235 ↛ 215line 235 didn't jump to line 215 because the condition on line 235 was always true
236 # Clamp to [0, 1]
237 clamped_score = max(0.0, min(1.0, similarity))
238 scores.append(TfidfResult(id=doc_id, score=clamped_score))
240 # Sort by score descending and return top k
241 scores.sort(key=lambda x: x.score, reverse=True)
242 return scores[:k]