PD03's picture
Update app.py
db17f98 verified
raw
history blame
2.73 kB
from mcp.server.fastmcp import FastMCP
import httpx
import os
# SAP API Configuration
SAP_API_URL = "https://sandbox.api.sap.com/s4hanacloud/sap/opu/odata4/sap/api_purchaserequisition_2/srvd_a2x/sap/purchaserequisition/0001/PurchaseReqn"
SAP_API_KEY = os.getenv("SAP_API_KEY", "your_sap_api_key_here")
# Initialize FastMCP server
mcp = FastMCP("SAP_PurchaseRequisition")
@mcp.tool()
async def get_purchase_requisitions(top: int = 50) -> dict:
"""
Fetch purchase requisitions from SAP S/4HANA Cloud API.
Args:
top: Number of records to fetch (default 50, max 100)
Returns:
Dictionary containing purchase requisition data
"""
headers = {
"APIKey": SAP_API_KEY,
"Accept": "application/json"
}
params = {
"$top": min(top, 100) # Limit to max 100 records
}
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.get(
SAP_API_URL,
headers=headers,
params=params
)
response.raise_for_status()
data = response.json()
return {
"status": "success",
"data": data,
"count": len(data.get("value", []))
}
except httpx.HTTPStatusError as e:
return {
"status": "error",
"message": f"HTTP error occurred: {e.response.status_code}",
"details": str(e)
}
except Exception as e:
return {
"status": "error",
"message": "Failed to fetch purchase requisitions",
"details": str(e)
}
@mcp.tool()
async def get_purchase_requisition_summary(top: int = 10) -> str:
"""
Get a summary of recent purchase requisitions.
Args:
top: Number of records to summarize (default 10)
Returns:
Formatted summary string
"""
result = await get_purchase_requisitions(top)
if result["status"] == "error":
return f"Error: {result['message']}"
items = result["data"].get("value", [])
if not items:
return "No purchase requisitions found."
summary = f"Found {result['count']} purchase requisitions:\n\n"
for idx, item in enumerate(items, 1):
pr_number = item.get("PurchaseRequisition", "N/A")
description = item.get("PurchaseRequisitionDescription", "No description")
summary += f"{idx}. PR#{pr_number}: {description}\n"
return summary
if __name__ == "__main__":
# Run with streamable HTTP transport for Hugging Face Spaces
mcp.run(transport="streamable-http", port=7860)