Spaces:
Runtime error
Runtime error
| from mcp.server.fastmcp import FastMCP | |
| import httpx | |
| import os | |
| # SAP API Configuration | |
| SAP_API_URL = "https://sandbox.api.sap.com/s4hanacloud/sap/opu/odata4/sap/api_purchaserequisition_2/srvd_a2x/sap/purchaserequisition/0001/PurchaseReqn" | |
| SAP_API_KEY = os.getenv("SAP_API_KEY", "your_sap_api_key_here") | |
| # Initialize FastMCP server | |
| mcp = FastMCP("SAP_PurchaseRequisition") | |
| async def get_purchase_requisitions(top: int = 50) -> dict: | |
| """ | |
| Fetch purchase requisitions from SAP S/4HANA Cloud API. | |
| Args: | |
| top: Number of records to fetch (default 50, max 100) | |
| Returns: | |
| Dictionary containing purchase requisition data | |
| """ | |
| headers = { | |
| "APIKey": SAP_API_KEY, | |
| "Accept": "application/json" | |
| } | |
| params = { | |
| "$top": min(top, 100) # Limit to max 100 records | |
| } | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| try: | |
| response = await client.get( | |
| SAP_API_URL, | |
| headers=headers, | |
| params=params | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| return { | |
| "status": "success", | |
| "data": data, | |
| "count": len(data.get("value", [])) | |
| } | |
| except httpx.HTTPStatusError as e: | |
| return { | |
| "status": "error", | |
| "message": f"HTTP error occurred: {e.response.status_code}", | |
| "details": str(e) | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": "Failed to fetch purchase requisitions", | |
| "details": str(e) | |
| } | |
| async def get_purchase_requisition_summary(top: int = 10) -> str: | |
| """ | |
| Get a summary of recent purchase requisitions. | |
| Args: | |
| top: Number of records to summarize (default 10) | |
| Returns: | |
| Formatted summary string | |
| """ | |
| result = await get_purchase_requisitions(top) | |
| if result["status"] == "error": | |
| return f"Error: {result['message']}" | |
| items = result["data"].get("value", []) | |
| if not items: | |
| return "No purchase requisitions found." | |
| summary = f"Found {result['count']} purchase requisitions:\n\n" | |
| for idx, item in enumerate(items, 1): | |
| pr_number = item.get("PurchaseRequisition", "N/A") | |
| description = item.get("PurchaseRequisitionDescription", "No description") | |
| summary += f"{idx}. PR#{pr_number}: {description}\n" | |
| return summary | |
| if __name__ == "__main__": | |
| # Run with streamable HTTP transport for Hugging Face Spaces | |
| mcp.run(transport="streamable-http", port=7860) | |