import streamlit as st import pandas as pd import numpy as np from sklearn.metrics.pairwise import cosine_similarity from sklearn.decomposition import PCA import time import json from typing import List, Dict, Tuple, Optional from datetime import datetime import warnings from sentence_transformers import SentenceTransformer import matplotlib.pyplot as plt import seaborn as sns import io import base64 import os warnings.filterwarnings("ignore") # Configuration for Sentence Transformers EMBEDDING_CONFIG = { "model": "google/embeddinggemma-300m", "similarity_threshold": 0.70, "high_similarity_threshold": 0.85, "normalize_embeddings": True, } class BlogTitleAnalyzer: def __init__(self, config=EMBEDDING_CONFIG): self.config = config self.model_name = config["model"] self.normalize = config.get("normalize_embeddings", True) self.existing_titles = [] self.existing_metadata = [] self.existing_embeddings = None # Initialize SentenceTransformer model with authentication for gated models if self.model_name.startswith("google/"): # Set Hugging Face token for gated Google models # hf_token = st.secrets.get( # "HUGGINGFACE_TOKEN", os.getenv("HUGGINGFACE_TOKEN") # ) # if hf_token: self.model = SentenceTransformer( self.model_name, use_auth_token=hf_token ) # else: # raise ValueError( # "Hugging Face token required for gated model. Please add it to Streamlit secrets." # ) else: self.model = SentenceTransformer(self.model_name) # ---- Embedding helpers ---- def _embed(self, texts): # Accept str or list[str]; always return list[list[float]] if isinstance(texts, str): inputs = [texts] else: inputs = list(texts) embeddings = self.model.encode(inputs, convert_to_numpy=True) if self.normalize: embeddings = self._l2_normalize_rows(embeddings) return embeddings.tolist() return embeddings.tolist() @staticmethod def _l2_normalize_rows(arr: np.ndarray) -> np.ndarray: norms = np.linalg.norm(arr, axis=1, keepdims=True) norms[norms == 0] = 1.0 return arr / norms # ---- Public API ---- def generate_embedding(self, title: str) -> np.ndarray: vec = np.array(self._embed(title)[0], dtype=np.float32) return vec def generate_embeddings_batch(self, titles) -> np.ndarray: vecs = np.array(self._embed(titles), dtype=np.float32) return vecs def load_existing_titles(self, titles, metadata=None): self.existing_titles = list(titles) self.existing_metadata = ( list(metadata) if metadata is not None else [{} for _ in titles] ) self.existing_embeddings = self.generate_embeddings_batch(self.existing_titles) def check_similarity_against_existing(self, new_title: str, top_k: int = 5): if self.existing_embeddings is None or len(self.existing_titles) == 0: return [] new_vec = self.generate_embedding(new_title).reshape(1, -1) sims = cosine_similarity(new_vec, self.existing_embeddings)[0] idx_sorted = np.argsort(-sims)[:top_k] results = [] for idx in idx_sorted: results.append( { "title": self.existing_titles[idx], "metadata": self.existing_metadata[idx], "similarity": float(sims[idx]), } ) return results def batch_check_similarity_against_existing(self, new_titles, top_k: int = 5): if self.existing_embeddings is None or len(self.existing_titles) == 0: return {t: [] for t in new_titles} new_vecs = self.generate_embeddings_batch(new_titles) sims = cosine_similarity(new_vecs, self.existing_embeddings) # [M, N] results = {} for i, t in enumerate(new_titles): row = sims[i] idx_sorted = np.argsort(-row)[:top_k] hits = [] for idx in idx_sorted: hits.append( { "title": self.existing_titles[idx], "metadata": self.existing_metadata[idx], "similarity": float(row[idx]), } ) results[t] = hits return results def simple_deduplication(analyzer, new_titles, threshold=0.7): """ Simple deduplication of new titles by comparing pairwise similarities. Removes titles that are overly similar (> threshold) based on rules. """ embeddings = analyzer.generate_embeddings_batch(new_titles) similarity_matrix = cosine_similarity(embeddings) n_titles = len(new_titles) to_remove = set() decisions = [] for i in range(n_titles): for j in range(i + 1, n_titles): if similarity_matrix[i][j] > threshold: title_A = new_titles[i] title_B = new_titles[j] if i in to_remove or j in to_remove: continue # Check against existing titles sims_A = analyzer.check_similarity_against_existing(title_A) sims_B = analyzer.check_similarity_against_existing(title_B) max_sim_A = max([s["similarity"] for s in sims_A], default=0.0) max_sim_B = max([s["similarity"] for s in sims_B], default=0.0) if max_sim_A > max_sim_B: to_remove.add(i) decisions.append( { "remove": title_A, "keep": title_B, "reason": "Title A is more similar to existing content", "pairwise_similarity": float(similarity_matrix[i][j]), } ) else: to_remove.add(j) decisions.append( { "remove": title_B, "keep": title_A, "reason": "Title B is more similar to existing content", "pairwise_similarity": float(similarity_matrix[i][j]), } ) # Final report report_rows = [] for d in decisions: report_rows.append( { "Action": "REMOVE", "Title": d["remove"], "Reason": d["reason"], "Pairwise_Similarity": round(d["pairwise_similarity"], 3), "Keep_Instead": d["keep"], } ) for i, t in enumerate(new_titles): if i not in to_remove: report_rows.append( { "Action": "KEEP", "Title": t, "Reason": "No high similarity conflicts", "Pairwise_Similarity": "N/A", "Keep_Instead": "N/A", } ) return pd.DataFrame(report_rows) def create_new_vs_existing_table( analyzer: BlogTitleAnalyzer, new_titles: List[str] ) -> pd.DataFrame: """Create a comprehensive table showing new titles vs existing titles comparisons""" table_data = [] for new_title in new_titles: # Generate embedding for new title new_embedding = analyzer.generate_embedding(new_title) # Calculate similarities against all existing titles similarities = cosine_similarity([new_embedding], analyzer.existing_embeddings)[ 0 ] # Add comparison with every existing title for idx, existing_title in enumerate(analyzer.existing_titles): table_data.append( { "New Title": new_title, "Existing Title": existing_title, "Similarity Score": round(similarities[idx], 3), } ) df = pd.DataFrame(table_data) return df def create_new_vs_new_table( analyzer: BlogTitleAnalyzer, new_titles: List[str] ) -> pd.DataFrame: """Create a table showing comparisons between new titles themselves""" table_data = [] # Generate embeddings for all new titles new_embeddings = analyzer.generate_embeddings_batch(new_titles) # Compare each new title with every other new title for i, title1 in enumerate(new_titles): for j, title2 in enumerate(new_titles): if i != j: # Don't compare a title with itself similarity = cosine_similarity( [new_embeddings[i]], [new_embeddings[j]] )[0][0] table_data.append( { "Title 1": title1, "Title 2": title2, "Similarity Score": round(similarity, 3), } ) df = pd.DataFrame(table_data) return df def run_title_similarity_analysis(existing_titles: List[str], new_titles: List[str]): """Run comprehensive title similarity analysis""" # Initialize analyzer analyzer = BlogTitleAnalyzer() # Load existing titles analyzer.load_existing_titles(existing_titles) # Create comparison tables new_vs_existing_table = create_new_vs_existing_table(analyzer, new_titles) new_vs_new_table = create_new_vs_new_table(analyzer, new_titles) dedup_report = simple_deduplication(analyzer, new_titles) return analyzer, new_vs_existing_table, new_vs_new_table, dedup_report # Streamlit UI st.set_page_config( page_title="Blog Title Checker", page_icon="📝", layout="wide", initial_sidebar_state="expanded", ) st.title("📝 Blog Title Similarity Checker") st.markdown( "Analyze and deduplicate blog titles using AI-powered similarity detection." ) # Sidebar with instructions st.sidebar.header("Instructions") st.sidebar.markdown( """ 1. **Existing Titles**: Enter your current blog titles (one per line) 2. **New Titles**: Enter new title ideas to check (one per line) 3. **Analysis**: Click "Analyze Titles" to run the similarity check 4. **Results**: View deduplication recommendations and download detailed reports **Note**: The app uses Sentence Transformers for embedding and compares titles using cosine similarity. """ ) # Create two columns for input col1, col2 = st.columns(2) with col1: st.subheader("Existing Blog Titles") st.markdown("Enter your current blog titles (one per line):") existing_titles_input = st.text_area( "Existing Titles", height=300, placeholder="The True Cost of Blown-In Insulation in 2025: A Detailed Breakdown\nCalculating the ROI of Your Attic Insulation Upgrade: A Step-by-Step Guide\nUnlocking Savings: Are There Government Rebates or Tax Credits for Blown-In Insulation?", ) with col2: st.subheader("New Titles to Check") st.markdown("Enter new title ideas to analyze (one per line):") new_titles_input = st.text_area( "New Titles", height=300, placeholder="Can Your Walls Be Insulated for Better Indoor Air Quality?\nHow Does Your Home's Insulation Impact Seasonal Allergies?\nIs Cold Air Leaking into Your Living Spaces, and What Can You Do?", ) # Process input existing_titles = [ title.strip() for title in existing_titles_input.split("\n") if title.strip() ] new_titles = [title.strip() for title in new_titles_input.split("\n") if title.strip()] # Analysis button if st.button("🔍 Analyze Titles", type="primary", use_container_width=True): if not existing_titles: st.error("Please enter at least one existing blog title.") elif not new_titles: st.error("Please enter at least one new title to analyze.") else: with st.spinner("Analyzing titles... This may take a moment."): try: analyzer, new_vs_existing_df, new_vs_new_df, dedup_report = ( run_title_similarity_analysis(existing_titles, new_titles) ) # Store results in session state st.session_state["analysis_results"] = { "analyzer": analyzer, "new_vs_existing_df": new_vs_existing_df, "new_vs_new_df": new_vs_new_df, "dedup_report": dedup_report, } st.success("✅ Analysis completed successfully!") except Exception as e: st.error(f"An error occurred during analysis: {str(e)}") # Display results if available if "analysis_results" in st.session_state: results = st.session_state["analysis_results"] # Summary statistics st.header("📊 Summary Statistics") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Total Existing Titles", len(existing_titles)) with col2: st.metric("Total New Titles", len(new_titles)) with col3: duplicates_found = len( results["dedup_report"][results["dedup_report"]["Action"] == "REMOVE"] ) st.metric("Duplicates Found", duplicates_found) with col4: unique_titles = len( results["dedup_report"][results["dedup_report"]["Action"] == "KEEP"] ) st.metric("Unique Titles to Keep", unique_titles) # Deduplication Results st.header("🎯 Deduplication Recommendations") # Filter to show only duplicates duplicates_df = results["dedup_report"][ results["dedup_report"]["Action"] == "REMOVE" ] keep_df = results["dedup_report"][results["dedup_report"]["Action"] == "KEEP"] if not duplicates_df.empty: st.subheader("🗑️ Titles to Remove (Duplicates)") st.dataframe( duplicates_df[["Title", "Reason", "Pairwise_Similarity", "Keep_Instead"]], use_container_width=True, hide_index=True, ) st.subheader("✅ Titles to Keep") st.dataframe( keep_df[["Title", "Reason"]], use_container_width=True, hide_index=True ) # Download section st.header("💾 Download Analysis Reports") download_col1, download_col2, download_col3 = st.columns(3) with download_col1: # New vs Existing comparison csv1 = results["new_vs_existing_df"].to_csv(index=False).encode("utf-8") st.download_button( label="📥 New vs Existing Titles", data=csv1, file_name="new_vs_existing_titles.csv", mime="text/csv", use_container_width=True, ) with download_col2: # New vs New comparison csv2 = results["new_vs_new_df"].to_csv(index=False).encode("utf-8") st.download_button( label="📥 New vs New Titles", data=csv2, file_name="new_vs_new_titles.csv", mime="text/csv", use_container_width=True, ) with download_col3: # Deduplication report csv3 = results["dedup_report"].to_csv(index=False).encode("utf-8") st.download_button( label="📥 Deduplication Report", data=csv3, file_name="deduplication_recommendations.csv", mime="text/csv", use_container_width=True, ) # Detailed analysis section (expandable) with st.expander("🔬 Detailed Analysis"): tab1, tab2 = st.tabs(["New vs Existing Comparisons", "New vs New Comparisons"]) with tab1: st.dataframe( results["new_vs_existing_df"], use_container_width=True, hide_index=True ) with tab2: st.dataframe( results["new_vs_new_df"], use_container_width=True, hide_index=True ) # Footer st.markdown("---") st.markdown( "💡 **Tip**: For best results, ensure your titles are well-written and descriptive. The similarity analysis works best with titles that have clear semantic meaning." )