Coverage for stackone_ai/utils/tfidf_index.py: 94%

105 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-12-24 09:48 +0000

1""" 

2Lightweight TF-IDF vector index for offline vector search. 

3No external dependencies; tokenizes ASCII/latin text, lowercases, 

4strips punctuation, removes a small stopword set, and builds a sparse index. 

5""" 

6 

7from __future__ import annotations 

8 

9import math 

10import re 

11from typing import NamedTuple 

12 

13 

14class TfidfDocument(NamedTuple): 

15 """Document for TF-IDF indexing""" 

16 

17 id: str 

18 text: str 

19 

20 

21class TfidfResult(NamedTuple): 

22 """Search result from TF-IDF index""" 

23 

24 id: str 

25 score: float # cosine similarity (0..1) 

26 

27 

28# Common English stopwords 

29STOPWORDS = { 

30 "a", 

31 "an", 

32 "the", 

33 "and", 

34 "or", 

35 "but", 

36 "if", 

37 "then", 

38 "else", 

39 "for", 

40 "of", 

41 "in", 

42 "on", 

43 "to", 

44 "from", 

45 "by", 

46 "with", 

47 "as", 

48 "at", 

49 "is", 

50 "are", 

51 "was", 

52 "were", 

53 "be", 

54 "been", 

55 "it", 

56 "this", 

57 "that", 

58 "these", 

59 "those", 

60 "not", 

61 "no", 

62 "can", 

63 "could", 

64 "should", 

65 "would", 

66 "may", 

67 "might", 

68 "do", 

69 "does", 

70 "did", 

71 "have", 

72 "has", 

73 "had", 

74 "you", 

75 "your", 

76} 

77 

78 

79def tokenize(text: str) -> list[str]: 

80 """Tokenize text by lowercasing, removing punctuation, and filtering stopwords 

81 

82 Args: 

83 text: Input text to tokenize 

84 

85 Returns: 

86 List of tokens 

87 """ 

88 # Lowercase and replace non-alphanumeric (except underscore) with space 

89 text = text.lower() 

90 text = re.sub(r"[^a-z0-9_\s]", " ", text) 

91 

92 # Split and filter 

93 tokens = [t for t in text.split() if t and t not in STOPWORDS] 

94 return tokens 

95 

96 

97class TfidfIndex: 

98 """TF-IDF vector index for document search using sparse vectors""" 

99 

100 def __init__(self) -> None: 

101 self.vocab: dict[str, int] = {} 

102 self.idf: list[float] = [] 

103 self.docs: list[dict[str, str | dict[int, float] | float]] = [] 

104 

105 def build(self, corpus: list[TfidfDocument]) -> None: 

106 """Build index from a corpus of documents 

107 

108 Args: 

109 corpus: List of documents to index 

110 """ 

111 # Tokenize all documents 

112 docs_tokens = [tokenize(doc.text) for doc in corpus] 

113 

114 # Build vocabulary 

115 for tokens in docs_tokens: 

116 for token in tokens: 

117 if token not in self.vocab: 

118 self.vocab[token] = len(self.vocab) 

119 

120 # Compute document frequency (df) 

121 df: dict[int, int] = {} 

122 for tokens in docs_tokens: 

123 seen: set[int] = set() 

124 for token in tokens: 

125 term_id = self.vocab.get(token) 

126 if term_id is not None and term_id not in seen: 

127 seen.add(term_id) 

128 df[term_id] = df.get(term_id, 0) + 1 

129 

130 # Compute IDF (inverse document frequency) 

131 n_docs = len(corpus) 

132 self.idf = [] 

133 for term_id in range(len(self.vocab)): 

134 dfi = df.get(term_id, 0) 

135 # Smoothed IDF 

136 idf_value = math.log((n_docs + 1) / (dfi + 1)) + 1 

137 self.idf.append(idf_value) 

138 

139 # Build document vectors 

140 self.docs = [] 

141 for doc, tokens in zip(corpus, docs_tokens): 

