import os import time import json import shutil import zipfile import gradio as gr from eval_exp import evaluate from datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler def load_splits(): splits_dir = "chinatravel/evaluation/default_splits" splits = [] for filename in os.listdir(splits_dir): if filename.endswith(".txt"): splits.append(filename.replace(".txt", "")) return splits SPLITS_LIST = load_splits() # SUBMIT_DIR = "./submissions" # OUTPUT_DIR = "./outputs" SUBMIT_DIR = os.path.abspath("submissions") OUTPUT_DIR = os.path.abspath("outputs") shutil.rmtree(SUBMIT_DIR, ignore_errors=True) shutil.rmtree(OUTPUT_DIR, ignore_errors=True) os.makedirs(SUBMIT_DIR, exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True) print(f"Submission directory: {SUBMIT_DIR}") print(f"Output directory: {OUTPUT_DIR}") # clear directories if they already exist def clean_old_outputs(folder, keep_hours=24): now = time.time() for fname in os.listdir(folder): fpath = os.path.join(folder, fname) if os.path.isfile(fpath) and now - os.path.getmtime(fpath) > keep_hours * 3600: os.remove(fpath) scheduler = BackgroundScheduler() scheduler.add_job(lambda: clean_old_outputs(OUTPUT_DIR), "interval", hours=6) scheduler.start() class Arguments: def __init__(self, splits, result_dir): self.splits = splits self.result_dir = result_dir def handle_submission(zip_file, dataset_choice): if zip_file is None: # yield "❌ 请上传 zip 文件!", 0, 0, 0, None yield "❌ Please upload a zip file!", 0, 0, 0, None return shutil.rmtree(SUBMIT_DIR, ignore_errors=True) os.makedirs(SUBMIT_DIR, exist_ok=True) # 解压操作 with zipfile.ZipFile(zip_file, "r") as zip_ref: print(f"Extracting {zip_file} to {SUBMIT_DIR}...") zip_ref.extractall(SUBMIT_DIR) # 获取当前时间戳 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") print(f"Submission dir: {SUBMIT_DIR}") # 如果 SUBMIT_DIR 下只有一个子目录,就使用这个子目录,否则使用 SUBMIT_DIR 本身 subdirs = [d for d in os.listdir(SUBMIT_DIR) if os.path.isdir(os.path.join(SUBMIT_DIR, d))] if len(subdirs) == 1: unzipped_dir = os.path.join(SUBMIT_DIR, subdirs[0]) else: unzipped_dir = SUBMIT_DIR # 没有子目录或有多个子目录时使用 SUBMIT_DIR print(f"Unzipped directory: {unzipped_dir}") output_path = os.path.join(OUTPUT_DIR, f"result_main_{timestamp}.json") args = Arguments(splits=dataset_choice, result_dir=unzipped_dir) try: # yield "🚀 开始测评...", 0, 0, 0, None yield "🚀 Starting evaluation...", 0, 0, 0, None result = {} for progress in evaluate(args, result): stage = progress.get("stage", "") progress_value = progress.get("progress", 0) if stage == "schema": # yield "Schema 阶段测评中...", progress_value, 0, 0, None yield "Schema evaluation in progress...", 100, progress_value, 0, None elif stage == "commonsense": # yield "Commonsense 阶段测评中...", 100, progress_value, 0, None yield "Commonsense evaluation in progress...", 100, 100, progress_value, None elif stage == "logic": # yield "Logic 阶段测评中...", 100, 100, progress_value, None yield "Logic evaluation in progress...", 100, 100, 100, None elif stage == "final": result.update(progress.get("result", {})) # yield "测评完成,正在保存结果...", 100, 100, 100, None yield "Evaluation completed, saving results...", 100, 100, 100, None # 保存结果到文件 with open(output_path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=4) # 在测评完成后更新结果文件的值和可见性 result_file.value = output_path result_file.visible = True # yield "✅ 测评完成!", 100, 100, 100, output_path yield "✅ Evaluation completed!", 100, 100, 100, output_path except Exception as e: import traceback traceback.print_exc() # yield f"❌ 测评异常:{e}", 0, 0, 0, None yield f"❌ Evaluation error: {e}", 0, 0, 0, None with gr.Blocks() as demo: # gr.Markdown("# 📊 ChinaTravel 模型测评") gr.Markdown( "# 📊 ChinaTravel Benchmark Evaluation" ) # with gr.Row(): # zip_input = gr.File(label="上传模型预测 zip 文件", file_types=[".zip"]) # dataset_choice = gr.Radio( # SPLITS_LIST, label="选择评估数据集", value="validation" # ) zip_input = gr.File(label="Upload zip file of results", file_types=[".zip"]) dataset_choice = gr.Radio( SPLITS_LIST, label="Select evaluation dataset", value="validation" ) # submit_btn = gr.Button("开始测评") submit_btn = gr.Button("Start Evaluation") schema_progress = gr.Slider( label="Schema Stage Progress", minimum=0, maximum=100, value=0, interactive=False ) commonsense_progress = gr.Slider( label="Commonsense Stage Progress", minimum=0, maximum=100, value=0, interactive=False ) logic_progress = gr.Slider( label="Logic Stage Progress", minimum=0, maximum=100, value=0, interactive=False ) output_msg = gr.Markdown() # result_file = gr.File(label="结果文件下载") # , visible=False) result_file = gr.File(label="Result File Download") submit_btn.click( handle_submission, inputs=[zip_input, dataset_choice], outputs=[ output_msg, schema_progress, commonsense_progress, logic_progress, result_file, ], ) demo.launch(debug=True)