Different approaches to implementing semantic search using embeddings, from fully local to hybrid to cloud-based solutions.
import json from collections import defaultdict from sentence_transformers import SentenceTransformer import faiss import numpy as np from tqdm import tqdm def clean_text(text): return " ".join(text.strip().split()) def normalize(v): return v / np.linalg.norm(v) # === Load your data === filename = "discourse_posts.json" # Change to your JSON file path with open(filename, "r", encoding="utf-8") as f: posts_data = json.load(f) # === Group posts by topic_id === topics = {} for post in posts_data: topic_id = post["topic_id"] if topic_id not in topics: topics[topic_id] = {"topic_title": post.get("topic_title", ""), "posts": []} topics[topic_id]["posts"].append(post) # Sort posts by post_number within each topic for topic_id in topics: topics[topic_id]["posts"].sort(key=lambda p: p["post_number"]) print(f"Loaded {len(posts_data)} posts across {len(topics)} topics.") # === Initialize embedding model === model = SentenceTransformer("GritLM/GritLM-8x7B") # === Function to build reply tree and extract subthreads === def build_reply_map(posts): """ Builds a map: parent_post_number -> list of child posts """ reply_map = defaultdict(list) posts_by_number = {} for post in posts: posts_by_number[post["post_number"]] = post parent = post.get("reply_to_post_number") reply_map[parent].append(post) return reply_map, posts_by_number def extract_subthread(root_post_number, reply_map, posts_by_number): """ Recursively collect all posts in a subthread rooted at root_post_number """ collected = [] def dfs(post_num): post = posts_by_number[post_num] collected.append(post) for child in reply_map.get(post_num, []): dfs(child["post_number"]) dfs(root_post_number) return collected # === Prepare embeddings for subthreads === embedding_data = [] embeddings = [] print("Building subthread embeddings...") for topic_id, topic_data in tqdm(topics.items()): posts = topic_data["posts"] topic_title = topic_data["topic_title"] reply_map, posts_by_number = build_reply_map(posts) # root posts have parent = None root_posts = reply_map[None] for root_post in root_posts: root_num = root_post["post_number"] subthread_posts = extract_subthread(root_num, reply_map, posts_by_number) # Combine texts of all posts in subthread combined_text = f"Topic title: {topic_title}\n\n" combined_text += "\n\n---\n\n".join( clean_text(p["content"]) for p in subthread_posts ) # Embed the combined text emb = model.encode(combined_text, convert_to_numpy=True) emb = emb / np.linalg.norm(emb) embedding_data.append({ "topic_id": topic_id, "topic_title": topic_title, "root_post_number": root_num, "post_numbers": [p["post_number"] for p in subthread_posts], "combined_text": combined_text, }) embeddings.append(emb) # Convert embeddings to numpy array for FAISS embeddings = np.vstack(embeddings).astype("float32") # Build FAISS index (cosine similarity with normalized vectors using inner product) dim = embeddings.shape[1] index = faiss.IndexFlatIP(dim) index.add(embeddings) print(f"Indexed {len(embedding_data)} subthreads.") # === Retrieval function === def retrieve(query, top_k=5): query_emb = model.encode(query, convert_to_numpy=True) query_emb = query_emb / np.linalg.norm(query_emb) query_emb = query_emb.astype("float32") D, I = index.search(np.array([query_emb]), top_k) results = [] for score, idx in zip(D[0], I[0]): window = embedding_data[idx] results.append({ "score": float(score), "topic_id": window["topic_id"], "topic_title": window["topic_title"], "root_post_number": window["root_post_number"], "post_numbers": window["post_numbers"], "combined_text": window["combined_text"], }) return results query = "If a student scores 10/10 on GA4 as well as a bonus, how would it appear on the dashboard?" results = retrieve(query, top_k=3) print("\nTop retrieved subthreads:") for i, res in enumerate(results, 1): print(f"\n[{i}] Score: {res['score']:.4f}") print(f"Topic ID: {res['topic_id']}, Root Post #: {res['root_post_number']}") print(f"Topic Title: {res['topic_title']}") print(f"Posts in subthread: {res['post_numbers']}") print("Content snippet:") print(res["combined_text"][:700], "...\n") # print first 700 chars from transformers import AutoTokenizer, AutoModelForSeq2SeqLM gen_model_name = "google/flan-t5-xxl" tokenizer = AutoTokenizer.from_pretrained(gen_model_name) model = AutoModelForSeq2SeqLM.from_pretrained(gen_model_name) def generate_answer(query, retrieved_texts, max_length=256): """ Generate an answer using HF model given the query and retrieved context. retrieved_texts: List of strings (combined_text from retrieved results) """ context = "\n\n".join(retrieved_texts) prompt = f"Answer the question based on the following forum discussion excerpts:\n\n{context}\n\nQuestion: {query}\nAnswer:" inputs = tokenizer(prompt, return_tensors="pt", max_length=4096, truncation=True) outputs = model.generate(**inputs, max_length=max_length, num_beams=5, early_stopping=True) answer = tokenizer.decode(outputs[0], skip_special_tokens=True) return answer # Use the top-k retrieved subthreads to generate an answer retrieved_texts = [res["combined_text"] for res in results] answer = generate_answer(query, retrieved_texts) print("\nGenerated Answer:\n", answer)
sentence-transformers
for embeddings