import json import os import glob import pandas as pd from huggingface_hub import snapshot_download from src.display.utils import AutoEvalColumn from src.about import Tasks def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame: """Creates a dataframe from all the individual experiment results""" print(f"🚀 Starting Custom Leaderboard Logic...") print(f"📥 Attempting to download results from: {results_path}") # --------------------------------------------------------- # 🛠️ 步骤 1: 强制下载数据 (解决空榜单问题的关键) # --------------------------------------------------------- local_dir = "./eval-results-cache" # 指定一个本地缓存目录 try: # 下载数据集到本地 snapshot_download( repo_id=results_path, repo_type="dataset", local_dir=local_dir, local_dir_use_symlinks=False, token=os.environ.get("HF_TOKEN") # 确保能读取私有数据集 ) print(f"✅ Data successfully downloaded to: {local_dir}") except Exception as e: print(f"⚠️ Download warning (using path as is): {e}") # 如果下载失败(比如 results_path 已经是本地路径),就尝试直接用 local_dir = results_path # --------------------------------------------------------- # 🛠️ 步骤 2: 读取 JSON # --------------------------------------------------------- all_data_json = [] # 在下载目录里递归寻找所有 .json 文件 json_files = glob.glob(os.path.join(local_dir, "**/*.json"), recursive=True) print(f"📂 Found {len(json_files)} JSON files in {local_dir}") for file_path in json_files: # 过滤掉非结果文件 (比如一些元数据 json) if ".git" in file_path: continue try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # 简单的校验:必须包含 'results' 或 'config' 或者是我们要的结构 if isinstance(data, dict) and (data.get("results") or data.get("config")): all_data_json.append(data) except Exception as e: print(f"❌ Error reading {file_path}: {e}") print(f"📦 Valid entries loaded: {len(all_data_json)}") if not all_data_json: print("❌ No valid data found! Returning empty DataFrame.") return pd.DataFrame() # --------------------------------------------------------- # 🛠️ 步骤 3: 解析数据 # --------------------------------------------------------- processed_data = [] for entry in all_data_json: results_root = entry.get("results", entry) config_root = entry.get("config", entry) flat_entry = {} # A. 模型名称 flat_entry["Model"] = config_root.get("model_name", entry.get("model", "Unknown Model")) # B. 提取分数 (Tasks) for task in Tasks: t = task.value score = None try: # 尝试 results -> Task -> Metric task_data = results_root.get(t.benchmark) if isinstance(task_data, dict): score = task_data.get(t.metric) else: # 尝试 results -> Task_Metric score = results_root.get(f"{t.benchmark}_{t.metric}") except Exception: pass flat_entry[t.col_name] = score # C. 补充元数据 flat_entry["T"] = "🟢" flat_entry["Type"] = config_root.get("model_dtype", "Pretrained") flat_entry["Architecture"] = "Unknown" flat_entry["Precision"] = "-" flat_entry["License"] = "Unknown" flat_entry["#Params (B)"] = 0 flat_entry["Hub ❤️"] = 0 flat_entry["Available on the hub"] = False flat_entry["Model sha"] = config_root.get("model_sha", "") processed_data.append(flat_entry) df = pd.DataFrame.from_records(processed_data) # --------------------------------------------------------- # 🛠️ 步骤 4: 计算与对齐 # --------------------------------------------------------- # 准备计算列 numeric_cols = [] for c in benchmark_cols: col_name = c.name if hasattr(c, "name") else str(c) numeric_cols.append(col_name) # 转数字 for col in numeric_cols: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') else: df[col] = float('nan') # 确定 Average 列名 avg_col_name = getattr(AutoEvalColumn, 'average', None) if avg_col_name and hasattr(avg_col_name, 'name'): avg_col_name = avg_col_name.name else: avg_col_name = "Average ⬆️" # 计算平均分 df[avg_col_name] = df[numeric_cols].mean(axis=1, skipna=True).round(2) # 排序 df = df.sort_values(by=[avg_col_name], ascending=False) # 最终列名对齐 (补全缺少的列) target_cols = [] for c in cols: c_name = c.name if hasattr(c, "name") else str(c) target_cols.append(c_name) if c_name not in df.columns: if "Average" in c_name and avg_col_name in df.columns: df[c_name] = df[avg_col_name] else: df[c_name] = "" df = df[target_cols] print(f"✅ Final DataFrame shape: {df.shape}") return df def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: """Creates the different dataframes for the evaluation queues requestes""" entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")] all_evals = [] for entry in entries: if ".json" in entry: file_path = os.path.join(save_path, entry) with open(file_path) as fp: data = json.load(fp) data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) data[EvalQueueColumn.revision.name] = data.get("revision", "main") all_evals.append(data) elif ".md" not in entry: # this is a folder sub_entries = [e for e in os.listdir(f"{save_path}/{entry}") if os.path.isfile(e) and not e.startswith(".")] for sub_entry in sub_entries: file_path = os.path.join(save_path, entry, sub_entry) with open(file_path) as fp: data = json.load(fp) data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) data[EvalQueueColumn.revision.name] = data.get("revision", "main") all_evals.append(data) pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] running_list = [e for e in all_evals if e["status"] == "RUNNING"] finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] df_pending = pd.DataFrame.from_records(pending_list, columns=cols) df_running = pd.DataFrame.from_records(running_list, columns=cols) df_finished = pd.DataFrame.from_records(finished_list, columns=cols) return df_finished[cols], df_running[cols], df_pending[cols]