Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from models.llm_client import get_response | |
| from ui.formatting import format_history | |
| from datetime import datetime | |
| import json | |
| from datasets import Dataset, load_dataset, concatenate_datasets | |
| import os | |
| from concurrent.futures import ThreadPoolExecutor | |
| import random | |
| from huggingface_hub import HfApi | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| DATASET_ID = "rachelmkim/llm-leaderboard-data" # TODO: move this to config | |
| def send_message(model_a, model_b, model_a_state, model_b_state, prompt, history, messages_a, messages_b, | |
| last_user_msg, last_response_a, last_response_b, | |
| waiting_for_vote): | |
| """Send message to both models and update histories.""" | |
| if not prompt.strip(): | |
| return (history, messages_a, messages_b, "", | |
| gr.update(visible=False), gr.update(visible=False), gr.update(interactive=True), | |
| "", "", "", | |
| model_a_state, model_b_state, waiting_for_vote) | |
| # Check if waiting for vote from previous turn | |
| if waiting_for_vote: | |
| return (history, messages_a, messages_b, "", | |
| gr.update(visible=False), gr.update(visible=len(history) > 0), gr.update(interactive=True), | |
| prompt, last_response_a, last_response_b, | |
| model_a_state, model_b_state, waiting_for_vote) | |
| # Track which models were used for this turn | |
| model_a_state.append(model_a) | |
| model_b_state.append(model_b) | |
| # Add user message to conversation histories | |
| messages_a.append({"role": "user", "content": prompt}) | |
| messages_b.append({"role": "user", "content": prompt}) | |
| # Get responses in parallel | |
| with ThreadPoolExecutor(max_workers=2) as executor: | |
| future_a = executor.submit(get_response, model_a, messages_a) | |
| future_b = executor.submit(get_response, model_b, messages_b) | |
| response_a = future_a.result() | |
| response_b = future_b.result() | |
| # Add assistant responses to conversation histories | |
| messages_a.append({"role": "assistant", "content": response_a}) | |
| messages_b.append({"role": "assistant", "content": response_b}) | |
| # Update chat history for display with side-by-side responses | |
| history.append({ | |
| "user": prompt, | |
| "response_a": response_a, | |
| "response_b": response_b, | |
| "highlight": None, | |
| }) | |
| # Format for display | |
| display_html = format_history(history) | |
| # Show voting buttons, disable prompt input, hide save row, set waiting flag | |
| return (display_html, messages_a, messages_b, "", | |
| gr.update(visible=True), gr.update(visible=False), gr.update(interactive=False), | |
| prompt, response_a, response_b, | |
| model_a_state, model_b_state, True) | |
| def vote_and_continue(choice, history, messages_a, messages_b, last_user_msg, last_response_a, | |
| last_response_b, votes): | |
| """Record vote and adjust conversation histories based on the vote.""" | |
| votes.append(choice) | |
| # Update the last turn's highlight (visual only - don't change conversation histories) | |
| if history: | |
| if choice == "Left is better": | |
| kept = "model_a" | |
| history[-1]["highlight"] = "left" | |
| history[-1]["kept"] = kept | |
| elif choice == "Right is better": | |
| kept = "model_b" | |
| history[-1]["highlight"] = "right" | |
| history[-1]["kept"] = kept | |
| else: # "Both are the same" or "Both are bad" | |
| # Randomly choose a model to keep | |
| kept = "model_a" if random.randint(0, 1) == 0 else "model_b" | |
| history[-1]["highlight"] = "random_left" if kept == "model_a" else "random_right" | |
| history[-1]["kept"] = kept | |
| if kept == "model_a": | |
| messages_b[-1]["content"] = messages_a[-1]["content"] | |
| else: | |
| messages_a[-1]["content"] = messages_b[-1]["content"] | |
| # Format updated history | |
| display_html = format_history(history) | |
| # Hide voting buttons, show save row, enable prompt input, clear waiting flag | |
| return (display_html, history, messages_a, messages_b, votes, | |
| gr.update(visible=False), gr.update(visible=True), | |
| gr.update(interactive=True), gr.update(interactive=True), False) | |
| def reset_conversation(): | |
| """Reset the conversation.""" | |
| return ("", [], [], [], "", gr.update(visible=False), gr.update(visible=False), | |
| gr.update(interactive=True), gr.update(interactive=True), | |
| "", "", "", [], [], [], False) | |
| def save_conversation_data(token, model_as, model_bs, history, votes): | |
| """Save conversation data to HuggingFace dataset.""" | |
| # Extract prompts, response_as, and response_bs from history | |
| prompts = [] | |
| response_as = [] | |
| response_bs = [] | |
| kepts = [] | |
| for turn in history: | |
| prompts.append(turn["user"]) | |
| response_as.append(turn["response_a"]) | |
| response_bs.append(turn["response_b"]) | |
| kepts.append(turn["kept"]) | |
| assert len(prompts) == len(model_as) == len(model_bs) == len(response_as) == len(response_bs) == len(votes) == len(kepts), "Data length mismatch" | |
| turns = list(range(len(prompts))) | |
| winners = [] | |
| for vote in votes: | |
| if vote == "Left is better": | |
| winners.append("model_a") | |
| elif vote == "Right is better": | |
| winners.append("model_b") | |
| elif vote == "Both are the same": | |
| winners.append("tie") | |
| else: | |
| winners.append("both_bad") | |
| # Create data structure | |
| data = { | |
| "token": [token] * len(turns), | |
| "turn": turns, | |
| "model_a": model_as, | |
| "model_b": model_bs, | |
| "response_a": response_as, | |
| "response_b": response_bs, | |
| "prompt": prompts, | |
| "winners": winners, | |
| "timestamp": [datetime.now().isoformat()] * len(turns), | |
| "kept": kepts | |
| } | |
| api = HfApi() | |
| # Save as parquet file | |
| filename = f"data_{token}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet" | |
| df = pd.DataFrame(data) | |
| df.to_parquet(filename) | |
| # Upload as a new file (doesn't touch existing files) | |
| api.upload_file( | |
| path_or_fileobj=filename, | |
| path_in_repo=f"data/{filename}", # Store in data/ folder | |
| repo_id=DATASET_ID, | |
| repo_type="dataset", | |
| token=os.getenv("HF_API_TOKEN"), | |
| commit_message=f"Add conversation {token}" | |
| ) | |
| # os.remove(filename) # Clean up local file | |
| # # 1. Try to load the existing dataset | |
| # try: | |
| # # Use streaming=True if the dataset is very large | |
| # existing_ds = load_dataset(DATASET_ID, split="train") | |
| # except Exception: | |
| # # If it doesn't exist, create an empty one | |
| # existing_ds = Dataset.from_dict({}) | |
| # # 2. Convert your new data (DataFrame) to a Dataset object | |
| # new_ds = Dataset.from_dict(data) | |
| # # 3. Concatenate (combine) the datasets | |
| # combined_ds = concatenate_datasets([existing_ds, new_ds]) | |
| # # 4. Push the combined dataset back to the Hub | |
| # # Create filename with token and timestamp | |
| # # filename = f"conversation_{token}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| # # Save to file | |
| # try: | |
| # combined_ds.push_to_hub( | |
| # DATASET_ID, | |
| # split="train", | |
| # commit_message="Append new Space output", | |
| # token=os.environ.get("HF_API_TOKEN") | |
| # ) | |
| # return f"β Data saved to HuggingFace" | |
| # except Exception as e: | |
| # return f"β Error saving data: {str(e)}" | |
| def redirect_to_url(return_url): | |
| # Return JavaScript that redirects to the URL | |
| return """ | |
| <script> | |
| window.location.href = "{return_url}"; | |
| </script> | |
| """.format(return_url=return_url) |