import requests from bs4 import BeautifulSoup from urllib.parse import urlparse, urljoin from markdownify import markdownify as md import tempfile import zipfile import re from typing import Tuple, Set, List import os import gradio as gr from collections import deque # =========================================================== # UTILITIES — Recursive crawler # =========================================================== def crawl_site_for_links(start_url: str, max_pages: int = 50, max_depth: int = 2): """Crawl the given site (up to max_depth) and return sets of HTML and PDF URLs.""" visited = set() html_links = set() pdf_links = set() parsed_base = urlparse(start_url) domain = parsed_base.netloc queue = deque([(start_url, 0)]) # (url, depth) session = requests.Session() session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0 Safari/537.36' }) while queue and len(visited) < max_pages: current_url, depth = queue.popleft() if current_url in visited or depth > max_depth: continue visited.add(current_url) try: response = session.get(current_url, timeout=10) if "text/html" not in response.headers.get("Content-Type", ""): continue soup = BeautifulSoup(response.content, "html.parser") for a in soup.find_all("a", href=True): href = a["href"] full_url = urljoin(current_url, href) parsed = urlparse(full_url) # Stay in same domain if parsed.netloc != domain: continue if full_url.lower().endswith(".pdf"): pdf_links.add(full_url) elif not href.startswith(("#", "javascript:", "mailto:", "tel:")): html_links.add(full_url) if full_url not in visited and depth + 1 <= max_depth: queue.append((full_url, depth + 1)) except Exception: continue return html_links, pdf_links # =========================================================== # MAIN FUNCTION — Extract text & PDFs into ZIP # =========================================================== def extract_all_content_as_zip(url: str, max_links: int = None, max_depth: int = 2) -> Tuple[str, str]: """ Extract text content and PDFs from all internal links found on a website recursively. """ try: if not url.startswith(("http://", "https://")): url = "https://" + url html_links, pdf_links = crawl_site_for_links(url, max_pages=(max_links or 50), max_depth=max_depth) if not html_links and not pdf_links: return "No internal links or PDFs found to extract.", None # Limit HTML pages if requested total_html = len(html_links) if max_links is not None: html_links = list(html_links)[:max_links] limited_message = f" (limited to {max_links} out of {total_html})" else: limited_message = "" with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_zip: zip_path = temp_zip.name successful_html = 0 failed_html = 0 successful_pdfs = 0 failed_pdfs = 0 session = requests.Session() session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" }) with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zip_file: # Process HTML pages for i, link_url in enumerate(html_links, 1): try: resp = session.get(link_url, timeout=10) resp.raise_for_status() soup = BeautifulSoup(resp.content, "html.parser") for tag in soup(["script", "style", "nav", "footer", "header", "aside"]): tag.decompose() main_content = ( soup.find("main") or soup.find("article") or soup.find("div", class_=re.compile(r"content|main|post|article")) or soup.find("body") ) if not main_content: failed_html += 1 continue markdown_text = md(str(main_content), heading_style="ATX") markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text) title = soup.find("title") if title: markdown_text = f"# {title.get_text().strip()}\n\n{markdown_text.strip()}" filename = re.sub(r"[^\w\-_.]", "_", urlparse(link_url).path or f"page_{i}") + ".md" if filename in [".md", "index.md"]: filename = f"page_{i}.md" zip_file.writestr(filename, f"\n\n{markdown_text.strip()}") successful_html += 1 except Exception: failed_html += 1 continue # Process PDFs for j, pdf_url in enumerate(pdf_links, 1): try: resp = session.get(pdf_url, timeout=20) resp.raise_for_status() pdf_filename = os.path.basename(urlparse(pdf_url).path) if not pdf_filename.lower().endswith(".pdf"): pdf_filename = f"document_{j}.pdf" zip_file.writestr(f"pdfs/{pdf_filename}", resp.content) successful_pdfs += 1 except Exception: failed_pdfs += 1 continue status_message = ( f"Extracted {successful_html} HTML pages{limited_message} and " f"downloaded {successful_pdfs} PDFs successfully." ) if failed_html or failed_pdfs: status_message += f" ({failed_html} HTML and {failed_pdfs} PDF downloads failed.)" status_message += f" Created ZIP with {successful_html} markdown files and {successful_pdfs} PDFs." return status_message, zip_path except Exception as e: return f"Error: {str(e)}", None # =========================================================== # GRADIO UI # =========================================================== def gradio_extract(url, max_links, max_depth): message, zip_path = extract_all_content_as_zip(url, max_links, max_depth) if zip_path: return message, zip_path else: return message, None gr_interface = gr.Interface( fn=gradio_extract, inputs=[ gr.Textbox(label="Website URL", placeholder="https://example.com"), gr.Number(label="Max number of links (optional)", value=50), gr.Slider(label="Crawl depth", minimum=1, maximum=3, value=2, step=1) ], outputs=[ gr.Textbox(label="Status Message"), gr.File(label="Download ZIP") ], title="Recursive Website Content & PDF Extractor", description="Recursively crawls a website to extract text and PDF files from internal pages and bundles them into a ZIP file." ) # =========================================================== # MCP SERVER STUB # =========================================================== class MCPServerApp: def __init__(self): self.name = "mcp_content_extractor" def launch(self, mcp_server=False): if mcp_server: print("🚀 MCP server running (stub mode) — ready for agent connections.") else: print("Launching Gradio UI...") gr_interface.launch(server_name="0.0.0.0", server_port=7860) def create_mcp_interface(): return MCPServerApp() # =========================================================== # ENTRY POINT # =========================================================== if __name__ == "__main__": app = create_mcp_interface() app.launch(mcp_server=False)