#!/usr/bin/env python3 """ MCP File Processor Tool - Gradio Implementation This MCP server provides file processing capabilities including: - Text extraction from various file formats - CSV data analysis - File format conversion - Document processing Supports MCP protocol via Gradio interface. """ import csv import json import logging import os from io import StringIO import gradio as gr # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) try: import pandas as pd PANDAS_AVAILABLE = True except ImportError: PANDAS_AVAILABLE = False logger.warning("Pandas not available, limited CSV analysis") class FileProcessor: """File processing service with text extraction and analysis.""" def __init__(self): """Initialize the file processor.""" self.supported_formats = { "txt": self._process_text_file, "csv": self._process_csv_file, "json": self._process_json_file, "md": self._process_markdown_file, } def process_file(self, file_path: str, file_content: bytes = None) -> str: """ Process uploaded file and extract information. Args: file_path: Path to the uploaded file file_content: Raw file content bytes Returns: String with processed file information """ try: if not file_path and not file_content: return "Error: No file provided" # Determine file extension if file_path: file_ext = os.path.splitext(file_path)[1].lower().lstrip(".") file_name = os.path.basename(file_path) else: file_ext = "unknown" file_name = "uploaded_file" # Read file content if not provided if file_content is None and file_path: try: with open(file_path, "rb") as f: file_content = f.read() except Exception as e: return f"Error reading file: {e!s}" if not file_content: return "Error: Empty file or could not read content" # Get file size file_size = len(file_content) # Process based on file type if file_ext in self.supported_formats: content_analysis = self.supported_formats[file_ext](file_content) else: content_analysis = self._process_unknown_file(file_content) # Build result result = [] result.append("📁 FILE PROCESSING RESULTS") result.append("=" * 40) result.append(f"File: {file_name}") result.append(f"Type: {file_ext.upper() if file_ext else 'Unknown'}") result.append(f"Size: {file_size:,} bytes ({self._format_size(file_size)})") result.append("") result.append("📄 CONTENT ANALYSIS:") result.append("-" * 25) result.append(content_analysis) logger.info(f"Successfully processed file: {file_name} ({file_size} bytes)") return "\n".join(result) except Exception as e: error_msg = f"Error processing file: {e!s}" logger.error(error_msg) return error_msg def _process_text_file(self, content: bytes) -> str: """Process plain text file.""" try: text = content.decode("utf-8", errors="ignore") lines = text.split("\n") analysis = [] analysis.append(f"Lines: {len(lines)}") analysis.append(f"Characters: {len(text)}") analysis.append(f"Words: {len(text.split())}") analysis.append(f"Paragraphs: {len([line for line in lines if line.strip()])}") analysis.append("") analysis.append("📖 CONTENT PREVIEW:") analysis.append(text[:500] + ("..." if len(text) > 500 else "")) return "\n".join(analysis) except Exception as e: return f"Error processing text file: {e!s}" def _process_csv_file(self, content: bytes) -> str: """Process CSV file.""" try: text = content.decode("utf-8", errors="ignore") if PANDAS_AVAILABLE: return self._process_csv_with_pandas(text) return self._process_csv_basic(text) except Exception as e: return f"Error processing CSV file: {e!s}" def _process_csv_with_pandas(self, text: str) -> str: """Process CSV using pandas for advanced analysis.""" try: df = pd.read_csv(StringIO(text)) analysis = [] analysis.append(f"Rows: {len(df)}") analysis.append(f"Columns: {len(df.columns)}") analysis.append("") analysis.append("📊 COLUMN INFORMATION:") for col in df.columns: dtype = str(df[col].dtype) null_count = df[col].isnull().sum() analysis.append(f" • {col}: {dtype} ({null_count} nulls)") analysis.append("") analysis.append("📈 DATA PREVIEW:") analysis.append(df.head().to_string()) # Basic statistics for numeric columns numeric_cols = df.select_dtypes(include=["number"]).columns if len(numeric_cols) > 0: analysis.append("") analysis.append("📊 NUMERIC STATISTICS:") analysis.append(df[numeric_cols].describe().to_string()) return "\n".join(analysis) except Exception as e: return f"Error in pandas CSV processing: {e!s}" def _process_csv_basic(self, text: str) -> str: """Process CSV using basic Python CSV module.""" try: reader = csv.reader(StringIO(text)) rows = list(reader) if not rows: return "Empty CSV file" headers = rows[0] if rows else [] data_rows = rows[1:] if len(rows) > 1 else [] analysis = [] analysis.append(f"Rows: {len(data_rows)} (+ 1 header)") analysis.append(f"Columns: {len(headers)}") analysis.append("") analysis.append("📋 COLUMNS:") for i, header in enumerate(headers): analysis.append(f" {i+1}. {header}") analysis.append("") analysis.append("📄 SAMPLE DATA:") for i, row in enumerate(data_rows[:5]): analysis.append(f"Row {i+1}: {row}") if len(data_rows) > 5: analysis.append(f"... and {len(data_rows) - 5} more rows") return "\n".join(analysis) except Exception as e: return f"Error in basic CSV processing: {e!s}" def _process_json_file(self, content: bytes) -> str: """Process JSON file.""" try: text = content.decode("utf-8", errors="ignore") data = json.loads(text) analysis = [] analysis.append(f"Type: {type(data).__name__}") if isinstance(data, dict): analysis.append(f"Keys: {len(data.keys())}") analysis.append("") analysis.append("🗝️ TOP-LEVEL KEYS:") for key in list(data.keys())[:10]: value_type = type(data[key]).__name__ analysis.append(f" • {key}: {value_type}") if len(data.keys()) > 10: analysis.append(f" ... and {len(data.keys()) - 10} more keys") elif isinstance(data, list): analysis.append(f"Items: {len(data)}") if data: first_item_type = type(data[0]).__name__ analysis.append(f"First item type: {first_item_type}") analysis.append("") analysis.append("📄 CONTENT PREVIEW:") preview = json.dumps(data, indent=2)[:800] analysis.append(preview + ("..." if len(preview) >= 800 else "")) return "\n".join(analysis) except json.JSONDecodeError as e: return f"Invalid JSON format: {e!s}" except Exception as e: return f"Error processing JSON: {e!s}" def _process_markdown_file(self, content: bytes) -> str: """Process Markdown file.""" try: text = content.decode("utf-8", errors="ignore") lines = text.split("\n") # Count markdown elements headers = [line for line in lines if line.strip().startswith("#")] links = len([line for line in lines if "[" in line and "](" in line]) code_blocks = text.count("```") analysis = [] analysis.append(f"Lines: {len(lines)}") analysis.append(f"Characters: {len(text)}") analysis.append(f"Headers: {len(headers)}") analysis.append(f"Links: {links}") analysis.append(f"Code blocks: {code_blocks // 2}") # Divide by 2 (start + end) analysis.append("") if headers: analysis.append("📑 DOCUMENT STRUCTURE:") for header in headers[:10]: level = len(header) - len(header.lstrip("#")) title = header.lstrip("# ").strip() indent = " " * (level - 1) analysis.append(f"{indent}• {title}") if len(headers) > 10: analysis.append(f" ... and {len(headers) - 10} more headers") analysis.append("") analysis.append("📖 CONTENT PREVIEW:") analysis.append(text[:500] + ("..." if len(text) > 500 else "")) return "\n".join(analysis) except Exception as e: return f"Error processing Markdown: {e!s}" def _process_unknown_file(self, content: bytes) -> str: """Process unknown file type.""" try: # Try to decode as text try: text = content.decode("utf-8", errors="ignore") is_text = True except: is_text = False analysis = [] if is_text and len(text.strip()) > 0: analysis.append("Type: Text-based file") analysis.append(f"Lines: {len(text.split())}") analysis.append(f"Characters: {len(text)}") analysis.append("") analysis.append("📄 CONTENT PREVIEW:") analysis.append(text[:300] + ("..." if len(text) > 300 else "")) else: analysis.append("Type: Binary file") analysis.append("Content: Binary data (not text-readable)") analysis.append(f"First 32 bytes: {content[:32].hex()}") return "\n".join(analysis) except Exception as e: return f"Error processing unknown file: {e!s}" def _format_size(self, size_bytes: int) -> str: """Format file size in human-readable format.""" for unit in ["B", "KB", "MB", "GB"]: if size_bytes < 1024: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024 return f"{size_bytes:.1f} TB" # Initialize the file processor file_processor = FileProcessor() def process_file_mcp(file) -> str: """ MCP-compatible file processing function. Args: file: Gradio File object Returns: String with file analysis results """ try: if file is None: return "Error: No file uploaded" # Handle Gradio file input file_path = file.name if hasattr(file, "name") else None result = file_processor.process_file(file_path) return result except Exception as e: error_msg = f"Error processing file: {e!s}" logger.error(error_msg) return error_msg def create_gradio_interface(): """Create and configure the Gradio interface.""" interface = gr.Interface( fn=process_file_mcp, inputs=[ gr.File( label="Upload File", file_types=[".txt", ".csv", ".json", ".md", ".py", ".js", ".html", ".xml"] ) ], outputs=[ gr.Textbox( label="File Analysis Results", lines=20, show_copy_button=True ) ], title="📁 MCP File Processor", description=""" **File Processing MCP Server** Upload and analyze various file types: - **Text files**: Word count, content preview - **CSV files**: Data analysis, column info, statistics - **JSON files**: Structure analysis, key exploration - **Markdown**: Document structure, headers, links Supports: TXT, CSV, JSON, MD, PY, JS, HTML, XML files """, examples=[], allow_flagging="never", analytics_enabled=False ) return interface def main(): """Main function to run the Gradio app.""" port = int(os.getenv("GRADIO_SERVER_PORT", 7860)) host = os.getenv("GRADIO_SERVER_NAME", "0.0.0.0") logger.info(f"Starting MCP File Processor on {host}:{port}") interface = create_gradio_interface() interface.launch( server_name=host, server_port=port, share=False, debug=False, quiet=False, show_error=True ) if __name__ == "__main__": main()