""" Advanced Data Processor & QR Generator Enhanced version with better error handling, performance improvements, and cleaner architecture. """ import json import os import re import time import logging import mimetypes import zipfile import tempfile import chardet import tarfile import copy import hashlib from datetime import datetime from typing import List, Dict, Optional, Union, Tuple, Any, Set from pathlib import Path from urllib.parse import urlparse, urljoin from dataclasses import dataclass, asdict from contextlib import contextmanager import requests import validators import gradio as gr from bs4 import BeautifulSoup, NavigableString, Tag from fake_useragent import UserAgent from cleantext import clean import qrcode from PIL import Image, ImageDraw, ImageFont import numpy as np # Conditional imports with better error handling PLAYWRIGHT_AVAILABLE = False try: from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError PLAYWRIGHT_AVAILABLE = True except ImportError: logger = logging.getLogger(__name__) logger.warning("Playwright not installed. Install with: pip install playwright && playwright install") # Setup enhanced logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('app.log', encoding='utf-8', mode='a') ] ) logger = logging.getLogger(__name__) # Constants OUTPUTS_DIR = Path('output') QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes' SNAPSHOTS_DIR = OUTPUTS_DIR / 'snapshots' MEDIA_DIR = OUTPUTS_DIR / 'media' TEMP_DIR = OUTPUTS_DIR / 'temp' MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB default max DEFAULT_TIMEOUT = 30 # Ensure directories exist for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR, SNAPSHOTS_DIR, MEDIA_DIR]: directory.mkdir(parents=True, exist_ok=True) # Data classes for better type safety @dataclass class URLValidationResult: is_valid: bool message: str details: Dict[str, Any] @dataclass class FetchResult: structured: Dict[str, Any] raw_content: str metadata: Dict[str, Any] @dataclass class ProcessedItem: source: str url: Optional[str] = None filename: Optional[str] = None structured: Dict[str, Any] = None metadata: Dict[str, Any] = None timestamp: str = None snapshot_path: Optional[str] = None def __post_init__(self): if self.timestamp is None: self.timestamp = datetime.now().isoformat() if self.structured is None: self.structured = {} if self.metadata is None: self.metadata = {} # Media Downloader with better caching and error handling class MediaDownloader: """Handles downloading and saving media files with caching.""" def __init__(self, cache_dir: Path = TEMP_DIR / 'media_cache'): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Accept': 'image/webp,image/*,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', }) self.cache_dir = cache_dir self.cache_dir.mkdir(exist_ok=True) self.downloaded_files = {} # {url_hash: local_path} def _get_url_hash(self, url: str) -> str: """Generate consistent hash for URL.""" return hashlib.md5(url.encode()).hexdigest() def download_media(self, url: str, timeout: int = 10) -> Optional[str]: """Download media file with caching.""" url_hash = self._get_url_hash(url) # Check cache first cache_file = self.cache_dir / f"{url_hash}.cache" if cache_file.exists(): try: with open(cache_file, 'r') as f: cached_path = f.read().strip() if Path(cached_path).exists(): return cached_path except Exception: pass # Download the file try: response = self.session.get(url, timeout=timeout, stream=True) response.raise_for_status() # Determine file extension content_type = response.headers.get('Content-Type', '').split(';')[0].strip() ext = mimetypes.guess_extension(content_type) if not ext: # Try to get extension from URL parsed = urlparse(url) ext = Path(parsed.path).suffix or '.bin' # Create safe filename safe_filename = f"{url_hash}{ext}" local_path = MEDIA_DIR / safe_filename # Save file with open(local_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) # Update cache with open(cache_file, 'w') as f: f.write(str(local_path)) self.downloaded_files[url] = str(local_path) logger.info(f"Downloaded media: {url} -> {local_path}") return str(local_path) except requests.exceptions.RequestException as e: logger.warning(f"Failed to download media {url}: {e}") return None except Exception as e: logger.error(f"Unexpected error downloading {url}: {e}") return None def batch_download(self, urls: List[str], max_workers: int = 5) -> Dict[str, Optional[str]]: """Download multiple files (could be enhanced with threading).""" results = {} for url in urls: results[url] = self.download_media(url) return results # Enhanced URL Processor class EnhancedURLProcessor: """Advanced URL processing with complete content extraction.""" def __init__(self, timeout: int = DEFAULT_TIMEOUT, max_retries: int = 3): self.session = requests.Session() self.timeout = timeout self.max_retries = max_retries self.user_agent = UserAgent() self.session.headers.update({ 'User-Agent': self.user_agent.random, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'DNT': '1', }) def validate_url(self, url: str) -> URLValidationResult: """Enhanced URL validation with detailed feedback.""" try: # Basic URL validation if not url or not isinstance(url, str): return URLValidationResult( is_valid=False, message='Invalid URL', details={'error': 'URL must be a non-empty string'} ) # Check if URL starts with http(s) if not url.startswith(('http://', 'https://')): url = 'https://' + url # Validate with validators if not validators.url(url): return URLValidationResult( is_valid=False, message='Invalid URL format', details={'error': 'URL must be properly formatted'} ) parsed = urlparse(url) if not all([parsed.scheme, parsed.netloc]): return URLValidationResult( is_valid=False, message='Incomplete URL', details={'error': 'Missing scheme or domain'} ) # Try to connect try: head_response = self.session.head( url, timeout=5, allow_redirects=True ) head_response.raise_for_status() except requests.exceptions.RequestException: # Try GET if HEAD fails response = self.session.get(url, timeout=5, stream=True) response.raise_for_status() return URLValidationResult( is_valid=True, message='URL is valid and accessible', details={ 'final_url': response.url if 'response' in locals() else head_response.url, 'content_type': head_response.headers.get('Content-Type', 'unknown'), 'server': head_response.headers.get('Server', 'unknown'), 'size': head_response.headers.get('Content-Length', 'unknown') } ) except Exception as e: return URLValidationResult( is_valid=False, message=f'URL validation failed: {str(e)}', details={'error': str(e), 'traceback': str(e.__traceback__)} ) def fetch_content(self, url: str, retry_count: int = 0) -> Optional[FetchResult]: """Enhanced content fetcher with retries and encoding detection.""" try: logger.info(f"Fetching content from: {url} (Attempt {retry_count + 1}/{self.max_retries})") # Update user agent self.session.headers.update({'User-Agent': self.user_agent.random}) response = self.session.get( url, timeout=self.timeout, allow_redirects=True, stream=True ) response.raise_for_status() # Encoding detection encoding = response.encoding if encoding is None or encoding == 'ISO-8859-1': # Sample first 10KB for encoding detection sample = response.content[:10240] detected = chardet.detect(sample) encoding = detected['encoding'] or 'utf-8' # Decode content try: raw_content = response.content.decode(encoding, errors='replace') except (UnicodeDecodeError, LookupError): raw_content = response.content.decode('utf-8', errors='replace') encoding = 'utf-8 (fallback)' # Prepare metadata metadata = { 'url': url, 'final_url': response.url, 'timestamp': datetime.now().isoformat(), 'encoding': encoding, 'content_type': response.headers.get('Content-Type', '').split(';')[0].strip(), 'content_length': len(response.content), 'status_code': response.status_code, 'headers': dict(response.headers), 'elapsed': response.elapsed.total_seconds(), } # Process based on content type content_type = metadata['content_type'].lower() structured = {} if 'text/html' in content_type: structured = self._process_html_content(raw_content, response.url) elif 'application/json' in content_type or url.endswith('.json'): try: structured = json.loads(raw_content) except json.JSONDecodeError as e: structured = { 'text': raw_content[:100000], 'parse_error': str(e), 'json_fragment': raw_content[:1000] } elif 'image/' in content_type: structured = { 'media_type': 'image', 'direct_url': response.url, 'format': content_type.split('/')[-1], 'size_bytes': len(response.content), 'filename': Path(urlparse(url).path).name or 'unknown' } else: # Generic content structured = {'text': raw_content[:100000]} return FetchResult( structured=structured, raw_content=raw_content, metadata=metadata ) except requests.exceptions.RequestException as e: if retry_count < self.max_retries - 1: sleep_time = 2 ** retry_count logger.info(f"Retrying {url} after {sleep_time}s...") time.sleep(sleep_time) return self.fetch_content(url, retry_count + 1) else: logger.error(f"Failed to fetch {url} after {self.max_retries} attempts: {e}") return None except Exception as e: logger.error(f"Unexpected error fetching {url}: {e}") return None def _process_html_content(self, raw_content: str, base_url: str) -> Dict[str, Any]: """Process HTML content and extract structured data.""" soup = BeautifulSoup(raw_content, 'html.parser') # Fix relative URLs for tag in soup.find_all(['a', 'img', 'link', 'script', 'video', 'audio', 'source']): for attr in ['href', 'src', 'data-src', 'poster']: if tag.get(attr) and not urlparse(tag[attr]).scheme: try: tag[attr] = urljoin(base_url, tag[attr]) except Exception as e: logger.debug(f"Failed to join URL: {e}") # Extract structured data structured = self._extract_database_data(soup, base_url) structured['raw_html'] = raw_content[:50000] # Store truncated HTML structured['base_url'] = base_url return structured def _create_template_shell(self, raw_content: str, base_url: str) -> Dict[str, Any]: """Create a template shell from HTML content.""" soup = BeautifulSoup(raw_content, 'html.parser') PLACEHOLDER_TEXT = "[LOREM IPSUM CONTENT]" PLACEHOLDER_IMG = "data:image/svg+xml;charset=UTF-8,%3Csvg%20width%3D%22200%22%20height%3D%22100%22%20xmlns%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%3E%3Crect%20width%3D%22200%22%20height%3D%22100%22%20fill%3D%22%23777%22%3E%3C%2Frect%3E%3Ctext%20x%3D%2270%22%20y%3D%2255%22%3E200x100%3C%2Ftext%3E%3C%2Fsvg%3E" # Replace text content text_tags = ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'span', 'td', 'th', 'label', 'title', 'div'] for tag in soup.find_all(text_tags): if tag.string and len(tag.get_text(strip=True)) > 5: tag.string.replace_with(PLACEHOLDER_TEXT) # Replace images for img in soup.find_all('img'): img['src'] = PLACEHOLDER_IMG if 'srcset' in img.attrs: del img['srcset'] # Remove external links for a in soup.find_all('a'): if 'href' in a.attrs: a['href'] = '#' # Remove sensitive data for script in soup.find_all('script', type='application/ld+json'): script.decompose() # Remove comments for comment in soup.find_all(string=lambda text: isinstance(text, NavigableString) and '