Spaces:
Build error
Build error
| # app.py | |
| import os | |
| import json | |
| import requests | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI | |
| from mcp.server import Server | |
| from mcp.server.http import make_http_app | |
| from mcp.types import Tool, TextContent | |
| # ββ 1) CONFIG βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| load_dotenv() | |
| SAP_KEY = os.getenv("SAP_API_KEY") | |
| SAP_BASE = os.getenv("SAP_API_BASE_URL", "").rstrip("/") | |
| if not SAP_KEY or not SAP_BASE: | |
| raise RuntimeError("Set SAP_API_KEY and SAP_API_BASE_URL in your env vars") | |
| # ββ 2) MCP SERVER SETUP βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| mcp = Server("sap-mcp") | |
| async def list_tools(): | |
| return [ | |
| # your existing generic GET tool | |
| Tool( | |
| name="sap_api_get", | |
| description="GET any SAP endpoint under your base URL", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "endpoint": {"type": "string"}, | |
| "params": {"type": "object", "additionalProperties": {"type": "string"}} | |
| }, | |
| "required": ["endpoint"] | |
| } | |
| ), | |
| # OData queryβstyle tool | |
| Tool( | |
| name="sap_odata_query", | |
| description="Generic OData query ($filter, $select, $top, β¦)", | |
| inputSchema={ | |
| "type":"object", | |
| "properties":{ | |
| "service": {"type":"string"}, | |
| "select": {"type":"string"}, | |
| "filter": {"type":"string"}, | |
| "top": {"type":"integer"}, | |
| "skip": {"type":"integer"}, | |
| "orderby": {"type":"string"} | |
| }, | |
| "required":["service"] | |
| } | |
| ), | |
| # Business partner demo tool | |
| Tool( | |
| name="sap_business_partner", | |
| description="Query Business Partner data", | |
| inputSchema={ | |
| "type":"object", | |
| "properties":{ | |
| "partner_id": {"type":"string"}, | |
| "company_name": {"type":"string"}, | |
| "limit": {"type":"integer"} | |
| } | |
| } | |
| ), | |
| # Metadata tool | |
| Tool( | |
| name="sap_metadata", | |
| description="Fetch $metadata for a given service", | |
| inputSchema={ | |
| "type":"object", | |
| "properties":{ | |
| "service": {"type":"string"} | |
| } | |
| } | |
| ), | |
| # ββ NEW: Purchase Requisition tool | |
| Tool( | |
| name="purchase_requisition", | |
| description="List or filter Purchase Requisitions (CE_PURCHASEREQUISITION_0001)", | |
| inputSchema={ | |
| "type":"object", | |
| "properties":{ | |
| "filter": {"type":"string"}, | |
| "select": {"type":"string"}, | |
| "top": {"type":"integer"}, | |
| "skip": {"type":"integer"}, | |
| "orderby": {"type":"string"} | |
| } | |
| } | |
| ), | |
| # ββ NEW: Purchase Order tool | |
| Tool( | |
| name="purchase_order", | |
| description="List or filter Purchase Orders (CE_PURCHASEORDER_0001)", | |
| inputSchema={ | |
| "type":"object", | |
| "properties":{ | |
| "filter": {"type":"string"}, | |
| "select": {"type":"string"}, | |
| "top": {"type":"integer"}, | |
| "skip": {"type":"integer"}, | |
| "orderby": {"type":"string"} | |
| } | |
| } | |
| ), | |
| ] | |
| async def call_tool(name: str, arguments: dict): | |
| """ | |
| Routes the six tools above into HTTP GETs against your SAP_BASE. | |
| """ | |
| # helper to build OData params | |
| def odata_params(args): | |
| p = {} | |
| for k in ("select","filter","top","skip","orderby"): | |
| v = args.get(k) | |
| if v is not None: | |
| p[f"${k}"] = str(v) | |
| p["$format"] = "json" | |
| return p | |
| # 1) Generic GET | |
| if name == "sap_api_get": | |
| url = f"{SAP_BASE}/{arguments['endpoint']}" | |
| resp = requests.get(url, headers={"APIKey":SAP_KEY}, params=arguments.get("params",{})) | |
| resp.raise_for_status() | |
| return [TextContent(type="text", text=json.dumps(resp.json(), indent=2))] | |
| # 2) Generic OData | |
| if name == "sap_odata_query": | |
| url = f"{SAP_BASE}/{arguments['service']}" | |
| resp = requests.get( | |
| url, | |
| headers={"APIKey":SAP_KEY}, | |
| params=odata_params(arguments) | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| # unwrap OData v2/v4 | |
| if isinstance(data, dict) and "d" in data: | |
| data = data["d"].get("results", data["d"]) | |
| return [TextContent(type="text", text=json.dumps(data, indent=2))] | |
| # 3) Business Partner (mock/demo) | |
| if name == "sap_business_partner": | |
| # β¦your existing logicβ¦ | |
| return [TextContent(type="text", text="(business partner demo)β¦")] | |
| # 4) Metadata | |
| if name == "sap_metadata": | |
| svc = arguments.get("service") | |
| url = f"{SAP_BASE}/{svc}/$metadata" if svc else f"{SAP_BASE}/$metadata" | |
| resp = requests.get(url, headers={"APIKey":SAP_KEY}) | |
| resp.raise_for_status() | |
| return [TextContent(type="text", text=resp.text)] | |
| # ββ NEW TOOL: Purchase Requisition | |
| if name == "purchase_requisition": | |
| url = f"{SAP_BASE}/PurchaseRequisition" | |
| resp = requests.get(url, headers={"APIKey":SAP_KEY}, params=odata_params(arguments)) | |
| resp.raise_for_status() | |
| data = resp.json().get("d", {}).get("results", resp.json()) | |
| return [TextContent(type="text", text=json.dumps(data, indent=2))] | |
| # ββ NEW TOOL: Purchase Order | |
| if name == "purchase_order": | |
| url = f"{SAP_BASE}/PurchaseOrder" | |
| resp = requests.get(url, headers={"APIKey":SAP_KEY}, params=odata_params(arguments)) | |
| resp.raise_for_status() | |
| data = resp.json().get("d", {}).get("results", resp.json()) | |
| return [TextContent(type="text", text=json.dumps(data, indent=2))] | |
| raise ValueError(f"Unknown tool: {name}") | |
| # wrap MCP in FastAPI under /mcp | |
| mcp_app = make_http_app(mcp, prefix="/mcp") | |
| # ββ 3) GRADIO UI βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # list all six tools in the dropdown | |
| TOOLS = [ | |
| "sap_api_get", | |
| "sap_odata_query", | |
| "sap_business_partner", | |
| "sap_metadata", | |
| "purchase_requisition", | |
| "purchase_order", | |
| ] | |
| def execute_tool(tool, endpoint, params_json, | |
| service, select, filter_, top, skip, orderby, | |
| partner_id, company_name, limit): | |
| # build args exactly as the MCP server expects | |
| args = {} | |
| if tool == "sap_api_get": | |
| args["endpoint"] = endpoint | |
| try: | |
| args["params"] = json.loads(params_json or "{}") | |
| except: | |
| return "β Invalid JSON in params" | |
| elif tool == "sap_odata_query": | |
| args = {k:v for k,v in { | |
| "service": service, "select": select, "filter": filter_, | |
| "top": top, "skip": skip, "orderby": orderby | |
| }.items() if v not in (None,"")} | |
| elif tool == "sap_business_partner": | |
| args = {k:v for k,v in { | |
| "partner_id":partner_id, "company_name":company_name, "limit":limit | |
| }.items() if v not in (None,"")} | |
| elif tool == "sap_metadata": | |
| if service: args["service"] = service | |
| elif tool in ("purchase_requisition","purchase_order"): | |
| args = {k:v for k,v in { | |
| "select": select, "filter": filter_, "top": top, "skip": skip, "orderby": orderby | |
| }.items() if v not in (None,"")} | |
| # call your MCP HTTP endpoint | |
| payload = { | |
| "jsonrpc":"2.0", | |
| "method":"call_tool", | |
| "params":{"name":tool,"arguments":args}, | |
| "id":1 | |
| } | |
| r = requests.post("http://localhost:7860/mcp", json=payload) | |
| r.raise_for_status() | |
| texts = [c["text"] for c in r.json().get("result",[]) if c.get("type")=="text"] | |
| return "\n\n".join(texts) or "(no result)" | |
| def toggle_visibility(tool): | |
| # 11 widgets: endpoint,params, service,select,filter,top,skip,orderby, partner_id,company_name,limit | |
| if tool=="sap_api_get": return [True,True] + [False]*9 | |
| if tool=="sap_odata_query": return [False,False,True] + [True]*5 + [False]*3 | |
| if tool=="sap_business_partner": return [False]*8 + [True]*3 | |
| if tool=="sap_metadata": return [False,False,True] + [False]*8 | |
| if tool in ("purchase_requisition","purchase_order"): | |
| return [False,False,False] + [True]*5 + [False]*3 | |
| return [False]*11 | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## π SAP MCP + CE_PR & CE_PO Demo") | |
| tool = gr.Dropdown(TOOLS, label="Tool") | |
| # inputs row by row | |
| endpoint = gr.Textbox(label="Endpoint") | |
| params_json = gr.Textbox(label="Params (JSON)") | |
| service = gr.Textbox(label="Service") | |
| select = gr.Textbox(label="$select") | |
| filter_ = gr.Textbox(label="$filter") | |
| top = gr.Number(label="$top") | |
| skip = gr.Number(label="$skip") | |
| orderby = gr.Textbox(label="$orderby") | |
| partner_id = gr.Textbox(label="Business Partner ID") | |
| company_name = gr.Textbox(label="Company Name") | |
| limit = gr.Number(label="Limit") | |
| run = gr.Button("π Execute") | |
| output = gr.Textbox(label="Result", lines=12) | |
| tool.change(toggle_visibility, tool, | |
| [endpoint,params_json,service,select,filter_,top,skip,orderby, | |
| partner_id,company_name,limit]) | |
| run.click(execute_tool, | |
| [tool,endpoint,params_json,service,select,filter_,top,skip,orderby, | |
| partner_id,company_name,limit], | |
| output) | |
| # mount MCP under the same FastAPI app | |
| demo.app.mount("/mcp", mcp_app) | |
| # start on HF Spaces port | |
| if __name__=="__main__": | |
| demo.launch(server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT",7860))) |