Spaces:
Running
Running
| import json | |
| import os | |
| import glob | |
| import pandas as pd | |
| from huggingface_hub import snapshot_download | |
| from src.display.utils import AutoEvalColumn | |
| from src.about import Tasks | |
| def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame: | |
| """Creates a dataframe from all the individual experiment results""" | |
| print(f"🚀 Starting Custom Leaderboard Logic...") | |
| print(f"📥 Attempting to download results from: {results_path}") | |
| # --------------------------------------------------------- | |
| # 🛠️ 步骤 1: 强制下载数据 (解决空榜单问题的关键) | |
| # --------------------------------------------------------- | |
| local_dir = "./eval-results-cache" # 指定一个本地缓存目录 | |
| try: | |
| # 下载数据集到本地 | |
| snapshot_download( | |
| repo_id=results_path, | |
| repo_type="dataset", | |
| local_dir=local_dir, | |
| local_dir_use_symlinks=False, | |
| token=os.environ.get("HF_TOKEN") # 确保能读取私有数据集 | |
| ) | |
| print(f"✅ Data successfully downloaded to: {local_dir}") | |
| except Exception as e: | |
| print(f"⚠️ Download warning (using path as is): {e}") | |
| # 如果下载失败(比如 results_path 已经是本地路径),就尝试直接用 | |
| local_dir = results_path | |
| # --------------------------------------------------------- | |
| # 🛠️ 步骤 2: 读取 JSON | |
| # --------------------------------------------------------- | |
| all_data_json = [] | |
| # 在下载目录里递归寻找所有 .json 文件 | |
| json_files = glob.glob(os.path.join(local_dir, "**/*.json"), recursive=True) | |
| print(f"📂 Found {len(json_files)} JSON files in {local_dir}") | |
| for file_path in json_files: | |
| # 过滤掉非结果文件 (比如一些元数据 json) | |
| if ".git" in file_path: continue | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| # 简单的校验:必须包含 'results' 或 'config' 或者是我们要的结构 | |
| if isinstance(data, dict) and (data.get("results") or data.get("config")): | |
| all_data_json.append(data) | |
| except Exception as e: | |
| print(f"❌ Error reading {file_path}: {e}") | |
| print(f"📦 Valid entries loaded: {len(all_data_json)}") | |
| if not all_data_json: | |
| print("❌ No valid data found! Returning empty DataFrame.") | |
| return pd.DataFrame() | |
| # --------------------------------------------------------- | |
| # 🛠️ 步骤 3: 解析数据 | |
| # --------------------------------------------------------- | |
| processed_data = [] | |
| for entry in all_data_json: | |
| results_root = entry.get("results", entry) | |
| config_root = entry.get("config", entry) | |
| flat_entry = {} | |
| # A. 模型名称 | |
| flat_entry["Model"] = config_root.get("model_name", entry.get("model", "Unknown Model")) | |
| # B. 提取分数 (Tasks) | |
| for task in Tasks: | |
| t = task.value | |
| score = None | |
| try: | |
| # 尝试 results -> Task -> Metric | |
| task_data = results_root.get(t.benchmark) | |
| if isinstance(task_data, dict): | |
| score = task_data.get(t.metric) | |
| else: | |
| # 尝试 results -> Task_Metric | |
| score = results_root.get(f"{t.benchmark}_{t.metric}") | |
| except Exception: | |
| pass | |
| flat_entry[t.col_name] = score | |
| # C. 补充元数据 | |
| flat_entry["T"] = "🟢" | |
| flat_entry["Type"] = config_root.get("model_dtype", "Pretrained") | |
| flat_entry["Architecture"] = "Unknown" | |
| flat_entry["Precision"] = "-" | |
| flat_entry["License"] = "Unknown" | |
| flat_entry["#Params (B)"] = 0 | |
| flat_entry["Hub ❤️"] = 0 | |
| flat_entry["Available on the hub"] = False | |
| flat_entry["Model sha"] = config_root.get("model_sha", "") | |
| processed_data.append(flat_entry) | |
| df = pd.DataFrame.from_records(processed_data) | |
| # --------------------------------------------------------- | |
| # 🛠️ 步骤 4: 计算与对齐 | |
| # --------------------------------------------------------- | |
| # 准备计算列 | |
| numeric_cols = [] | |
| for c in benchmark_cols: | |
| col_name = c.name if hasattr(c, "name") else str(c) | |
| numeric_cols.append(col_name) | |
| # 转数字 | |
| for col in numeric_cols: | |
| if col in df.columns: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| else: | |
| df[col] = float('nan') | |
| # 确定 Average 列名 | |
| avg_col_name = getattr(AutoEvalColumn, 'average', None) | |
| if avg_col_name and hasattr(avg_col_name, 'name'): | |
| avg_col_name = avg_col_name.name | |
| else: | |
| avg_col_name = "Average ⬆️" | |
| # 计算平均分 | |
| df[avg_col_name] = df[numeric_cols].mean(axis=1, skipna=True).round(2) | |
| # 排序 | |
| df = df.sort_values(by=[avg_col_name], ascending=False) | |
| # 最终列名对齐 (补全缺少的列) | |
| target_cols = [] | |
| for c in cols: | |
| c_name = c.name if hasattr(c, "name") else str(c) | |
| target_cols.append(c_name) | |
| if c_name not in df.columns: | |
| if "Average" in c_name and avg_col_name in df.columns: | |
| df[c_name] = df[avg_col_name] | |
| else: | |
| df[c_name] = "" | |
| df = df[target_cols] | |
| print(f"✅ Final DataFrame shape: {df.shape}") | |
| return df | |
| def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: | |
| """Creates the different dataframes for the evaluation queues requestes""" | |
| entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")] | |
| all_evals = [] | |
| for entry in entries: | |
| if ".json" in entry: | |
| file_path = os.path.join(save_path, entry) | |
| with open(file_path) as fp: | |
| data = json.load(fp) | |
| data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) | |
| data[EvalQueueColumn.revision.name] = data.get("revision", "main") | |
| all_evals.append(data) | |
| elif ".md" not in entry: | |
| # this is a folder | |
| sub_entries = [e for e in os.listdir(f"{save_path}/{entry}") if os.path.isfile(e) and not e.startswith(".")] | |
| for sub_entry in sub_entries: | |
| file_path = os.path.join(save_path, entry, sub_entry) | |
| with open(file_path) as fp: | |
| data = json.load(fp) | |
| data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) | |
| data[EvalQueueColumn.revision.name] = data.get("revision", "main") | |
| all_evals.append(data) | |
| pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] | |
| running_list = [e for e in all_evals if e["status"] == "RUNNING"] | |
| finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] | |
| df_pending = pd.DataFrame.from_records(pending_list, columns=cols) | |
| df_running = pd.DataFrame.from_records(running_list, columns=cols) | |
| df_finished = pd.DataFrame.from_records(finished_list, columns=cols) | |
| return df_finished[cols], df_running[cols], df_pending[cols] | |