|
|
|
|
|
""" |
|
|
MCP File Processor Tool - Gradio Implementation |
|
|
|
|
|
This MCP server provides file processing capabilities including: |
|
|
- Text extraction from various file formats |
|
|
- CSV data analysis |
|
|
- File format conversion |
|
|
- Document processing |
|
|
|
|
|
Supports MCP protocol via Gradio interface. |
|
|
""" |
|
|
|
|
|
import csv |
|
|
import json |
|
|
import logging |
|
|
import os |
|
|
from io import StringIO |
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
try: |
|
|
import pandas as pd |
|
|
PANDAS_AVAILABLE = True |
|
|
except ImportError: |
|
|
PANDAS_AVAILABLE = False |
|
|
logger.warning("Pandas not available, limited CSV analysis") |
|
|
|
|
|
|
|
|
class FileProcessor: |
|
|
"""File processing service with text extraction and analysis.""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the file processor.""" |
|
|
self.supported_formats = { |
|
|
"txt": self._process_text_file, |
|
|
"csv": self._process_csv_file, |
|
|
"json": self._process_json_file, |
|
|
"md": self._process_markdown_file, |
|
|
} |
|
|
|
|
|
def process_file(self, file_path: str, file_content: bytes = None) -> str: |
|
|
""" |
|
|
Process uploaded file and extract information. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the uploaded file |
|
|
file_content: Raw file content bytes |
|
|
|
|
|
Returns: |
|
|
String with processed file information |
|
|
""" |
|
|
try: |
|
|
if not file_path and not file_content: |
|
|
return "Error: No file provided" |
|
|
|
|
|
|
|
|
if file_path: |
|
|
file_ext = os.path.splitext(file_path)[1].lower().lstrip(".") |
|
|
file_name = os.path.basename(file_path) |
|
|
else: |
|
|
file_ext = "unknown" |
|
|
file_name = "uploaded_file" |
|
|
|
|
|
|
|
|
if file_content is None and file_path: |
|
|
try: |
|
|
with open(file_path, "rb") as f: |
|
|
file_content = f.read() |
|
|
except Exception as e: |
|
|
return f"Error reading file: {e!s}" |
|
|
|
|
|
if not file_content: |
|
|
return "Error: Empty file or could not read content" |
|
|
|
|
|
|
|
|
file_size = len(file_content) |
|
|
|
|
|
|
|
|
if file_ext in self.supported_formats: |
|
|
content_analysis = self.supported_formats[file_ext](file_content) |
|
|
else: |
|
|
content_analysis = self._process_unknown_file(file_content) |
|
|
|
|
|
|
|
|
result = [] |
|
|
result.append("π FILE PROCESSING RESULTS") |
|
|
result.append("=" * 40) |
|
|
result.append(f"File: {file_name}") |
|
|
result.append(f"Type: {file_ext.upper() if file_ext else 'Unknown'}") |
|
|
result.append(f"Size: {file_size:,} bytes ({self._format_size(file_size)})") |
|
|
result.append("") |
|
|
result.append("π CONTENT ANALYSIS:") |
|
|
result.append("-" * 25) |
|
|
result.append(content_analysis) |
|
|
|
|
|
logger.info(f"Successfully processed file: {file_name} ({file_size} bytes)") |
|
|
return "\n".join(result) |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error processing file: {e!s}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
def _process_text_file(self, content: bytes) -> str: |
|
|
"""Process plain text file.""" |
|
|
try: |
|
|
text = content.decode("utf-8", errors="ignore") |
|
|
lines = text.split("\n") |
|
|
|
|
|
analysis = [] |
|
|
analysis.append(f"Lines: {len(lines)}") |
|
|
analysis.append(f"Characters: {len(text)}") |
|
|
analysis.append(f"Words: {len(text.split())}") |
|
|
analysis.append(f"Paragraphs: {len([line for line in lines if line.strip()])}") |
|
|
analysis.append("") |
|
|
analysis.append("π CONTENT PREVIEW:") |
|
|
analysis.append(text[:500] + ("..." if len(text) > 500 else "")) |
|
|
|
|
|
return "\n".join(analysis) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error processing text file: {e!s}" |
|
|
|
|
|
def _process_csv_file(self, content: bytes) -> str: |
|
|
"""Process CSV file.""" |
|
|
try: |
|
|
text = content.decode("utf-8", errors="ignore") |
|
|
|
|
|
if PANDAS_AVAILABLE: |
|
|
return self._process_csv_with_pandas(text) |
|
|
return self._process_csv_basic(text) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error processing CSV file: {e!s}" |
|
|
|
|
|
def _process_csv_with_pandas(self, text: str) -> str: |
|
|
"""Process CSV using pandas for advanced analysis.""" |
|
|
try: |
|
|
df = pd.read_csv(StringIO(text)) |
|
|
|
|
|
analysis = [] |
|
|
analysis.append(f"Rows: {len(df)}") |
|
|
analysis.append(f"Columns: {len(df.columns)}") |
|
|
analysis.append("") |
|
|
analysis.append("π COLUMN INFORMATION:") |
|
|
for col in df.columns: |
|
|
dtype = str(df[col].dtype) |
|
|
null_count = df[col].isnull().sum() |
|
|
analysis.append(f" β’ {col}: {dtype} ({null_count} nulls)") |
|
|
|
|
|
analysis.append("") |
|
|
analysis.append("π DATA PREVIEW:") |
|
|
analysis.append(df.head().to_string()) |
|
|
|
|
|
|
|
|
numeric_cols = df.select_dtypes(include=["number"]).columns |
|
|
if len(numeric_cols) > 0: |
|
|
analysis.append("") |
|
|
analysis.append("π NUMERIC STATISTICS:") |
|
|
analysis.append(df[numeric_cols].describe().to_string()) |
|
|
|
|
|
return "\n".join(analysis) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error in pandas CSV processing: {e!s}" |
|
|
|
|
|
def _process_csv_basic(self, text: str) -> str: |
|
|
"""Process CSV using basic Python CSV module.""" |
|
|
try: |
|
|
reader = csv.reader(StringIO(text)) |
|
|
rows = list(reader) |
|
|
|
|
|
if not rows: |
|
|
return "Empty CSV file" |
|
|
|
|
|
headers = rows[0] if rows else [] |
|
|
data_rows = rows[1:] if len(rows) > 1 else [] |
|
|
|
|
|
analysis = [] |
|
|
analysis.append(f"Rows: {len(data_rows)} (+ 1 header)") |
|
|
analysis.append(f"Columns: {len(headers)}") |
|
|
analysis.append("") |
|
|
analysis.append("π COLUMNS:") |
|
|
for i, header in enumerate(headers): |
|
|
analysis.append(f" {i+1}. {header}") |
|
|
|
|
|
analysis.append("") |
|
|
analysis.append("π SAMPLE DATA:") |
|
|
for i, row in enumerate(data_rows[:5]): |
|
|
analysis.append(f"Row {i+1}: {row}") |
|
|
|
|
|
if len(data_rows) > 5: |
|
|
analysis.append(f"... and {len(data_rows) - 5} more rows") |
|
|
|
|
|
return "\n".join(analysis) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error in basic CSV processing: {e!s}" |
|
|
|
|
|
def _process_json_file(self, content: bytes) -> str: |
|
|
"""Process JSON file.""" |
|
|
try: |
|
|
text = content.decode("utf-8", errors="ignore") |
|
|
data = json.loads(text) |
|
|
|
|
|
analysis = [] |
|
|
analysis.append(f"Type: {type(data).__name__}") |
|
|
|
|
|
if isinstance(data, dict): |
|
|
analysis.append(f"Keys: {len(data.keys())}") |
|
|
analysis.append("") |
|
|
analysis.append("ποΈ TOP-LEVEL KEYS:") |
|
|
for key in list(data.keys())[:10]: |
|
|
value_type = type(data[key]).__name__ |
|
|
analysis.append(f" β’ {key}: {value_type}") |
|
|
if len(data.keys()) > 10: |
|
|
analysis.append(f" ... and {len(data.keys()) - 10} more keys") |
|
|
|
|
|
elif isinstance(data, list): |
|
|
analysis.append(f"Items: {len(data)}") |
|
|
if data: |
|
|
first_item_type = type(data[0]).__name__ |
|
|
analysis.append(f"First item type: {first_item_type}") |
|
|
|
|
|
analysis.append("") |
|
|
analysis.append("π CONTENT PREVIEW:") |
|
|
preview = json.dumps(data, indent=2)[:800] |
|
|
analysis.append(preview + ("..." if len(preview) >= 800 else "")) |
|
|
|
|
|
return "\n".join(analysis) |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
return f"Invalid JSON format: {e!s}" |
|
|
except Exception as e: |
|
|
return f"Error processing JSON: {e!s}" |
|
|
|
|
|
def _process_markdown_file(self, content: bytes) -> str: |
|
|
"""Process Markdown file.""" |
|
|
try: |
|
|
text = content.decode("utf-8", errors="ignore") |
|
|
lines = text.split("\n") |
|
|
|
|
|
|
|
|
headers = [line for line in lines if line.strip().startswith("#")] |
|
|
links = len([line for line in lines if "[" in line and "](" in line]) |
|
|
code_blocks = text.count("```") |
|
|
|
|
|
analysis = [] |
|
|
analysis.append(f"Lines: {len(lines)}") |
|
|
analysis.append(f"Characters: {len(text)}") |
|
|
analysis.append(f"Headers: {len(headers)}") |
|
|
analysis.append(f"Links: {links}") |
|
|
analysis.append(f"Code blocks: {code_blocks // 2}") |
|
|
analysis.append("") |
|
|
|
|
|
if headers: |
|
|
analysis.append("π DOCUMENT STRUCTURE:") |
|
|
for header in headers[:10]: |
|
|
level = len(header) - len(header.lstrip("#")) |
|
|
title = header.lstrip("# ").strip() |
|
|
indent = " " * (level - 1) |
|
|
analysis.append(f"{indent}β’ {title}") |
|
|
if len(headers) > 10: |
|
|
analysis.append(f" ... and {len(headers) - 10} more headers") |
|
|
|
|
|
analysis.append("") |
|
|
analysis.append("π CONTENT PREVIEW:") |
|
|
analysis.append(text[:500] + ("..." if len(text) > 500 else "")) |
|
|
|
|
|
return "\n".join(analysis) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error processing Markdown: {e!s}" |
|
|
|
|
|
def _process_unknown_file(self, content: bytes) -> str: |
|
|
"""Process unknown file type.""" |
|
|
try: |
|
|
|
|
|
try: |
|
|
text = content.decode("utf-8", errors="ignore") |
|
|
is_text = True |
|
|
except: |
|
|
is_text = False |
|
|
|
|
|
analysis = [] |
|
|
if is_text and len(text.strip()) > 0: |
|
|
analysis.append("Type: Text-based file") |
|
|
analysis.append(f"Lines: {len(text.split())}") |
|
|
analysis.append(f"Characters: {len(text)}") |
|
|
analysis.append("") |
|
|
analysis.append("π CONTENT PREVIEW:") |
|
|
analysis.append(text[:300] + ("..." if len(text) > 300 else "")) |
|
|
else: |
|
|
analysis.append("Type: Binary file") |
|
|
analysis.append("Content: Binary data (not text-readable)") |
|
|
analysis.append(f"First 32 bytes: {content[:32].hex()}") |
|
|
|
|
|
return "\n".join(analysis) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error processing unknown file: {e!s}" |
|
|
|
|
|
def _format_size(self, size_bytes: int) -> str: |
|
|
"""Format file size in human-readable format.""" |
|
|
for unit in ["B", "KB", "MB", "GB"]: |
|
|
if size_bytes < 1024: |
|
|
return f"{size_bytes:.1f} {unit}" |
|
|
size_bytes /= 1024 |
|
|
return f"{size_bytes:.1f} TB" |
|
|
|
|
|
|
|
|
|
|
|
file_processor = FileProcessor() |
|
|
|
|
|
|
|
|
def process_file_mcp(file) -> str: |
|
|
""" |
|
|
MCP-compatible file processing function. |
|
|
|
|
|
Args: |
|
|
file: Gradio File object |
|
|
|
|
|
Returns: |
|
|
String with file analysis results |
|
|
""" |
|
|
try: |
|
|
if file is None: |
|
|
return "Error: No file uploaded" |
|
|
|
|
|
|
|
|
file_path = file.name if hasattr(file, "name") else None |
|
|
|
|
|
result = file_processor.process_file(file_path) |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error processing file: {e!s}" |
|
|
logger.error(error_msg) |
|
|
return error_msg |
|
|
|
|
|
|
|
|
def create_gradio_interface(): |
|
|
"""Create and configure the Gradio interface.""" |
|
|
|
|
|
interface = gr.Interface( |
|
|
fn=process_file_mcp, |
|
|
inputs=[ |
|
|
gr.File( |
|
|
label="Upload File", |
|
|
file_types=[".txt", ".csv", ".json", ".md", ".py", ".js", ".html", ".xml"] |
|
|
) |
|
|
], |
|
|
outputs=[ |
|
|
gr.Textbox( |
|
|
label="File Analysis Results", |
|
|
lines=20, |
|
|
show_copy_button=True |
|
|
) |
|
|
], |
|
|
title="π MCP File Processor", |
|
|
description=""" |
|
|
**File Processing MCP Server** |
|
|
|
|
|
Upload and analyze various file types: |
|
|
- **Text files**: Word count, content preview |
|
|
- **CSV files**: Data analysis, column info, statistics |
|
|
- **JSON files**: Structure analysis, key exploration |
|
|
- **Markdown**: Document structure, headers, links |
|
|
|
|
|
Supports: TXT, CSV, JSON, MD, PY, JS, HTML, XML files |
|
|
""", |
|
|
examples=[], |
|
|
allow_flagging="never", |
|
|
analytics_enabled=False |
|
|
) |
|
|
|
|
|
return interface |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main function to run the Gradio app.""" |
|
|
port = int(os.getenv("GRADIO_SERVER_PORT", 7860)) |
|
|
host = os.getenv("GRADIO_SERVER_NAME", "0.0.0.0") |
|
|
|
|
|
logger.info(f"Starting MCP File Processor on {host}:{port}") |
|
|
|
|
|
interface = create_gradio_interface() |
|
|
|
|
|
interface.launch( |
|
|
server_name=host, |
|
|
server_port=port, |
|
|
share=False, |
|
|
debug=False, |
|
|
quiet=False, |
|
|
show_error=True |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|