Spaces:
Sleeping
Sleeping
| """ | |
| Advanced Data Processor & QR Generator | |
| Enhanced version with better error handling, performance improvements, and cleaner architecture. | |
| """ | |
| import json | |
| import os | |
| import re | |
| import time | |
| import logging | |
| import mimetypes | |
| import zipfile | |
| import tempfile | |
| import chardet | |
| import tarfile | |
| import copy | |
| import hashlib | |
| from datetime import datetime | |
| from typing import List, Dict, Optional, Union, Tuple, Any, Set | |
| from pathlib import Path | |
| from urllib.parse import urlparse, urljoin | |
| from dataclasses import dataclass, asdict | |
| from contextlib import contextmanager | |
| import requests | |
| import validators | |
| import gradio as gr | |
| from bs4 import BeautifulSoup, NavigableString, Tag | |
| from fake_useragent import UserAgent | |
| from cleantext import clean | |
| import qrcode | |
| from PIL import Image, ImageDraw, ImageFont | |
| import numpy as np | |
| # Conditional imports with better error handling | |
| PLAYWRIGHT_AVAILABLE = False | |
| try: | |
| from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError | |
| PLAYWRIGHT_AVAILABLE = True | |
| except ImportError: | |
| logger = logging.getLogger(__name__) | |
| logger.warning("Playwright not installed. Install with: pip install playwright && playwright install") | |
| # Setup enhanced logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler('app.log', encoding='utf-8', mode='a') | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Constants | |
| OUTPUTS_DIR = Path('output') | |
| QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes' | |
| SNAPSHOTS_DIR = OUTPUTS_DIR / 'snapshots' | |
| MEDIA_DIR = OUTPUTS_DIR / 'media' | |
| TEMP_DIR = OUTPUTS_DIR / 'temp' | |
| MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB default max | |
| DEFAULT_TIMEOUT = 30 | |
| # Ensure directories exist | |
| for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR, SNAPSHOTS_DIR, MEDIA_DIR]: | |
| directory.mkdir(parents=True, exist_ok=True) | |
| # Data classes for better type safety | |
| class URLValidationResult: | |
| is_valid: bool | |
| message: str | |
| details: Dict[str, Any] | |
| class FetchResult: | |
| structured: Dict[str, Any] | |
| raw_content: str | |
| metadata: Dict[str, Any] | |
| class ProcessedItem: | |
| source: str | |
| url: Optional[str] = None | |
| filename: Optional[str] = None | |
| structured: Dict[str, Any] = None | |
| metadata: Dict[str, Any] = None | |
| timestamp: str = None | |
| snapshot_path: Optional[str] = None | |
| def __post_init__(self): | |
| if self.timestamp is None: | |
| self.timestamp = datetime.now().isoformat() | |
| if self.structured is None: | |
| self.structured = {} | |
| if self.metadata is None: | |
| self.metadata = {} | |
| # Media Downloader with better caching and error handling | |
| class MediaDownloader: | |
| """Handles downloading and saving media files with caching.""" | |
| def __init__(self, cache_dir: Path = TEMP_DIR / 'media_cache'): | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', | |
| 'Accept': 'image/webp,image/*,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| }) | |
| self.cache_dir = cache_dir | |
| self.cache_dir.mkdir(exist_ok=True) | |
| self.downloaded_files = {} # {url_hash: local_path} | |
| def _get_url_hash(self, url: str) -> str: | |
| """Generate consistent hash for URL.""" | |
| return hashlib.md5(url.encode()).hexdigest() | |
| def download_media(self, url: str, timeout: int = 10) -> Optional[str]: | |
| """Download media file with caching.""" | |
| url_hash = self._get_url_hash(url) | |
| # Check cache first | |
| cache_file = self.cache_dir / f"{url_hash}.cache" | |
| if cache_file.exists(): | |
| try: | |
| with open(cache_file, 'r') as f: | |
| cached_path = f.read().strip() | |
| if Path(cached_path).exists(): | |
| return cached_path | |
| except Exception: | |
| pass | |
| # Download the file | |
| try: | |
| response = self.session.get(url, timeout=timeout, stream=True) | |
| response.raise_for_status() | |
| # Determine file extension | |
| content_type = response.headers.get('Content-Type', '').split(';')[0].strip() | |
| ext = mimetypes.guess_extension(content_type) | |
| if not ext: | |
| # Try to get extension from URL | |
| parsed = urlparse(url) | |
| ext = Path(parsed.path).suffix or '.bin' | |
| # Create safe filename | |
| safe_filename = f"{url_hash}{ext}" | |
| local_path = MEDIA_DIR / safe_filename | |
| # Save file | |
| with open(local_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| # Update cache | |
| with open(cache_file, 'w') as f: | |
| f.write(str(local_path)) | |
| self.downloaded_files[url] = str(local_path) | |
| logger.info(f"Downloaded media: {url} -> {local_path}") | |
| return str(local_path) | |
| except requests.exceptions.RequestException as e: | |
| logger.warning(f"Failed to download media {url}: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Unexpected error downloading {url}: {e}") | |
| return None | |
| def batch_download(self, urls: List[str], max_workers: int = 5) -> Dict[str, Optional[str]]: | |
| """Download multiple files (could be enhanced with threading).""" | |
| results = {} | |
| for url in urls: | |
| results[url] = self.download_media(url) | |
| return results | |
| # Enhanced URL Processor | |
| class EnhancedURLProcessor: | |
| """Advanced URL processing with complete content extraction.""" | |
| def __init__(self, timeout: int = DEFAULT_TIMEOUT, max_retries: int = 3): | |
| self.session = requests.Session() | |
| self.timeout = timeout | |
| self.max_retries = max_retries | |
| self.user_agent = UserAgent() | |
| self.session.headers.update({ | |
| 'User-Agent': self.user_agent.random, | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate, br', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1', | |
| 'Sec-Fetch-Dest': 'document', | |
| 'Sec-Fetch-Mode': 'navigate', | |
| 'Sec-Fetch-Site': 'none', | |
| 'DNT': '1', | |
| }) | |
| def validate_url(self, url: str) -> URLValidationResult: | |
| """Enhanced URL validation with detailed feedback.""" | |
| try: | |
| # Basic URL validation | |
| if not url or not isinstance(url, str): | |
| return URLValidationResult( | |
| is_valid=False, | |
| message='Invalid URL', | |
| details={'error': 'URL must be a non-empty string'} | |
| ) | |
| # Check if URL starts with http(s) | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| # Validate with validators | |
| if not validators.url(url): | |
| return URLValidationResult( | |
| is_valid=False, | |
| message='Invalid URL format', | |
| details={'error': 'URL must be properly formatted'} | |
| ) | |
| parsed = urlparse(url) | |
| if not all([parsed.scheme, parsed.netloc]): | |
| return URLValidationResult( | |
| is_valid=False, | |
| message='Incomplete URL', | |
| details={'error': 'Missing scheme or domain'} | |
| ) | |
| # Try to connect | |
| try: | |
| head_response = self.session.head( | |
| url, | |
| timeout=5, | |
| allow_redirects=True | |
| ) | |
| head_response.raise_for_status() | |
| except requests.exceptions.RequestException: | |
| # Try GET if HEAD fails | |
| response = self.session.get(url, timeout=5, stream=True) | |
| response.raise_for_status() | |
| return URLValidationResult( | |
| is_valid=True, | |
| message='URL is valid and accessible', | |
| details={ | |
| 'final_url': response.url if 'response' in locals() else head_response.url, | |
| 'content_type': head_response.headers.get('Content-Type', 'unknown'), | |
| 'server': head_response.headers.get('Server', 'unknown'), | |
| 'size': head_response.headers.get('Content-Length', 'unknown') | |
| } | |
| ) | |
| except Exception as e: | |
| return URLValidationResult( | |
| is_valid=False, | |
| message=f'URL validation failed: {str(e)}', | |
| details={'error': str(e), 'traceback': str(e.__traceback__)} | |
| ) | |
| def fetch_content(self, url: str, retry_count: int = 0) -> Optional[FetchResult]: | |
| """Enhanced content fetcher with retries and encoding detection.""" | |
| try: | |
| logger.info(f"Fetching content from: {url} (Attempt {retry_count + 1}/{self.max_retries})") | |
| # Update user agent | |
| self.session.headers.update({'User-Agent': self.user_agent.random}) | |
| response = self.session.get( | |
| url, | |
| timeout=self.timeout, | |
| allow_redirects=True, | |
| stream=True | |
| ) | |
| response.raise_for_status() | |
| # Encoding detection | |
| encoding = response.encoding | |
| if encoding is None or encoding == 'ISO-8859-1': | |
| # Sample first 10KB for encoding detection | |
| sample = response.content[:10240] | |
| detected = chardet.detect(sample) | |
| encoding = detected['encoding'] or 'utf-8' | |
| # Decode content | |
| try: | |
| raw_content = response.content.decode(encoding, errors='replace') | |
| except (UnicodeDecodeError, LookupError): | |
| raw_content = response.content.decode('utf-8', errors='replace') | |
| encoding = 'utf-8 (fallback)' | |
| # Prepare metadata | |
| metadata = { | |
| 'url': url, | |
| 'final_url': response.url, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'encoding': encoding, | |
| 'content_type': response.headers.get('Content-Type', '').split(';')[0].strip(), | |
| 'content_length': len(response.content), | |
| 'status_code': response.status_code, | |
| 'headers': dict(response.headers), | |
| 'elapsed': response.elapsed.total_seconds(), | |
| } | |
| # Process based on content type | |
| content_type = metadata['content_type'].lower() | |
| structured = {} | |
| if 'text/html' in content_type: | |
| structured = self._process_html_content(raw_content, response.url) | |
| elif 'application/json' in content_type or url.endswith('.json'): | |
| try: | |
| structured = json.loads(raw_content) | |
| except json.JSONDecodeError as e: | |
| structured = { | |
| 'text': raw_content[:100000], | |
| 'parse_error': str(e), | |
| 'json_fragment': raw_content[:1000] | |
| } | |
| elif 'image/' in content_type: | |
| structured = { | |
| 'media_type': 'image', | |
| 'direct_url': response.url, | |
| 'format': content_type.split('/')[-1], | |
| 'size_bytes': len(response.content), | |
| 'filename': Path(urlparse(url).path).name or 'unknown' | |
| } | |
| else: | |
| # Generic content | |
| structured = {'text': raw_content[:100000]} | |
| return FetchResult( | |
| structured=structured, | |
| raw_content=raw_content, | |
| metadata=metadata | |
| ) | |
| except requests.exceptions.RequestException as e: | |
| if retry_count < self.max_retries - 1: | |
| sleep_time = 2 ** retry_count | |
| logger.info(f"Retrying {url} after {sleep_time}s...") | |
| time.sleep(sleep_time) | |
| return self.fetch_content(url, retry_count + 1) | |
| else: | |
| logger.error(f"Failed to fetch {url} after {self.max_retries} attempts: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Unexpected error fetching {url}: {e}") | |
| return None | |
| def _process_html_content(self, raw_content: str, base_url: str) -> Dict[str, Any]: | |
| """Process HTML content and extract structured data.""" | |
| soup = BeautifulSoup(raw_content, 'html.parser') | |
| # Fix relative URLs | |
| for tag in soup.find_all(['a', 'img', 'link', 'script', 'video', 'audio', 'source']): | |
| for attr in ['href', 'src', 'data-src', 'poster']: | |
| if tag.get(attr) and not urlparse(tag[attr]).scheme: | |
| try: | |
| tag[attr] = urljoin(base_url, tag[attr]) | |
| except Exception as e: | |
| logger.debug(f"Failed to join URL: {e}") | |
| # Extract structured data | |
| structured = self._extract_database_data(soup, base_url) | |
| structured['raw_html'] = raw_content[:50000] # Store truncated HTML | |
| structured['base_url'] = base_url | |
| return structured | |
| def _create_template_shell(self, raw_content: str, base_url: str) -> Dict[str, Any]: | |
| """Create a template shell from HTML content.""" | |
| soup = BeautifulSoup(raw_content, 'html.parser') | |
| PLACEHOLDER_TEXT = "[LOREM IPSUM CONTENT]" | |
| PLACEHOLDER_IMG = "data:image/svg+xml;charset=UTF-8,%3Csvg%20width%3D%22200%22%20height%3D%22100%22%20xmlns%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%3E%3Crect%20width%3D%22200%22%20height%3D%22100%22%20fill%3D%22%23777%22%3E%3C%2Frect%3E%3Ctext%20x%3D%2270%22%20y%3D%2255%22%3E200x100%3C%2Ftext%3E%3C%2Fsvg%3E" | |
| # Replace text content | |
| text_tags = ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'span', 'td', 'th', 'label', 'title', 'div'] | |
| for tag in soup.find_all(text_tags): | |
| if tag.string and len(tag.get_text(strip=True)) > 5: | |
| tag.string.replace_with(PLACEHOLDER_TEXT) | |
| # Replace images | |
| for img in soup.find_all('img'): | |
| img['src'] = PLACEHOLDER_IMG | |
| if 'srcset' in img.attrs: | |
| del img['srcset'] | |
| # Remove external links | |
| for a in soup.find_all('a'): | |
| if 'href' in a.attrs: | |
| a['href'] = '#' | |
| # Remove sensitive data | |
| for script in soup.find_all('script', type='application/ld+json'): | |
| script.decompose() | |
| # Remove comments | |
| for comment in soup.find_all(string=lambda text: isinstance(text, NavigableString) and '<!--' in str(text)): | |
| comment.extract() | |
| return { | |
| 'template_type': 'html_shell', | |
| 'base_url': base_url, | |
| 'template_html': str(soup), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| def _extract_database_data(self, soup: BeautifulSoup, base_url: str) -> Dict[str, Any]: | |
| """Extract structured data from HTML.""" | |
| structured = { | |
| 'title': soup.title.string.strip() if soup.title and soup.title.string else '', | |
| 'meta_description': '', | |
| 'core_text_content': '', | |
| 'images': [], | |
| 'videos': [], | |
| 'audios': [], | |
| 'structured_data': [], | |
| 'products': [], | |
| 'links': [], | |
| 'metadata': {} | |
| } | |
| # Extract meta description | |
| meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
| if meta_desc: | |
| structured['meta_description'] = meta_desc.get('content', '') | |
| # Extract JSON-LD structured data | |
| for script in soup.find_all('script', type='application/ld+json'): | |
| try: | |
| ld_data = json.loads(script.string or '{}') | |
| structured['structured_data'].append(ld_data) | |
| # Extract products | |
| if isinstance(ld_data, dict): | |
| if ld_data.get('@type') == 'Product': | |
| structured['products'].append(ld_data) | |
| elif ld_data.get('@graph'): | |
| for item in ld_data.get('@graph', []): | |
| if isinstance(item, dict) and item.get('@type') == 'Product': | |
| structured['products'].append(item) | |
| except (json.JSONDecodeError, TypeError) as e: | |
| logger.debug(f"Failed to parse JSON-LD: {e}") | |
| # Extract media | |
| for img in soup.find_all('img'): | |
| src = img.get('src') or img.get('data-src') | |
| if src: | |
| structured['images'].append(urljoin(base_url, src)) | |
| for video in soup.find_all('video'): | |
| src = video.get('src') or (video.find('source') and video.find('source').get('src')) | |
| if src: | |
| structured['videos'].append(urljoin(base_url, src)) | |
| for audio in soup.find_all('audio'): | |
| src = audio.get('src') or (audio.find('source') and audio.find('source').get('src')) | |
| if src: | |
| structured['audios'].append(urljoin(base_url, src)) | |
| # Extract links | |
| for a in soup.find_all('a', href=True): | |
| href = a['href'] | |
| if href.startswith(('http://', 'https://')): | |
| structured['links'].append(href) | |
| # Extract main content | |
| main_content_selectors = [ | |
| 'main', 'article', '[role="main"]', | |
| '.main-content', '.content', '#content', | |
| '.article', '.post' | |
| ] | |
| for selector in main_content_selectors: | |
| main_tag = soup.select_one(selector) | |
| if main_tag: | |
| structured['core_text_content'] = clean( | |
| main_tag.get_text('\n', strip=True), | |
| lower=False, | |
| no_line_breaks=False, | |
| no_urls=True, | |
| no_emails=True, | |
| no_phone_numbers=True | |
| )[:10000] # Limit size | |
| break | |
| if not structured['core_text_content']: | |
| # Fallback: extract all text | |
| structured['core_text_content'] = clean( | |
| soup.get_text('\n', strip=True), | |
| lower=False, | |
| no_line_breaks=False, | |
| no_urls=True, | |
| no_emails=True, | |
| no_phone_numbers=True | |
| )[:5000] | |
| # Remove duplicates | |
| structured['images'] = list(dict.fromkeys(structured['images']))[:50] # Limit to 50 images | |
| structured['videos'] = list(dict.fromkeys(structured['videos'])) | |
| structured['audios'] = list(dict.fromkeys(structured['audios'])) | |
| structured['links'] = list(dict.fromkeys(structured['links']))[:100] # Limit to 100 links | |
| return structured | |
| # Site Crawler with improved logic | |
| class SiteCrawler: | |
| """Crawl website with configurable depth and limits.""" | |
| def __init__(self, processor: EnhancedURLProcessor, max_pages: int = 10, max_depth: int = 2): | |
| self.processor = processor | |
| self.max_pages = max_pages | |
| self.max_depth = max_depth | |
| self.crawled_urls = set() | |
| self.results = [] | |
| self.snapshot_paths = [] | |
| def _normalize_url(self, url: str, base_url: str) -> str: | |
| """Normalize URL by removing fragments and query parameters for crawling.""" | |
| parsed = urlparse(url) | |
| base_parsed = urlparse(base_url) | |
| # Ensure same domain | |
| if parsed.netloc and parsed.netloc != base_parsed.netloc: | |
| return None | |
| # Remove fragments and query params for crawling | |
| normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" | |
| return normalized.rstrip('/') | |
| def _get_internal_links(self, soup: BeautifulSoup, base_url: str) -> List[str]: | |
| """Extract internal links from page.""" | |
| parsed_base = urlparse(base_url) | |
| internal_links = set() | |
| for a in soup.find_all('a', href=True): | |
| href = urljoin(base_url, a['href']) | |
| parsed_href = urlparse(href) | |
| # Check if same domain | |
| if parsed_href.netloc == parsed_base.netloc: | |
| # Filter out non-HTML resources | |
| if any(href.lower().endswith(ext) for ext in [ | |
| '.pdf', '.zip', '.jpg', '.jpeg', '.png', '.gif', | |
| '.css', '.js', '.mp4', '.mp3', '.avi', '.mov' | |
| ]): | |
| continue | |
| # Remove fragments | |
| href = self._normalize_url(href, base_url) | |
| if href: | |
| internal_links.add(href) | |
| return list(internal_links) | |
| def crawl_site(self, start_url: str, mode: str = "Full Structured Data") -> Tuple[List[Dict], List[str]]: | |
| """Crawl website starting from given URL.""" | |
| logger.info(f"Starting crawl from {start_url} (max pages: {self.max_pages})") | |
| queue = [(start_url, 0)] # (url, depth) | |
| while queue and len(self.crawled_urls) < self.max_pages: | |
| url, depth = queue.pop(0) | |
| if url in self.crawled_urls or depth > self.max_depth: | |
| continue | |
| logger.info(f"Crawling: {url} (depth: {depth})") | |
| self.crawled_urls.add(url) | |
| # Fetch content | |
| content_result = self.processor.fetch_content(url) | |
| if not content_result: | |
| continue | |
| # Check if HTML | |
| content_type = content_result.metadata.get('content_type', '').lower() | |
| if 'text/html' not in content_type: | |
| continue | |
| # Capture snapshot if Playwright is available | |
| snapshot_path = None | |
| if PLAYWRIGHT_AVAILABLE: | |
| try: | |
| filename = f"snapshot_{len(self.crawled_urls)}_{hashlib.md5(url.encode()).hexdigest()[:8]}.png" | |
| snapshot_path = capture_visual_snapshot(url, filename) | |
| if snapshot_path: | |
| self.snapshot_paths.append(snapshot_path) | |
| except Exception as e: | |
| logger.warning(f"Failed to capture snapshot for {url}: {e}") | |
| # Process based on mode | |
| raw_content = content_result.raw_content | |
| base_url = content_result.metadata['final_url'] | |
| soup = BeautifulSoup(raw_content, 'html.parser') | |
| if mode == "Extract for Template (Shell)": | |
| structured = self.processor._create_template_shell(raw_content, base_url) | |
| elif mode == "Extract for Database (Content Only)": | |
| structured = self.processor._extract_database_data(soup, base_url) | |
| else: | |
| structured = self.processor._process_html_content(raw_content, base_url) | |
| # Create result item | |
| result_item = ProcessedItem( | |
| source='crawl', | |
| url=base_url, | |
| structured=structured, | |
| metadata=content_result.metadata, | |
| snapshot_path=snapshot_path | |
| ) | |
| self.results.append(asdict(result_item)) | |
| # Extract links for next level | |
| if depth < self.max_depth: | |
| new_links = self._get_internal_links(soup, base_url) | |
| for link in new_links: | |
| if link not in self.crawled_urls and len(self.crawled_urls) < self.max_pages: | |
| queue.append((link, depth + 1)) | |
| # Be polite | |
| time.sleep(0.5) | |
| logger.info(f"Crawl completed. Found {len(self.results)} pages.") | |
| return self.results, self.snapshot_paths | |
| # File Processor with better archive handling | |
| class EnhancedFileProcessor: | |
| """Process various file types including archives.""" | |
| def __init__(self, max_file_size: int = MAX_FILE_SIZE): | |
| self.max_file_size = max_file_size | |
| self.supported_extensions = { | |
| '.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', '.log', | |
| '.yml', '.yaml', '.ini', '.conf', '.cfg', '.zip', '.tar', '.gz', | |
| '.bz2', '.7z', '.rar', '.pdf', '.doc', '.docx', '.rtf', '.odt', | |
| '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.webp' | |
| } | |
| def process_file(self, file_path: str) -> List[Dict]: | |
| """Process a single file or archive.""" | |
| if not file_path or not os.path.exists(file_path): | |
| return [] | |
| try: | |
| file_size = os.path.getsize(file_path) | |
| if file_size > self.max_file_size: | |
| logger.warning(f"File {file_path} exceeds size limit ({file_size} > {self.max_file_size})") | |
| return [] | |
| if self._is_archive(file_path): | |
| return self._process_archive(file_path) | |
| else: | |
| return self._process_single_file(file_path) | |
| except Exception as e: | |
| logger.error(f"Error processing file {file_path}: {e}") | |
| return [] | |
| def _is_archive(self, filepath: str) -> bool: | |
| """Check if file is an archive.""" | |
| archive_extensions = ['.zip', '.tar', '.gz', '.bz2', '.7z', '.rar'] | |
| return any(filepath.lower().endswith(ext) for ext in archive_extensions) | |
| def _process_single_file(self, file_path: str) -> List[Dict]: | |
| """Process a single file.""" | |
| try: | |
| file_stat = os.stat(file_path) | |
| mime_type, _ = mimetypes.guess_type(file_path) | |
| mime_type = mime_type or 'application/octet-stream' | |
| structured = {} | |
| if 'image/' in mime_type: | |
| structured = { | |
| 'media_type': 'image', | |
| 'filename': os.path.basename(file_path), | |
| 'mime_type': mime_type, | |
| 'size_bytes': file_stat.st_size | |
| } | |
| else: | |
| # Read file content | |
| with open(file_path, 'rb') as f: | |
| raw_bytes = f.read() | |
| # Detect encoding | |
| detected = chardet.detect(raw_bytes[:10000]) | |
| encoding = detected['encoding'] or 'utf-8' | |
| try: | |
| content = raw_bytes.decode(encoding, errors='replace') | |
| except (UnicodeDecodeError, LookupError): | |
| content = raw_bytes.decode('utf-8', errors='replace') | |
| # Parse based on file type | |
| if 'json' in mime_type or file_path.endswith('.json'): | |
| try: | |
| json_data = json.loads(content) | |
| structured = json_data | |
| except json.JSONDecodeError as e: | |
| structured = { | |
| 'text': content[:50000], | |
| 'parse_error': str(e) | |
| } | |
| elif 'html' in mime_type or file_path.endswith(('.html', '.htm')): | |
| processor = EnhancedURLProcessor() | |
| soup = BeautifulSoup(content, 'html.parser') | |
| structured = processor._extract_database_data(soup, f"file://{file_path}") | |
| else: | |
| structured = {'text': content[:100000]} | |
| result_item = ProcessedItem( | |
| source='file', | |
| filename=os.path.basename(file_path), | |
| structured=structured, | |
| metadata={ | |
| 'file_size': file_stat.st_size, | |
| 'mime_type': mime_type, | |
| 'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
| 'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
| 'file_path': file_path | |
| } | |
| ) | |
| return [asdict(result_item)] | |
| except Exception as e: | |
| logger.error(f"Error processing single file {file_path}: {e}") | |
| return [] | |
| def _process_archive(self, archive_path: str) -> List[Dict]: | |
| """Extract and process files from archive.""" | |
| dataset = [] | |
| temp_dir = tempfile.mkdtemp(prefix='archive_extract_') | |
| try: | |
| if zipfile.is_zipfile(archive_path): | |
| with zipfile.ZipFile(archive_path, 'r') as zip_ref: | |
| zip_ref.extractall(temp_dir) | |
| for file_info in zip_ref.infolist(): | |
| if not file_info.is_dir(): | |
| file_path = os.path.join(temp_dir, file_info.filename) | |
| if os.path.exists(file_path): | |
| dataset.extend(self._process_single_file(file_path)) | |
| elif tarfile.is_tarfile(archive_path): | |
| with tarfile.open(archive_path, 'r') as tar_ref: | |
| tar_ref.extractall(temp_dir) | |
| for member in tar_ref.getmembers(): | |
| if member.isfile(): | |
| file_path = os.path.join(temp_dir, member.name) | |
| if os.path.exists(file_path): | |
| dataset.extend(self._process_single_file(file_path)) | |
| else: | |
| logger.warning(f"Unsupported archive format: {archive_path}") | |
| except Exception as e: | |
| logger.error(f"Error processing archive {archive_path}: {e}") | |
| finally: | |
| # Cleanup | |
| try: | |
| import shutil | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| except: | |
| pass | |
| return dataset | |
| # Data Chunker with improved chunking logic | |
| class DataChunker: | |
| """Chunk data for QR code generation.""" | |
| def __init__(self, max_chunk_size: int = 2953): | |
| self.max_chunk_size = max_chunk_size | |
| def chunk_data(self, data: Any) -> List[Dict]: | |
| """Chunk data into smaller pieces for QR encoding.""" | |
| try: | |
| # Serialize data | |
| if isinstance(data, dict): | |
| json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) | |
| elif isinstance(data, list): | |
| json_str = json.dumps(data, ensure_ascii=False) | |
| else: | |
| json_str = str(data) | |
| # Calculate chunk size | |
| total_bytes = len(json_str.encode('utf-8')) | |
| chunk_size = self.max_chunk_size | |
| # Create chunks | |
| chunks = [] | |
| for i in range(0, total_bytes, chunk_size): | |
| chunk_str = json_str.encode('utf-8')[i:i + chunk_size].decode('utf-8', errors='ignore') | |
| chunk_hash = hashlib.md5(chunk_str.encode()).hexdigest()[:8] | |
| chunk = { | |
| "chunk_index": len(chunks) + 1, | |
| "total_chunks": (total_bytes + chunk_size - 1) // chunk_size, | |
| "total_length": total_bytes, | |
| "chunk_hash": chunk_hash, | |
| "data": chunk_str, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| chunks.append(chunk) | |
| return chunks | |
| except Exception as e: | |
| logger.error(f"Error chunking data: {e}") | |
| return [{"error": str(e), "data": str(data)[:100]}] | |
| # QR Code Generator with styling options | |
| class QRCodeGenerator: | |
| """Generate QR codes with various styling options.""" | |
| def __init__(self, output_dir: Path = QR_CODES_DIR): | |
| self.output_dir = output_dir | |
| self.output_dir.mkdir(exist_ok=True) | |
| def generate_stylish_qr(self, data: Union[str, Dict], filename: str, | |
| size: int = 10, border: int = 4, | |
| fill_color: str = "#000000", | |
| back_color: str = "#FFFFFF", | |
| logo_path: Optional[str] = None) -> str: | |
| """Generate a stylish QR code.""" | |
| try: | |
| # Prepare data | |
| if isinstance(data, dict): | |
| data_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) | |
| else: | |
| data_str = str(data) | |
| # Create QR code | |
| qr = qrcode.QRCode( | |
| version=None, | |
| error_correction=qrcode.constants.ERROR_CORRECT_H, # High error correction | |
| box_size=size, | |
| border=border | |
| ) | |
| qr.add_data(data_str) | |
| qr.make(fit=True) | |
| # Create image | |
| qr_img = qr.make_image(fill_color=fill_color, back_color=back_color) | |
| qr_img = qr_img.convert('RGBA') | |
| # Add logo if provided | |
| if logo_path and os.path.exists(logo_path): | |
| try: | |
| logo = Image.open(logo_path) | |
| logo_size = qr_img.size[0] // 5 | |
| logo = logo.resize((logo_size, logo_size), Image.Resampling.LANCZOS) | |
| # Calculate position | |
| pos = ((qr_img.size[0] - logo.size[0]) // 2, | |
| (qr_img.size[1] - logo.size[1]) // 2) | |
| # Paste logo | |
| qr_img.paste(logo, pos, logo) | |
| except Exception as e: | |
| logger.warning(f"Failed to add logo: {e}") | |
| # Save image | |
| output_path = self.output_dir / filename | |
| qr_img.save(output_path, 'PNG', quality=95) | |
| logger.info(f"QR code generated: {output_path}") | |
| return str(output_path) | |
| except Exception as e: | |
| logger.error(f"QR generation error: {e}") | |
| return "" | |
| def generate_qr_sequence(self, data: Any, combined: bool = True, | |
| prefix: str = "qr") -> List[str]: | |
| """Generate a sequence of QR codes for data.""" | |
| chunker = DataChunker() | |
| paths = [] | |
| timestamp = int(time.time()) | |
| if combined: | |
| # Generate QR codes for combined data | |
| chunks = chunker.chunk_data(data) | |
| for i, chunk in enumerate(chunks): | |
| filename = f'{prefix}_{timestamp}_{i+1}_of_{len(chunks)}.png' | |
| qr_path = self.generate_stylish_qr( | |
| data=chunk, | |
| filename=filename, | |
| fill_color="#1a365d", | |
| back_color="#ffffff" | |
| ) | |
| if qr_path: | |
| paths.append(qr_path) | |
| else: | |
| # Generate separate QR codes for each item | |
| if isinstance(data, list): | |
| for idx, item in enumerate(data): | |
| chunks = chunker.chunk_data(item) | |
| for chunk_idx, chunk in enumerate(chunks): | |
| filename = f'{prefix}_item{idx+1}_{chunk_idx+1}_of_{len(chunks)}_{timestamp}.png' | |
| qr_path = self.generate_stylish_qr( | |
| data=chunk, | |
| filename=filename, | |
| fill_color="#2d3748", | |
| back_color="#ffffff" | |
| ) | |
| if qr_path: | |
| paths.append(qr_path) | |
| else: | |
| chunks = chunker.chunk_data(data) | |
| for i, chunk in enumerate(chunks): | |
| filename = f'{prefix}_single_{i+1}_of_{len(chunks)}_{timestamp}.png' | |
| qr_path = self.generate_stylish_qr( | |
| data=chunk, | |
| filename=filename, | |
| fill_color="#1a365d", | |
| back_color="#ffffff" | |
| ) | |
| if qr_path: | |
| paths.append(qr_path) | |
| return paths | |
| # Main processing functions | |
| def capture_visual_snapshot(url: str, filename: str) -> Optional[str]: | |
| """Capture webpage screenshot using Playwright.""" | |
| if not PLAYWRIGHT_AVAILABLE: | |
| logger.warning("Playwright not available for screenshots") | |
| return None | |
| output_path = SNAPSHOTS_DIR / filename | |
| try: | |
| with sync_playwright() as p: | |
| browser = p.chromium.launch(headless=True) | |
| context = browser.new_context( | |
| viewport={'width': 1280, 'height': 720}, | |
| user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| ) | |
| page = context.new_page() | |
| # Navigate with timeout | |
| page.goto(url, wait_until='networkidle', timeout=30000) | |
| # Take full page screenshot | |
| page.screenshot(path=output_path, full_page=True) | |
| browser.close() | |
| logger.info(f"Snapshot captured: {output_path}") | |
| return str(output_path) | |
| except Exception as e: | |
| logger.error(f"Failed to capture snapshot for {url}: {e}") | |
| return None | |
| def break_down_data(data: Union[Dict, List[Dict]]) -> Union[Dict, List[Dict]]: | |
| """Break down and restructure data for better organization.""" | |
| def process_item(item: Dict) -> Dict: | |
| structured = item.get('structured', {}) | |
| # Handle template shells | |
| if structured.get('template_type') == 'html_shell': | |
| return item | |
| # Ensure structured data exists | |
| if not structured: | |
| content = item.get('content') or item.get('raw_content', '') | |
| if isinstance(content, str): | |
| structured = {'text': content} | |
| elif isinstance(content, dict): | |
| structured = content | |
| # Extract media | |
| media = [] | |
| for img in structured.get('images', []): | |
| media.append({'type': 'image', 'source': img, 'size': 'unknown'}) | |
| for vid in structured.get('videos', []): | |
| media.append({'type': 'video', 'source': vid, 'size': 'unknown'}) | |
| for aud in structured.get('audios', []): | |
| media.append({'type': 'audio', 'source': aud, 'size': 'unknown'}) | |
| structured['media'] = media | |
| # Extract products | |
| if 'products' not in structured: | |
| structured['products'] = [] | |
| # Create template if products exist | |
| if structured['products']: | |
| structured['template'] = { | |
| 'type': 'product_catalog', | |
| 'item_count': len(structured['products']), | |
| 'items': structured['products'][:10], # Limit to 10 | |
| 'metadata': item.get('metadata', {}) | |
| } | |
| item['structured'] = structured | |
| return item | |
| if isinstance(data, list): | |
| return [process_item(item) for item in data] | |
| elif isinstance(data, dict): | |
| return process_item(data) | |
| return data | |
| def package_database(results: List[Dict]) -> Optional[str]: | |
| """Package processed data and media into a ZIP file.""" | |
| if not results: | |
| return None | |
| try: | |
| downloader = MediaDownloader() | |
| updated_results = copy.deepcopy(results) | |
| # Collect media URLs | |
| media_urls = set() | |
| for item in updated_results: | |
| structured = item.get('structured', {}) | |
| media_urls.update(structured.get('images', [])) | |
| media_urls.update(structured.get('videos', [])) | |
| media_urls.update(structured.get('audios', [])) | |
| # Download media | |
| media_mapping = downloader.batch_download(list(media_urls)) | |
| # Update results with local paths | |
| for item in updated_results: | |
| structured = item.get('structured', {}) | |
| for media_type in ['images', 'videos', 'audios']: | |
| if media_type in structured: | |
| new_paths = [] | |
| for url in structured[media_type]: | |
| if url in media_mapping and media_mapping[url]: | |
| local_path = Path(media_mapping[url]) | |
| new_paths.append(f"media/{local_path.name}") | |
| else: | |
| new_paths.append(url) | |
| structured[media_type] = new_paths | |
| # Create ZIP file | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| zip_filename = OUTPUTS_DIR / f"database_export_{timestamp}.zip" | |
| with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zf: | |
| # Add data | |
| zf.writestr( | |
| 'data_export.json', | |
| json.dumps(updated_results, indent=2, ensure_ascii=False) | |
| ) | |
| # Add README | |
| readme = f"""Database Export | |
| Generated: {datetime.now().isoformat()} | |
| Items: {len(updated_results)} | |
| Media Files: {len(media_mapping)} | |
| """ | |
| zf.writestr('README.txt', readme) | |
| # Add media files | |
| for url, local_path in media_mapping.items(): | |
| if local_path and os.path.exists(local_path): | |
| zf.write(local_path, arcname=f"media/{Path(local_path).name}") | |
| logger.info(f"Database package created: {zip_filename}") | |
| return str(zip_filename) | |
| except Exception as e: | |
| logger.error(f"Failed to create database package: {e}") | |
| return None | |
| # Gradio Interface | |
| def create_modern_interface(): | |
| """Create modern Gradio interface.""" | |
| css = """ | |
| :root { | |
| --primary-color: #1a365d; | |
| --secondary-color: #2d3748; | |
| --accent-color: #4299e1; | |
| --background-color: #f7fafc; | |
| --success-color: #48bb78; | |
| --error-color: #f56565; | |
| --warning-color: #ed8936; | |
| --border-radius: 0.5rem; | |
| } | |
| .gradio-container { | |
| max-width: 1200px; | |
| margin: 2rem auto; | |
| padding: 2rem; | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| border-radius: var(--border-radius); | |
| box-shadow: 0 20px 60px rgba(0, 0, 0, 0.3); | |
| } | |
| .container-inner { | |
| background: white; | |
| border-radius: var(--border-radius); | |
| padding: 2rem; | |
| box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
| } | |
| h1 { | |
| background: linear-gradient(90deg, #667eea, #764ba2); | |
| -webkit-background-clip: text; | |
| -webkit-text-fill-color: transparent; | |
| margin-bottom: 1rem; | |
| } | |
| .primary-button { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| border: none; | |
| padding: 0.75rem 1.5rem; | |
| border-radius: var(--border-radius); | |
| font-weight: 600; | |
| cursor: pointer; | |
| transition: transform 0.2s, box-shadow 0.2s; | |
| } | |
| .primary-button:hover { | |
| transform: translateY(-2px); | |
| box-shadow: 0 10px 20px rgba(102, 126, 234, 0.4); | |
| } | |
| .warning-box { | |
| background: linear-gradient(135deg, #f6d365 0%, #fda085 100%); | |
| padding: 1rem; | |
| border-radius: var(--border-radius); | |
| margin-bottom: 1rem; | |
| border-left: 4px solid #ed8936; | |
| } | |
| .tab-nav { | |
| background: linear-gradient(135deg, #f7fafc 0%, #edf2f7 100%); | |
| border-radius: var(--border-radius); | |
| padding: 0.5rem; | |
| margin-bottom: 1rem; | |
| } | |
| """ | |
| with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator", theme=gr.themes.Soft()) as interface: | |
| gr.Markdown(""" | |
| <div class="container-inner"> | |
| <h1>π Advanced Data Processor & QR Code Generator</h1> | |
| <p>Process URLs, files, and JSON data. Generate QR codes and export databases.</p> | |
| </div> | |
| """) | |
| # Warning if Playwright not available | |
| if not PLAYWRIGHT_AVAILABLE: | |
| gr.Markdown(""" | |
| <div class="warning-box"> | |
| β οΈ **Playwright not installed** - Screenshots and advanced rendering disabled.<br> | |
| Install with: `pip install playwright && playwright install` | |
| </div> | |
| """) | |
| with gr.Tabs() as tabs: | |
| with gr.TabItem("π URL Processing"): | |
| url_input = gr.Textbox( | |
| label="Enter URLs", | |
| lines=5, | |
| placeholder="Enter one URL per line:\nhttps://example.com\nhttps://example.org", | |
| value="" | |
| ) | |
| with gr.TabItem("π File Input"): | |
| file_input = gr.File( | |
| label="Upload Files", | |
| file_types=["*"], | |
| file_count="multiple" | |
| ) | |
| with gr.TabItem("π JSON Input"): | |
| text_input = gr.TextArea( | |
| label="Direct JSON Input", | |
| lines=15, | |
| placeholder='{"data": "your json here"} or [{"item": 1}, {"item": 2}]', | |
| value="" | |
| ) | |
| # Options | |
| with gr.Row(): | |
| extraction_mode = gr.Radio( | |
| label="Extraction Mode", | |
| choices=[ | |
| "Full Structured Data", | |
| "Extract for Template (Shell)", | |
| "Extract for Database (Content Only)" | |
| ], | |
| value="Full Structured Data", | |
| info="Template/Database mode with single URL triggers site crawl." | |
| ) | |
| combine_data = gr.Checkbox( | |
| label="Combine data for sequential QR codes", | |
| value=True, | |
| info="Recommended for large datasets" | |
| ) | |
| # Buttons | |
| with gr.Row(): | |
| example_btn = gr.Button("π Load Example", variant="secondary") | |
| clear_btn = gr.Button("ποΈ Clear All", variant="secondary") | |
| process_btn = gr.Button("β‘ Process & Generate", variant="primary", scale=2) | |
| # Outputs | |
| output_json = gr.JSON(label="Processed Data", visible=True) | |
| with gr.Row(): | |
| output_gallery = gr.Gallery( | |
| label="Generated QR Codes & Snapshots", | |
| columns=3, | |
| height=400, | |
| show_label=True | |
| ) | |
| output_database_zip = gr.File( | |
| label="Database Export (.zip)", | |
| interactive=False | |
| ) | |
| output_text = gr.Textbox( | |
| label="Processing Status", | |
| interactive=False | |
| ) | |
| # Progress bar | |
| progress_bar = gr.Progress() | |
| # Example data | |
| def load_example(): | |
| example = { | |
| "name": "Example Product Catalog", | |
| "type": "product_catalog", | |
| "items": [ | |
| {"id": "123", "name": "Premium Widget", "price": 299.99, "category": "Electronics"}, | |
| {"id": "456", "name": "Basic Widget", "price": 149.99, "category": "Electronics"}, | |
| {"id": "789", "name": "Deluxe Widget", "price": 499.99, "category": "Electronics"} | |
| ], | |
| "metadata": { | |
| "timestamp": datetime.now().isoformat(), | |
| "source": "example", | |
| "version": "1.0" | |
| } | |
| } | |
| return json.dumps(example, indent=2) | |
| def clear_inputs(): | |
| return "", None, "", "Full Structured Data", True | |
| def process_inputs(urls, files, text, mode, combine): | |
| """Main processing function.""" | |
| results = [] | |
| all_media_paths = [] | |
| database_zip_path = None | |
| try: | |
| # Process JSON input | |
| if text and text.strip(): | |
| try: | |
| json_data = json.loads(text) | |
| if isinstance(json_data, list): | |
| for item in json_data: | |
| results.append(ProcessedItem( | |
| source='json', | |
| structured=item | |
| )) | |
| else: | |
| results.append(ProcessedItem( | |
| source='json', | |
| structured=json_data | |
| )) | |
| except json.JSONDecodeError as e: | |
| return None, [], f"Invalid JSON: {str(e)}", None | |
| # Process files | |
| if files: | |
| file_processor = EnhancedFileProcessor() | |
| for file in files: | |
| file_results = file_processor.process_file(file.name) | |
| if file_results: | |
| results.extend(file_results) | |
| # Process URLs | |
| if urls and urls.strip(): | |
| url_processor = EnhancedURLProcessor() | |
| url_list = [url.strip() for url in re.split(r'[,\n]', urls) if url.strip()] | |
| if len(url_list) == 1 and mode != "Full Structured Data": | |
| # Site crawl | |
| crawler = SiteCrawler(url_processor, max_pages=5) | |
| crawl_results, snapshot_paths = crawler.crawl_site(url_list[0], mode) | |
| results.extend(crawl_results) | |
| all_media_paths.extend(snapshot_paths) | |
| else: | |
| # Single URL processing | |
| for url in url_list: | |
| validation = url_processor.validate_url(url) | |
| if validation.is_valid: | |
| content = url_processor.fetch_content(url) | |
| if content: | |
| # Capture snapshot | |
| snapshot_path = None | |
| if PLAYWRIGHT_AVAILABLE: | |
| filename = f"snapshot_{hashlib.md5(url.encode()).hexdigest()[:8]}.png" | |
| snapshot_path = capture_visual_snapshot(url, filename) | |
| if snapshot_path: | |
| all_media_paths.append(snapshot_path) | |
| # Process based on mode | |
| if mode == "Extract for Template (Shell)": | |
| structured = url_processor._create_template_shell( | |
| content.raw_content, | |
| content.metadata['final_url'] | |
| ) | |
| elif mode == "Extract for Database (Content Only)": | |
| soup = BeautifulSoup(content.raw_content, 'html.parser') | |
| structured = url_processor._extract_database_data( | |
| soup, | |
| content.metadata['final_url'] | |
| ) | |
| else: | |
| structured = url_processor._process_html_content( | |
| content.raw_content, | |
| content.metadata['final_url'] | |
| ) | |
| results.append(ProcessedItem( | |
| source='url', | |
| url=content.metadata['final_url'], | |
| structured=structured, | |
| metadata=content.metadata, | |
| snapshot_path=snapshot_path | |
| )) | |
| # Process results | |
| if results: | |
| results_dicts = [asdict(r) for r in results] | |
| processed_results = break_down_data(results_dicts) | |
| if mode == "Extract for Database (Content Only)": | |
| # Create database package | |
| database_zip_path = package_database(processed_results) | |
| status_msg = f"β Database package created with {len(results)} items" | |
| else: | |
| # Generate QR codes | |
| qr_generator = QRCodeGenerator() | |
| qr_paths = qr_generator.generate_qr_sequence( | |
| processed_results, | |
| combined=combine, | |
| prefix="data_qr" | |
| ) | |
| all_media_paths.extend(qr_paths) | |
| status_msg = f"β Processed {len(results)} items, generated {len(qr_paths)} QR codes" | |
| return processed_results, all_media_paths, status_msg, database_zip_path | |
| else: | |
| return None, [], "β No valid content found in inputs", None | |
| except Exception as e: | |
| logger.error(f"Processing error: {e}") | |
| return None, [], f"β Error: {str(e)}", None | |
| # Connect events | |
| example_btn.click(load_example, outputs=[text_input]) | |
| clear_btn.click(clear_inputs, outputs=[url_input, file_input, text_input, extraction_mode, combine_data]) | |
| process_btn.click( | |
| process_inputs, | |
| inputs=[url_input, file_input, text_input, extraction_mode, combine_data], | |
| outputs=[output_json, output_gallery, output_text, output_database_zip] | |
| ) | |
| # Footer | |
| gr.Markdown(""" | |
| <div style="margin-top: 2rem; padding-top: 1rem; border-top: 1px solid #e2e8f0;"> | |
| <h3>π Features</h3> | |
| <ul> | |
| <li><strong>URL Processing</strong>: Extract structured data from web pages</li> | |
| <li><strong>File Support</strong>: Process various file formats including archives</li> | |
| <li><strong>Site Crawling</strong>: Limited crawl for template/database extraction</li> | |
| <li><strong>QR Generation</strong>: Create QR codes for data sharing</li> | |
| <li><strong>Database Export</strong>: Package data and media for deployment</li> | |
| </ul> | |
| </div> | |
| """) | |
| return interface | |
| def main(): | |
| """Main entry point.""" | |
| try: | |
| # Initialize mimetypes | |
| mimetypes.init() | |
| # Create and launch interface | |
| interface = create_modern_interface() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| debug=False, | |
| show_error=True, | |
| show_api=False, | |
| favicon_path=None | |
| ) | |
| except Exception as e: | |
| logger.error(f"Application startup error: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| main() |