BasalGanglia's picture
πŸ› οΈ Fix HuggingFace Space configuration - Remove quotes from frontmatter
64ced8b verified
#!/usr/bin/env python3
"""
MCP File Processor Tool - Gradio Implementation
This MCP server provides file processing capabilities including:
- Text extraction from various file formats
- CSV data analysis
- File format conversion
- Document processing
Supports MCP protocol via Gradio interface.
"""
import csv
import json
import logging
import os
from io import StringIO
import gradio as gr
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
logger.warning("Pandas not available, limited CSV analysis")
class FileProcessor:
"""File processing service with text extraction and analysis."""
def __init__(self):
"""Initialize the file processor."""
self.supported_formats = {
"txt": self._process_text_file,
"csv": self._process_csv_file,
"json": self._process_json_file,
"md": self._process_markdown_file,
}
def process_file(self, file_path: str, file_content: bytes = None) -> str:
"""
Process uploaded file and extract information.
Args:
file_path: Path to the uploaded file
file_content: Raw file content bytes
Returns:
String with processed file information
"""
try:
if not file_path and not file_content:
return "Error: No file provided"
# Determine file extension
if file_path:
file_ext = os.path.splitext(file_path)[1].lower().lstrip(".")
file_name = os.path.basename(file_path)
else:
file_ext = "unknown"
file_name = "uploaded_file"
# Read file content if not provided
if file_content is None and file_path:
try:
with open(file_path, "rb") as f:
file_content = f.read()
except Exception as e:
return f"Error reading file: {e!s}"
if not file_content:
return "Error: Empty file or could not read content"
# Get file size
file_size = len(file_content)
# Process based on file type
if file_ext in self.supported_formats:
content_analysis = self.supported_formats[file_ext](file_content)
else:
content_analysis = self._process_unknown_file(file_content)
# Build result
result = []
result.append("πŸ“ FILE PROCESSING RESULTS")
result.append("=" * 40)
result.append(f"File: {file_name}")
result.append(f"Type: {file_ext.upper() if file_ext else 'Unknown'}")
result.append(f"Size: {file_size:,} bytes ({self._format_size(file_size)})")
result.append("")
result.append("πŸ“„ CONTENT ANALYSIS:")
result.append("-" * 25)
result.append(content_analysis)
logger.info(f"Successfully processed file: {file_name} ({file_size} bytes)")
return "\n".join(result)
except Exception as e:
error_msg = f"Error processing file: {e!s}"
logger.error(error_msg)
return error_msg
def _process_text_file(self, content: bytes) -> str:
"""Process plain text file."""
try:
text = content.decode("utf-8", errors="ignore")
lines = text.split("\n")
analysis = []
analysis.append(f"Lines: {len(lines)}")
analysis.append(f"Characters: {len(text)}")
analysis.append(f"Words: {len(text.split())}")
analysis.append(f"Paragraphs: {len([line for line in lines if line.strip()])}")
analysis.append("")
analysis.append("πŸ“– CONTENT PREVIEW:")
analysis.append(text[:500] + ("..." if len(text) > 500 else ""))
return "\n".join(analysis)
except Exception as e:
return f"Error processing text file: {e!s}"
def _process_csv_file(self, content: bytes) -> str:
"""Process CSV file."""
try:
text = content.decode("utf-8", errors="ignore")
if PANDAS_AVAILABLE:
return self._process_csv_with_pandas(text)
return self._process_csv_basic(text)
except Exception as e:
return f"Error processing CSV file: {e!s}"
def _process_csv_with_pandas(self, text: str) -> str:
"""Process CSV using pandas for advanced analysis."""
try:
df = pd.read_csv(StringIO(text))
analysis = []
analysis.append(f"Rows: {len(df)}")
analysis.append(f"Columns: {len(df.columns)}")
analysis.append("")
analysis.append("πŸ“Š COLUMN INFORMATION:")
for col in df.columns:
dtype = str(df[col].dtype)
null_count = df[col].isnull().sum()
analysis.append(f" β€’ {col}: {dtype} ({null_count} nulls)")
analysis.append("")
analysis.append("πŸ“ˆ DATA PREVIEW:")
analysis.append(df.head().to_string())
# Basic statistics for numeric columns
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(numeric_cols) > 0:
analysis.append("")
analysis.append("πŸ“Š NUMERIC STATISTICS:")
analysis.append(df[numeric_cols].describe().to_string())
return "\n".join(analysis)
except Exception as e:
return f"Error in pandas CSV processing: {e!s}"
def _process_csv_basic(self, text: str) -> str:
"""Process CSV using basic Python CSV module."""
try:
reader = csv.reader(StringIO(text))
rows = list(reader)
if not rows:
return "Empty CSV file"
headers = rows[0] if rows else []
data_rows = rows[1:] if len(rows) > 1 else []
analysis = []
analysis.append(f"Rows: {len(data_rows)} (+ 1 header)")
analysis.append(f"Columns: {len(headers)}")
analysis.append("")
analysis.append("πŸ“‹ COLUMNS:")
for i, header in enumerate(headers):
analysis.append(f" {i+1}. {header}")
analysis.append("")
analysis.append("πŸ“„ SAMPLE DATA:")
for i, row in enumerate(data_rows[:5]):
analysis.append(f"Row {i+1}: {row}")
if len(data_rows) > 5:
analysis.append(f"... and {len(data_rows) - 5} more rows")
return "\n".join(analysis)
except Exception as e:
return f"Error in basic CSV processing: {e!s}"
def _process_json_file(self, content: bytes) -> str:
"""Process JSON file."""
try:
text = content.decode("utf-8", errors="ignore")
data = json.loads(text)
analysis = []
analysis.append(f"Type: {type(data).__name__}")
if isinstance(data, dict):
analysis.append(f"Keys: {len(data.keys())}")
analysis.append("")
analysis.append("πŸ—οΈ TOP-LEVEL KEYS:")
for key in list(data.keys())[:10]:
value_type = type(data[key]).__name__
analysis.append(f" β€’ {key}: {value_type}")
if len(data.keys()) > 10:
analysis.append(f" ... and {len(data.keys()) - 10} more keys")
elif isinstance(data, list):
analysis.append(f"Items: {len(data)}")
if data:
first_item_type = type(data[0]).__name__
analysis.append(f"First item type: {first_item_type}")
analysis.append("")
analysis.append("πŸ“„ CONTENT PREVIEW:")
preview = json.dumps(data, indent=2)[:800]
analysis.append(preview + ("..." if len(preview) >= 800 else ""))
return "\n".join(analysis)
except json.JSONDecodeError as e:
return f"Invalid JSON format: {e!s}"
except Exception as e:
return f"Error processing JSON: {e!s}"
def _process_markdown_file(self, content: bytes) -> str:
"""Process Markdown file."""
try:
text = content.decode("utf-8", errors="ignore")
lines = text.split("\n")
# Count markdown elements
headers = [line for line in lines if line.strip().startswith("#")]
links = len([line for line in lines if "[" in line and "](" in line])
code_blocks = text.count("```")
analysis = []
analysis.append(f"Lines: {len(lines)}")
analysis.append(f"Characters: {len(text)}")
analysis.append(f"Headers: {len(headers)}")
analysis.append(f"Links: {links}")
analysis.append(f"Code blocks: {code_blocks // 2}") # Divide by 2 (start + end)
analysis.append("")
if headers:
analysis.append("πŸ“‘ DOCUMENT STRUCTURE:")
for header in headers[:10]:
level = len(header) - len(header.lstrip("#"))
title = header.lstrip("# ").strip()
indent = " " * (level - 1)
analysis.append(f"{indent}β€’ {title}")
if len(headers) > 10:
analysis.append(f" ... and {len(headers) - 10} more headers")
analysis.append("")
analysis.append("πŸ“– CONTENT PREVIEW:")
analysis.append(text[:500] + ("..." if len(text) > 500 else ""))
return "\n".join(analysis)
except Exception as e:
return f"Error processing Markdown: {e!s}"
def _process_unknown_file(self, content: bytes) -> str:
"""Process unknown file type."""
try:
# Try to decode as text
try:
text = content.decode("utf-8", errors="ignore")
is_text = True
except:
is_text = False
analysis = []
if is_text and len(text.strip()) > 0:
analysis.append("Type: Text-based file")
analysis.append(f"Lines: {len(text.split())}")
analysis.append(f"Characters: {len(text)}")
analysis.append("")
analysis.append("πŸ“„ CONTENT PREVIEW:")
analysis.append(text[:300] + ("..." if len(text) > 300 else ""))
else:
analysis.append("Type: Binary file")
analysis.append("Content: Binary data (not text-readable)")
analysis.append(f"First 32 bytes: {content[:32].hex()}")
return "\n".join(analysis)
except Exception as e:
return f"Error processing unknown file: {e!s}"
def _format_size(self, size_bytes: int) -> str:
"""Format file size in human-readable format."""
for unit in ["B", "KB", "MB", "GB"]:
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} TB"
# Initialize the file processor
file_processor = FileProcessor()
def process_file_mcp(file) -> str:
"""
MCP-compatible file processing function.
Args:
file: Gradio File object
Returns:
String with file analysis results
"""
try:
if file is None:
return "Error: No file uploaded"
# Handle Gradio file input
file_path = file.name if hasattr(file, "name") else None
result = file_processor.process_file(file_path)
return result
except Exception as e:
error_msg = f"Error processing file: {e!s}"
logger.error(error_msg)
return error_msg
def create_gradio_interface():
"""Create and configure the Gradio interface."""
interface = gr.Interface(
fn=process_file_mcp,
inputs=[
gr.File(
label="Upload File",
file_types=[".txt", ".csv", ".json", ".md", ".py", ".js", ".html", ".xml"]
)
],
outputs=[
gr.Textbox(
label="File Analysis Results",
lines=20,
show_copy_button=True
)
],
title="πŸ“ MCP File Processor",
description="""
**File Processing MCP Server**
Upload and analyze various file types:
- **Text files**: Word count, content preview
- **CSV files**: Data analysis, column info, statistics
- **JSON files**: Structure analysis, key exploration
- **Markdown**: Document structure, headers, links
Supports: TXT, CSV, JSON, MD, PY, JS, HTML, XML files
""",
examples=[],
allow_flagging="never",
analytics_enabled=False
)
return interface
def main():
"""Main function to run the Gradio app."""
port = int(os.getenv("GRADIO_SERVER_PORT", 7860))
host = os.getenv("GRADIO_SERVER_NAME", "0.0.0.0")
logger.info(f"Starting MCP File Processor on {host}:{port}")
interface = create_gradio_interface()
interface.launch(
server_name=host,
server_port=port,
share=False,
debug=False,
quiet=False,
show_error=True
)
if __name__ == "__main__":
main()