142 # Compute term frequency (TF) 

143 tf: dict[int, int] = {} 

144 for token in tokens: 

145 term_id = self.vocab.get(token) 

146 if term_id is not None: 146 ↛ 144line 146 didn't jump to line 144 because the condition on line 146 was always true

147 tf[term_id] = tf.get(term_id, 0) + 1 

148 

149 # Build weighted vector (TF-IDF) 

150 vec: dict[int, float] = {} 

151 norm_sq = 0.0 

152 n_tokens = len(tokens) 

153 

154 for term_id, freq in tf.items(): 

155 if term_id >= len(self.idf) or n_tokens == 0: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 continue 

157 idf_val = self.idf[term_id] 

158 weight = (freq / n_tokens) * idf_val 

159 if weight > 0: 159 ↛ 154line 159 didn't jump to line 154 because the condition on line 159 was always true

160 vec[term_id] = weight 

161 norm_sq += weight * weight 

162 

163 norm = math.sqrt(norm_sq) if norm_sq > 0 else 1.0 

164 

165 self.docs.append( 

166 { 

167 "id": doc.id, 

168 "vec": vec, 

169 "norm": norm, 

170 } 

171 ) 

172 

173 def search(self, query: str, k: int = 10) -> list[TfidfResult]: 

174 """Search for documents similar to the query 

175 

176 Args: 

177 query: Search query 

178 k: Maximum number of results to return 

179 

180 Returns: 

181 List of results sorted by score (descending) 

182 """ 

183 tokens = tokenize(query) 

184 if not tokens or not self.vocab: 

185 return [] 

186 

187 # Compute query term frequency 

188 tf: dict[int, int] = {} 

189 for token in tokens: 

190 term_id = self.vocab.get(token) 

191 if term_id is not None: 

192 tf[term_id] = tf.get(term_id, 0) + 1 

193 

194 if not tf: 

195 return [] 

196 

197 # Build query vector 

198 q_vec: dict[int, float] = {} 

199 q_norm_sq = 0.0 

200 n_tokens = len(tokens) 

201 

202 for term_id, freq in tf.items(): 

203 if term_id >= len(self.idf): 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 continue 

205 idf_val = self.idf[term_id] 

206 weight = (freq / n_tokens) * idf_val if n_tokens > 0 else 0 

207 if weight > 0: 207 ↛ 202line 207 didn't jump to line 202 because the condition on line 207 was always true

208 q_vec[term_id] = weight 

209 q_norm_sq += weight * weight 

210 

211 q_norm = math.sqrt(q_norm_sq) if q_norm_sq > 0 else 1.0 

212 

213 # Compute cosine similarity with each document 

214 scores: list[TfidfResult] = [] 

215 for doc in self.docs: 

216 doc_vec = doc["vec"] 

217 doc_norm = doc["norm"] 

218 if not isinstance(doc_vec, dict) or not isinstance(doc_norm, (int, float)): 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 continue 

220 

221 # Compute dot product (iterate over smaller map for efficiency) 

222 dot = 0.0 

223 small_vec = q_vec if len(q_vec) <= len(doc_vec) else doc_vec 

224 big_vec = doc_vec if len(q_vec) <= len(doc_vec) else q_vec 

225 

226 for term_id, weight in small_vec.items(): 

227 other_weight = big_vec.get(term_id) 

228 if other_weight is not None: 

229 dot += weight * other_weight 

230 

231 # Cosine similarity 

232 similarity = dot / (q_norm * doc_norm) 

233 if similarity > 0: 

234 doc_id = doc["id"] 

235 if isinstance(doc_id, str): 235 ↛ 215line 235 didn't jump to line 215 because the condition on line 235 was always true

236 # Clamp to [0, 1] 

237 clamped_score = max(0.0, min(1.0, similarity)) 

238 scores.append(TfidfResult(id=doc_id, score=clamped_score)) 

239 

240 # Sort by score descending and return top k 

241 scores.sort(key=lambda x: x.score, reverse=True) 

242 return scores[:k]