VOIDER's picture
🛡️ Security & Stability Update
a597782 verified
import gradio as gr
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import tempfile
import os
import re
import math
import threading
import atexit
import logging
from data_loader import DataLoader
from scoring import ScoringEngine, PRESET_CONFIGS
from dev_tools import DevSuite
from config import *
from about import get_about_markdown
# === SETUP ===
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Инициализация загрузчика данных
loader = DataLoader()
# Thread-safe globals
_cache_lock = threading.Lock()
_CACHED_DF = None
_temp_files = []
# === CLEANUP ===
@atexit.register
def cleanup_temp_files():
"""Removes temporary CSV files on exit."""
for f in _temp_files:
if os.path.exists(f):
try:
os.remove(f)
except OSError:
pass
# === DATA ACCESS ===
def get_dataframe():
"""Thread-safe lazy loader for dataframe."""
global _CACHED_DF
with _cache_lock:
if _CACHED_DF is None:
if FORCE_REFRESH_ON_STARTUP:
logger.info("First load: Clearing cache...")
loader.clear_cache()
df = loader.load_data()
if not df.empty:
_CACHED_DF = ScoringEngine(df).calculate_all()
else:
_CACHED_DF = df
return _CACHED_DF
def invalidate_cache():
"""Unified cache invalidation logic."""
global _CACHED_DF
with _cache_lock:
_CACHED_DF = None
deleted = loader.clear_cache()
return deleted
# === FORMATTING & UI HELPERS ===
def get_header_content(df):
count = len(df) if df is not None else 0
current_time = loader.last_updated
return f"""
# 🏆 UGI Leaderboard: Presets Edition v3.7
**Last Updated:** {current_time} | **Models:** {count} | **PID:** {os.getpid()}
"""
def format_params(row):
total = row.get('Total Parameters', np.nan)
active = row.get('Active Parameters', np.nan)
if pd.isna(total) or total <= 0:
return "N/A"
def fmt(x):
try:
val = float(x)
if val <= 0: return "?"
if val < 1: return f"{val*1000:.0f}M"
return f"{val:.1f}B"
except (ValueError, TypeError):
return "?"
formatted_total = fmt(total)
# Show active params if MoE
if pd.notna(active) and active > 0 and active < total:
return f"{formatted_total} (Act: {fmt(active)})"
return formatted_total
def escape_markdown(text):
return re.sub(r'([\[\]()\*_#~`])', r'\\\1', str(text))
def format_model_link(row):
name = str(row.get('author/model_name', 'Unknown'))
link = row.get('Model Link', '')
safe_name = escape_markdown(name)
if pd.notna(link) and isinstance(link, str) and link.startswith('http'):
return f"[{safe_name}]({link})"
return safe_name
def get_architecture_choices(df):
if df is None or df.empty:
return []
valid_archs = [a for a in df['Architecture'].dropna().unique()
if str(a).lower() not in ['unknown', 'nan', 'null', 'none']]
return sorted(valid_archs)
# === FILTERING LOGIC (Refactored) ===
def apply_search_filter(df, query):
if not query:
return df
return df[
df['author/model_name'].astype(str).str.contains(query, case=False, na=False) |
df['Architecture'].astype(str).str.contains(query, case=False, na=False)
]
def apply_param_filter(df, param_min, param_max, proprietary):
has_params = df['Total Parameters'].notna() & (df['Total Parameters'] > 0)
p_min = float(param_min) if param_min is not None else 0.0
p_max = float(param_max) if param_max is not None else 99999.0
in_range = (df['Total Parameters'] >= p_min) & (df['Total Parameters'] <= p_max)
if proprietary:
# Include if in range OR if proprietary (no params known)
return df[(has_params & in_range) | (~has_params)]
else:
# Strict range check
return df[has_params & in_range]
def apply_date_filter(df, date_preset, date_start, date_end):
if date_preset == "All Time" or 'Release Date' not in df.columns:
return df
temp_dates = pd.to_datetime(df['Release Date'], errors='coerce')
now = pd.Timestamp.now()
mask = pd.Series(True, index=df.index)
if date_preset == "Last Week":
mask = temp_dates >= (now - pd.Timedelta(days=7))
elif date_preset == "Last Month":
mask = temp_dates >= (now - pd.Timedelta(days=30))
elif date_preset == "Last Year":
mask = temp_dates >= (now - pd.Timedelta(days=365))
elif date_preset == "Custom Range":
if date_start:
s = pd.to_datetime(date_start, errors='coerce')
if pd.notna(s): mask &= (temp_dates >= s)
if date_end:
e = pd.to_datetime(date_end, errors='coerce')
if pd.notna(e): mask &= (temp_dates <= e)
return df[mask]
def filter_leaderboard_pipeline(df, preset, query, param_min, param_max, proprietary,
moe_only, thinking_mode, model_types, architecture, top_n,
balance_filter, date_preset, date_start, date_end):
if df is None or df.empty:
return pd.DataFrame(), pd.DataFrame()
# 1. Search
df = apply_search_filter(df, query)
# 2. Parameters
df = apply_param_filter(df, param_min, param_max, proprietary)
# 3. MoE
if moe_only:
df = df[df['Active Parameters'] < df['Total Parameters']]
# 4. Thinking
if thinking_mode == "Hide Thinking":
df = df[~df['Is Thinking Model']]
elif thinking_mode == "Only Thinking":
df = df[df['Is Thinking Model']]
# 5. Types
type_mask = pd.Series(False, index=df.index)
for m_type, col in [("Foundation", "Is Foundation"), ("Finetuned", "Is Finetuned"), ("Merged", "Is Merged")]:
if m_type in model_types and col in df.columns:
type_mask |= df[col]
if type_mask.any():
df = df[type_mask]
# 6. Architecture
if architecture and architecture != "All":
df = df[df['Architecture'] == architecture]
# 7. Balance
if balance_filter != "Show All":
threshold = 0.7 if "Perfect" in balance_filter else (0.5 if "Good" in balance_filter else 0.3)
target_col = "Score_💎 Perfect Balance"
if target_col in df.columns:
df = df[df[target_col] >= threshold]
# 8. Date
df = apply_date_filter(df, date_preset, date_start, date_end)
# 9. Sorting & Top N
score_col = f"Score_{preset}"
if score_col not in df.columns:
return pd.DataFrame(), pd.DataFrame()
df = df.sort_values(score_col, ascending=False).head(top_n).copy()
if df.empty:
return pd.DataFrame(), pd.DataFrame()
# 10. Formatting for Display
display_df = df.copy()
display_df['Rank'] = range(1, len(display_df) + 1)
display_df['Model Name'] = display_df.apply(format_model_link, axis=1)
display_df['Parameters'] = display_df.apply(format_params, axis=1)
display_df['Architecture'] = display_df['Architecture'].astype(str)
display_df['Date'] = pd.to_datetime(display_df['Release Date'], errors='coerce').dt.strftime('%Y-%m-%d').fillna('-')
display_df = display_df.rename(columns={score_col: "⭐ Score"})
display_cols = ['Rank', 'Model Name', "⭐ Score", 'Date', 'Badges', 'Parameters', 'Architecture']
return display_df[display_cols], df
# === COMPARISON & UTILS ===
def search_models(df, query, limit=10):
"""Efficient search for model selection."""
if not query or df is None:
return gr.update(choices=[])
mask = df['author/model_name'].astype(str).str.contains(query, case=False, na=False)
matches = df.loc[mask, 'author/model_name'].head(limit).tolist()
return gr.update(choices=matches)
def compare_models(df, model_names_text):
if df is None or not model_names_text:
return None, pd.DataFrame()
targets = [x.strip() for x in model_names_text.split('\n') if x.strip()]
subset = df[df['author/model_name'].isin(targets)].copy()
if subset.empty:
return None, pd.DataFrame()
# Radar Chart
fig = go.Figure()
# Use metrics from config
labels = list(COMPARE_METRICS.keys())
cols = list(COMPARE_METRICS.values())
for _, row in subset.iterrows():
values = []
for col in cols:
val = float(row.get(col, 0))
if pd.isna(val): val = 0
values.append(val)
# Close the loop
values.append(values[0])
plot_labels = labels + [labels[0]]
fig.add_trace(go.Scatterpolar(
r=values, theta=plot_labels,
fill='toself',
name=row['author/model_name'][:30]
))
fig.update_layout(polar=dict(radialaxis=dict(visible=True, range=[0, 1])), showlegend=True, height=500)
# Comparison Table
compare_cols = ['author/model_name', 'Total Parameters', 'Score_🌌 Divine RP'] + cols
# Rename for display
rename_map = {
'author/model_name': 'Model',
'Total Parameters': 'Params',
'Score_🌌 Divine RP': 'Divine RP'
}
# Add metric renames
for k, v in COMPARE_METRICS.items():
rename_map[v] = k
compare_df = subset[compare_cols].rename(columns=rename_map)
return fig, compare_df
def calculate_custom_score(df, weights_dict):
if df is None or df.empty:
return pd.DataFrame()
# Using lightweight engine to avoid full re-calc
temp_engine = ScoringEngine(df)
# Calculate score on the original dataframe (the engine copies it internally)
scores = temp_engine.calculate_weighted_score(weights_dict).round(3)
# Create result view
result = df.copy()
result['Custom_Score'] = scores
result = result.sort_values('Custom_Score', ascending=False).head(50)
display = result[['author/model_name', 'Custom_Score', 'Total Parameters', 'Badges']].copy()
display = display.rename(columns={'author/model_name': 'Model', 'Custom_Score': '⭐ Score', 'Total Parameters': 'Params'})
return display
def run_diagnostics(df):
if df is None or df.empty:
return "❌ No data loaded", pd.DataFrame(), pd.DataFrame()
dev = DevSuite(df)
return dev.run_all_tests(), dev.get_anomalies_df(), dev.get_statistics_df()
def clear_and_reload_ui():
deleted = invalidate_cache()
new_df = get_dataframe()
status = f"✅ Cache cleared!\nDeleted: {', '.join(deleted) if deleted else 'None'}\n🔄 Data reloaded: {len(new_df)} rows"
return new_df, status
def export_handler(df):
if df is None or df.empty:
return gr.update(value=None, visible=False)
try:
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.csv', mode='w', encoding='utf-8')
_temp_files.append(temp_file.name) # Track for cleanup
df.to_csv(temp_file.name, index=False)
return gr.update(value=temp_file.name, visible=True)
except Exception as e:
logger.error(f"Export failed: {e}")
return gr.update(value=None, visible=False)
# === UI BUILDER ===
with gr.Blocks() as demo:
# Initial Load
initial_df = get_dataframe()
df_state = gr.State(initial_df)
filtered_raw_state = gr.State()
# Dynamic Params for Sliders
actual_max_params = 100
if initial_df is not None and not initial_df.empty:
m = initial_df['Total Parameters'].max()
if pd.notna(m):
actual_max_params = math.ceil(m)
max_params_state = gr.State(actual_max_params)
# Header
header_md = gr.Markdown(get_header_content(initial_df))
with gr.Tabs():
with gr.Tab("🏅 Leaderboard"):
# Controls
with gr.Row(variant="panel", equal_height=True):
with gr.Column(scale=5):
preset_dropdown = gr.Radio(
choices=list(PRESET_CONFIGS.keys()) + ["⚡ Efficiency King"],
value="🌌 Divine RP",
label="🎯 Preset",
interactive=True
)
with gr.Column(scale=1, min_width=150):
refresh_btn = gr.Button("🔄 Refresh Data", variant="secondary", size="lg")
# Filters
with gr.Accordion("⚙️ Hardware & Filters", open=False):
with gr.Row():
param_min = gr.Slider(0, actual_max_params, 0, step=1, label="Min Parameters (B)")
param_max = gr.Slider(0, actual_max_params, actual_max_params, step=1, label="Max Parameters (B)")
with gr.Row():
date_preset = gr.Radio(
choices=["All Time", "Last Week", "Last Month", "Last Year", "Custom Range"],
value="All Time",
label="📅 Release Date Filter"
)
with gr.Row(visible=False) as custom_date_row:
date_start = gr.Textbox(placeholder="YYYY-MM-DD", label="From")
date_end = gr.Textbox(placeholder="YYYY-MM-DD", label="To")
with gr.Row():
proprietary_check = gr.Checkbox(value=True, label="Include Proprietary (unknown params)")
moe_check = gr.Checkbox(value=False, label="MoE Only")
thinking_mode = gr.Radio(["Show All", "Hide Thinking", "Only Thinking"], value="Show All", label="Reasoning Models")
with gr.Row():
model_types = gr.CheckboxGroup(["Foundation", "Finetuned", "Merged"], value=["Foundation", "Finetuned", "Merged"], label="Model Types")
arch_dropdown = gr.Dropdown(["All"] + get_architecture_choices(initial_df), value="All", label="Architecture")
top_n_slider = gr.Slider(10, 500, DEFAULT_TOP_N, step=10, label="Top N")
with gr.Row():
balance_filter = gr.Radio(
choices=["Show All", "💎 Perfect (≥0.7)", "🏅 Good (≥0.5)", "⚖️ Basic (≥0.3)"],
value="Show All",
label="🛡️ Robustness Filter",
info="Filters out models with weak spots."
)
search_box = gr.Textbox(label="🔍 Search Models", placeholder="e.g., Llama, Qwen...")
leaderboard_table = gr.Dataframe(
datatype=["number", "markdown", "number", "str", "str", "str", "str"],
wrap=True, interactive=False
)
with gr.Row():
with gr.Column(scale=1):
export_btn = gr.Button("📥 Export CSV", variant="primary", size="sm")
with gr.Column(scale=4):
export_file = gr.File(label="Download CSV", visible=False, height=50)
with gr.Tab("⚖️ Compare"):
gr.Markdown("### Compare Multiple Models")
with gr.Row():
with gr.Column(scale=2):
search_compare = gr.Textbox(label="🔍 Search to Add Models", placeholder="Type model name...")
search_results_radio = gr.Radio(choices=[], label="Select from results", interactive=True)
add_model_btn = gr.Button("➕ Add Model", variant="secondary")
with gr.Column(scale=3):
compare_textbox = gr.Textbox(label="📋 Comparing (one per line)", lines=8, placeholder="Add models using search...")
compare_btn = gr.Button("📊 Generate Comparison", variant="primary")
with gr.Row():
radar_plot = gr.Plot(label="📈 Radar Chart")
compare_table = gr.Dataframe(label="📊 Comparison Table")
with gr.Tab("🎨 Custom Weights"):
gr.Markdown("### Create Your Own Preset")
with gr.Row():
with gr.Column():
w_textbook = gr.Slider(0, 1, 0.12, step=0.01, label="📚 Textbook Knowledge")
w_popculture = gr.Slider(0, 1, 0.08, step=0.01, label="🎬 Pop Culture")
w_worldmodel = gr.Slider(0, 1, 0.10, step=0.01, label="🌍 World Model")
w_instruction = gr.Slider(0, 1, 0.10, step=0.01, label="📋 Instruction Following")
w_style = gr.Slider(0, 1, 0.25, step=0.01, label="✍️ Writing Style")
with gr.Column():
w_originality = gr.Slider(0, 1, 0.10, step=0.01, label="✨ Originality")
w_dialogue = gr.Slider(0, 1, 0.15, step=0.01, label="💬 Dialogue Balance")
w_unbound = gr.Slider(0, 1, 0.05, step=0.01, label="🔓 Unbound")
w_redundancy = gr.Slider(0, 1, 0.05, step=0.01, label="🧹 Low Redundancy")
weight_sum_display = gr.Markdown("**Total Weight:** 1.00")
calc_custom_btn = gr.Button("🎯 Calculate Custom Score", variant="primary")
custom_results = gr.Dataframe(label="Top 50 Models")
with gr.Tab("📖 About"):
gr.Markdown(get_about_markdown(loader.last_updated))
# Diagnostics Tab (Conditional)
diag_btn, clear_btn = None, None
if SHOW_DIAGNOSTICS:
with gr.Tab("🛠️ Diagnostics"):
with gr.Row():
diag_btn = gr.Button("🧪 Run Diagnostics", variant="primary")
clear_btn = gr.Button("🗑️ Clear Cache & Reload Data", variant="stop")
cache_status = gr.Textbox(label="Status", lines=3, interactive=False)
diag_report = gr.Code(label="📋 Diagnostic Report", language="markdown")
with gr.Accordion("🔍 Anomalies", open=False):
anomalies_table = gr.Dataframe(label="Detected Anomalies")
with gr.Accordion("📊 Statistics", open=False):
stats_table = gr.Dataframe(label="Normalization Statistics")
# === EVENT BINDINGS ===
# 1. Date Toggle
date_preset.change(lambda x: gr.update(visible=(x == "Custom Range")), inputs=[date_preset], outputs=[custom_date_row])
# 2. Filter Inputs Bundle
filter_inputs = [
df_state, # 0
preset_dropdown, # 1
search_box, # 2
param_min, # 3
param_max, # 4
proprietary_check, # 5
moe_check, # 6
thinking_mode, # 7
model_types, # 8
arch_dropdown, # 9
top_n_slider, # 10
balance_filter, # 11
date_preset, # 12
date_start, # 13
date_end # 14
]
filter_outputs = [leaderboard_table, filtered_raw_state]
# 3. Slider Interaction Wrapper
def slider_interaction(max_p_val, *args):
# args contains the values from filter_inputs
# CORRECTED INDICES:
# 0: df, 1: preset, 2: search
# 3: param_min, 4: param_max, 5: proprietary_check
p_min = args[3]
p_max = args[4]
prop_val = args[5]
is_restricted = (p_min > 0 or p_max < max_p_val)
new_prop_val = False if is_restricted else prop_val
new_interactive = not is_restricted
new_label = "Include Proprietary (Disabled by params)" if is_restricted else "Include Proprietary (unknown params)"
cb_update = gr.update(value=new_prop_val, label=new_label, interactive=new_interactive)
# Modify list for pipeline call
pipeline_args = list(args)
pipeline_args[5] = new_prop_val
table, raw = filter_leaderboard_pipeline(*pipeline_args)
return cb_update, table, raw
slider_args = [max_params_state] + filter_inputs
slider_outs = [proprietary_check] + filter_outputs
param_min.change(slider_interaction, inputs=slider_args, outputs=slider_outs)
param_max.change(slider_interaction, inputs=slider_args, outputs=slider_outs)
# 4. General Filter Change
for i, inp in enumerate(filter_inputs):
# Skip df(0), and parameters sliders (3, 4) to avoid double firing
if i not in [0, 3, 4]:
inp.change(filter_leaderboard_pipeline, inputs=filter_inputs, outputs=filter_outputs)
# 5. Refresh
def refresh_wrapper():
invalidate_cache()
new_df = get_dataframe()
if new_df is not None and not new_df.empty:
archs = ["All"] + get_architecture_choices(new_df)
return new_df, gr.update(choices=archs), get_header_content(new_df)
return new_df, gr.update(), get_header_content(None)
refresh_btn.click(refresh_wrapper, outputs=[df_state, arch_dropdown, header_md]).then(
filter_leaderboard_pipeline, inputs=filter_inputs, outputs=filter_outputs
)
# 6. Export
export_btn.click(export_handler, inputs=[filtered_raw_state], outputs=[export_file])
# 7. Compare
search_compare.change(search_models, inputs=[df_state, search_compare], outputs=[search_results_radio])
add_model_btn.click(lambda t, s: t + ("\n" if t else "") + s if s else t, inputs=[compare_textbox, search_results_radio], outputs=[compare_textbox])
compare_btn.click(compare_models, inputs=[df_state, compare_textbox], outputs=[radar_plot, compare_table])
# 8. Custom Weights
weight_inputs = [w_textbook, w_popculture, w_worldmodel, w_instruction, w_style, w_originality, w_dialogue, w_unbound, w_redundancy]
for w in weight_inputs: w.change(lambda *args: f"**Total Weight:** {sum(args):.2f}", inputs=weight_inputs, outputs=[weight_sum_display])
calc_custom_btn.click(
lambda df, *args: calculate_custom_score(df, {k: v for k, v in zip(['Textbook', 'Pop Culture', 'World Model', 'Instruction', 'Writing Style', 'Originality', 'Dialogue', 'Unbound', 'Redundancy'], args)}),
inputs=[df_state] + weight_inputs,
outputs=[custom_results]
)
# 9. Diagnostics
if SHOW_DIAGNOSTICS and diag_btn and clear_btn:
diag_btn.click(run_diagnostics, inputs=[df_state], outputs=[diag_report, anomalies_table, stats_table])
clear_btn.click(clear_and_reload_ui, outputs=[df_state, cache_status]).then(
filter_leaderboard_pipeline, inputs=filter_inputs, outputs=filter_outputs
)
# Load on Start
demo.load(filter_leaderboard_pipeline, inputs=filter_inputs, outputs=filter_outputs)
if __name__ == "__main__":
demo.launch()