UniVA-Leaderboard / src /populate.py
Rui1121's picture
Update src/populate.py
cf77e87 verified
import json
import os
import glob
import pandas as pd
from huggingface_hub import snapshot_download
from src.display.utils import AutoEvalColumn
from src.about import Tasks
def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame:
"""Creates a dataframe from all the individual experiment results"""
print(f"🚀 Starting Custom Leaderboard Logic...")
print(f"📥 Attempting to download results from: {results_path}")
# ---------------------------------------------------------
# 🛠️ 步骤 1: 强制下载数据 (解决空榜单问题的关键)
# ---------------------------------------------------------
local_dir = "./eval-results-cache" # 指定一个本地缓存目录
try:
# 下载数据集到本地
snapshot_download(
repo_id=results_path,
repo_type="dataset",
local_dir=local_dir,
local_dir_use_symlinks=False,
token=os.environ.get("HF_TOKEN") # 确保能读取私有数据集
)
print(f"✅ Data successfully downloaded to: {local_dir}")
except Exception as e:
print(f"⚠️ Download warning (using path as is): {e}")
# 如果下载失败(比如 results_path 已经是本地路径),就尝试直接用
local_dir = results_path
# ---------------------------------------------------------
# 🛠️ 步骤 2: 读取 JSON
# ---------------------------------------------------------
all_data_json = []
# 在下载目录里递归寻找所有 .json 文件
json_files = glob.glob(os.path.join(local_dir, "**/*.json"), recursive=True)
print(f"📂 Found {len(json_files)} JSON files in {local_dir}")
for file_path in json_files:
# 过滤掉非结果文件 (比如一些元数据 json)
if ".git" in file_path: continue
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 简单的校验:必须包含 'results' 或 'config' 或者是我们要的结构
if isinstance(data, dict) and (data.get("results") or data.get("config")):
all_data_json.append(data)
except Exception as e:
print(f"❌ Error reading {file_path}: {e}")
print(f"📦 Valid entries loaded: {len(all_data_json)}")
if not all_data_json:
print("❌ No valid data found! Returning empty DataFrame.")
return pd.DataFrame()
# ---------------------------------------------------------
# 🛠️ 步骤 3: 解析数据
# ---------------------------------------------------------
processed_data = []
for entry in all_data_json:
results_root = entry.get("results", entry)
config_root = entry.get("config", entry)
flat_entry = {}
# A. 模型名称
flat_entry["Model"] = config_root.get("model_name", entry.get("model", "Unknown Model"))
# B. 提取分数 (Tasks)
for task in Tasks:
t = task.value
score = None
try:
# 尝试 results -> Task -> Metric
task_data = results_root.get(t.benchmark)
if isinstance(task_data, dict):
score = task_data.get(t.metric)
else:
# 尝试 results -> Task_Metric
score = results_root.get(f"{t.benchmark}_{t.metric}")
except Exception:
pass
flat_entry[t.col_name] = score
# C. 补充元数据
flat_entry["T"] = "🟢"
flat_entry["Type"] = config_root.get("model_dtype", "Pretrained")
flat_entry["Architecture"] = "Unknown"
flat_entry["Precision"] = "-"
flat_entry["License"] = "Unknown"
flat_entry["#Params (B)"] = 0
flat_entry["Hub ❤️"] = 0
flat_entry["Available on the hub"] = False
flat_entry["Model sha"] = config_root.get("model_sha", "")
processed_data.append(flat_entry)
df = pd.DataFrame.from_records(processed_data)
# ---------------------------------------------------------
# 🛠️ 步骤 4: 计算与对齐
# ---------------------------------------------------------
# 准备计算列
numeric_cols = []
for c in benchmark_cols:
col_name = c.name if hasattr(c, "name") else str(c)
numeric_cols.append(col_name)
# 转数字
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
else:
df[col] = float('nan')
# 确定 Average 列名
avg_col_name = getattr(AutoEvalColumn, 'average', None)
if avg_col_name and hasattr(avg_col_name, 'name'):
avg_col_name = avg_col_name.name
else:
avg_col_name = "Average ⬆️"
# 计算平均分
df[avg_col_name] = df[numeric_cols].mean(axis=1, skipna=True).round(2)
# 排序
df = df.sort_values(by=[avg_col_name], ascending=False)
# 最终列名对齐 (补全缺少的列)
target_cols = []
for c in cols:
c_name = c.name if hasattr(c, "name") else str(c)
target_cols.append(c_name)
if c_name not in df.columns:
if "Average" in c_name and avg_col_name in df.columns:
df[c_name] = df[avg_col_name]
else:
df[c_name] = ""
df = df[target_cols]
print(f"✅ Final DataFrame shape: {df.shape}")
return df
def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]:
"""Creates the different dataframes for the evaluation queues requestes"""
entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")]
all_evals = []
for entry in entries:
if ".json" in entry:
file_path = os.path.join(save_path, entry)
with open(file_path) as fp:
data = json.load(fp)
data[EvalQueueColumn.model.name] = make_clickable_model(data["model"])
data[EvalQueueColumn.revision.name] = data.get("revision", "main")
all_evals.append(data)
elif ".md" not in entry:
# this is a folder
sub_entries = [e for e in os.listdir(f"{save_path}/{entry}") if os.path.isfile(e) and not e.startswith(".")]
for sub_entry in sub_entries:
file_path = os.path.join(save_path, entry, sub_entry)
with open(file_path) as fp:
data = json.load(fp)
data[EvalQueueColumn.model.name] = make_clickable_model(data["model"])
data[EvalQueueColumn.revision.name] = data.get("revision", "main")
all_evals.append(data)
pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]]
running_list = [e for e in all_evals if e["status"] == "RUNNING"]
finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"]
df_pending = pd.DataFrame.from_records(pending_list, columns=cols)
df_running = pd.DataFrame.from_records(running_list, columns=cols)
df_finished = pd.DataFrame.from_records(finished_list, columns=cols)
return df_finished[cols], df_running[cols], df_pending[cols]