SOP / app_backup.py
PD03's picture
Rename app.py to app_backup.py
cfb873c verified
import os, json, tempfile, logging
import gradio as gr
import pandas as pd
import numpy as np
# Quiet noisy logs
logging.getLogger("cmdstanpy").setLevel(logging.WARNING)
logging.getLogger("prophet").setLevel(logging.WARNING)
# ==== Tools (your @tool template) ============================================
from smolagents import tool, CodeAgent, OpenAIServerModel
@tool
def forecast_tool(horizon_months: int = 1, use_demo: bool = True, history_csv_path: str = "") -> str:
"""
Forecast monthly demand for finished goods using Prophet (demo-friendly).
Args:
horizon_months (int): Number of future months to forecast (>=1). Defaults to 1.
use_demo (bool): If True, generate synthetic history for FG100/FG200. Defaults to True.
history_csv_path (str): Optional CSV path with columns [product_id,date,qty] to override demo.
Returns:
str: JSON string list of {"product_id": str, "period_start": "YYYY-MM-01", "forecast_qty": float}.
"""
from prophet import Prophet
# 1) History
if use_demo or not history_csv_path:
rng = pd.date_range("2023-01-01", periods=24, freq="MS")
rows = []
np.random.seed(0)
for pid, base in [("FG100", 1800), ("FG200", 900)]:
season = 1 + 0.15 * np.sin(2 * np.pi * (np.arange(len(rng)) / 12.0))
qty = (base * season).astype(float)
for d, q in zip(rng, qty):
rows.append({"product_id": pid, "date": d, "qty": float(q)})
df = pd.DataFrame(rows)
else:
df = pd.read_csv(history_csv_path)
assert {"product_id", "date", "qty"} <= set(df.columns), "CSV must have product_id,date,qty"
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df = df.dropna(subset=["date"])
df["qty"] = pd.to_numeric(df["qty"], errors="coerce").fillna(0.0)
# 2) Forecast per product
out = []
H = max(1, int(horizon_months))
for pid, g in df.groupby("product_id"):
s = (g.set_index("date")["qty"].resample("MS").sum().asfreq("MS").fillna(0.0))
m = Prophet(yearly_seasonality=True, weekly_seasonality=False, daily_seasonality=False, n_changepoints=10)
m.fit(pd.DataFrame({"ds": s.index, "y": s.values}))
future = m.make_future_dataframe(periods=H, freq="MS", include_history=False)
pred = m.predict(future)[["ds", "yhat"]]
for _, r in pred.iterrows():
out.append({"product_id": str(pid), "period_start": r["ds"].strftime("%Y-%m-%d"), "forecast_qty": float(r["yhat"])})
return json.dumps(out)
@tool
def optimize_supply_tool(forecast_json: str) -> str:
"""
Optimize a single-month supply plan (LP) using the forecast.
Args:
forecast_json (str): JSON string returned by forecast_tool.
Returns:
str: JSON with summary + readable tables (not raw solver output).
"""
from scipy.optimize import linprog
rows = json.loads(forecast_json)
# Use first month per product
demand = {}
for r in rows:
p = r["product_id"]
if p not in demand:
demand[p] = float(r["forecast_qty"])
P = sorted(demand.keys()) or ["FG100", "FG200"]
price = {"FG100": 98.0, "FG200": 120.0}
conv = {"FG100": 12.5, "FG200": 15.0}
r1 = {"FG100": 0.03, "FG200": 0.05}
r2 = {"FG100": 0.02, "FG200": 0.01}
RMs = ["RM_A", "RM_B"]
rm_cost = {"RM_A": 20.0, "RM_B": 30.0}
rm_start = {"RM_A": 1000.0, "RM_B": 100.0}
rm_cap = {"RM_A": 5000.0, "RM_B": 5000.0}
bom = {
"FG100": {"RM_A": 0.8, "RM_B": 0.2 * 1.02},
"FG200": {"RM_A": 1.0, "RM_B": 0.1},
}
r1_cap, r2_cap = 320.0, 480.0
nP, nR = len(P), len(RMs)
pidx = {p:i for i,p in enumerate(P)}
ridx = {r:i for i,r in enumerate(RMs)}
def i_prod(p): return pidx[p]
def i_sell(p): return nP + pidx[p]
def i_einv(p): return 2*nP + pidx[p]
def i_pur(r): return 3*nP + ridx[r]
def i_einr(r): return 3*nP + nR + ridx[r]
n_vars = 3*nP + 2*nR
c = np.zeros(n_vars)
bounds = [None]*n_vars
for p in P:
c[i_prod(p)] += conv[p]
c[i_sell(p)] -= price[p]
c[i_einv(p)] += 0.0
bounds[i_prod(p)] = (0, None)
bounds[i_sell(p)] = (0, demand[p]) # demand as an upper bound (no backorders)
bounds[i_einv(p)] = (0, None)
for r in RMs:
c[i_pur(r)] += rm_cost[r]
c[i_einr(r)] += 0.0
bounds[i_pur(r)] = (0, rm_cap[r])
bounds[i_einr(r)] = (0, None)
# Equalities
Aeq, beq = [], []
for p in P:
row = np.zeros(n_vars); row[i_prod(p)]=1; row[i_sell(p)]=-1; row[i_einv(p)]=-1
Aeq.append(row); beq.append(0.0) # start_inv=0 in this demo
for r in RMs:
row = np.zeros(n_vars); row[i_pur(r)]=1; row[i_einr(r)]=-1
for p in P: row[i_prod(p)] -= bom[p].get(r,0.0)
Aeq.append(row); beq.append(-rm_start[r])
# Inequalities (resources)
Aub, bub = [], []
row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r1[p]) for p in P]; Aub.append(row); bub.append(r1_cap)
row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r2[p]) for p in P]; Aub.append(row); bub.append(r2_cap)
res = linprog(c, A_ub=np.array(Aub), b_ub=np.array(bub), A_eq=np.array(Aeq), b_eq=np.array(beq),
bounds=bounds, method="highs")
if not res.success:
return json.dumps({"status": "FAILED", "message": res.message})
x = res.x
def v(idx): return float(x[idx])
# Compose human-friendly tables
prod_tbl = []
revenue = 0.0; conv_cost = 0.0
for p in P:
produce = v(i_prod(p)); sell = v(i_sell(p))
prod_tbl.append({"Product": p, "Produce": produce, "Sell": sell, "Unit Price": price[p], "Conv. Cost/u": conv[p]})
revenue += sell*price[p]; conv_cost += produce*conv[p]
raw_tbl = []
rm_purch_cost = 0.0
for r in RMs:
purchase = v(i_pur(r))
cons = float(sum(bom[p].get(r,0.0)*v(i_prod(p)) for p in P))
cost = purchase*rm_cost[r]; rm_purch_cost += cost
raw_tbl.append({"Raw": r, "Purchase": purchase, "Consume": cons, "Cost/u": rm_cost[r], "Total Cost": cost})
r1_used = float(sum(r1[p]*v(i_prod(p)) for p in P))
r2_used = float(sum(r2[p]*v(i_prod(p)) for p in P))
res_tbl = [
{"Resource": "R1", "Used": r1_used, "Cap": r1_cap, "Slack": r1_cap - r1_used},
{"Resource": "R2", "Used": r2_used, "Cap": r2_cap, "Slack": r2_cap - r2_used},
]
profit = revenue - conv_cost - rm_purch_cost
out = {
"status": "OPTIMAL",
"kpis": {"Profit": profit, "Revenue": revenue, "Conv. Cost": conv_cost, "RM Purchase Cost": rm_purch_cost},
"products": prod_tbl,
"raw_materials": raw_tbl,
"resources": res_tbl
}
return json.dumps(out)
@tool
def update_sap_md61_tool(forecast_json: str, plant: str = "PLANT01", uom: str = "EA", mrp_area: str = "") -> str:
"""
Prepare an MD61-style demand upload (SIMULATION ONLY).
Args:
forecast_json (str): JSON string returned by forecast_tool.
plant (str): SAP plant (WERKS). Defaults to 'PLANT01'.
uom (str): Unit of measure. Defaults to 'EA'.
mrp_area (str): Optional MRP area.
Returns:
str: JSON with {"status":"SIMULATED","csv_path":"...","preview":[...]}.
"""
rows = json.loads(forecast_json)
md61 = [{
"Material": r["product_id"], "Plant": plant, "MRP_Area": mrp_area,
"Req_Date": r["period_start"], "Req_Qty": float(r["forecast_qty"]),
"UoM": uom, "Version": "00"
} for r in rows]
df = pd.DataFrame(md61)
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
df.to_csv(tmp.name, index=False)
return json.dumps({"status": "SIMULATED", "csv_path": tmp.name, "preview": md61[:5]})
# ==== Agent (end-to-end) ======================================================
def make_agent():
api_key = os.environ.get("OPENAI_API_KEY", "")
if not api_key:
raise RuntimeError("OPENAI_API_KEY not set. Add it as a Space secret.")
model = OpenAIServerModel(model_id="gpt-4o-mini", api_key=api_key, temperature=0)
return CodeAgent(tools=[forecast_tool, optimize_supply_tool, update_sap_md61_tool],
model=model, add_base_tools=False, stream_outputs=False)
SYSTEM_PLAN = (
"Run the pipeline and return one JSON:\n"
"1) forecast_tool(...)\n"
"2) optimize_supply_tool(forecast_json)\n"
"3) update_sap_md61_tool(forecast_json, ...)\n"
"Return: {'forecast': <json>, 'plan': <json>, 'md61': <json>}"
)
def run_agentic(h, plant, demo_flag, file_obj):
agent = make_agent()
if file_obj is not None:
path = file_obj.name
prompt = (f"{SYSTEM_PLAN}\n"
f"Use forecast_tool(horizon_months={int(h)}, use_demo=False, history_csv_path='{path}'). "
f"Then run the other two steps as specified. Return only the final JSON.")
else:
prompt = (f"{SYSTEM_PLAN}\n"
f"Use forecast_tool(horizon_months={int(h)}, use_demo={bool(demo_flag)}). "
f"Then run the other two steps as specified. Return only the final JSON.")
return agent.run(prompt)
# ==== UI Helpers (rounding + pretty) =========================================
def _round_df(df: pd.DataFrame, places: int = 2) -> pd.DataFrame:
if df is None or df.empty:
return df
out = df.copy()
num_cols = out.select_dtypes(include=["number"]).columns
out[num_cols] = out[num_cols].astype(float).round(places)
return out
def parse_forecast(json_str):
df = pd.DataFrame(json.loads(json_str))
df = df[["product_id","period_start","forecast_qty"]].rename(columns={
"product_id":"Product","period_start":"Period Start","forecast_qty":"Forecast Qty"
})
return _round_df(df)
def parse_plan(json_str):
d = json.loads(json_str)
kpis = pd.DataFrame([d["kpis"]]).rename(columns={
"Conv. Cost":"Conversion Cost", "RM Purchase Cost":"RM Purchase Cost"
})
prod = pd.DataFrame(d["products"])
raw = pd.DataFrame(d["raw_materials"])
res = pd.DataFrame(d["resources"])
return d["status"], _round_df(kpis), _round_df(prod), _round_df(raw), _round_df(res)
def parse_md61(json_str):
d = json.loads(json_str)
prev = _round_df(pd.DataFrame(d.get("preview", [])))
path = d.get("csv_path", "")
return d.get("status",""), prev, path
# ==== Gradio UI ==============================================================
with gr.Blocks(title="Forecast → Optimize → SAP MD61") as demo:
gr.Markdown("## 🧭 Workflow\n"
"### 1) **Forecast** → 2) **Optimize Supply** → 3) **Prepare MD61**\n"
"Run them **manually** below, or use the **agent** to do end-to-end in one click.")
with gr.Tab("Manual (Step-by-step)"):
with gr.Row():
horizon = gr.Number(label="Horizon (months)", value=1, precision=0)
plant = gr.Textbox(label="SAP Plant (WERKS)", value="PLANT01")
with gr.Row():
use_demo = gr.Checkbox(label="Use demo synthetic history", value=True)
file = gr.File(label="Or upload history.csv (product_id,date,qty)", file_types=[".csv"])
# States to pass data between steps
forecast_state = gr.State("")
plan_state = gr.State("")
md61_state = gr.State("")
gr.Markdown("### ➤ Step 1: Forecast")
run_f = gr.Button("Run Step 1 — Forecast")
forecast_tbl = gr.Dataframe(label="Forecast (first horizon month per SKU)", interactive=False)
forecast_note = gr.Markdown("")
gr.Markdown("### ➤ Step 2: Optimize Supply")
run_o = gr.Button("Run Step 2 — Optimize")
plan_status = gr.Markdown("")
plan_kpis = gr.Dataframe(label="KPIs", interactive=False)
plan_prod = gr.Dataframe(label="Products Plan", interactive=False)
plan_raw = gr.Dataframe(label="Raw Materials", interactive=False)
plan_res = gr.Dataframe(label="Resources", interactive=False)
gr.Markdown("### ➤ Step 3: Prepare MD61 (Simulated)")
run_m = gr.Button("Run Step 3 — MD61")
md61_status = gr.Markdown("")
md61_prev = gr.Dataframe(label="MD61 Preview", interactive=False)
md61_file = gr.File(label="Download CSV", interactive=False)
# Handlers
def do_forecast(h, demo_flag, f):
hist_path = "" if (f is None) else f.name
fj = forecast_tool(horizon_months=int(h), use_demo=(f is None) and bool(demo_flag),
history_csv_path=hist_path)
df = parse_forecast(fj)
return fj, df, f"Forecast generated for {df['Product'].nunique()} product(s)."
run_f.click(do_forecast, inputs=[horizon, use_demo, file], outputs=[forecast_state, forecast_tbl, forecast_note])
def do_optimize(fj):
if not fj:
return "", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), "⚠️ Run Step 1 first."
pj = optimize_supply_tool(fj)
status, kpis, prod, raw, res = parse_plan(pj)
return pj, kpis, prod, raw, res, f"Optimization status: **{status}**"
run_o.click(do_optimize, inputs=[forecast_state],
outputs=[plan_state, plan_kpis, plan_prod, plan_raw, plan_res, plan_status])
def do_md61(fj, plant):
if not fj:
return "", pd.DataFrame(), None, "⚠️ Run Step 1 first."
mj = update_sap_md61_tool(fj, plant=plant, uom="EA")
status, prev, path = parse_md61(mj)
return mj, prev, path, f"MD61 status: **{status}**"
run_m.click(do_md61, inputs=[forecast_state, plant], outputs=[md61_state, md61_prev, md61_file, md61_status])
with gr.Tab("Agentic (End-to-end)"):
gr.Markdown("One click: the agent runs all three steps with OpenAI.")
with gr.Row():
a_horizon = gr.Number(label="Horizon (months)", value=1, precision=0)
a_plant = gr.Textbox(label="SAP Plant (WERKS)", value="PLANT01")
with gr.Row():
a_demo = gr.Checkbox(label="Use demo synthetic history", value=True)
a_file = gr.File(label="Or upload history.csv", file_types=[".csv"])
run_all = gr.Button("Run End-to-end (Agent)")
out_json = gr.Textbox(label="Agent Raw JSON (for inspection)", lines=6)
with gr.Accordion("Pretty Outputs", open=True):
a_forecast_tbl = gr.Dataframe(label="Forecast", interactive=False)
a_plan_kpis = gr.Dataframe(label="KPIs", interactive=False)
a_plan_prod = gr.Dataframe(label="Products Plan", interactive=False)
a_plan_raw = gr.Dataframe(label="Raw Materials", interactive=False)
a_plan_res = gr.Dataframe(label="Resources", interactive=False)
a_md61_prev = gr.Dataframe(label="MD61 Preview", interactive=False)
a_md61_file = gr.File(label="Download MD61 CSV", interactive=False)
# Robust agent handler: accepts dict OR str and rounds outputs
def do_agent(h, p, demo_flag, f):
def to_obj(x):
return x if isinstance(x, (dict, list)) else json.loads(x)
def to_str(x):
return x if isinstance(x, str) else json.dumps(x)
try:
res = run_agentic(h, p, demo_flag, f) # may be dict or str
out = to_obj(res)
forecast_json = to_str(out["forecast"])
plan_json = to_str(out["plan"])
md61_json = to_str(out["md61"])
f_df = parse_forecast(forecast_json)
_, kpis, prod, raw, res_tbl = parse_plan(plan_json)
_, prev, csv_path = parse_md61(md61_json)
pretty = {
"forecast": json.loads(forecast_json),
"plan": json.loads(plan_json),
"md61": json.loads(md61_json),
}
return (json.dumps(pretty, indent=2), f_df, kpis, prod, raw, res_tbl, prev, csv_path)
except Exception as e:
return (f"Agent error: {e}", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame(),
pd.DataFrame(), pd.DataFrame(), None)
run_all.click(do_agent, inputs=[a_horizon, a_plant, a_demo, a_file],
outputs=[out_json, a_forecast_tbl, a_plan_kpis, a_plan_prod, a_plan_raw, a_plan_res, a_md61_prev, a_md61_file])
if __name__ == "__main__":
# Needs OPENAI_API_KEY in env for agent tab; manual tab works without it.
if not os.environ.get("OPENAI_API_KEY"):
print("⚠️ Set OPENAI_API_KEY (Space secret) to use Agentic tab.")
demo.launch()