SOP / app.py
PD03's picture
Update app.py
840ed4e verified
# app.py
import os, json, tempfile, logging
import gradio as gr
import pandas as pd
import numpy as np
# Quiet noisy logs
logging.getLogger("cmdstanpy").setLevel(logging.WARNING)
logging.getLogger("prophet").setLevel(logging.WARNING)
# ==== Tools (your @tool template) ============================================
from smolagents import tool, CodeAgent, OpenAIServerModel
# ---------- Helper (rounding) ----------
def _round_df(df: pd.DataFrame, places: int = 2) -> pd.DataFrame:
if df is None or df.empty:
return df
out = df.copy()
num_cols = out.select_dtypes(include=["number"]).columns
out[num_cols] = out[num_cols].astype(float).round(places)
return out
# ---------- Tool 1: Forecast ----------
from smolagents import tool
import json, pandas as pd, numpy as np
@tool
def forecast_tool(horizon_months: int = 1, use_demo: bool = True, history_csv_path: str = "",
use_covariates: bool = False) -> str:
"""
Forecast monthly demand using a GLOBAL N-HiTS model (fast & accurate).
Args:
horizon_months (int): Number of future months to forecast (>=1).
use_demo (bool): If True, generates synthetic history for FG100/FG200.
history_csv_path (str): Optional CSV with columns [product_id,date,qty,(optional extra covariates...)].
use_covariates (bool): If True and extra numeric columns exist, use them as past covariates
(for future effects you must provide future values too).
Returns:
str: JSON list of {"product_id","period_start","forecast_qty"} for the next horizon_months.
"""
# --- 1) Load data in long form ---
if use_demo or not history_csv_path:
rng = pd.date_range("2023-01-01", periods=24, freq="MS")
rows = []
np.random.seed(0)
for pid, base in [("FG100", 1800), ("FG200", 900)]:
season = 1 + 0.15 * np.sin(2 * np.pi * (np.arange(len(rng)) / 12.0))
qty = (base * season).astype(float)
for d, q in zip(rng, qty):
rows.append({"product_id": pid, "date": d, "qty": float(q)})
df = pd.DataFrame(rows)
else:
df = pd.read_csv(history_csv_path)
assert {"product_id","date","qty"} <= set(df.columns), "CSV must have product_id,date,qty"
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df = df.dropna(subset=["date"])
df["qty"] = pd.to_numeric(df["qty"], errors="coerce").fillna(0.0)
# Ensure proper monthly frequency per SKU
df = df.copy()
df["product_id"] = df["product_id"].astype(str)
# --- 2) Build Darts series (GLOBAL model across SKUs) ---
from darts import TimeSeries
series_list = []
past_cov_list = [] # optional
extra_cols = [c for c in df.columns if c not in ["product_id","date","qty"]]
# keep only numeric covariates (categoricals must be pre-encoded)
num_covs = [c for c in extra_cols if pd.api.types.is_numeric_dtype(df[c])]
for pid, g in df.groupby("product_id"):
g = (g.set_index("date")
.sort_index()
.resample("MS")
.agg({**{"qty":"sum"}, **{c:"last" for c in num_covs}})
.fillna(method="ffill")
.fillna(0.0))
y = TimeSeries.from_dataframe(g.reset_index(), time_col="date", value_cols="qty", freq="MS")
series_list.append(y)
if use_covariates and num_covs:
pc = TimeSeries.from_dataframe(g.reset_index(), time_col="date", value_cols=num_covs, freq="MS")
past_cov_list.append(pc)
else:
past_cov_list.append(None)
# --- 3) Train N-HiTS (fast settings) ---
from darts.models import NHiTSModel
H = max(1, int(horizon_months))
# keep chunk length small for short histories; model is global
input_chunk = max(6, min(12, min(len(s) for s in series_list) - 1)) if series_list else 12
model = NHiTSModel(
input_chunk_length=input_chunk,
output_chunk_length=min(H, 3), # can roll to reach H
n_epochs=60, # keep fast; tune up if needed
batch_size=32,
random_state=0,
dropout=0.0,
)
if use_covariates and any(pc is not None for pc in past_cov_list):
model.fit(series=series_list, past_covariates=past_cov_list, verbose=False)
else:
model.fit(series=series_list, verbose=False)
# --- 4) Predict per SKU and return JSON ---
out = []
for pid, s, pc in zip(df["product_id"].unique(), series_list, past_cov_list):
if use_covariates and pc is not None:
pred = model.predict(n=H, series=s, past_covariates=pc)
else:
pred = model.predict(n=H, series=s)
for ts, val in zip(pred.time_index, pred.values().flatten()):
out.append({
"product_id": str(pid),
"period_start": pd.Timestamp(ts).strftime("%Y-%m-%d"),
"forecast_qty": float(val)
})
return json.dumps(out)
# ---------- Tool 2: Optimize (LP) ----------
@tool
def optimize_supply_tool(forecast_json: str) -> str:
"""
Optimize a single-month supply plan (LP) using the forecast.
Args:
forecast_json (str): JSON string returned by forecast_tool.
Returns:
str: JSON with summary + readable tables (not raw solver output).
"""
from scipy.optimize import linprog
rows = json.loads(forecast_json)
# Use first month per product
demand = {}
for r in rows:
p = r["product_id"]
if p not in demand:
demand[p] = float(r["forecast_qty"])
P = sorted(demand.keys()) or ["FG100", "FG200"]
price = {"FG100": 98.0, "FG200": 120.0}
conv = {"FG100": 12.5, "FG200": 15.0}
r1 = {"FG100": 0.03, "FG200": 0.05}
r2 = {"FG100": 0.02, "FG200": 0.01}
RMs = ["RM_A", "RM_B"]
rm_cost = {"RM_A": 20.0, "RM_B": 30.0}
rm_start = {"RM_A": 1000.0, "RM_B": 100.0}
rm_cap = {"RM_A": 5000.0, "RM_B": 5000.0}
bom = {
"FG100": {"RM_A": 0.8, "RM_B": 0.2 * 1.02},
"FG200": {"RM_A": 1.0, "RM_B": 0.1},
}
r1_cap, r2_cap = 320.0, 480.0
nP, nR = len(P), len(RMs)
pidx = {p:i for i,p in enumerate(P)}
ridx = {r:i for i,r in enumerate(RMs)}
def i_prod(p): return pidx[p]
def i_sell(p): return nP + pidx[p]
def i_einv(p): return 2*nP + pidx[p]
def i_pur(r): return 3*nP + ridx[r]
def i_einr(r): return 3*nP + nR + ridx[r]
n_vars = 3*nP + 2*nR
c = np.zeros(n_vars)
bounds = [None]*n_vars
for p in P:
c[i_prod(p)] += conv[p]
c[i_sell(p)] -= price[p]
c[i_einv(p)] += 0.0
bounds[i_prod(p)] = (0, None)
bounds[i_sell(p)] = (0, demand[p]) # demand as upper bound (no backorders)
bounds[i_einv(p)] = (0, None)
for r in RMs:
c[i_pur(r)] += rm_cost[r]
c[i_einr(r)] += 0.0
bounds[i_pur(r)] = (0, rm_cap[r])
bounds[i_einr(r)] = (0, None)
# Equalities
Aeq, beq = [], []
for p in P:
row = np.zeros(n_vars); row[i_prod(p)]=1; row[i_sell(p)]=-1; row[i_einv(p)]=-1
Aeq.append(row); beq.append(0.0) # start_inv=0 in this demo
for r in RMs:
row = np.zeros(n_vars); row[i_pur(r)]=1; row[i_einr(r)]=-1
for p in P: row[i_prod(p)] -= bom[p].get(r,0.0)
Aeq.append(row); beq.append(-rm_start[r])
# Inequalities (resources)
Aub, bub = [], []
row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r1[p]) for p in P]; Aub.append(row); bub.append(r1_cap)
row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r2[p]) for p in P]; Aub.append(row); bub.append(r2_cap)
from scipy.optimize import linprog
res = linprog(c, A_ub=np.array(Aub), b_ub=np.array(bub), A_eq=np.array(Aeq), b_eq=np.array(beq),
bounds=bounds, method="highs")
if not res.success:
return json.dumps({"status": "FAILED", "message": res.message})
x = res.x
def v(idx): return float(x[idx])
# Compose human-friendly tables
prod_tbl = []
revenue = 0.0; conv_cost = 0.0
for p in P:
produce = v(i_prod(p)); sell = v(i_sell(p))
prod_tbl.append({"Product": p, "Produce": produce, "Sell": sell, "Unit Price": price[p], "Conv. Cost/u": conv[p]})
revenue += sell*price[p]; conv_cost += produce*conv[p]
raw_tbl = []
rm_purch_cost = 0.0
for r in RMs:
purchase = v(i_pur(r))
cons = float(sum(bom[p].get(r,0.0)*v(i_prod(p)) for p in P))
cost = purchase*rm_cost[r]; rm_purch_cost += cost
raw_tbl.append({"Raw": r, "Purchase": purchase, "Consume": cons, "Cost/u": rm_cost[r], "Total Cost": cost})
r1_used = float(sum(r1[p]*v(i_prod(p)) for p in P))
r2_used = float(sum(r2[p]*v(i_prod(p)) for p in P))
res_tbl = [
{"Resource": "R1", "Used": r1_used, "Cap": r1_cap, "Slack": r1_cap - r1_used},
{"Resource": "R2", "Used": r2_used, "Cap": r2_cap, "Slack": r2_cap - r2_used},
]
profit = revenue - conv_cost - rm_purch_cost
out = {
"status": "OPTIMAL",
"kpis": {"Profit": profit, "Revenue": revenue, "Conv. Cost": conv_cost, "RM Purchase Cost": rm_purch_cost},
"products": prod_tbl,
"raw_materials": raw_tbl,
"resources": res_tbl
}
return json.dumps(out)
# ---------- Tool 3: MD61 (simulate) ----------
@tool
def update_sap_md61_tool(forecast_json: str, plant: str = "PLANT01", uom: str = "EA", mrp_area: str = "") -> str:
"""
Prepare an MD61-style demand upload (SIMULATION ONLY).
Args:
forecast_json (str): JSON string returned by forecast_tool.
plant (str): SAP plant (WERKS). Defaults to 'PLANT01'.
uom (str): Unit of measure. Defaults to 'EA'.
mrp_area (str): Optional MRP area.
Returns:
str: JSON with {"status":"SIMULATED","csv_path":"...","preview":[...]}.
"""
rows = json.loads(forecast_json)
md61 = [{
"Material": r["product_id"], "Plant": plant, "MRP_Area": mrp_area,
"Req_Date": r["period_start"], "Req_Qty": float(r["forecast_qty"]),
"UoM": uom, "Version": "00"
} for r in rows]
df = pd.DataFrame(md61)
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
df.to_csv(tmp.name, index=False)
return json.dumps({"status": "SIMULATED", "csv_path": tmp.name, "preview": md61[:5]})
# ---------- Upgrade A: Data Quality Guard ----------
@tool
def data_quality_guard_tool(use_demo: bool = True, history_csv_path: str = "", z: float = 3.5) -> str:
"""
Scan monthly history for outliers and likely stockout zeros. Demo-friendly.
Args:
use_demo (bool): If True, use synthetic FG100/FG200 history.
history_csv_path (str): Optional CSV path with columns [product_id,date,qty].
z (float): Robust z-threshold (MAD-based) for outlier flags. Default 3.5.
Returns:
str: JSON {"status":"OK","issues":[{product_id,period_start,qty,flag,suggested_action,suggested_value?}]}
"""
# Load history
if use_demo or not history_csv_path:
rng = pd.date_range("2023-01-01", periods=24, freq="MS")
rows = []
np.random.seed(0)
for pid, base in [("FG100", 1800), ("FG200", 900)]:
season = 1 + 0.15 * np.sin(2 * np.pi * (np.arange(len(rng)) / 12.0))
qty = (base * season).astype(float)
for d, q in zip(rng, qty):
rows.append({"product_id": pid, "date": d, "qty": float(q)})
df = pd.DataFrame(rows)
else:
df = pd.read_csv(history_csv_path)
assert {"product_id","date","qty"} <= set(df.columns), "CSV must have product_id,date,qty"
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df = df.dropna(subset=["date"])
df["qty"] = pd.to_numeric(df["qty"], errors="coerce").fillna(0.0)
issues = []
for pid, g in df.groupby("product_id"):
s = g.set_index("date")["qty"].resample("MS").sum().asfreq("MS").fillna(0.0)
if len(s) < 6:
continue
med = s.rolling(6, min_periods=3).median()
mad = (s - med).abs().rolling(6, min_periods=3).median()
robust_z = (0.6745 * (s - med) / mad.replace(0, np.nan)).abs()
for ts, y in s.items():
flag = None; action = None; val = None
rz = robust_z.get(ts, np.nan)
# Outlier
if not np.isnan(rz) and rz > z:
flag = "OUTLIER"
action = "cap_to_rolling_median"
val = float(med.get(ts, np.nan)) if not np.isnan(med.get(ts, np.nan)) else None
# Likely stockout zero in-between non-zeros
prev_nonzero = s[:ts].iloc[:-1].ne(0).any() if len(s[:ts])>1 else False
next_nonzero = s[ts:].iloc[1:].ne(0).any() if len(s[ts:])>1 else False
if y == 0 and prev_nonzero and next_nonzero:
flag = flag or "ZERO_BETWEEN_NONZERO"
action = action or "impute_neighbor_avg"
idx = s.index.get_loc(ts)
neighbors = []
if idx>0: neighbors.append(s.iloc[idx-1])
if idx<len(s)-1: neighbors.append(s.iloc[idx+1])
val = float(np.mean(neighbors)) if neighbors else None
if flag:
issues.append({
"product_id": pid,
"period_start": ts.strftime("%Y-%m-%d"),
"qty": float(y),
"flag": flag,
"suggested_action": action,
"suggested_value": None if val is None or np.isnan(val) else float(val)
})
return json.dumps({"status":"OK","issues":issues})
# ---------- Upgrade B: Scenario Explorer ----------
@tool
def scenario_explorer_tool(forecast_json: str) -> str:
"""
Run a small set of what-if scenarios and summarize results.
Args:
forecast_json (str): JSON string from forecast_tool (uses first month per SKU).
Returns:
str: JSON {"status":"OK","scenarios":[{name,profit,r1_used,r1_slack,r2_used,r2_slack}]}
"""
from scipy.optimize import linprog
base_rows = json.loads(forecast_json)
demand = {}
for r in base_rows:
p = r["product_id"]
if p not in demand:
demand[p] = float(r["forecast_qty"])
P = sorted(demand.keys()) or ["FG100","FG200"]
# Base constants
price = {"FG100": 98.0, "FG200": 120.0}
conv = {"FG100": 12.5, "FG200": 15.0}
r1 = {"FG100": 0.03, "FG200": 0.05}
r2 = {"FG100": 0.02, "FG200": 0.01}
RMs = ["RM_A","RM_B"]
rm_cost = {"RM_A": 20.0, "RM_B": 30.0}
rm_start = {"RM_A": 1000.0, "RM_B": 100.0}
rm_cap = {"RM_A": 5000.0, "RM_B": 5000.0}
bom = {
"FG100": {"RM_A": 0.8, "RM_B": 0.2*1.02},
"FG200": {"RM_A": 1.0, "RM_B": 0.1},
}
r1_cap0, r2_cap0 = 320.0, 480.0
scenarios = [
{"name":"Base", "mult":1.0, "r1_cap":r1_cap0, "r2_cap":r2_cap0, "rm_cost_B_add":0.0, "conv_fg100_mult":1.0, "rmA_start_mult":1.0},
{"name":"+10% Demand", "demand_mult":1.10, "r1_cap":r1_cap0, "r2_cap":r2_cap0, "rm_cost_B_add":0.0, "conv_fg100_mult":1.0, "rmA_start_mult":1.0},
{"name":"-10% R1 Cap", "mult":1.0, "r1_cap":0.9*r1_cap0, "r2_cap":r2_cap0, "rm_cost_B_add":0.0, "conv_fg100_mult":1.0, "rmA_start_mult":1.0},
{"name":"+β‚Ή5 RM_B", "mult":1.0, "r1_cap":r1_cap0, "r2_cap":r2_cap0, "rm_cost_B_add":5.0, "conv_fg100_mult":1.0, "rmA_start_mult":1.0},
{"name":"+10% FG100 ConvCost", "mult":1.0, "r1_cap":r1_cap0, "r2_cap":r2_cap0, "rm_cost_B_add":0.0, "conv_fg100_mult":1.10, "rmA_start_mult":1.0},
{"name":"-20% RM_A Start", "mult":1.0, "r1_cap":r1_cap0, "r2_cap":r2_cap0, "rm_cost_B_add":0.0, "conv_fg100_mult":1.0, "rmA_start_mult":0.80},
]
results = []
for sc in scenarios:
dem_mult = sc.get("demand_mult", sc.get("mult",1.0))
d = {p: demand[p]*dem_mult for p in P}
r1_cap = sc["r1_cap"]; r2_cap = sc["r2_cap"]
rm_cost_local = rm_cost.copy(); rm_cost_local["RM_B"] += sc["rm_cost_B_add"]
conv_local = conv.copy(); conv_local["FG100"] = conv_local["FG100"] * sc["conv_fg100_mult"]
rm_start_local = rm_start.copy(); rm_start_local["RM_A"] = rm_start_local["RM_A"] * sc["rmA_start_mult"]
# Build and solve LP
nP, nR = len(P), len(RMs)
pidx = {p:i for i,p in enumerate(P)}; ridx = {r:i for i,r in enumerate(RMs)}
def i_prod(p): return pidx[p]
def i_sell(p): return nP + pidx[p]
def i_einv(p): return 2*nP + pidx[p]
def i_pur(r): return 3*nP + ridx[r]
def i_einr(r): return 3*nP + nR + ridx[r]
n_vars = 3*nP + 2*nR
c = np.zeros(n_vars); bounds = [None]*n_vars
for p in P:
c[i_prod(p)] += conv_local[p]; c[i_sell(p)] -= price[p]; c[i_einv(p)] += 0.0
bounds[i_prod(p)] = (0, None); bounds[i_sell(p)] = (0, d[p]); bounds[i_einv(p)] = (0, None)
for r in RMs:
c[i_pur(r)] += rm_cost_local[r]; c[i_einr(r)] += 0.0
bounds[i_pur(r)] = (0, rm_cap[r]); bounds[i_einr(r)] = (0, None)
Aeq, beq = [], []
for p in P:
row = np.zeros(n_vars); row[i_prod(p)]=1; row[i_sell(p)]=-1; row[i_einv(p)]=-1
Aeq.append(row); beq.append(0.0)
for r in RMs:
row = np.zeros(n_vars); row[i_pur(r)]=1; row[i_einr(r)]=-1
for p in P: row[i_prod(p)] -= bom[p].get(r,0.0)
Aeq.append(row); beq.append(-rm_start_local[r])
Aub, bub = [], []
row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r1[p]) for p in P]; Aub.append(row); bub.append(r1_cap)
row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r2[p]) for p in P]; Aub.append(row); bub.append(r2_cap)
from scipy.optimize import linprog
res = linprog(c, A_ub=np.array(Aub), b_ub=np.array(bub), A_eq=np.array(Aeq), b_eq=np.array(beq),
bounds=bounds, method="highs")
if not res.success:
results.append({"name": sc["name"], "status":"FAILED", "profit": None})
continue
x = res.x
def v(idx): return float(x[idx])
r1_used = float(sum(r1[p]*v(i_prod(p)) for p in P))
r2_used = float(sum(r2[p]*v(i_prod(p)) for p in P))
revenue = float(sum(price[p]*v(i_sell(p)) for p in P))
conv_cost = float(sum(conv_local[p]*v(i_prod(p)) for p in P))
rm_purch_cost = float(sum(v(i_pur(r))*rm_cost_local[r] for r in RMs))
profit = revenue - conv_cost - rm_purch_cost
results.append({
"name": sc["name"],
"status":"OPTIMAL",
"profit": profit,
"r1_used": r1_used, "r1_slack": r1_cap - r1_used,
"r2_used": r2_used, "r2_slack": r2_cap - r2_used
})
return json.dumps({"status":"OK","scenarios":results})
# ---------- Upgrade C: Plan Explainer ----------
@tool
def plan_explainer_tool(plan_json: str) -> str:
"""
Generate a plain-language explanation of the plan (demo assumptions).
Args:
plan_json (str): JSON from optimize_supply_tool.
Returns:
str: JSON {"status":"OK","summary": "... human-readable text ..."}
"""
d = json.loads(plan_json)
k = d.get("kpis", {})
products = d.get("products", [])
resources = d.get("resources", [])
# Contribution by product
contribs = []
for row in products:
rev = row["Sell"] * row["Unit Price"]
conv = row["Produce"] * row["Conv. Cost/u"]
contribs.append((row["Product"], rev - conv))
contribs.sort(key=lambda x: x[1], reverse=True)
top_prod = contribs[0][0] if contribs else "N/A"
# Binding resource (min slack)
bind_res = None
if resources:
r_sorted = sorted(resources, key=lambda r: r.get("Slack", 0.0))
bind_res = r_sorted[0]["Resource"]
summary = (
f"Profit: β‚Ή{k.get('Profit',0):,.2f} (Revenue β‚Ή{k.get('Revenue',0):,.2f} βˆ’ "
f"Conv. Cost β‚Ή{k.get('Conv. Cost',0):,.2f} βˆ’ RM Cost β‚Ή{k.get('RM Purchase Cost',0):,.2f}). "
f"Top driver: {top_prod}. "
f"Bottleneck check: {bind_res} has the least slack."
)
return json.dumps({"status":"OK","summary": summary})
# ---------- NEW: Bottleneck Search (finite-difference + policy) ----------
@tool
def bottleneck_search_tool(forecast_json: str, policy_json: str = "") -> str:
"""
Find the best practical lever to relax (within ~1 month) via small scenario probes.
Supports resource overtime, RM expedite (cap), and demand_lift (promo ROI-checked).
Args:
forecast_json (str): JSON from forecast_tool (first month per SKU used).
policy_json (str): Optional JSON with levers and costs.
Returns:
str: JSON {"status":"OK","diagnostics":{...},"recommendations":[...]}
"""
import json, numpy as np
from scipy.optimize import linprog
# --- Defaults (edit to your reality) ---
policy = {
"R1_overtime": {"type":"resource","target":"R1","step":10.0,"max_delta":80.0,"unit_cost":600.0},
"R2_overtime": {"type":"resource","target":"R2","step":10.0,"max_delta":40.0,"unit_cost":800.0},
"RM_A_expedite": {"type":"rm_expedite","target":"RM_A","step":200.0,"max_delta":1500.0,"unit_premium":8.0},
"RM_B_expedite": {"type":"rm_expedite","target":"RM_B","step":100.0,"max_delta":800.0,"unit_premium":12.0},
"Factory_expansion": {"type":"blocked","reason":"Not relaxable within 1 month"}
# You can add demand levers via policy_json:
# "promo_FG100": {"type":"demand_lift","target":"FG100","step":100,"max_delta":600,"unit_cost":10}
}
if policy_json:
try: policy.update(json.loads(policy_json))
except Exception: pass
# --- Problem primitives (same as your optimizer) ---
rows = json.loads(forecast_json)
demand = {}
for r in rows:
p = r["product_id"]
if p not in demand:
demand[p] = float(r["forecast_qty"])
P = sorted(demand.keys()) or ["FG100","FG200"]
price = {"FG100":98.0,"FG200":120.0}
conv = {"FG100":12.5,"FG200":15.0}
r1 = {"FG100":0.03,"FG200":0.05}
r2 = {"FG100":0.02,"FG200":0.01}
RMs = ["RM_A","RM_B"]
rm_cost = {"RM_A":20.0,"RM_B":30.0}
rm_start = {"RM_A":1000.0,"RM_B":100.0}
rm_cap = {"RM_A":5000.0,"RM_B":5000.0}
bom = {"FG100":{"RM_A":0.8,"RM_B":0.204},"FG200":{"RM_A":1.0,"RM_B":0.1}}
r1_cap0, r2_cap0 = 320.0, 480.0
# Helpful unit margin for demand_lift (approx variable cost)
unit_var_cost = {p: conv[p] + sum(bom[p].get(rm,0.0)*rm_cost[rm] for rm in RMs) for p in P}
unit_margin = {p: price[p] - unit_var_cost[p] for p in P}
# --- LP builder (inner solve) ---
def solve(dem, r1_cap, r2_cap, rm_cap_adj):
nP, nR = len(P), len(RMs)
pidx = {p:i for i,p in enumerate(P)}; ridx = {r:i for i,r in enumerate(RMs)}
def i_prod(p): return pidx[p]
def i_sell(p): return nP + pidx[p]
def i_einv(p): return 2*nP + pidx[p]
def i_pur(r): return 3*nP + ridx[r]
def i_einr(r): return 3*nP + nR + ridx[r]
n_vars = 3*nP + 2*nR
c = np.zeros(n_vars); bounds=[None]*n_vars
for p in P:
c[i_prod(p)] += conv[p]
c[i_sell(p)] -= price[p]
bounds[i_prod(p)] = (0,None)
bounds[i_sell(p)] = (0, dem[p])
bounds[i_einv(p)] = (0,None)
for r in RMs:
c[i_pur(r)] += rm_cost[r]
bounds[i_pur(r)] = (0, rm_cap_adj[r])
bounds[i_einr(r)] = (0,None)
Aeq, beq = [], []
for p in P:
row = np.zeros(n_vars); row[i_prod(p)]=1; row[i_sell(p)]=-1; row[i_einv(p)]=-1
Aeq.append(row); beq.append(0.0)
for r in RMs:
row = np.zeros(n_vars); row[i_pur(r)]=1; row[i_einr(r)]=-1
for p in P: row[i_prod(p)] -= bom[p].get(r,0.0)
Aeq.append(row); beq.append(-rm_start[r])
Aub, bub = [], []
row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r1[p]) for p in P]; Aub.append(row); bub.append(r1_cap)
row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r2[p]) for p in P]; Aub.append(row); bub.append(r2_cap)
res = linprog(c, A_ub=np.array(Aub), b_ub=np.array(bub), A_eq=np.array(Aeq), b_eq=np.array(beq),
bounds=bounds, method="highs")
if not res.success:
return None
x = res.x
def v(idx): return float(x[idx])
revenue = sum(price[p]*v(i_sell(p)) for p in P)
conv_cost = sum(conv[p]*v(i_prod(p)) for p in P)
rm_purch_cost = sum(v(i_pur(r))*rm_cost[r] for r in RMs)
r1_used = sum(r1[p]*v(i_prod(p)) for p in P)
r2_used = sum(r2[p]*v(i_prod(p)) for p in P)
return {
"profit": revenue - conv_cost - rm_purch_cost,
"r1_used": r1_used, "r2_used": r2_used,
}
# Baseline
base = solve(demand, r1_cap0, r2_cap0, rm_cap.copy())
if base is None:
return json.dumps({"status":"FAILED","message":"Baseline infeasible."})
base_profit = float(base["profit"])
diag = {
"demand_met": True, # sells hit demand ceiling in this model
"r1_slack": round(r1_cap0 - base["r1_used"], 2),
"r2_slack": round(r2_cap0 - base["r2_used"], 2),
"rm_caps_hit": {rm: False for rm in RMs}
}
# If everything is slack, tell the user plainly
all_slack = (diag["r1_slack"] > 1e-3 and diag["r2_slack"] > 1e-3)
# --- Probe levers ---
recs = []
for name, cfg in policy.items():
if cfg.get("type") == "blocked":
recs.append({"lever":name,"recommended":0,"est_profit_lift":0,"est_net_gain":0,
"rationale":cfg.get("reason","Not feasible in horizon")})
continue
best_gain = 0.0; best_delta = 0.0; best_lift = 0.0
step = float(cfg.get("step",0)); maxd = float(cfg.get("max_delta",0))
d = 0.0
while d <= maxd + 1e-6:
dem = demand.copy()
r1_cap = r1_cap0; r2_cap = r2_cap0
rm_cap_adj = rm_cap.copy()
relax_cost = 0.0
t = cfg["type"]
if t == "resource" and cfg["target"] == "R1":
r1_cap += d; relax_cost = d * float(cfg["unit_cost"])
elif t == "resource" and cfg["target"] == "R2":
r2_cap += d; relax_cost = d * float(cfg["unit_cost"])
elif t == "rm_expedite":
key = "RM_A" if cfg["target"]=="RM_A" else "RM_B"
rm_cap_adj[key] = rm_cap[key] + d
relax_cost = d * float(cfg["unit_premium"])
elif t == "demand_lift":
sku = cfg["target"]
if sku in dem:
dem[sku] = dem[sku] + d
relax_cost = d * float(cfg["unit_cost"])
else:
d += step; continue
else:
d += step; continue
res = solve(dem, r1_cap, r2_cap, rm_cap_adj)
if res is None:
d += step; continue
lift = float(res["profit"] - base_profit)
net = lift - relax_cost
if net > best_gain:
best_gain, best_delta, best_lift = net, d, lift
d += step
rationale = ("Beneficial at small Ξ”" if best_gain>0 else
("Capacity not binding; try demand/cost levers" if all_slack else
"No positive net gain within feasible range"))
recs.append({"lever":name,"recommended":round(best_delta,2),
"est_profit_lift":round(best_lift,2),
"est_net_gain":round(best_gain,2),
"rationale":rationale})
recs.sort(key=lambda r: r["est_net_gain"], reverse=True)
return json.dumps({"status":"OK","diagnostics":diag,"recommendations":recs})
# ==== Agent (end-to-end) ======================================================
def make_agent():
api_key = os.environ.get("OPENAI_API_KEY", "")
if not api_key:
raise RuntimeError("OPENAI_API_KEY not set. Add it as a Space secret.")
model = OpenAIServerModel(model_id="gpt-4o-mini", api_key=api_key, temperature=0)
return CodeAgent(tools=[
forecast_tool, optimize_supply_tool, update_sap_md61_tool,
data_quality_guard_tool, scenario_explorer_tool, plan_explainer_tool, bottleneck_search_tool
], model=model, add_base_tools=False, stream_outputs=False)
SYSTEM_PLAN = (
"Run the pipeline and return one JSON:\n"
"1) forecast_tool(...)\n"
"2) optimize_supply_tool(forecast_json)\n"
"3) update_sap_md61_tool(forecast_json, ...)\n"
"Return: {'forecast': <json>, 'plan': <json>, 'md61': <json>}"
)
def run_agentic(h, plant, demo_flag, file_obj):
agent = make_agent()
if file_obj is not None:
path = file_obj.name
prompt = (f"{SYSTEM_PLAN}\n"
f"Use forecast_tool(horizon_months={int(h)}, use_demo=False, history_csv_path='{path}'). "
f"Then run the other two steps as specified. Return only the final JSON.")
else:
prompt = (f"{SYSTEM_PLAN}\n"
f"Use forecast_tool(horizon_months={int(h)}, use_demo={bool(demo_flag)}). "
f"Then run the other two steps as specified. Return only the final JSON.")
return agent.run(prompt)
# ==== UI Helpers (pretty) =====================================================
def parse_forecast(json_str):
df = pd.DataFrame(json.loads(json_str))
df = df[["product_id","period_start","forecast_qty"]].rename(columns={
"product_id":"Product","period_start":"Period Start","forecast_qty":"Forecast Qty"
})
return _round_df(df)
def parse_plan(json_str):
d = json.loads(json_str)
kpis = pd.DataFrame([d["kpis"]]).rename(columns={
"Conv. Cost":"Conversion Cost", "RM Purchase Cost":"RM Purchase Cost"
})
prod = pd.DataFrame(d["products"])
raw = pd.DataFrame(d["raw_materials"])
res = pd.DataFrame(d["resources"])
return d["status"], _round_df(kpis), _round_df(prod), _round_df(raw), _round_df(res)
def parse_md61(json_str):
d = json.loads(json_str)
prev = _round_df(pd.DataFrame(d.get("preview", [])))
path = d.get("csv_path", "")
return d.get("status",""), prev, path
def parse_data_quality(json_str):
d = json.loads(json_str)
df = _round_df(pd.DataFrame(d.get("issues", [])))
return df
def parse_scenarios(json_str):
d = json.loads(json_str)
df = _round_df(pd.DataFrame(d.get("scenarios", [])))
if not df.empty:
cols = ["name","status","profit","r1_used","r1_slack","r2_used","r2_slack"]
df = df[cols]
df = df.rename(columns={"name":"Scenario","status":"Status","profit":"Profit",
"r1_used":"R1 Used","r1_slack":"R1 Slack","r2_used":"R2 Used","r2_slack":"R2 Slack"})
return df
def parse_recs(json_str):
d = json.loads(json_str)
df = _round_df(pd.DataFrame(d.get("recommendations", [])))
if not df.empty:
df = df.rename(columns={
"lever":"Lever","recommended":"Recommended Ξ”","est_profit_lift":"Est. Profit Lift",
"est_net_gain":"Est. Net Gain","rationale":"Rationale"
})
return df
# ==== Gradio UI ==============================================================
with gr.Blocks(title="Forecast β†’ Optimize β†’ SAP MD61") as demo:
gr.Markdown("## 🧭 Workflow\n"
"### 1) **Forecast** β†’ 2) **Optimize Supply** β†’ 3) **Prepare MD61**\n"
"Run them **manually** below, or use the **agent** to do end-to-end in one click.")
with gr.Tab("Manual (Step-by-step)"):
with gr.Row():
horizon = gr.Number(label="Horizon (months)", value=1, precision=0)
plant = gr.Textbox(label="SAP Plant (WERKS)", value="PLANT01")
with gr.Row():
use_demo = gr.Checkbox(label="Use demo synthetic history", value=True)
file = gr.File(label="Or upload history.csv (product_id,date,qty)", file_types=[".csv"])
# States
forecast_state = gr.State("")
plan_state = gr.State("")
md61_state = gr.State("")
gr.Markdown("### ➀ Step 1: Forecast")
run_f = gr.Button("Run Step 1 β€” Forecast")
forecast_tbl = gr.Dataframe(label="Forecast (first horizon month per SKU)", interactive=False)
forecast_note = gr.Markdown("")
gr.Markdown("### ➀ Step 2: Optimize Supply")
run_o = gr.Button("Run Step 2 β€” Optimize")
plan_status = gr.Markdown("")
plan_kpis = gr.Dataframe(label="KPIs", interactive=False)
plan_prod = gr.Dataframe(label="Products Plan", interactive=False)
plan_raw = gr.Dataframe(label="Raw Materials", interactive=False)
plan_res = gr.Dataframe(label="Resources", interactive=False)
gr.Markdown("### ➀ Step 3: Prepare MD61 (Simulated)")
run_m = gr.Button("Run Step 3 β€” MD61")
md61_status = gr.Markdown("")
md61_prev = gr.Dataframe(label="MD61 Preview", interactive=False)
md61_file = gr.File(label="Download CSV", interactive=False)
# Handlers
def do_forecast(h, demo_flag, f):
hist_path = "" if (f is None) else f.name
fj = forecast_tool(horizon_months=int(h), use_demo=(f is None) and bool(demo_flag),
history_csv_path=hist_path)
df = parse_forecast(fj)
return fj, df, f"Forecast generated for {df['Product'].nunique()} product(s)."
run_f.click(do_forecast, inputs=[horizon, use_demo, file], outputs=[forecast_state, forecast_tbl, forecast_note])
def do_optimize(fj):
if not fj:
return "", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), "⚠️ Run Step 1 first."
pj = optimize_supply_tool(fj)
status, kpis, prod, raw, res = parse_plan(pj)
return pj, kpis, prod, raw, res, f"Optimization status: **{status}**"
run_o.click(do_optimize, inputs=[forecast_state],
outputs=[plan_state, plan_kpis, plan_prod, plan_raw, plan_res, plan_status])
def do_md61(fj, plant):
if not fj:
return "", pd.DataFrame(), None, "⚠️ Run Step 1 first."
mj = update_sap_md61_tool(fj, plant=plant, uom="EA")
status, prev, path = parse_md61(mj)
return mj, prev, path, f"MD61 status: **{status}**"
run_m.click(do_md61, inputs=[forecast_state, plant], outputs=[md61_state, md61_prev, md61_file, md61_status])
with gr.Tab("Agentic (End-to-end)"):
gr.Markdown("One click: the agent runs all three steps with OpenAI.")
with gr.Row():
a_horizon = gr.Number(label="Horizon (months)", value=1, precision=0)
a_plant = gr.Textbox(label="SAP Plant (WERKS)", value="PLANT01")
with gr.Row():
a_demo = gr.Checkbox(label="Use demo synthetic history", value=True)
a_file = gr.File(label="Or upload history.csv", file_types=[".csv"])
run_all = gr.Button("Run End-to-end (Agent)")
out_json = gr.Textbox(label="Agent Raw JSON (for inspection)", lines=6)
with gr.Accordion("Pretty Outputs", open=True):
a_forecast_tbl = gr.Dataframe(label="Forecast", interactive=False)
a_plan_kpis = gr.Dataframe(label="KPIs", interactive=False)
a_plan_prod = gr.Dataframe(label="Products Plan", interactive=False)
a_plan_raw = gr.Dataframe(label="Raw Materials", interactive=False)
a_plan_res = gr.Dataframe(label="Resources", interactive=False)
a_md61_prev = gr.Dataframe(label="MD61 Preview", interactive=False)
a_md61_file = gr.File(label="Download MD61 CSV", interactive=False)
def do_agent(h, p, demo_flag, f):
def to_obj(x):
return x if isinstance(x, (dict, list)) else json.loads(x)
def to_str(x):
return x if isinstance(x, str) else json.dumps(x)
try:
res = run_agentic(h, p, demo_flag, f) # may be dict or str
out = to_obj(res)
forecast_json = to_str(out["forecast"])
plan_json = to_str(out["plan"])
md61_json = to_str(out["md61"])
f_df = parse_forecast(forecast_json)
_, kpis, prod, raw, res_tbl = parse_plan(plan_json)
_, prev, csv_path = parse_md61(md61_json)
pretty = {
"forecast": json.loads(forecast_json),
"plan": json.loads(plan_json),
"md61": json.loads(md61_json),
}
return (json.dumps(pretty, indent=2), f_df, kpis, prod, raw, res_tbl, prev, csv_path)
except Exception as e:
return (f"Agent error: {e}", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame(),
pd.DataFrame(), pd.DataFrame(), None)
run_all.click(do_agent, inputs=[a_horizon, a_plant, a_demo, a_file],
outputs=[out_json, a_forecast_tbl, a_plan_kpis, a_plan_prod, a_plan_raw, a_plan_res, a_md61_prev, a_md61_file])
# --------- Upgrades ---------
with gr.Tab("Upgrades"):
gr.Markdown("### 🧹 Data Quality Guard")
with gr.Row():
dq_use_demo = gr.Checkbox(label="Use demo synthetic history", value=True)
dq_file = gr.File(label="Or upload history.csv", file_types=[".csv"])
dq_z = gr.Number(label="Robust z-threshold", value=3.5)
run_dq = gr.Button("Run Data Quality Guard")
dq_tbl = gr.Dataframe(label="Issues (flags & suggestions)", interactive=False)
def do_dq(demo_flag, f, zval):
hist_path = "" if (f is None) else f.name
out = data_quality_guard_tool(use_demo=(f is None) and bool(demo_flag),
history_csv_path=hist_path, z=float(zval))
return parse_data_quality(out)
run_dq.click(do_dq, inputs=[dq_use_demo, dq_file, dq_z], outputs=[dq_tbl])
gr.Markdown("### πŸ”€ Scenario Explorer")
run_sc = gr.Button("Run Scenarios on current Forecast (Step 1)")
sc_tbl = gr.Dataframe(label="Scenario Outcomes", interactive=False)
def do_sc(fj):
if not fj:
return pd.DataFrame()
out = scenario_explorer_tool(fj)
return parse_scenarios(out)
run_sc.click(do_sc, inputs=[forecast_state], outputs=[sc_tbl])
gr.Markdown("### πŸ“ Plan Explainer")
run_ex = gr.Button("Explain Current Plan (Step 2)")
expl_md = gr.Markdown("")
def do_explain(pj):
if not pj:
return "⚠️ Run Step 2 first."
out = plan_explainer_tool(pj)
return json.loads(out)["summary"]
run_ex.click(do_explain, inputs=[plan_state], outputs=[expl_md])
gr.Markdown("### 🎯 Bottleneck Finder (Practical Levers)")
policy_box = gr.Textbox(
label="Optional policy JSON (overtime/expedite limits & costs). Leave blank for sensible defaults.",
lines=4, value=""
)
run_bn = gr.Button("Find Best Bottleneck (uses current Forecast)")
bn_tbl = gr.Dataframe(label="Ranked Recommendations", interactive=False)
def do_bn(fj, policy):
if not fj:
return pd.DataFrame()
out = bottleneck_search_tool(fj, policy_json=policy or "")
return parse_recs(out)
run_bn.click(do_bn, inputs=[forecast_state, policy_box], outputs=[bn_tbl])
if __name__ == "__main__":
# Needs OPENAI_API_KEY in env for agent tab; manual tabs work without it.
if not os.environ.get("OPENAI_API_KEY"):
print("⚠️ Set OPENAI_API_KEY (Space secret) to use Agentic tab.")
demo.launch()