import gradio as gr import pandas as pd import numpy as np import plotly.graph_objects as go import tempfile import os import re import math import threading import atexit import logging from data_loader import DataLoader from scoring import ScoringEngine, PRESET_CONFIGS from dev_tools import DevSuite from config import * from about import get_about_markdown # === SETUP === logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Инициализация загрузчика данных loader = DataLoader() # Thread-safe globals _cache_lock = threading.Lock() _CACHED_DF = None _temp_files = [] # === CLEANUP === @atexit.register def cleanup_temp_files(): """Removes temporary CSV files on exit.""" for f in _temp_files: if os.path.exists(f): try: os.remove(f) except OSError: pass # === DATA ACCESS === def get_dataframe(): """Thread-safe lazy loader for dataframe.""" global _CACHED_DF with _cache_lock: if _CACHED_DF is None: if FORCE_REFRESH_ON_STARTUP: logger.info("First load: Clearing cache...") loader.clear_cache() df = loader.load_data() if not df.empty: _CACHED_DF = ScoringEngine(df).calculate_all() else: _CACHED_DF = df return _CACHED_DF def invalidate_cache(): """Unified cache invalidation logic.""" global _CACHED_DF with _cache_lock: _CACHED_DF = None deleted = loader.clear_cache() return deleted # === FORMATTING & UI HELPERS === def get_header_content(df): count = len(df) if df is not None else 0 current_time = loader.last_updated return f""" # 🏆 UGI Leaderboard: Presets Edition v3.7 **Last Updated:** {current_time} | **Models:** {count} | **PID:** {os.getpid()} """ def format_params(row): total = row.get('Total Parameters', np.nan) active = row.get('Active Parameters', np.nan) if pd.isna(total) or total <= 0: return "N/A" def fmt(x): try: val = float(x) if val <= 0: return "?" if val < 1: return f"{val*1000:.0f}M" return f"{val:.1f}B" except (ValueError, TypeError): return "?" formatted_total = fmt(total) # Show active params if MoE if pd.notna(active) and active > 0 and active < total: return f"{formatted_total} (Act: {fmt(active)})" return formatted_total def escape_markdown(text): return re.sub(r'([\[\]()\*_#~`])', r'\\\1', str(text)) def format_model_link(row): name = str(row.get('author/model_name', 'Unknown')) link = row.get('Model Link', '') safe_name = escape_markdown(name) if pd.notna(link) and isinstance(link, str) and link.startswith('http'): return f"[{safe_name}]({link})" return safe_name def get_architecture_choices(df): if df is None or df.empty: return [] valid_archs = [a for a in df['Architecture'].dropna().unique() if str(a).lower() not in ['unknown', 'nan', 'null', 'none']] return sorted(valid_archs) # === FILTERING LOGIC (Refactored) === def apply_search_filter(df, query): if not query: return df return df[ df['author/model_name'].astype(str).str.contains(query, case=False, na=False) | df['Architecture'].astype(str).str.contains(query, case=False, na=False) ] def apply_param_filter(df, param_min, param_max, proprietary): has_params = df['Total Parameters'].notna() & (df['Total Parameters'] > 0) p_min = float(param_min) if param_min is not None else 0.0 p_max = float(param_max) if param_max is not None else 99999.0 in_range = (df['Total Parameters'] >= p_min) & (df['Total Parameters'] <= p_max) if proprietary: # Include if in range OR if proprietary (no params known) return df[(has_params & in_range) | (~has_params)] else: # Strict range check return df[has_params & in_range] def apply_date_filter(df, date_preset, date_start, date_end): if date_preset == "All Time" or 'Release Date' not in df.columns: return df temp_dates = pd.to_datetime(df['Release Date'], errors='coerce') now = pd.Timestamp.now() mask = pd.Series(True, index=df.index) if date_preset == "Last Week": mask = temp_dates >= (now - pd.Timedelta(days=7)) elif date_preset == "Last Month": mask = temp_dates >= (now - pd.Timedelta(days=30)) elif date_preset == "Last Year": mask = temp_dates >= (now - pd.Timedelta(days=365)) elif date_preset == "Custom Range": if date_start: s = pd.to_datetime(date_start, errors='coerce') if pd.notna(s): mask &= (temp_dates >= s) if date_end: e = pd.to_datetime(date_end, errors='coerce') if pd.notna(e): mask &= (temp_dates <= e) return df[mask] def filter_leaderboard_pipeline(df, preset, query, param_min, param_max, proprietary, moe_only, thinking_mode, model_types, architecture, top_n, balance_filter, date_preset, date_start, date_end): if df is None or df.empty: return pd.DataFrame(), pd.DataFrame() # 1. Search df = apply_search_filter(df, query) # 2. Parameters df = apply_param_filter(df, param_min, param_max, proprietary) # 3. MoE if moe_only: df = df[df['Active Parameters'] < df['Total Parameters']] # 4. Thinking if thinking_mode == "Hide Thinking": df = df[~df['Is Thinking Model']] elif thinking_mode == "Only Thinking": df = df[df['Is Thinking Model']] # 5. Types type_mask = pd.Series(False, index=df.index) for m_type, col in [("Foundation", "Is Foundation"), ("Finetuned", "Is Finetuned"), ("Merged", "Is Merged")]: if m_type in model_types and col in df.columns: type_mask |= df[col] if type_mask.any(): df = df[type_mask] # 6. Architecture if architecture and architecture != "All": df = df[df['Architecture'] == architecture] # 7. Balance if balance_filter != "Show All": threshold = 0.7 if "Perfect" in balance_filter else (0.5 if "Good" in balance_filter else 0.3) target_col = "Score_💎 Perfect Balance" if target_col in df.columns: df = df[df[target_col] >= threshold] # 8. Date df = apply_date_filter(df, date_preset, date_start, date_end) # 9. Sorting & Top N score_col = f"Score_{preset}" if score_col not in df.columns: return pd.DataFrame(), pd.DataFrame() df = df.sort_values(score_col, ascending=False).head(top_n).copy() if df.empty: return pd.DataFrame(), pd.DataFrame() # 10. Formatting for Display display_df = df.copy() display_df['Rank'] = range(1, len(display_df) + 1) display_df['Model Name'] = display_df.apply(format_model_link, axis=1) display_df['Parameters'] = display_df.apply(format_params, axis=1) display_df['Architecture'] = display_df['Architecture'].astype(str) display_df['Date'] = pd.to_datetime(display_df['Release Date'], errors='coerce').dt.strftime('%Y-%m-%d').fillna('-') display_df = display_df.rename(columns={score_col: "⭐ Score"}) display_cols = ['Rank', 'Model Name', "⭐ Score", 'Date', 'Badges', 'Parameters', 'Architecture'] return display_df[display_cols], df # === COMPARISON & UTILS === def search_models(df, query, limit=10): """Efficient search for model selection.""" if not query or df is None: return gr.update(choices=[]) mask = df['author/model_name'].astype(str).str.contains(query, case=False, na=False) matches = df.loc[mask, 'author/model_name'].head(limit).tolist() return gr.update(choices=matches) def compare_models(df, model_names_text): if df is None or not model_names_text: return None, pd.DataFrame() targets = [x.strip() for x in model_names_text.split('\n') if x.strip()] subset = df[df['author/model_name'].isin(targets)].copy() if subset.empty: return None, pd.DataFrame() # Radar Chart fig = go.Figure() # Use metrics from config labels = list(COMPARE_METRICS.keys()) cols = list(COMPARE_METRICS.values()) for _, row in subset.iterrows(): values = [] for col in cols: val = float(row.get(col, 0)) if pd.isna(val): val = 0 values.append(val) # Close the loop values.append(values[0]) plot_labels = labels + [labels[0]] fig.add_trace(go.Scatterpolar( r=values, theta=plot_labels, fill='toself', name=row['author/model_name'][:30] )) fig.update_layout(polar=dict(radialaxis=dict(visible=True, range=[0, 1])), showlegend=True, height=500) # Comparison Table compare_cols = ['author/model_name', 'Total Parameters', 'Score_🌌 Divine RP'] + cols # Rename for display rename_map = { 'author/model_name': 'Model', 'Total Parameters': 'Params', 'Score_🌌 Divine RP': 'Divine RP' } # Add metric renames for k, v in COMPARE_METRICS.items(): rename_map[v] = k compare_df = subset[compare_cols].rename(columns=rename_map) return fig, compare_df def calculate_custom_score(df, weights_dict): if df is None or df.empty: return pd.DataFrame() # Using lightweight engine to avoid full re-calc temp_engine = ScoringEngine(df) # Calculate score on the original dataframe (the engine copies it internally) scores = temp_engine.calculate_weighted_score(weights_dict).round(3) # Create result view result = df.copy() result['Custom_Score'] = scores result = result.sort_values('Custom_Score', ascending=False).head(50) display = result[['author/model_name', 'Custom_Score', 'Total Parameters', 'Badges']].copy() display = display.rename(columns={'author/model_name': 'Model', 'Custom_Score': '⭐ Score', 'Total Parameters': 'Params'}) return display def run_diagnostics(df): if df is None or df.empty: return "❌ No data loaded", pd.DataFrame(), pd.DataFrame() dev = DevSuite(df) return dev.run_all_tests(), dev.get_anomalies_df(), dev.get_statistics_df() def clear_and_reload_ui(): deleted = invalidate_cache() new_df = get_dataframe() status = f"✅ Cache cleared!\nDeleted: {', '.join(deleted) if deleted else 'None'}\n🔄 Data reloaded: {len(new_df)} rows" return new_df, status def export_handler(df): if df is None or df.empty: return gr.update(value=None, visible=False) try: temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.csv', mode='w', encoding='utf-8') _temp_files.append(temp_file.name) # Track for cleanup df.to_csv(temp_file.name, index=False) return gr.update(value=temp_file.name, visible=True) except Exception as e: logger.error(f"Export failed: {e}") return gr.update(value=None, visible=False) # === UI BUILDER === with gr.Blocks() as demo: # Initial Load initial_df = get_dataframe() df_state = gr.State(initial_df) filtered_raw_state = gr.State() # Dynamic Params for Sliders actual_max_params = 100 if initial_df is not None and not initial_df.empty: m = initial_df['Total Parameters'].max() if pd.notna(m): actual_max_params = math.ceil(m) max_params_state = gr.State(actual_max_params) # Header header_md = gr.Markdown(get_header_content(initial_df)) with gr.Tabs(): with gr.Tab("🏅 Leaderboard"): # Controls with gr.Row(variant="panel", equal_height=True): with gr.Column(scale=5): preset_dropdown = gr.Radio( choices=list(PRESET_CONFIGS.keys()) + ["⚡ Efficiency King"], value="🌌 Divine RP", label="🎯 Preset", interactive=True ) with gr.Column(scale=1, min_width=150): refresh_btn = gr.Button("🔄 Refresh Data", variant="secondary", size="lg") # Filters with gr.Accordion("⚙️ Hardware & Filters", open=False): with gr.Row(): param_min = gr.Slider(0, actual_max_params, 0, step=1, label="Min Parameters (B)") param_max = gr.Slider(0, actual_max_params, actual_max_params, step=1, label="Max Parameters (B)") with gr.Row(): date_preset = gr.Radio( choices=["All Time", "Last Week", "Last Month", "Last Year", "Custom Range"], value="All Time", label="📅 Release Date Filter" ) with gr.Row(visible=False) as custom_date_row: date_start = gr.Textbox(placeholder="YYYY-MM-DD", label="From") date_end = gr.Textbox(placeholder="YYYY-MM-DD", label="To") with gr.Row(): proprietary_check = gr.Checkbox(value=True, label="Include Proprietary (unknown params)") moe_check = gr.Checkbox(value=False, label="MoE Only") thinking_mode = gr.Radio(["Show All", "Hide Thinking", "Only Thinking"], value="Show All", label="Reasoning Models") with gr.Row(): model_types = gr.CheckboxGroup(["Foundation", "Finetuned", "Merged"], value=["Foundation", "Finetuned", "Merged"], label="Model Types") arch_dropdown = gr.Dropdown(["All"] + get_architecture_choices(initial_df), value="All", label="Architecture") top_n_slider = gr.Slider(10, 500, DEFAULT_TOP_N, step=10, label="Top N") with gr.Row(): balance_filter = gr.Radio( choices=["Show All", "💎 Perfect (≥0.7)", "🏅 Good (≥0.5)", "⚖️ Basic (≥0.3)"], value="Show All", label="🛡️ Robustness Filter", info="Filters out models with weak spots." ) search_box = gr.Textbox(label="🔍 Search Models", placeholder="e.g., Llama, Qwen...") leaderboard_table = gr.Dataframe( datatype=["number", "markdown", "number", "str", "str", "str", "str"], wrap=True, interactive=False ) with gr.Row(): with gr.Column(scale=1): export_btn = gr.Button("📥 Export CSV", variant="primary", size="sm") with gr.Column(scale=4): export_file = gr.File(label="Download CSV", visible=False, height=50) with gr.Tab("⚖️ Compare"): gr.Markdown("### Compare Multiple Models") with gr.Row(): with gr.Column(scale=2): search_compare = gr.Textbox(label="🔍 Search to Add Models", placeholder="Type model name...") search_results_radio = gr.Radio(choices=[], label="Select from results", interactive=True) add_model_btn = gr.Button("➕ Add Model", variant="secondary") with gr.Column(scale=3): compare_textbox = gr.Textbox(label="📋 Comparing (one per line)", lines=8, placeholder="Add models using search...") compare_btn = gr.Button("📊 Generate Comparison", variant="primary") with gr.Row(): radar_plot = gr.Plot(label="📈 Radar Chart") compare_table = gr.Dataframe(label="📊 Comparison Table") with gr.Tab("🎨 Custom Weights"): gr.Markdown("### Create Your Own Preset") with gr.Row(): with gr.Column(): w_textbook = gr.Slider(0, 1, 0.12, step=0.01, label="📚 Textbook Knowledge") w_popculture = gr.Slider(0, 1, 0.08, step=0.01, label="🎬 Pop Culture") w_worldmodel = gr.Slider(0, 1, 0.10, step=0.01, label="🌍 World Model") w_instruction = gr.Slider(0, 1, 0.10, step=0.01, label="📋 Instruction Following") w_style = gr.Slider(0, 1, 0.25, step=0.01, label="✍️ Writing Style") with gr.Column(): w_originality = gr.Slider(0, 1, 0.10, step=0.01, label="✨ Originality") w_dialogue = gr.Slider(0, 1, 0.15, step=0.01, label="💬 Dialogue Balance") w_unbound = gr.Slider(0, 1, 0.05, step=0.01, label="🔓 Unbound") w_redundancy = gr.Slider(0, 1, 0.05, step=0.01, label="🧹 Low Redundancy") weight_sum_display = gr.Markdown("**Total Weight:** 1.00") calc_custom_btn = gr.Button("🎯 Calculate Custom Score", variant="primary") custom_results = gr.Dataframe(label="Top 50 Models") with gr.Tab("📖 About"): gr.Markdown(get_about_markdown(loader.last_updated)) # Diagnostics Tab (Conditional) diag_btn, clear_btn = None, None if SHOW_DIAGNOSTICS: with gr.Tab("🛠️ Diagnostics"): with gr.Row(): diag_btn = gr.Button("🧪 Run Diagnostics", variant="primary") clear_btn = gr.Button("🗑️ Clear Cache & Reload Data", variant="stop") cache_status = gr.Textbox(label="Status", lines=3, interactive=False) diag_report = gr.Code(label="📋 Diagnostic Report", language="markdown") with gr.Accordion("🔍 Anomalies", open=False): anomalies_table = gr.Dataframe(label="Detected Anomalies") with gr.Accordion("📊 Statistics", open=False): stats_table = gr.Dataframe(label="Normalization Statistics") # === EVENT BINDINGS === # 1. Date Toggle date_preset.change(lambda x: gr.update(visible=(x == "Custom Range")), inputs=[date_preset], outputs=[custom_date_row]) # 2. Filter Inputs Bundle filter_inputs = [ df_state, # 0 preset_dropdown, # 1 search_box, # 2 param_min, # 3 param_max, # 4 proprietary_check, # 5 moe_check, # 6 thinking_mode, # 7 model_types, # 8 arch_dropdown, # 9 top_n_slider, # 10 balance_filter, # 11 date_preset, # 12 date_start, # 13 date_end # 14 ] filter_outputs = [leaderboard_table, filtered_raw_state] # 3. Slider Interaction Wrapper def slider_interaction(max_p_val, *args): # args contains the values from filter_inputs # CORRECTED INDICES: # 0: df, 1: preset, 2: search # 3: param_min, 4: param_max, 5: proprietary_check p_min = args[3] p_max = args[4] prop_val = args[5] is_restricted = (p_min > 0 or p_max < max_p_val) new_prop_val = False if is_restricted else prop_val new_interactive = not is_restricted new_label = "Include Proprietary (Disabled by params)" if is_restricted else "Include Proprietary (unknown params)" cb_update = gr.update(value=new_prop_val, label=new_label, interactive=new_interactive) # Modify list for pipeline call pipeline_args = list(args) pipeline_args[5] = new_prop_val table, raw = filter_leaderboard_pipeline(*pipeline_args) return cb_update, table, raw slider_args = [max_params_state] + filter_inputs slider_outs = [proprietary_check] + filter_outputs param_min.change(slider_interaction, inputs=slider_args, outputs=slider_outs) param_max.change(slider_interaction, inputs=slider_args, outputs=slider_outs) # 4. General Filter Change for i, inp in enumerate(filter_inputs): # Skip df(0), and parameters sliders (3, 4) to avoid double firing if i not in [0, 3, 4]: inp.change(filter_leaderboard_pipeline, inputs=filter_inputs, outputs=filter_outputs) # 5. Refresh def refresh_wrapper(): invalidate_cache() new_df = get_dataframe() if new_df is not None and not new_df.empty: archs = ["All"] + get_architecture_choices(new_df) return new_df, gr.update(choices=archs), get_header_content(new_df) return new_df, gr.update(), get_header_content(None) refresh_btn.click(refresh_wrapper, outputs=[df_state, arch_dropdown, header_md]).then( filter_leaderboard_pipeline, inputs=filter_inputs, outputs=filter_outputs ) # 6. Export export_btn.click(export_handler, inputs=[filtered_raw_state], outputs=[export_file]) # 7. Compare search_compare.change(search_models, inputs=[df_state, search_compare], outputs=[search_results_radio]) add_model_btn.click(lambda t, s: t + ("\n" if t else "") + s if s else t, inputs=[compare_textbox, search_results_radio], outputs=[compare_textbox]) compare_btn.click(compare_models, inputs=[df_state, compare_textbox], outputs=[radar_plot, compare_table]) # 8. Custom Weights weight_inputs = [w_textbook, w_popculture, w_worldmodel, w_instruction, w_style, w_originality, w_dialogue, w_unbound, w_redundancy] for w in weight_inputs: w.change(lambda *args: f"**Total Weight:** {sum(args):.2f}", inputs=weight_inputs, outputs=[weight_sum_display]) calc_custom_btn.click( lambda df, *args: calculate_custom_score(df, {k: v for k, v in zip(['Textbook', 'Pop Culture', 'World Model', 'Instruction', 'Writing Style', 'Originality', 'Dialogue', 'Unbound', 'Redundancy'], args)}), inputs=[df_state] + weight_inputs, outputs=[custom_results] ) # 9. Diagnostics if SHOW_DIAGNOSTICS and diag_btn and clear_btn: diag_btn.click(run_diagnostics, inputs=[df_state], outputs=[diag_report, anomalies_table, stats_table]) clear_btn.click(clear_and_reload_ui, outputs=[df_state, cache_status]).then( filter_leaderboard_pipeline, inputs=filter_inputs, outputs=filter_outputs ) # Load on Start demo.load(filter_leaderboard_pipeline, inputs=filter_inputs, outputs=filter_outputs) if __name__ == "__main__": demo.launch()