# app.py import os, json, tempfile, logging import gradio as gr import pandas as pd import numpy as np # Quiet noisy logs logging.getLogger("cmdstanpy").setLevel(logging.WARNING) logging.getLogger("prophet").setLevel(logging.WARNING) # ==== Tools (your @tool template) ============================================ from smolagents import tool, CodeAgent, OpenAIServerModel # ---------- Helper (rounding) ---------- def _round_df(df: pd.DataFrame, places: int = 2) -> pd.DataFrame: if df is None or df.empty: return df out = df.copy() num_cols = out.select_dtypes(include=["number"]).columns out[num_cols] = out[num_cols].astype(float).round(places) return out # ---------- Tool 1: Forecast ---------- from smolagents import tool import json, pandas as pd, numpy as np @tool def forecast_tool(horizon_months: int = 1, use_demo: bool = True, history_csv_path: str = "", use_covariates: bool = False) -> str: """ Forecast monthly demand using a GLOBAL N-HiTS model (fast & accurate). Args: horizon_months (int): Number of future months to forecast (>=1). use_demo (bool): If True, generates synthetic history for FG100/FG200. history_csv_path (str): Optional CSV with columns [product_id,date,qty,(optional extra covariates...)]. use_covariates (bool): If True and extra numeric columns exist, use them as past covariates (for future effects you must provide future values too). Returns: str: JSON list of {"product_id","period_start","forecast_qty"} for the next horizon_months. """ # --- 1) Load data in long form --- if use_demo or not history_csv_path: rng = pd.date_range("2023-01-01", periods=24, freq="MS") rows = [] np.random.seed(0) for pid, base in [("FG100", 1800), ("FG200", 900)]: season = 1 + 0.15 * np.sin(2 * np.pi * (np.arange(len(rng)) / 12.0)) qty = (base * season).astype(float) for d, q in zip(rng, qty): rows.append({"product_id": pid, "date": d, "qty": float(q)}) df = pd.DataFrame(rows) else: df = pd.read_csv(history_csv_path) assert {"product_id","date","qty"} <= set(df.columns), "CSV must have product_id,date,qty" df["date"] = pd.to_datetime(df["date"], errors="coerce") df = df.dropna(subset=["date"]) df["qty"] = pd.to_numeric(df["qty"], errors="coerce").fillna(0.0) # Ensure proper monthly frequency per SKU df = df.copy() df["product_id"] = df["product_id"].astype(str) # --- 2) Build Darts series (GLOBAL model across SKUs) --- from darts import TimeSeries series_list = [] past_cov_list = [] # optional extra_cols = [c for c in df.columns if c not in ["product_id","date","qty"]] # keep only numeric covariates (categoricals must be pre-encoded) num_covs = [c for c in extra_cols if pd.api.types.is_numeric_dtype(df[c])] for pid, g in df.groupby("product_id"): g = (g.set_index("date") .sort_index() .resample("MS") .agg({**{"qty":"sum"}, **{c:"last" for c in num_covs}}) .fillna(method="ffill") .fillna(0.0)) y = TimeSeries.from_dataframe(g.reset_index(), time_col="date", value_cols="qty", freq="MS") series_list.append(y) if use_covariates and num_covs: pc = TimeSeries.from_dataframe(g.reset_index(), time_col="date", value_cols=num_covs, freq="MS") past_cov_list.append(pc) else: past_cov_list.append(None) # --- 3) Train N-HiTS (fast settings) --- from darts.models import NHiTSModel H = max(1, int(horizon_months)) # keep chunk length small for short histories; model is global input_chunk = max(6, min(12, min(len(s) for s in series_list) - 1)) if series_list else 12 model = NHiTSModel( input_chunk_length=input_chunk, output_chunk_length=min(H, 3), # can roll to reach H n_epochs=60, # keep fast; tune up if needed batch_size=32, random_state=0, dropout=0.0, ) if use_covariates and any(pc is not None for pc in past_cov_list): model.fit(series=series_list, past_covariates=past_cov_list, verbose=False) else: model.fit(series=series_list, verbose=False) # --- 4) Predict per SKU and return JSON --- out = [] for pid, s, pc in zip(df["product_id"].unique(), series_list, past_cov_list): if use_covariates and pc is not None: pred = model.predict(n=H, series=s, past_covariates=pc) else: pred = model.predict(n=H, series=s) for ts, val in zip(pred.time_index, pred.values().flatten()): out.append({ "product_id": str(pid), "period_start": pd.Timestamp(ts).strftime("%Y-%m-%d"), "forecast_qty": float(val) }) return json.dumps(out) # ---------- Tool 2: Optimize (LP) ---------- @tool def optimize_supply_tool(forecast_json: str) -> str: """ Optimize a single-month supply plan (LP) using the forecast. Args: forecast_json (str): JSON string returned by forecast_tool. Returns: str: JSON with summary + readable tables (not raw solver output). """ from scipy.optimize import linprog rows = json.loads(forecast_json) # Use first month per product demand = {} for r in rows: p = r["product_id"] if p not in demand: demand[p] = float(r["forecast_qty"]) P = sorted(demand.keys()) or ["FG100", "FG200"] price = {"FG100": 98.0, "FG200": 120.0} conv = {"FG100": 12.5, "FG200": 15.0} r1 = {"FG100": 0.03, "FG200": 0.05} r2 = {"FG100": 0.02, "FG200": 0.01} RMs = ["RM_A", "RM_B"] rm_cost = {"RM_A": 20.0, "RM_B": 30.0} rm_start = {"RM_A": 1000.0, "RM_B": 100.0} rm_cap = {"RM_A": 5000.0, "RM_B": 5000.0} bom = { "FG100": {"RM_A": 0.8, "RM_B": 0.2 * 1.02}, "FG200": {"RM_A": 1.0, "RM_B": 0.1}, } r1_cap, r2_cap = 320.0, 480.0 nP, nR = len(P), len(RMs) pidx = {p:i for i,p in enumerate(P)} ridx = {r:i for i,r in enumerate(RMs)} def i_prod(p): return pidx[p] def i_sell(p): return nP + pidx[p] def i_einv(p): return 2*nP + pidx[p] def i_pur(r): return 3*nP + ridx[r] def i_einr(r): return 3*nP + nR + ridx[r] n_vars = 3*nP + 2*nR c = np.zeros(n_vars) bounds = [None]*n_vars for p in P: c[i_prod(p)] += conv[p] c[i_sell(p)] -= price[p] c[i_einv(p)] += 0.0 bounds[i_prod(p)] = (0, None) bounds[i_sell(p)] = (0, demand[p]) # demand as upper bound (no backorders) bounds[i_einv(p)] = (0, None) for r in RMs: c[i_pur(r)] += rm_cost[r] c[i_einr(r)] += 0.0 bounds[i_pur(r)] = (0, rm_cap[r]) bounds[i_einr(r)] = (0, None) # Equalities Aeq, beq = [], [] for p in P: row = np.zeros(n_vars); row[i_prod(p)]=1; row[i_sell(p)]=-1; row[i_einv(p)]=-1 Aeq.append(row); beq.append(0.0) # start_inv=0 in this demo for r in RMs: row = np.zeros(n_vars); row[i_pur(r)]=1; row[i_einr(r)]=-1 for p in P: row[i_prod(p)] -= bom[p].get(r,0.0) Aeq.append(row); beq.append(-rm_start[r]) # Inequalities (resources) Aub, bub = [], [] row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r1[p]) for p in P]; Aub.append(row); bub.append(r1_cap) row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r2[p]) for p in P]; Aub.append(row); bub.append(r2_cap) from scipy.optimize import linprog res = linprog(c, A_ub=np.array(Aub), b_ub=np.array(bub), A_eq=np.array(Aeq), b_eq=np.array(beq), bounds=bounds, method="highs") if not res.success: return json.dumps({"status": "FAILED", "message": res.message}) x = res.x def v(idx): return float(x[idx]) # Compose human-friendly tables prod_tbl = [] revenue = 0.0; conv_cost = 0.0 for p in P: produce = v(i_prod(p)); sell = v(i_sell(p)) prod_tbl.append({"Product": p, "Produce": produce, "Sell": sell, "Unit Price": price[p], "Conv. Cost/u": conv[p]}) revenue += sell*price[p]; conv_cost += produce*conv[p] raw_tbl = [] rm_purch_cost = 0.0 for r in RMs: purchase = v(i_pur(r)) cons = float(sum(bom[p].get(r,0.0)*v(i_prod(p)) for p in P)) cost = purchase*rm_cost[r]; rm_purch_cost += cost raw_tbl.append({"Raw": r, "Purchase": purchase, "Consume": cons, "Cost/u": rm_cost[r], "Total Cost": cost}) r1_used = float(sum(r1[p]*v(i_prod(p)) for p in P)) r2_used = float(sum(r2[p]*v(i_prod(p)) for p in P)) res_tbl = [ {"Resource": "R1", "Used": r1_used, "Cap": r1_cap, "Slack": r1_cap - r1_used}, {"Resource": "R2", "Used": r2_used, "Cap": r2_cap, "Slack": r2_cap - r2_used}, ] profit = revenue - conv_cost - rm_purch_cost out = { "status": "OPTIMAL", "kpis": {"Profit": profit, "Revenue": revenue, "Conv. Cost": conv_cost, "RM Purchase Cost": rm_purch_cost}, "products": prod_tbl, "raw_materials": raw_tbl, "resources": res_tbl } return json.dumps(out) # ---------- Tool 3: MD61 (simulate) ---------- @tool def update_sap_md61_tool(forecast_json: str, plant: str = "PLANT01", uom: str = "EA", mrp_area: str = "") -> str: """ Prepare an MD61-style demand upload (SIMULATION ONLY). Args: forecast_json (str): JSON string returned by forecast_tool. plant (str): SAP plant (WERKS). Defaults to 'PLANT01'. uom (str): Unit of measure. Defaults to 'EA'. mrp_area (str): Optional MRP area. Returns: str: JSON with {"status":"SIMULATED","csv_path":"...","preview":[...]}. """ rows = json.loads(forecast_json) md61 = [{ "Material": r["product_id"], "Plant": plant, "MRP_Area": mrp_area, "Req_Date": r["period_start"], "Req_Qty": float(r["forecast_qty"]), "UoM": uom, "Version": "00" } for r in rows] df = pd.DataFrame(md61) tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") df.to_csv(tmp.name, index=False) return json.dumps({"status": "SIMULATED", "csv_path": tmp.name, "preview": md61[:5]}) # ---------- Upgrade A: Data Quality Guard ---------- @tool def data_quality_guard_tool(use_demo: bool = True, history_csv_path: str = "", z: float = 3.5) -> str: """ Scan monthly history for outliers and likely stockout zeros. Demo-friendly. Args: use_demo (bool): If True, use synthetic FG100/FG200 history. history_csv_path (str): Optional CSV path with columns [product_id,date,qty]. z (float): Robust z-threshold (MAD-based) for outlier flags. Default 3.5. Returns: str: JSON {"status":"OK","issues":[{product_id,period_start,qty,flag,suggested_action,suggested_value?}]} """ # Load history if use_demo or not history_csv_path: rng = pd.date_range("2023-01-01", periods=24, freq="MS") rows = [] np.random.seed(0) for pid, base in [("FG100", 1800), ("FG200", 900)]: season = 1 + 0.15 * np.sin(2 * np.pi * (np.arange(len(rng)) / 12.0)) qty = (base * season).astype(float) for d, q in zip(rng, qty): rows.append({"product_id": pid, "date": d, "qty": float(q)}) df = pd.DataFrame(rows) else: df = pd.read_csv(history_csv_path) assert {"product_id","date","qty"} <= set(df.columns), "CSV must have product_id,date,qty" df["date"] = pd.to_datetime(df["date"], errors="coerce") df = df.dropna(subset=["date"]) df["qty"] = pd.to_numeric(df["qty"], errors="coerce").fillna(0.0) issues = [] for pid, g in df.groupby("product_id"): s = g.set_index("date")["qty"].resample("MS").sum().asfreq("MS").fillna(0.0) if len(s) < 6: continue med = s.rolling(6, min_periods=3).median() mad = (s - med).abs().rolling(6, min_periods=3).median() robust_z = (0.6745 * (s - med) / mad.replace(0, np.nan)).abs() for ts, y in s.items(): flag = None; action = None; val = None rz = robust_z.get(ts, np.nan) # Outlier if not np.isnan(rz) and rz > z: flag = "OUTLIER" action = "cap_to_rolling_median" val = float(med.get(ts, np.nan)) if not np.isnan(med.get(ts, np.nan)) else None # Likely stockout zero in-between non-zeros prev_nonzero = s[:ts].iloc[:-1].ne(0).any() if len(s[:ts])>1 else False next_nonzero = s[ts:].iloc[1:].ne(0).any() if len(s[ts:])>1 else False if y == 0 and prev_nonzero and next_nonzero: flag = flag or "ZERO_BETWEEN_NONZERO" action = action or "impute_neighbor_avg" idx = s.index.get_loc(ts) neighbors = [] if idx>0: neighbors.append(s.iloc[idx-1]) if idx str: """ Run a small set of what-if scenarios and summarize results. Args: forecast_json (str): JSON string from forecast_tool (uses first month per SKU). Returns: str: JSON {"status":"OK","scenarios":[{name,profit,r1_used,r1_slack,r2_used,r2_slack}]} """ from scipy.optimize import linprog base_rows = json.loads(forecast_json) demand = {} for r in base_rows: p = r["product_id"] if p not in demand: demand[p] = float(r["forecast_qty"]) P = sorted(demand.keys()) or ["FG100","FG200"] # Base constants price = {"FG100": 98.0, "FG200": 120.0} conv = {"FG100": 12.5, "FG200": 15.0} r1 = {"FG100": 0.03, "FG200": 0.05} r2 = {"FG100": 0.02, "FG200": 0.01} RMs = ["RM_A","RM_B"] rm_cost = {"RM_A": 20.0, "RM_B": 30.0} rm_start = {"RM_A": 1000.0, "RM_B": 100.0} rm_cap = {"RM_A": 5000.0, "RM_B": 5000.0} bom = { "FG100": {"RM_A": 0.8, "RM_B": 0.2*1.02}, "FG200": {"RM_A": 1.0, "RM_B": 0.1}, } r1_cap0, r2_cap0 = 320.0, 480.0 scenarios = [ {"name":"Base", "mult":1.0, "r1_cap":r1_cap0, "r2_cap":r2_cap0, "rm_cost_B_add":0.0, "conv_fg100_mult":1.0, "rmA_start_mult":1.0}, {"name":"+10% Demand", "demand_mult":1.10, "r1_cap":r1_cap0, "r2_cap":r2_cap0, "rm_cost_B_add":0.0, "conv_fg100_mult":1.0, "rmA_start_mult":1.0}, {"name":"-10% R1 Cap", "mult":1.0, "r1_cap":0.9*r1_cap0, "r2_cap":r2_cap0, "rm_cost_B_add":0.0, "conv_fg100_mult":1.0, "rmA_start_mult":1.0}, {"name":"+₹5 RM_B", "mult":1.0, "r1_cap":r1_cap0, "r2_cap":r2_cap0, "rm_cost_B_add":5.0, "conv_fg100_mult":1.0, "rmA_start_mult":1.0}, {"name":"+10% FG100 ConvCost", "mult":1.0, "r1_cap":r1_cap0, "r2_cap":r2_cap0, "rm_cost_B_add":0.0, "conv_fg100_mult":1.10, "rmA_start_mult":1.0}, {"name":"-20% RM_A Start", "mult":1.0, "r1_cap":r1_cap0, "r2_cap":r2_cap0, "rm_cost_B_add":0.0, "conv_fg100_mult":1.0, "rmA_start_mult":0.80}, ] results = [] for sc in scenarios: dem_mult = sc.get("demand_mult", sc.get("mult",1.0)) d = {p: demand[p]*dem_mult for p in P} r1_cap = sc["r1_cap"]; r2_cap = sc["r2_cap"] rm_cost_local = rm_cost.copy(); rm_cost_local["RM_B"] += sc["rm_cost_B_add"] conv_local = conv.copy(); conv_local["FG100"] = conv_local["FG100"] * sc["conv_fg100_mult"] rm_start_local = rm_start.copy(); rm_start_local["RM_A"] = rm_start_local["RM_A"] * sc["rmA_start_mult"] # Build and solve LP nP, nR = len(P), len(RMs) pidx = {p:i for i,p in enumerate(P)}; ridx = {r:i for i,r in enumerate(RMs)} def i_prod(p): return pidx[p] def i_sell(p): return nP + pidx[p] def i_einv(p): return 2*nP + pidx[p] def i_pur(r): return 3*nP + ridx[r] def i_einr(r): return 3*nP + nR + ridx[r] n_vars = 3*nP + 2*nR c = np.zeros(n_vars); bounds = [None]*n_vars for p in P: c[i_prod(p)] += conv_local[p]; c[i_sell(p)] -= price[p]; c[i_einv(p)] += 0.0 bounds[i_prod(p)] = (0, None); bounds[i_sell(p)] = (0, d[p]); bounds[i_einv(p)] = (0, None) for r in RMs: c[i_pur(r)] += rm_cost_local[r]; c[i_einr(r)] += 0.0 bounds[i_pur(r)] = (0, rm_cap[r]); bounds[i_einr(r)] = (0, None) Aeq, beq = [], [] for p in P: row = np.zeros(n_vars); row[i_prod(p)]=1; row[i_sell(p)]=-1; row[i_einv(p)]=-1 Aeq.append(row); beq.append(0.0) for r in RMs: row = np.zeros(n_vars); row[i_pur(r)]=1; row[i_einr(r)]=-1 for p in P: row[i_prod(p)] -= bom[p].get(r,0.0) Aeq.append(row); beq.append(-rm_start_local[r]) Aub, bub = [], [] row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r1[p]) for p in P]; Aub.append(row); bub.append(r1_cap) row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r2[p]) for p in P]; Aub.append(row); bub.append(r2_cap) from scipy.optimize import linprog res = linprog(c, A_ub=np.array(Aub), b_ub=np.array(bub), A_eq=np.array(Aeq), b_eq=np.array(beq), bounds=bounds, method="highs") if not res.success: results.append({"name": sc["name"], "status":"FAILED", "profit": None}) continue x = res.x def v(idx): return float(x[idx]) r1_used = float(sum(r1[p]*v(i_prod(p)) for p in P)) r2_used = float(sum(r2[p]*v(i_prod(p)) for p in P)) revenue = float(sum(price[p]*v(i_sell(p)) for p in P)) conv_cost = float(sum(conv_local[p]*v(i_prod(p)) for p in P)) rm_purch_cost = float(sum(v(i_pur(r))*rm_cost_local[r] for r in RMs)) profit = revenue - conv_cost - rm_purch_cost results.append({ "name": sc["name"], "status":"OPTIMAL", "profit": profit, "r1_used": r1_used, "r1_slack": r1_cap - r1_used, "r2_used": r2_used, "r2_slack": r2_cap - r2_used }) return json.dumps({"status":"OK","scenarios":results}) # ---------- Upgrade C: Plan Explainer ---------- @tool def plan_explainer_tool(plan_json: str) -> str: """ Generate a plain-language explanation of the plan (demo assumptions). Args: plan_json (str): JSON from optimize_supply_tool. Returns: str: JSON {"status":"OK","summary": "... human-readable text ..."} """ d = json.loads(plan_json) k = d.get("kpis", {}) products = d.get("products", []) resources = d.get("resources", []) # Contribution by product contribs = [] for row in products: rev = row["Sell"] * row["Unit Price"] conv = row["Produce"] * row["Conv. Cost/u"] contribs.append((row["Product"], rev - conv)) contribs.sort(key=lambda x: x[1], reverse=True) top_prod = contribs[0][0] if contribs else "N/A" # Binding resource (min slack) bind_res = None if resources: r_sorted = sorted(resources, key=lambda r: r.get("Slack", 0.0)) bind_res = r_sorted[0]["Resource"] summary = ( f"Profit: ₹{k.get('Profit',0):,.2f} (Revenue ₹{k.get('Revenue',0):,.2f} − " f"Conv. Cost ₹{k.get('Conv. Cost',0):,.2f} − RM Cost ₹{k.get('RM Purchase Cost',0):,.2f}). " f"Top driver: {top_prod}. " f"Bottleneck check: {bind_res} has the least slack." ) return json.dumps({"status":"OK","summary": summary}) # ---------- NEW: Bottleneck Search (finite-difference + policy) ---------- @tool def bottleneck_search_tool(forecast_json: str, policy_json: str = "") -> str: """ Find the best practical lever to relax (within ~1 month) via small scenario probes. Supports resource overtime, RM expedite (cap), and demand_lift (promo ROI-checked). Args: forecast_json (str): JSON from forecast_tool (first month per SKU used). policy_json (str): Optional JSON with levers and costs. Returns: str: JSON {"status":"OK","diagnostics":{...},"recommendations":[...]} """ import json, numpy as np from scipy.optimize import linprog # --- Defaults (edit to your reality) --- policy = { "R1_overtime": {"type":"resource","target":"R1","step":10.0,"max_delta":80.0,"unit_cost":600.0}, "R2_overtime": {"type":"resource","target":"R2","step":10.0,"max_delta":40.0,"unit_cost":800.0}, "RM_A_expedite": {"type":"rm_expedite","target":"RM_A","step":200.0,"max_delta":1500.0,"unit_premium":8.0}, "RM_B_expedite": {"type":"rm_expedite","target":"RM_B","step":100.0,"max_delta":800.0,"unit_premium":12.0}, "Factory_expansion": {"type":"blocked","reason":"Not relaxable within 1 month"} # You can add demand levers via policy_json: # "promo_FG100": {"type":"demand_lift","target":"FG100","step":100,"max_delta":600,"unit_cost":10} } if policy_json: try: policy.update(json.loads(policy_json)) except Exception: pass # --- Problem primitives (same as your optimizer) --- rows = json.loads(forecast_json) demand = {} for r in rows: p = r["product_id"] if p not in demand: demand[p] = float(r["forecast_qty"]) P = sorted(demand.keys()) or ["FG100","FG200"] price = {"FG100":98.0,"FG200":120.0} conv = {"FG100":12.5,"FG200":15.0} r1 = {"FG100":0.03,"FG200":0.05} r2 = {"FG100":0.02,"FG200":0.01} RMs = ["RM_A","RM_B"] rm_cost = {"RM_A":20.0,"RM_B":30.0} rm_start = {"RM_A":1000.0,"RM_B":100.0} rm_cap = {"RM_A":5000.0,"RM_B":5000.0} bom = {"FG100":{"RM_A":0.8,"RM_B":0.204},"FG200":{"RM_A":1.0,"RM_B":0.1}} r1_cap0, r2_cap0 = 320.0, 480.0 # Helpful unit margin for demand_lift (approx variable cost) unit_var_cost = {p: conv[p] + sum(bom[p].get(rm,0.0)*rm_cost[rm] for rm in RMs) for p in P} unit_margin = {p: price[p] - unit_var_cost[p] for p in P} # --- LP builder (inner solve) --- def solve(dem, r1_cap, r2_cap, rm_cap_adj): nP, nR = len(P), len(RMs) pidx = {p:i for i,p in enumerate(P)}; ridx = {r:i for i,r in enumerate(RMs)} def i_prod(p): return pidx[p] def i_sell(p): return nP + pidx[p] def i_einv(p): return 2*nP + pidx[p] def i_pur(r): return 3*nP + ridx[r] def i_einr(r): return 3*nP + nR + ridx[r] n_vars = 3*nP + 2*nR c = np.zeros(n_vars); bounds=[None]*n_vars for p in P: c[i_prod(p)] += conv[p] c[i_sell(p)] -= price[p] bounds[i_prod(p)] = (0,None) bounds[i_sell(p)] = (0, dem[p]) bounds[i_einv(p)] = (0,None) for r in RMs: c[i_pur(r)] += rm_cost[r] bounds[i_pur(r)] = (0, rm_cap_adj[r]) bounds[i_einr(r)] = (0,None) Aeq, beq = [], [] for p in P: row = np.zeros(n_vars); row[i_prod(p)]=1; row[i_sell(p)]=-1; row[i_einv(p)]=-1 Aeq.append(row); beq.append(0.0) for r in RMs: row = np.zeros(n_vars); row[i_pur(r)]=1; row[i_einr(r)]=-1 for p in P: row[i_prod(p)] -= bom[p].get(r,0.0) Aeq.append(row); beq.append(-rm_start[r]) Aub, bub = [], [] row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r1[p]) for p in P]; Aub.append(row); bub.append(r1_cap) row = np.zeros(n_vars); [row.__setitem__(i_prod(p), r2[p]) for p in P]; Aub.append(row); bub.append(r2_cap) res = linprog(c, A_ub=np.array(Aub), b_ub=np.array(bub), A_eq=np.array(Aeq), b_eq=np.array(beq), bounds=bounds, method="highs") if not res.success: return None x = res.x def v(idx): return float(x[idx]) revenue = sum(price[p]*v(i_sell(p)) for p in P) conv_cost = sum(conv[p]*v(i_prod(p)) for p in P) rm_purch_cost = sum(v(i_pur(r))*rm_cost[r] for r in RMs) r1_used = sum(r1[p]*v(i_prod(p)) for p in P) r2_used = sum(r2[p]*v(i_prod(p)) for p in P) return { "profit": revenue - conv_cost - rm_purch_cost, "r1_used": r1_used, "r2_used": r2_used, } # Baseline base = solve(demand, r1_cap0, r2_cap0, rm_cap.copy()) if base is None: return json.dumps({"status":"FAILED","message":"Baseline infeasible."}) base_profit = float(base["profit"]) diag = { "demand_met": True, # sells hit demand ceiling in this model "r1_slack": round(r1_cap0 - base["r1_used"], 2), "r2_slack": round(r2_cap0 - base["r2_used"], 2), "rm_caps_hit": {rm: False for rm in RMs} } # If everything is slack, tell the user plainly all_slack = (diag["r1_slack"] > 1e-3 and diag["r2_slack"] > 1e-3) # --- Probe levers --- recs = [] for name, cfg in policy.items(): if cfg.get("type") == "blocked": recs.append({"lever":name,"recommended":0,"est_profit_lift":0,"est_net_gain":0, "rationale":cfg.get("reason","Not feasible in horizon")}) continue best_gain = 0.0; best_delta = 0.0; best_lift = 0.0 step = float(cfg.get("step",0)); maxd = float(cfg.get("max_delta",0)) d = 0.0 while d <= maxd + 1e-6: dem = demand.copy() r1_cap = r1_cap0; r2_cap = r2_cap0 rm_cap_adj = rm_cap.copy() relax_cost = 0.0 t = cfg["type"] if t == "resource" and cfg["target"] == "R1": r1_cap += d; relax_cost = d * float(cfg["unit_cost"]) elif t == "resource" and cfg["target"] == "R2": r2_cap += d; relax_cost = d * float(cfg["unit_cost"]) elif t == "rm_expedite": key = "RM_A" if cfg["target"]=="RM_A" else "RM_B" rm_cap_adj[key] = rm_cap[key] + d relax_cost = d * float(cfg["unit_premium"]) elif t == "demand_lift": sku = cfg["target"] if sku in dem: dem[sku] = dem[sku] + d relax_cost = d * float(cfg["unit_cost"]) else: d += step; continue else: d += step; continue res = solve(dem, r1_cap, r2_cap, rm_cap_adj) if res is None: d += step; continue lift = float(res["profit"] - base_profit) net = lift - relax_cost if net > best_gain: best_gain, best_delta, best_lift = net, d, lift d += step rationale = ("Beneficial at small Δ" if best_gain>0 else ("Capacity not binding; try demand/cost levers" if all_slack else "No positive net gain within feasible range")) recs.append({"lever":name,"recommended":round(best_delta,2), "est_profit_lift":round(best_lift,2), "est_net_gain":round(best_gain,2), "rationale":rationale}) recs.sort(key=lambda r: r["est_net_gain"], reverse=True) return json.dumps({"status":"OK","diagnostics":diag,"recommendations":recs}) # ==== Agent (end-to-end) ====================================================== def make_agent(): api_key = os.environ.get("OPENAI_API_KEY", "") if not api_key: raise RuntimeError("OPENAI_API_KEY not set. Add it as a Space secret.") model = OpenAIServerModel(model_id="gpt-4o-mini", api_key=api_key, temperature=0) return CodeAgent(tools=[ forecast_tool, optimize_supply_tool, update_sap_md61_tool, data_quality_guard_tool, scenario_explorer_tool, plan_explainer_tool, bottleneck_search_tool ], model=model, add_base_tools=False, stream_outputs=False) SYSTEM_PLAN = ( "Run the pipeline and return one JSON:\n" "1) forecast_tool(...)\n" "2) optimize_supply_tool(forecast_json)\n" "3) update_sap_md61_tool(forecast_json, ...)\n" "Return: {'forecast': , 'plan': , 'md61': }" ) def run_agentic(h, plant, demo_flag, file_obj): agent = make_agent() if file_obj is not None: path = file_obj.name prompt = (f"{SYSTEM_PLAN}\n" f"Use forecast_tool(horizon_months={int(h)}, use_demo=False, history_csv_path='{path}'). " f"Then run the other two steps as specified. Return only the final JSON.") else: prompt = (f"{SYSTEM_PLAN}\n" f"Use forecast_tool(horizon_months={int(h)}, use_demo={bool(demo_flag)}). " f"Then run the other two steps as specified. Return only the final JSON.") return agent.run(prompt) # ==== UI Helpers (pretty) ===================================================== def parse_forecast(json_str): df = pd.DataFrame(json.loads(json_str)) df = df[["product_id","period_start","forecast_qty"]].rename(columns={ "product_id":"Product","period_start":"Period Start","forecast_qty":"Forecast Qty" }) return _round_df(df) def parse_plan(json_str): d = json.loads(json_str) kpis = pd.DataFrame([d["kpis"]]).rename(columns={ "Conv. Cost":"Conversion Cost", "RM Purchase Cost":"RM Purchase Cost" }) prod = pd.DataFrame(d["products"]) raw = pd.DataFrame(d["raw_materials"]) res = pd.DataFrame(d["resources"]) return d["status"], _round_df(kpis), _round_df(prod), _round_df(raw), _round_df(res) def parse_md61(json_str): d = json.loads(json_str) prev = _round_df(pd.DataFrame(d.get("preview", []))) path = d.get("csv_path", "") return d.get("status",""), prev, path def parse_data_quality(json_str): d = json.loads(json_str) df = _round_df(pd.DataFrame(d.get("issues", []))) return df def parse_scenarios(json_str): d = json.loads(json_str) df = _round_df(pd.DataFrame(d.get("scenarios", []))) if not df.empty: cols = ["name","status","profit","r1_used","r1_slack","r2_used","r2_slack"] df = df[cols] df = df.rename(columns={"name":"Scenario","status":"Status","profit":"Profit", "r1_used":"R1 Used","r1_slack":"R1 Slack","r2_used":"R2 Used","r2_slack":"R2 Slack"}) return df def parse_recs(json_str): d = json.loads(json_str) df = _round_df(pd.DataFrame(d.get("recommendations", []))) if not df.empty: df = df.rename(columns={ "lever":"Lever","recommended":"Recommended Δ","est_profit_lift":"Est. Profit Lift", "est_net_gain":"Est. Net Gain","rationale":"Rationale" }) return df # ==== Gradio UI ============================================================== with gr.Blocks(title="Forecast → Optimize → SAP MD61") as demo: gr.Markdown("## 🧭 Workflow\n" "### 1) **Forecast** → 2) **Optimize Supply** → 3) **Prepare MD61**\n" "Run them **manually** below, or use the **agent** to do end-to-end in one click.") with gr.Tab("Manual (Step-by-step)"): with gr.Row(): horizon = gr.Number(label="Horizon (months)", value=1, precision=0) plant = gr.Textbox(label="SAP Plant (WERKS)", value="PLANT01") with gr.Row(): use_demo = gr.Checkbox(label="Use demo synthetic history", value=True) file = gr.File(label="Or upload history.csv (product_id,date,qty)", file_types=[".csv"]) # States forecast_state = gr.State("") plan_state = gr.State("") md61_state = gr.State("") gr.Markdown("### ➤ Step 1: Forecast") run_f = gr.Button("Run Step 1 — Forecast") forecast_tbl = gr.Dataframe(label="Forecast (first horizon month per SKU)", interactive=False) forecast_note = gr.Markdown("") gr.Markdown("### ➤ Step 2: Optimize Supply") run_o = gr.Button("Run Step 2 — Optimize") plan_status = gr.Markdown("") plan_kpis = gr.Dataframe(label="KPIs", interactive=False) plan_prod = gr.Dataframe(label="Products Plan", interactive=False) plan_raw = gr.Dataframe(label="Raw Materials", interactive=False) plan_res = gr.Dataframe(label="Resources", interactive=False) gr.Markdown("### ➤ Step 3: Prepare MD61 (Simulated)") run_m = gr.Button("Run Step 3 — MD61") md61_status = gr.Markdown("") md61_prev = gr.Dataframe(label="MD61 Preview", interactive=False) md61_file = gr.File(label="Download CSV", interactive=False) # Handlers def do_forecast(h, demo_flag, f): hist_path = "" if (f is None) else f.name fj = forecast_tool(horizon_months=int(h), use_demo=(f is None) and bool(demo_flag), history_csv_path=hist_path) df = parse_forecast(fj) return fj, df, f"Forecast generated for {df['Product'].nunique()} product(s)." run_f.click(do_forecast, inputs=[horizon, use_demo, file], outputs=[forecast_state, forecast_tbl, forecast_note]) def do_optimize(fj): if not fj: return "", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), "⚠️ Run Step 1 first." pj = optimize_supply_tool(fj) status, kpis, prod, raw, res = parse_plan(pj) return pj, kpis, prod, raw, res, f"Optimization status: **{status}**" run_o.click(do_optimize, inputs=[forecast_state], outputs=[plan_state, plan_kpis, plan_prod, plan_raw, plan_res, plan_status]) def do_md61(fj, plant): if not fj: return "", pd.DataFrame(), None, "⚠️ Run Step 1 first." mj = update_sap_md61_tool(fj, plant=plant, uom="EA") status, prev, path = parse_md61(mj) return mj, prev, path, f"MD61 status: **{status}**" run_m.click(do_md61, inputs=[forecast_state, plant], outputs=[md61_state, md61_prev, md61_file, md61_status]) with gr.Tab("Agentic (End-to-end)"): gr.Markdown("One click: the agent runs all three steps with OpenAI.") with gr.Row(): a_horizon = gr.Number(label="Horizon (months)", value=1, precision=0) a_plant = gr.Textbox(label="SAP Plant (WERKS)", value="PLANT01") with gr.Row(): a_demo = gr.Checkbox(label="Use demo synthetic history", value=True) a_file = gr.File(label="Or upload history.csv", file_types=[".csv"]) run_all = gr.Button("Run End-to-end (Agent)") out_json = gr.Textbox(label="Agent Raw JSON (for inspection)", lines=6) with gr.Accordion("Pretty Outputs", open=True): a_forecast_tbl = gr.Dataframe(label="Forecast", interactive=False) a_plan_kpis = gr.Dataframe(label="KPIs", interactive=False) a_plan_prod = gr.Dataframe(label="Products Plan", interactive=False) a_plan_raw = gr.Dataframe(label="Raw Materials", interactive=False) a_plan_res = gr.Dataframe(label="Resources", interactive=False) a_md61_prev = gr.Dataframe(label="MD61 Preview", interactive=False) a_md61_file = gr.File(label="Download MD61 CSV", interactive=False) def do_agent(h, p, demo_flag, f): def to_obj(x): return x if isinstance(x, (dict, list)) else json.loads(x) def to_str(x): return x if isinstance(x, str) else json.dumps(x) try: res = run_agentic(h, p, demo_flag, f) # may be dict or str out = to_obj(res) forecast_json = to_str(out["forecast"]) plan_json = to_str(out["plan"]) md61_json = to_str(out["md61"]) f_df = parse_forecast(forecast_json) _, kpis, prod, raw, res_tbl = parse_plan(plan_json) _, prev, csv_path = parse_md61(md61_json) pretty = { "forecast": json.loads(forecast_json), "plan": json.loads(plan_json), "md61": json.loads(md61_json), } return (json.dumps(pretty, indent=2), f_df, kpis, prod, raw, res_tbl, prev, csv_path) except Exception as e: return (f"Agent error: {e}", pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), None) run_all.click(do_agent, inputs=[a_horizon, a_plant, a_demo, a_file], outputs=[out_json, a_forecast_tbl, a_plan_kpis, a_plan_prod, a_plan_raw, a_plan_res, a_md61_prev, a_md61_file]) # --------- Upgrades --------- with gr.Tab("Upgrades"): gr.Markdown("### 🧹 Data Quality Guard") with gr.Row(): dq_use_demo = gr.Checkbox(label="Use demo synthetic history", value=True) dq_file = gr.File(label="Or upload history.csv", file_types=[".csv"]) dq_z = gr.Number(label="Robust z-threshold", value=3.5) run_dq = gr.Button("Run Data Quality Guard") dq_tbl = gr.Dataframe(label="Issues (flags & suggestions)", interactive=False) def do_dq(demo_flag, f, zval): hist_path = "" if (f is None) else f.name out = data_quality_guard_tool(use_demo=(f is None) and bool(demo_flag), history_csv_path=hist_path, z=float(zval)) return parse_data_quality(out) run_dq.click(do_dq, inputs=[dq_use_demo, dq_file, dq_z], outputs=[dq_tbl]) gr.Markdown("### 🔀 Scenario Explorer") run_sc = gr.Button("Run Scenarios on current Forecast (Step 1)") sc_tbl = gr.Dataframe(label="Scenario Outcomes", interactive=False) def do_sc(fj): if not fj: return pd.DataFrame() out = scenario_explorer_tool(fj) return parse_scenarios(out) run_sc.click(do_sc, inputs=[forecast_state], outputs=[sc_tbl]) gr.Markdown("### 📝 Plan Explainer") run_ex = gr.Button("Explain Current Plan (Step 2)") expl_md = gr.Markdown("") def do_explain(pj): if not pj: return "⚠️ Run Step 2 first." out = plan_explainer_tool(pj) return json.loads(out)["summary"] run_ex.click(do_explain, inputs=[plan_state], outputs=[expl_md]) gr.Markdown("### 🎯 Bottleneck Finder (Practical Levers)") policy_box = gr.Textbox( label="Optional policy JSON (overtime/expedite limits & costs). Leave blank for sensible defaults.", lines=4, value="" ) run_bn = gr.Button("Find Best Bottleneck (uses current Forecast)") bn_tbl = gr.Dataframe(label="Ranked Recommendations", interactive=False) def do_bn(fj, policy): if not fj: return pd.DataFrame() out = bottleneck_search_tool(fj, policy_json=policy or "") return parse_recs(out) run_bn.click(do_bn, inputs=[forecast_state, policy_box], outputs=[bn_tbl]) if __name__ == "__main__": # Needs OPENAI_API_KEY in env for agent tab; manual tabs work without it. if not os.environ.get("OPENAI_API_KEY"): print("⚠️ Set OPENAI_API_KEY (Space secret) to use Agentic tab.") demo.launch()