urld / Shapp.py
acecalisto3's picture
Update Shapp.py
5ddacea verified
"""
Advanced Data Processor & QR Generator
Enhanced version with better error handling, performance improvements, and cleaner architecture.
"""
import json
import os
import re
import time
import logging
import mimetypes
import zipfile
import tempfile
import chardet
import tarfile
import copy
import hashlib
from datetime import datetime
from typing import List, Dict, Optional, Union, Tuple, Any, Set
from pathlib import Path
from urllib.parse import urlparse, urljoin
from dataclasses import dataclass, asdict
from contextlib import contextmanager
import requests
import validators
import gradio as gr
from bs4 import BeautifulSoup, NavigableString, Tag
from fake_useragent import UserAgent
from cleantext import clean
import qrcode
from PIL import Image, ImageDraw, ImageFont
import numpy as np
# Conditional imports with better error handling
PLAYWRIGHT_AVAILABLE = False
try:
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
PLAYWRIGHT_AVAILABLE = True
except ImportError:
logger = logging.getLogger(__name__)
logger.warning("Playwright not installed. Install with: pip install playwright && playwright install")
# Setup enhanced logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app.log', encoding='utf-8', mode='a')
]
)
logger = logging.getLogger(__name__)
# Constants
OUTPUTS_DIR = Path('output')
QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes'
SNAPSHOTS_DIR = OUTPUTS_DIR / 'snapshots'
MEDIA_DIR = OUTPUTS_DIR / 'media'
TEMP_DIR = OUTPUTS_DIR / 'temp'
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB default max
DEFAULT_TIMEOUT = 30
# Ensure directories exist
for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR, SNAPSHOTS_DIR, MEDIA_DIR]:
directory.mkdir(parents=True, exist_ok=True)
# Data classes for better type safety
@dataclass
class URLValidationResult:
is_valid: bool
message: str
details: Dict[str, Any]
@dataclass
class FetchResult:
structured: Dict[str, Any]
raw_content: str
metadata: Dict[str, Any]
@dataclass
class ProcessedItem:
source: str
url: Optional[str] = None
filename: Optional[str] = None
structured: Dict[str, Any] = None
metadata: Dict[str, Any] = None
timestamp: str = None
snapshot_path: Optional[str] = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now().isoformat()
if self.structured is None:
self.structured = {}
if self.metadata is None:
self.metadata = {}
# Media Downloader with better caching and error handling
class MediaDownloader:
"""Handles downloading and saving media files with caching."""
def __init__(self, cache_dir: Path = TEMP_DIR / 'media_cache'):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'image/webp,image/*,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
})
self.cache_dir = cache_dir
self.cache_dir.mkdir(exist_ok=True)
self.downloaded_files = {} # {url_hash: local_path}
def _get_url_hash(self, url: str) -> str:
"""Generate consistent hash for URL."""
return hashlib.md5(url.encode()).hexdigest()
def download_media(self, url: str, timeout: int = 10) -> Optional[str]:
"""Download media file with caching."""
url_hash = self._get_url_hash(url)
# Check cache first
cache_file = self.cache_dir / f"{url_hash}.cache"
if cache_file.exists():
try:
with open(cache_file, 'r') as f:
cached_path = f.read().strip()
if Path(cached_path).exists():
return cached_path
except Exception:
pass
# Download the file
try:
response = self.session.get(url, timeout=timeout, stream=True)
response.raise_for_status()
# Determine file extension
content_type = response.headers.get('Content-Type', '').split(';')[0].strip()
ext = mimetypes.guess_extension(content_type)
if not ext:
# Try to get extension from URL
parsed = urlparse(url)
ext = Path(parsed.path).suffix or '.bin'
# Create safe filename
safe_filename = f"{url_hash}{ext}"
local_path = MEDIA_DIR / safe_filename
# Save file
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# Update cache
with open(cache_file, 'w') as f:
f.write(str(local_path))
self.downloaded_files[url] = str(local_path)
logger.info(f"Downloaded media: {url} -> {local_path}")
return str(local_path)
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to download media {url}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error downloading {url}: {e}")
return None
def batch_download(self, urls: List[str], max_workers: int = 5) -> Dict[str, Optional[str]]:
"""Download multiple files (could be enhanced with threading)."""
results = {}
for url in urls:
results[url] = self.download_media(url)
return results
# Enhanced URL Processor
class EnhancedURLProcessor:
"""Advanced URL processing with complete content extraction."""
def __init__(self, timeout: int = DEFAULT_TIMEOUT, max_retries: int = 3):
self.session = requests.Session()
self.timeout = timeout
self.max_retries = max_retries
self.user_agent = UserAgent()
self.session.headers.update({
'User-Agent': self.user_agent.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'DNT': '1',
})
def validate_url(self, url: str) -> URLValidationResult:
"""Enhanced URL validation with detailed feedback."""
try:
# Basic URL validation
if not url or not isinstance(url, str):
return URLValidationResult(
is_valid=False,
message='Invalid URL',
details={'error': 'URL must be a non-empty string'}
)
# Check if URL starts with http(s)
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
# Validate with validators
if not validators.url(url):
return URLValidationResult(
is_valid=False,
message='Invalid URL format',
details={'error': 'URL must be properly formatted'}
)
parsed = urlparse(url)
if not all([parsed.scheme, parsed.netloc]):
return URLValidationResult(
is_valid=False,
message='Incomplete URL',
details={'error': 'Missing scheme or domain'}
)
# Try to connect
try:
head_response = self.session.head(
url,
timeout=5,
allow_redirects=True
)
head_response.raise_for_status()
except requests.exceptions.RequestException:
# Try GET if HEAD fails
response = self.session.get(url, timeout=5, stream=True)
response.raise_for_status()
return URLValidationResult(
is_valid=True,
message='URL is valid and accessible',
details={
'final_url': response.url if 'response' in locals() else head_response.url,
'content_type': head_response.headers.get('Content-Type', 'unknown'),
'server': head_response.headers.get('Server', 'unknown'),
'size': head_response.headers.get('Content-Length', 'unknown')
}
)
except Exception as e:
return URLValidationResult(
is_valid=False,
message=f'URL validation failed: {str(e)}',
details={'error': str(e), 'traceback': str(e.__traceback__)}
)
def fetch_content(self, url: str, retry_count: int = 0) -> Optional[FetchResult]:
"""Enhanced content fetcher with retries and encoding detection."""
try:
logger.info(f"Fetching content from: {url} (Attempt {retry_count + 1}/{self.max_retries})")
# Update user agent
self.session.headers.update({'User-Agent': self.user_agent.random})
response = self.session.get(
url,
timeout=self.timeout,
allow_redirects=True,
stream=True
)
response.raise_for_status()
# Encoding detection
encoding = response.encoding
if encoding is None or encoding == 'ISO-8859-1':
# Sample first 10KB for encoding detection
sample = response.content[:10240]
detected = chardet.detect(sample)
encoding = detected['encoding'] or 'utf-8'
# Decode content
try:
raw_content = response.content.decode(encoding, errors='replace')
except (UnicodeDecodeError, LookupError):
raw_content = response.content.decode('utf-8', errors='replace')
encoding = 'utf-8 (fallback)'
# Prepare metadata
metadata = {
'url': url,
'final_url': response.url,
'timestamp': datetime.now().isoformat(),
'encoding': encoding,
'content_type': response.headers.get('Content-Type', '').split(';')[0].strip(),
'content_length': len(response.content),
'status_code': response.status_code,
'headers': dict(response.headers),
'elapsed': response.elapsed.total_seconds(),
}
# Process based on content type
content_type = metadata['content_type'].lower()
structured = {}
if 'text/html' in content_type:
structured = self._process_html_content(raw_content, response.url)
elif 'application/json' in content_type or url.endswith('.json'):
try:
structured = json.loads(raw_content)
except json.JSONDecodeError as e:
structured = {
'text': raw_content[:100000],
'parse_error': str(e),
'json_fragment': raw_content[:1000]
}
elif 'image/' in content_type:
structured = {
'media_type': 'image',
'direct_url': response.url,
'format': content_type.split('/')[-1],
'size_bytes': len(response.content),
'filename': Path(urlparse(url).path).name or 'unknown'
}
else:
# Generic content
structured = {'text': raw_content[:100000]}
return FetchResult(
structured=structured,
raw_content=raw_content,
metadata=metadata
)
except requests.exceptions.RequestException as e:
if retry_count < self.max_retries - 1:
sleep_time = 2 ** retry_count
logger.info(f"Retrying {url} after {sleep_time}s...")
time.sleep(sleep_time)
return self.fetch_content(url, retry_count + 1)
else:
logger.error(f"Failed to fetch {url} after {self.max_retries} attempts: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error fetching {url}: {e}")
return None
def _process_html_content(self, raw_content: str, base_url: str) -> Dict[str, Any]:
"""Process HTML content and extract structured data."""
soup = BeautifulSoup(raw_content, 'html.parser')
# Fix relative URLs
for tag in soup.find_all(['a', 'img', 'link', 'script', 'video', 'audio', 'source']):
for attr in ['href', 'src', 'data-src', 'poster']:
if tag.get(attr) and not urlparse(tag[attr]).scheme:
try:
tag[attr] = urljoin(base_url, tag[attr])
except Exception as e:
logger.debug(f"Failed to join URL: {e}")
# Extract structured data
structured = self._extract_database_data(soup, base_url)
structured['raw_html'] = raw_content[:50000] # Store truncated HTML
structured['base_url'] = base_url
return structured
def _create_template_shell(self, raw_content: str, base_url: str) -> Dict[str, Any]:
"""Create a template shell from HTML content."""
soup = BeautifulSoup(raw_content, 'html.parser')
PLACEHOLDER_TEXT = "[LOREM IPSUM CONTENT]"
PLACEHOLDER_IMG = "data:image/svg+xml;charset=UTF-8,%3Csvg%20width%3D%22200%22%20height%3D%22100%22%20xmlns%3D%22http%3A%2F%2Fwww.w3.org%2F2000%2Fsvg%22%3E%3Crect%20width%3D%22200%22%20height%3D%22100%22%20fill%3D%22%23777%22%3E%3C%2Frect%3E%3Ctext%20x%3D%2270%22%20y%3D%2255%22%3E200x100%3C%2Ftext%3E%3C%2Fsvg%3E"
# Replace text content
text_tags = ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'span', 'td', 'th', 'label', 'title', 'div']
for tag in soup.find_all(text_tags):
if tag.string and len(tag.get_text(strip=True)) > 5:
tag.string.replace_with(PLACEHOLDER_TEXT)
# Replace images
for img in soup.find_all('img'):
img['src'] = PLACEHOLDER_IMG
if 'srcset' in img.attrs:
del img['srcset']
# Remove external links
for a in soup.find_all('a'):
if 'href' in a.attrs:
a['href'] = '#'
# Remove sensitive data
for script in soup.find_all('script', type='application/ld+json'):
script.decompose()
# Remove comments
for comment in soup.find_all(string=lambda text: isinstance(text, NavigableString) and '<!--' in str(text)):
comment.extract()
return {
'template_type': 'html_shell',
'base_url': base_url,
'template_html': str(soup),
'timestamp': datetime.now().isoformat()
}
def _extract_database_data(self, soup: BeautifulSoup, base_url: str) -> Dict[str, Any]:
"""Extract structured data from HTML."""
structured = {
'title': soup.title.string.strip() if soup.title and soup.title.string else '',
'meta_description': '',
'core_text_content': '',
'images': [],
'videos': [],
'audios': [],
'structured_data': [],
'products': [],
'links': [],
'metadata': {}
}
# Extract meta description
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc:
structured['meta_description'] = meta_desc.get('content', '')
# Extract JSON-LD structured data
for script in soup.find_all('script', type='application/ld+json'):
try:
ld_data = json.loads(script.string or '{}')
structured['structured_data'].append(ld_data)
# Extract products
if isinstance(ld_data, dict):
if ld_data.get('@type') == 'Product':
structured['products'].append(ld_data)
elif ld_data.get('@graph'):
for item in ld_data.get('@graph', []):
if isinstance(item, dict) and item.get('@type') == 'Product':
structured['products'].append(item)
except (json.JSONDecodeError, TypeError) as e:
logger.debug(f"Failed to parse JSON-LD: {e}")
# Extract media
for img in soup.find_all('img'):
src = img.get('src') or img.get('data-src')
if src:
structured['images'].append(urljoin(base_url, src))
for video in soup.find_all('video'):
src = video.get('src') or (video.find('source') and video.find('source').get('src'))
if src:
structured['videos'].append(urljoin(base_url, src))
for audio in soup.find_all('audio'):
src = audio.get('src') or (audio.find('source') and audio.find('source').get('src'))
if src:
structured['audios'].append(urljoin(base_url, src))
# Extract links
for a in soup.find_all('a', href=True):
href = a['href']
if href.startswith(('http://', 'https://')):
structured['links'].append(href)
# Extract main content
main_content_selectors = [
'main', 'article', '[role="main"]',
'.main-content', '.content', '#content',
'.article', '.post'
]
for selector in main_content_selectors:
main_tag = soup.select_one(selector)
if main_tag:
structured['core_text_content'] = clean(
main_tag.get_text('\n', strip=True),
lower=False,
no_line_breaks=False,
no_urls=True,
no_emails=True,
no_phone_numbers=True
)[:10000] # Limit size
break
if not structured['core_text_content']:
# Fallback: extract all text
structured['core_text_content'] = clean(
soup.get_text('\n', strip=True),
lower=False,
no_line_breaks=False,
no_urls=True,
no_emails=True,
no_phone_numbers=True
)[:5000]
# Remove duplicates
structured['images'] = list(dict.fromkeys(structured['images']))[:50] # Limit to 50 images
structured['videos'] = list(dict.fromkeys(structured['videos']))
structured['audios'] = list(dict.fromkeys(structured['audios']))
structured['links'] = list(dict.fromkeys(structured['links']))[:100] # Limit to 100 links
return structured
# Site Crawler with improved logic
class SiteCrawler:
"""Crawl website with configurable depth and limits."""
def __init__(self, processor: EnhancedURLProcessor, max_pages: int = 10, max_depth: int = 2):
self.processor = processor
self.max_pages = max_pages
self.max_depth = max_depth
self.crawled_urls = set()
self.results = []
self.snapshot_paths = []
def _normalize_url(self, url: str, base_url: str) -> str:
"""Normalize URL by removing fragments and query parameters for crawling."""
parsed = urlparse(url)
base_parsed = urlparse(base_url)
# Ensure same domain
if parsed.netloc and parsed.netloc != base_parsed.netloc:
return None
# Remove fragments and query params for crawling
normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
return normalized.rstrip('/')
def _get_internal_links(self, soup: BeautifulSoup, base_url: str) -> List[str]:
"""Extract internal links from page."""
parsed_base = urlparse(base_url)
internal_links = set()
for a in soup.find_all('a', href=True):
href = urljoin(base_url, a['href'])
parsed_href = urlparse(href)
# Check if same domain
if parsed_href.netloc == parsed_base.netloc:
# Filter out non-HTML resources
if any(href.lower().endswith(ext) for ext in [
'.pdf', '.zip', '.jpg', '.jpeg', '.png', '.gif',
'.css', '.js', '.mp4', '.mp3', '.avi', '.mov'
]):
continue
# Remove fragments
href = self._normalize_url(href, base_url)
if href:
internal_links.add(href)
return list(internal_links)
def crawl_site(self, start_url: str, mode: str = "Full Structured Data") -> Tuple[List[Dict], List[str]]:
"""Crawl website starting from given URL."""
logger.info(f"Starting crawl from {start_url} (max pages: {self.max_pages})")
queue = [(start_url, 0)] # (url, depth)
while queue and len(self.crawled_urls) < self.max_pages:
url, depth = queue.pop(0)
if url in self.crawled_urls or depth > self.max_depth:
continue
logger.info(f"Crawling: {url} (depth: {depth})")
self.crawled_urls.add(url)
# Fetch content
content_result = self.processor.fetch_content(url)
if not content_result:
continue
# Check if HTML
content_type = content_result.metadata.get('content_type', '').lower()
if 'text/html' not in content_type:
continue
# Capture snapshot if Playwright is available
snapshot_path = None
if PLAYWRIGHT_AVAILABLE:
try:
filename = f"snapshot_{len(self.crawled_urls)}_{hashlib.md5(url.encode()).hexdigest()[:8]}.png"
snapshot_path = capture_visual_snapshot(url, filename)
if snapshot_path:
self.snapshot_paths.append(snapshot_path)
except Exception as e:
logger.warning(f"Failed to capture snapshot for {url}: {e}")
# Process based on mode
raw_content = content_result.raw_content
base_url = content_result.metadata['final_url']
soup = BeautifulSoup(raw_content, 'html.parser')
if mode == "Extract for Template (Shell)":
structured = self.processor._create_template_shell(raw_content, base_url)
elif mode == "Extract for Database (Content Only)":
structured = self.processor._extract_database_data(soup, base_url)
else:
structured = self.processor._process_html_content(raw_content, base_url)
# Create result item
result_item = ProcessedItem(
source='crawl',
url=base_url,
structured=structured,
metadata=content_result.metadata,
snapshot_path=snapshot_path
)
self.results.append(asdict(result_item))
# Extract links for next level
if depth < self.max_depth:
new_links = self._get_internal_links(soup, base_url)
for link in new_links:
if link not in self.crawled_urls and len(self.crawled_urls) < self.max_pages:
queue.append((link, depth + 1))
# Be polite
time.sleep(0.5)
logger.info(f"Crawl completed. Found {len(self.results)} pages.")
return self.results, self.snapshot_paths
# File Processor with better archive handling
class EnhancedFileProcessor:
"""Process various file types including archives."""
def __init__(self, max_file_size: int = MAX_FILE_SIZE):
self.max_file_size = max_file_size
self.supported_extensions = {
'.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', '.log',
'.yml', '.yaml', '.ini', '.conf', '.cfg', '.zip', '.tar', '.gz',
'.bz2', '.7z', '.rar', '.pdf', '.doc', '.docx', '.rtf', '.odt',
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.webp'
}
def process_file(self, file_path: str) -> List[Dict]:
"""Process a single file or archive."""
if not file_path or not os.path.exists(file_path):
return []
try:
file_size = os.path.getsize(file_path)
if file_size > self.max_file_size:
logger.warning(f"File {file_path} exceeds size limit ({file_size} > {self.max_file_size})")
return []
if self._is_archive(file_path):
return self._process_archive(file_path)
else:
return self._process_single_file(file_path)
except Exception as e:
logger.error(f"Error processing file {file_path}: {e}")
return []
def _is_archive(self, filepath: str) -> bool:
"""Check if file is an archive."""
archive_extensions = ['.zip', '.tar', '.gz', '.bz2', '.7z', '.rar']
return any(filepath.lower().endswith(ext) for ext in archive_extensions)
def _process_single_file(self, file_path: str) -> List[Dict]:
"""Process a single file."""
try:
file_stat = os.stat(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
mime_type = mime_type or 'application/octet-stream'
structured = {}
if 'image/' in mime_type:
structured = {
'media_type': 'image',
'filename': os.path.basename(file_path),
'mime_type': mime_type,
'size_bytes': file_stat.st_size
}
else:
# Read file content
with open(file_path, 'rb') as f:
raw_bytes = f.read()
# Detect encoding
detected = chardet.detect(raw_bytes[:10000])
encoding = detected['encoding'] or 'utf-8'
try:
content = raw_bytes.decode(encoding, errors='replace')
except (UnicodeDecodeError, LookupError):
content = raw_bytes.decode('utf-8', errors='replace')
# Parse based on file type
if 'json' in mime_type or file_path.endswith('.json'):
try:
json_data = json.loads(content)
structured = json_data
except json.JSONDecodeError as e:
structured = {
'text': content[:50000],
'parse_error': str(e)
}
elif 'html' in mime_type or file_path.endswith(('.html', '.htm')):
processor = EnhancedURLProcessor()
soup = BeautifulSoup(content, 'html.parser')
structured = processor._extract_database_data(soup, f"file://{file_path}")
else:
structured = {'text': content[:100000]}
result_item = ProcessedItem(
source='file',
filename=os.path.basename(file_path),
structured=structured,
metadata={
'file_size': file_stat.st_size,
'mime_type': mime_type,
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
'file_path': file_path
}
)
return [asdict(result_item)]
except Exception as e:
logger.error(f"Error processing single file {file_path}: {e}")
return []
def _process_archive(self, archive_path: str) -> List[Dict]:
"""Extract and process files from archive."""
dataset = []
temp_dir = tempfile.mkdtemp(prefix='archive_extract_')
try:
if zipfile.is_zipfile(archive_path):
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
for file_info in zip_ref.infolist():
if not file_info.is_dir():
file_path = os.path.join(temp_dir, file_info.filename)
if os.path.exists(file_path):
dataset.extend(self._process_single_file(file_path))
elif tarfile.is_tarfile(archive_path):
with tarfile.open(archive_path, 'r') as tar_ref:
tar_ref.extractall(temp_dir)
for member in tar_ref.getmembers():
if member.isfile():
file_path = os.path.join(temp_dir, member.name)
if os.path.exists(file_path):
dataset.extend(self._process_single_file(file_path))
else:
logger.warning(f"Unsupported archive format: {archive_path}")
except Exception as e:
logger.error(f"Error processing archive {archive_path}: {e}")
finally:
# Cleanup
try:
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
except:
pass
return dataset
# Data Chunker with improved chunking logic
class DataChunker:
"""Chunk data for QR code generation."""
def __init__(self, max_chunk_size: int = 2953):
self.max_chunk_size = max_chunk_size
def chunk_data(self, data: Any) -> List[Dict]:
"""Chunk data into smaller pieces for QR encoding."""
try:
# Serialize data
if isinstance(data, dict):
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
elif isinstance(data, list):
json_str = json.dumps(data, ensure_ascii=False)
else:
json_str = str(data)
# Calculate chunk size
total_bytes = len(json_str.encode('utf-8'))
chunk_size = self.max_chunk_size
# Create chunks
chunks = []
for i in range(0, total_bytes, chunk_size):
chunk_str = json_str.encode('utf-8')[i:i + chunk_size].decode('utf-8', errors='ignore')
chunk_hash = hashlib.md5(chunk_str.encode()).hexdigest()[:8]
chunk = {
"chunk_index": len(chunks) + 1,
"total_chunks": (total_bytes + chunk_size - 1) // chunk_size,
"total_length": total_bytes,
"chunk_hash": chunk_hash,
"data": chunk_str,
"timestamp": datetime.now().isoformat()
}
chunks.append(chunk)
return chunks
except Exception as e:
logger.error(f"Error chunking data: {e}")
return [{"error": str(e), "data": str(data)[:100]}]
# QR Code Generator with styling options
class QRCodeGenerator:
"""Generate QR codes with various styling options."""
def __init__(self, output_dir: Path = QR_CODES_DIR):
self.output_dir = output_dir
self.output_dir.mkdir(exist_ok=True)
def generate_stylish_qr(self, data: Union[str, Dict], filename: str,
size: int = 10, border: int = 4,
fill_color: str = "#000000",
back_color: str = "#FFFFFF",
logo_path: Optional[str] = None) -> str:
"""Generate a stylish QR code."""
try:
# Prepare data
if isinstance(data, dict):
data_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
else:
data_str = str(data)
# Create QR code
qr = qrcode.QRCode(
version=None,
error_correction=qrcode.constants.ERROR_CORRECT_H, # High error correction
box_size=size,
border=border
)
qr.add_data(data_str)
qr.make(fit=True)
# Create image
qr_img = qr.make_image(fill_color=fill_color, back_color=back_color)
qr_img = qr_img.convert('RGBA')
# Add logo if provided
if logo_path and os.path.exists(logo_path):
try:
logo = Image.open(logo_path)
logo_size = qr_img.size[0] // 5
logo = logo.resize((logo_size, logo_size), Image.Resampling.LANCZOS)
# Calculate position
pos = ((qr_img.size[0] - logo.size[0]) // 2,
(qr_img.size[1] - logo.size[1]) // 2)
# Paste logo
qr_img.paste(logo, pos, logo)
except Exception as e:
logger.warning(f"Failed to add logo: {e}")
# Save image
output_path = self.output_dir / filename
qr_img.save(output_path, 'PNG', quality=95)
logger.info(f"QR code generated: {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"QR generation error: {e}")
return ""
def generate_qr_sequence(self, data: Any, combined: bool = True,
prefix: str = "qr") -> List[str]:
"""Generate a sequence of QR codes for data."""
chunker = DataChunker()
paths = []
timestamp = int(time.time())
if combined:
# Generate QR codes for combined data
chunks = chunker.chunk_data(data)
for i, chunk in enumerate(chunks):
filename = f'{prefix}_{timestamp}_{i+1}_of_{len(chunks)}.png'
qr_path = self.generate_stylish_qr(
data=chunk,
filename=filename,
fill_color="#1a365d",
back_color="#ffffff"
)
if qr_path:
paths.append(qr_path)
else:
# Generate separate QR codes for each item
if isinstance(data, list):
for idx, item in enumerate(data):
chunks = chunker.chunk_data(item)
for chunk_idx, chunk in enumerate(chunks):
filename = f'{prefix}_item{idx+1}_{chunk_idx+1}_of_{len(chunks)}_{timestamp}.png'
qr_path = self.generate_stylish_qr(
data=chunk,
filename=filename,
fill_color="#2d3748",
back_color="#ffffff"
)
if qr_path:
paths.append(qr_path)
else:
chunks = chunker.chunk_data(data)
for i, chunk in enumerate(chunks):
filename = f'{prefix}_single_{i+1}_of_{len(chunks)}_{timestamp}.png'
qr_path = self.generate_stylish_qr(
data=chunk,
filename=filename,
fill_color="#1a365d",
back_color="#ffffff"
)
if qr_path:
paths.append(qr_path)
return paths
# Main processing functions
def capture_visual_snapshot(url: str, filename: str) -> Optional[str]:
"""Capture webpage screenshot using Playwright."""
if not PLAYWRIGHT_AVAILABLE:
logger.warning("Playwright not available for screenshots")
return None
output_path = SNAPSHOTS_DIR / filename
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
viewport={'width': 1280, 'height': 720},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
)
page = context.new_page()
# Navigate with timeout
page.goto(url, wait_until='networkidle', timeout=30000)
# Take full page screenshot
page.screenshot(path=output_path, full_page=True)
browser.close()
logger.info(f"Snapshot captured: {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"Failed to capture snapshot for {url}: {e}")
return None
def break_down_data(data: Union[Dict, List[Dict]]) -> Union[Dict, List[Dict]]:
"""Break down and restructure data for better organization."""
def process_item(item: Dict) -> Dict:
structured = item.get('structured', {})
# Handle template shells
if structured.get('template_type') == 'html_shell':
return item
# Ensure structured data exists
if not structured:
content = item.get('content') or item.get('raw_content', '')
if isinstance(content, str):
structured = {'text': content}
elif isinstance(content, dict):
structured = content
# Extract media
media = []
for img in structured.get('images', []):
media.append({'type': 'image', 'source': img, 'size': 'unknown'})
for vid in structured.get('videos', []):
media.append({'type': 'video', 'source': vid, 'size': 'unknown'})
for aud in structured.get('audios', []):
media.append({'type': 'audio', 'source': aud, 'size': 'unknown'})
structured['media'] = media
# Extract products
if 'products' not in structured:
structured['products'] = []
# Create template if products exist
if structured['products']:
structured['template'] = {
'type': 'product_catalog',
'item_count': len(structured['products']),
'items': structured['products'][:10], # Limit to 10
'metadata': item.get('metadata', {})
}
item['structured'] = structured
return item
if isinstance(data, list):
return [process_item(item) for item in data]
elif isinstance(data, dict):
return process_item(data)
return data
def package_database(results: List[Dict]) -> Optional[str]:
"""Package processed data and media into a ZIP file."""
if not results:
return None
try:
downloader = MediaDownloader()
updated_results = copy.deepcopy(results)
# Collect media URLs
media_urls = set()
for item in updated_results:
structured = item.get('structured', {})
media_urls.update(structured.get('images', []))
media_urls.update(structured.get('videos', []))
media_urls.update(structured.get('audios', []))
# Download media
media_mapping = downloader.batch_download(list(media_urls))
# Update results with local paths
for item in updated_results:
structured = item.get('structured', {})
for media_type in ['images', 'videos', 'audios']:
if media_type in structured:
new_paths = []
for url in structured[media_type]:
if url in media_mapping and media_mapping[url]:
local_path = Path(media_mapping[url])
new_paths.append(f"media/{local_path.name}")
else:
new_paths.append(url)
structured[media_type] = new_paths
# Create ZIP file
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
zip_filename = OUTPUTS_DIR / f"database_export_{timestamp}.zip"
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zf:
# Add data
zf.writestr(
'data_export.json',
json.dumps(updated_results, indent=2, ensure_ascii=False)
)
# Add README
readme = f"""Database Export
Generated: {datetime.now().isoformat()}
Items: {len(updated_results)}
Media Files: {len(media_mapping)}
"""
zf.writestr('README.txt', readme)
# Add media files
for url, local_path in media_mapping.items():
if local_path and os.path.exists(local_path):
zf.write(local_path, arcname=f"media/{Path(local_path).name}")
logger.info(f"Database package created: {zip_filename}")
return str(zip_filename)
except Exception as e:
logger.error(f"Failed to create database package: {e}")
return None
# Gradio Interface
def create_modern_interface():
"""Create modern Gradio interface."""
css = """
:root {
--primary-color: #1a365d;
--secondary-color: #2d3748;
--accent-color: #4299e1;
--background-color: #f7fafc;
--success-color: #48bb78;
--error-color: #f56565;
--warning-color: #ed8936;
--border-radius: 0.5rem;
}
.gradio-container {
max-width: 1200px;
margin: 2rem auto;
padding: 2rem;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
border-radius: var(--border-radius);
box-shadow: 0 20px 60px rgba(0, 0, 0, 0.3);
}
.container-inner {
background: white;
border-radius: var(--border-radius);
padding: 2rem;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
}
h1 {
background: linear-gradient(90deg, #667eea, #764ba2);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
margin-bottom: 1rem;
}
.primary-button {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border: none;
padding: 0.75rem 1.5rem;
border-radius: var(--border-radius);
font-weight: 600;
cursor: pointer;
transition: transform 0.2s, box-shadow 0.2s;
}
.primary-button:hover {
transform: translateY(-2px);
box-shadow: 0 10px 20px rgba(102, 126, 234, 0.4);
}
.warning-box {
background: linear-gradient(135deg, #f6d365 0%, #fda085 100%);
padding: 1rem;
border-radius: var(--border-radius);
margin-bottom: 1rem;
border-left: 4px solid #ed8936;
}
.tab-nav {
background: linear-gradient(135deg, #f7fafc 0%, #edf2f7 100%);
border-radius: var(--border-radius);
padding: 0.5rem;
margin-bottom: 1rem;
}
"""
with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator", theme=gr.themes.Soft()) as interface:
gr.Markdown("""
<div class="container-inner">
<h1>πŸš€ Advanced Data Processor & QR Code Generator</h1>
<p>Process URLs, files, and JSON data. Generate QR codes and export databases.</p>
</div>
""")
# Warning if Playwright not available
if not PLAYWRIGHT_AVAILABLE:
gr.Markdown("""
<div class="warning-box">
⚠️ **Playwright not installed** - Screenshots and advanced rendering disabled.<br>
Install with: `pip install playwright && playwright install`
</div>
""")
with gr.Tabs() as tabs:
with gr.TabItem("🌐 URL Processing"):
url_input = gr.Textbox(
label="Enter URLs",
lines=5,
placeholder="Enter one URL per line:\nhttps://example.com\nhttps://example.org",
value=""
)
with gr.TabItem("πŸ“ File Input"):
file_input = gr.File(
label="Upload Files",
file_types=["*"],
file_count="multiple"
)
with gr.TabItem("πŸ“ JSON Input"):
text_input = gr.TextArea(
label="Direct JSON Input",
lines=15,
placeholder='{"data": "your json here"} or [{"item": 1}, {"item": 2}]',
value=""
)
# Options
with gr.Row():
extraction_mode = gr.Radio(
label="Extraction Mode",
choices=[
"Full Structured Data",
"Extract for Template (Shell)",
"Extract for Database (Content Only)"
],
value="Full Structured Data",
info="Template/Database mode with single URL triggers site crawl."
)
combine_data = gr.Checkbox(
label="Combine data for sequential QR codes",
value=True,
info="Recommended for large datasets"
)
# Buttons
with gr.Row():
example_btn = gr.Button("πŸ“‹ Load Example", variant="secondary")
clear_btn = gr.Button("πŸ—‘οΈ Clear All", variant="secondary")
process_btn = gr.Button("⚑ Process & Generate", variant="primary", scale=2)
# Outputs
output_json = gr.JSON(label="Processed Data", visible=True)
with gr.Row():
output_gallery = gr.Gallery(
label="Generated QR Codes & Snapshots",
columns=3,
height=400,
show_label=True
)
output_database_zip = gr.File(
label="Database Export (.zip)",
interactive=False
)
output_text = gr.Textbox(
label="Processing Status",
interactive=False
)
# Progress bar
progress_bar = gr.Progress()
# Example data
def load_example():
example = {
"name": "Example Product Catalog",
"type": "product_catalog",
"items": [
{"id": "123", "name": "Premium Widget", "price": 299.99, "category": "Electronics"},
{"id": "456", "name": "Basic Widget", "price": 149.99, "category": "Electronics"},
{"id": "789", "name": "Deluxe Widget", "price": 499.99, "category": "Electronics"}
],
"metadata": {
"timestamp": datetime.now().isoformat(),
"source": "example",
"version": "1.0"
}
}
return json.dumps(example, indent=2)
def clear_inputs():
return "", None, "", "Full Structured Data", True
def process_inputs(urls, files, text, mode, combine):
"""Main processing function."""
results = []
all_media_paths = []
database_zip_path = None
try:
# Process JSON input
if text and text.strip():
try:
json_data = json.loads(text)
if isinstance(json_data, list):
for item in json_data:
results.append(ProcessedItem(
source='json',
structured=item
))
else:
results.append(ProcessedItem(
source='json',
structured=json_data
))
except json.JSONDecodeError as e:
return None, [], f"Invalid JSON: {str(e)}", None
# Process files
if files:
file_processor = EnhancedFileProcessor()
for file in files:
file_results = file_processor.process_file(file.name)
if file_results:
results.extend(file_results)
# Process URLs
if urls and urls.strip():
url_processor = EnhancedURLProcessor()
url_list = [url.strip() for url in re.split(r'[,\n]', urls) if url.strip()]
if len(url_list) == 1 and mode != "Full Structured Data":
# Site crawl
crawler = SiteCrawler(url_processor, max_pages=5)
crawl_results, snapshot_paths = crawler.crawl_site(url_list[0], mode)
results.extend(crawl_results)
all_media_paths.extend(snapshot_paths)
else:
# Single URL processing
for url in url_list:
validation = url_processor.validate_url(url)
if validation.is_valid:
content = url_processor.fetch_content(url)
if content:
# Capture snapshot
snapshot_path = None
if PLAYWRIGHT_AVAILABLE:
filename = f"snapshot_{hashlib.md5(url.encode()).hexdigest()[:8]}.png"
snapshot_path = capture_visual_snapshot(url, filename)
if snapshot_path:
all_media_paths.append(snapshot_path)
# Process based on mode
if mode == "Extract for Template (Shell)":
structured = url_processor._create_template_shell(
content.raw_content,
content.metadata['final_url']
)
elif mode == "Extract for Database (Content Only)":
soup = BeautifulSoup(content.raw_content, 'html.parser')
structured = url_processor._extract_database_data(
soup,
content.metadata['final_url']
)
else:
structured = url_processor._process_html_content(
content.raw_content,
content.metadata['final_url']
)
results.append(ProcessedItem(
source='url',
url=content.metadata['final_url'],
structured=structured,
metadata=content.metadata,
snapshot_path=snapshot_path
))
# Process results
if results:
results_dicts = [asdict(r) for r in results]
processed_results = break_down_data(results_dicts)
if mode == "Extract for Database (Content Only)":
# Create database package
database_zip_path = package_database(processed_results)
status_msg = f"βœ… Database package created with {len(results)} items"
else:
# Generate QR codes
qr_generator = QRCodeGenerator()
qr_paths = qr_generator.generate_qr_sequence(
processed_results,
combined=combine,
prefix="data_qr"
)
all_media_paths.extend(qr_paths)
status_msg = f"βœ… Processed {len(results)} items, generated {len(qr_paths)} QR codes"
return processed_results, all_media_paths, status_msg, database_zip_path
else:
return None, [], "❌ No valid content found in inputs", None
except Exception as e:
logger.error(f"Processing error: {e}")
return None, [], f"❌ Error: {str(e)}", None
# Connect events
example_btn.click(load_example, outputs=[text_input])
clear_btn.click(clear_inputs, outputs=[url_input, file_input, text_input, extraction_mode, combine_data])
process_btn.click(
process_inputs,
inputs=[url_input, file_input, text_input, extraction_mode, combine_data],
outputs=[output_json, output_gallery, output_text, output_database_zip]
)
# Footer
gr.Markdown("""
<div style="margin-top: 2rem; padding-top: 1rem; border-top: 1px solid #e2e8f0;">
<h3>πŸ“š Features</h3>
<ul>
<li><strong>URL Processing</strong>: Extract structured data from web pages</li>
<li><strong>File Support</strong>: Process various file formats including archives</li>
<li><strong>Site Crawling</strong>: Limited crawl for template/database extraction</li>
<li><strong>QR Generation</strong>: Create QR codes for data sharing</li>
<li><strong>Database Export</strong>: Package data and media for deployment</li>
</ul>
</div>
""")
return interface
def main():
"""Main entry point."""
try:
# Initialize mimetypes
mimetypes.init()
# Create and launch interface
interface = create_modern_interface()
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=False,
show_error=True,
show_api=False,
favicon_path=None
)
except Exception as e:
logger.error(f"Application startup error: {e}")
raise
if __name__ == "__main__":
main()