from __future__ import annotations import csv import json import logging import os import re import argparse from dataclasses import dataclass from datetime import datetime from pathlib import Path from statistics import mean, pstdev from time import perf_counter from typing import Any, Dict, Sequence import requests from dotenv import load_dotenv from openai import OpenAI from openenv.core.containers.runtime.providers import LocalDockerProvider from grid2op_env import BaselineRequest, BaselineScores, GridAction, GridEnv from grid2op_env.models import ( GridObservation, RedispatchGeneratorContext, ScenarioMode, TaskId, ) from grid2op_env.server.tasks import TASKS, benchmark_tiers_for_task def configure_logging(level: int = logging.WARNING) -> None: root_logger = logging.getLogger() if root_logger.handlers: root_logger.setLevel(level) return logging.basicConfig( level=level, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) def _load_env() -> None: env_dir = Path(__file__).resolve().parent candidate_paths = [ Path.cwd() / ".env", env_dir / ".env", env_dir.parent / ".env", ] for path in candidate_paths: if path.exists(): load_dotenv(path, override=False) _load_env() configure_logging() logger = logging.getLogger(__name__) TASK_SEED_OVERRIDES: dict[TaskId, int] = { "single_fault": 1, "n_minus_1": 4, "cascade_prevent": 1, "multi_stage_cascade": 4, } HF_ROUTER_BASE_URL = "https://router.huggingface.co/v1" HF_ROUTER_DEFAULT_MODEL = "openai/gpt-oss-20b:groq" DEFAULT_ENV_BASE_URL = "https://sidharth1743-grid2op-openenv.hf.space" DEFAULT_BENCHMARK_NAME = "grid2op_env" SUBMISSION_SUCCESS_SCORE_THRESHOLD = float( os.getenv("SUCCESS_SCORE_THRESHOLD", "0.1") ) @dataclass class BaselineConfig: model: str max_tokens: int temperature: float top_p: float presence_penalty: float top_k: int min_p: float repetition_penalty: float enable_thinking: bool num_seeds: int seed_start: int scenario_mode: ScenarioMode @dataclass class SimulationOutcome: candidate_index: int action: GridAction trace: dict[str, Any] done: bool simulated_reward: float max_rho: float overloaded_line_ids: list[int] disconnected_lines: list[int] convergence_failed: bool exceptions: list[str] raw_result: dict[str, Any] def _default_model_name() -> str: return os.environ.get("MODEL_NAME", HF_ROUTER_DEFAULT_MODEL) def _llm_api_base_url() -> str: return os.environ.get("API_BASE_URL", HF_ROUTER_BASE_URL) def _build_llm_client() -> OpenAI: api_key = os.environ.get("HF_TOKEN") or os.environ.get("API_KEY") if not api_key: raise RuntimeError( "Set HF_TOKEN or API_KEY to use Hugging Face Router inference." ) return OpenAI( base_url=_llm_api_base_url(), api_key=api_key, ) def _chat_completion_kwargs( llm_config: BaselineConfig, prompt: str, ) -> dict[str, Any]: request_kwargs: dict[str, Any] = { "model": llm_config.model, "messages": [{"role": "user", "content": prompt}], "max_tokens": llm_config.max_tokens, "temperature": llm_config.temperature, "top_p": llm_config.top_p, "presence_penalty": llm_config.presence_penalty, "stream": False, } return request_kwargs def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step( step: int, action: GridAction, reward: float, done: bool, error: str | None, ) -> None: error_val = error if error else "null" done_val = str(done).lower() action_str = json.dumps(action.model_dump(), separators=(",", ":"), sort_keys=True) print( f"[STEP] step={step} action={action_str} reward={reward:.2f} done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: list[float]) -> None: rewards_str = ",".join(f"{reward:.2f}" for reward in rewards) print( f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True, ) def _docker_image_name() -> str | None: return os.environ.get("LOCAL_IMAGE_NAME") or os.environ.get("IMAGE_NAME") def _create_sync_env_client( preferred_base_url: str | None = None, ) -> tuple[Any, str, str]: image_name = _docker_image_name() if image_name: provider = LocalDockerProvider() base_url = provider.start_container(image_name) provider.wait_for_ready(base_url) async_client = GridEnv(base_url=base_url, provider=provider) return ( async_client.sync(), base_url, f"docker image {image_name}", ) base_url = preferred_base_url or os.environ.get( "GRID2OP_BASE_URL", DEFAULT_ENV_BASE_URL ) return GridEnv(base_url=base_url).sync(), base_url, f"base_url {base_url}" def run_submission_episodes(task_ids: Sequence[TaskId] | None = None) -> dict[TaskId, float]: benchmark_name = os.environ.get("GRID2OP_BENCHMARK", DEFAULT_BENCHMARK_NAME) scenario_mode = os.environ.get("GRID2OP_SCENARIO_MODE", "benchmark") selected_task_ids = list(task_ids) if task_ids is not None else list(TASKS.keys()) llm_config = BaselineConfig( model=_default_model_name(), max_tokens=int(os.environ.get("MAX_TOKENS", "300")), temperature=float(os.environ.get("TEMPERATURE", "0.7")), top_p=float(os.environ.get("TOP_P", "0.8")), presence_penalty=float(os.environ.get("PRESENCE_PENALTY", "1.5")), top_k=int(os.environ.get("TOP_K", "20")), min_p=float(os.environ.get("MIN_P", "0.0")), repetition_penalty=float(os.environ.get("REPETITION_PENALTY", "1.0")), enable_thinking=False, num_seeds=int(os.environ.get("NUM_SEEDS", "5")), seed_start=int(os.environ.get("SEED_START", "0")), scenario_mode=scenario_mode, # type: ignore[arg-type] ) client = _build_llm_client() task_scores: dict[TaskId, float] = {} try: env_ctx, grader_base_url, env_source = _create_sync_env_client() env_ctx.connect() except Exception as exc: logger.warning( "Submission runner could not connect to environment source=%s error=%s", locals().get("env_source", "unknown"), exc, ) for task_id in selected_task_ids: log_start(task=task_id, env=benchmark_name, model=llm_config.model) log_end(success=False, steps=0, score=0.01, rewards=[]) task_scores[task_id] = 0.01 return task_scores with env_ctx as env: for task_id in selected_task_ids: task = TASKS[task_id] benchmark_tiers = benchmark_tiers_for_task(task_id) task_num_seeds = TASK_SEED_OVERRIDES.get(task_id, llm_config.num_seeds) task_episode_scores: list[float] = [] for benchmark_tier in benchmark_tiers: for seed in range( llm_config.seed_start, llm_config.seed_start + task_num_seeds ): rewards: list[float] = [] steps_taken = 0 score = 0.0 success = False log_start(task=task_id, env=benchmark_name, model=llm_config.model) try: result = env.reset( task_id=task_id, seed=seed, difficulty_level=1, scenario_mode=scenario_mode, # type: ignore[arg-type] benchmark_tier=benchmark_tier, ) state = env.state() step_idx = 0 while not result.done and step_idx < task.max_steps: action, _planning_trace = choose_action_with_qwen( client=client, env=env, episode_id=state.episode_id, task_id=task_id, observation=result.observation, step_count=step_idx, max_steps=task.max_steps, include_task_description=(step_idx == 0), llm_config=llm_config, ) error: str | None = None try: result = env.step(action) except Exception as exc: error = str(exc) log_step( step=step_idx + 1, action=action, reward=0.0, done=True, error=error, ) raise reward = float(result.reward or 0.0) rewards.append(reward) steps_taken = step_idx + 1 log_step( step=steps_taken, action=action, reward=reward, done=bool(result.done), error=error, ) step_idx += 1 state = env.state() response = requests.post( f"{grader_base_url}/grader", json={ "task_id": task_id, "episode_log": [ entry.model_dump() for entry in state.episode_log ], }, timeout=60, ) response.raise_for_status() score = float(response.json()["score"]) score = max(0.01, min(0.99, score)) task_episode_scores.append(score) success = score >= SUBMISSION_SUCCESS_SCORE_THRESHOLD finally: log_end( success=success, steps=steps_taken, score=score, rewards=rewards, ) task_scores[task_id] = ( round(mean(task_episode_scores), 6) if task_episode_scores else 0.0 ) return task_scores def run_baseline_suite( base_url: str, config: BaselineRequest | None = None, task_ids: Sequence[TaskId] | None = None, ) -> BaselineScores: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") run_paths = prepare_run_paths(timestamp) attach_file_logger(run_paths["log"]) request_config = config or BaselineRequest(model=_default_model_name()) llm_config = BaselineConfig( model=request_config.model, max_tokens=request_config.max_tokens, temperature=request_config.temperature, top_p=request_config.top_p, presence_penalty=request_config.presence_penalty, top_k=request_config.top_k, min_p=request_config.min_p, repetition_penalty=request_config.repetition_penalty, enable_thinking=request_config.enable_thinking, num_seeds=request_config.num_seeds, seed_start=request_config.seed_start, scenario_mode=request_config.scenario_mode, ) client = _build_llm_client() selected_task_ids = list(task_ids) if task_ids is not None else list(TASKS.keys()) scores: Dict[TaskId, float] = {} episode_lengths: Dict[TaskId, int] = {} evaluation_records: list[dict[str, Any]] = [] logger.info( "Starting baseline suite base_url=%s llm_api_base_url=%s model=%s num_seeds=%s seed_start=%s", base_url, _llm_api_base_url(), llm_config.model, llm_config.num_seeds, llm_config.seed_start, ) env_ctx, _, _ = _create_sync_env_client(preferred_base_url=base_url) with env_ctx as env: task_metrics: Dict[TaskId, list[dict[str, Any]]] = { task_id: [] for task_id in selected_task_ids } task_episode_counts: Dict[TaskId, int] = { task_id: 0 for task_id in selected_task_ids } for task_id in selected_task_ids: task = TASKS[task_id] benchmark_tiers = benchmark_tiers_for_task(task_id) task_num_seeds = TASK_SEED_OVERRIDES.get(task_id, llm_config.num_seeds) for benchmark_tier in benchmark_tiers: for seed in range( llm_config.seed_start, llm_config.seed_start + task_num_seeds ): task_episode_counts[task_id] += 1 curriculum_episode = task_episode_counts[task_id] episode_started_at = perf_counter() logger.info( "Baseline episode start task_id=%s seed=%s curriculum_episode=%s benchmark_tier=%s max_steps=%s task_num_seeds=%s", task_id, seed, curriculum_episode, benchmark_tier, task.max_steps, task_num_seeds, ) result = env.reset( task_id=task_id, seed=seed, difficulty_level=curriculum_episode, scenario_mode=llm_config.scenario_mode, benchmark_tier=benchmark_tier, ) state = env.state() logger.info( "Initial state task_id=%s seed=%s episode_id=%s scenario_metadata=%s max_rho=%.4f disconnected=%s", task_id, seed, state.episode_id, state.scenario_metadata, float(max(result.observation.rho)) if result.observation.rho else 0.0, [ idx for idx, status in enumerate(result.observation.line_status) if not status ], ) step_idx = 0 do_nothing_steps = 0 raw_outputs: list[dict[str, Any]] = [] while not result.done and step_idx < task.max_steps: action, planning_trace = choose_action_with_qwen( client=client, env=env, episode_id=state.episode_id, task_id=task_id, observation=result.observation, step_count=step_idx, max_steps=task.max_steps, include_task_description=(step_idx == 0), llm_config=llm_config, ) raw_outputs.append( { "step": step_idx + 1, "proposal_prompt": planning_trace["proposal_prompt"], "proposal_raw_output": planning_trace[ "proposal_raw_output" ], "proposal_trace": planning_trace["proposal_trace"], "graph_intelligence": planning_trace[ "graph_intelligence" ], "simulations": planning_trace["simulations"], "final_prompt": planning_trace["final_prompt"], "final_raw_output": planning_trace["final_raw_output"], "final_trace": planning_trace["final_trace"], "selected_action": action.model_dump(), "observation_summary": { "max_rho": max(result.observation.rho) if result.observation.rho else 0.0, "disconnected_lines": [ idx for idx, status in enumerate( result.observation.line_status ) if not status ], "timestep_overflow": result.observation.timestep_overflow, }, } ) if action.do_nothing: do_nothing_steps += 1 logger.info( "Baseline action task_id=%s seed=%s step=%s action=%s trace=%s", task_id, seed, step_idx + 1, action.model_dump(), planning_trace["final_trace"], ) result = env.step(action) step_idx += 1 state = env.state() logger.info( "Baseline episode finished task_id=%s seed=%s step_count=%s done=%s last_reward=%.4f", task_id, seed, state.step_count, state.done, state.last_reward, ) response = requests.post( f"{base_url}/grader", json={ "task_id": task_id, "episode_log": [ entry.model_dump() for entry in state.episode_log ], }, timeout=60, ) response.raise_for_status() score_payload = response.json() episode_score = float(score_payload["score"]) episode_length = int(state.step_count) episode_log = [entry.model_dump() for entry in state.episode_log] episode_wall_time_s = round(perf_counter() - episode_started_at, 6) total_redispatch_mw = round( sum( float(entry.get("redispatch_mw", 0.0)) for entry in episode_log ), 6, ) total_action_penalty = round( sum( float(entry.get("action_penalty", 0.0)) for entry in episode_log ), 6, ) record = { "task_id": task_id, "seed": seed, "curriculum_episode": curriculum_episode, "benchmark_tier": benchmark_tier, "score": episode_score, "episode_length": episode_length, "episode_wall_time_s": episode_wall_time_s, "done": state.done, "do_nothing_steps": do_nothing_steps, "non_do_nothing_steps": max( 0, episode_length - do_nothing_steps ), "episode_total_redispatch_mw": total_redispatch_mw, "episode_action_penalty_total": total_action_penalty, "episode_action_penalty_mean": round( total_action_penalty / max(1, episode_length), 6, ), "episode_log": episode_log, "raw_outputs": raw_outputs, "scenario_metadata": state.scenario_metadata, } evaluation_records.append(record) task_metrics[task_id].append(record) logger.info( "Baseline score task_id=%s seed=%s benchmark_tier=%s score=%.6f episode_length=%s episode_wall_time_s=%.6f do_nothing_steps=%s", task_id, seed, benchmark_tier, episode_score, episode_length, episode_wall_time_s, do_nothing_steps, ) for task_id, records in task_metrics.items(): task_scores = [float(record["score"]) for record in records] task_lengths = [int(record["episode_length"]) for record in records] scores[task_id] = round(mean(task_scores), 6) if task_scores else 0.0 episode_lengths[task_id] = round(mean(task_lengths)) if task_lengths else 0 logger.info("Baseline suite complete scores=%s", scores) baseline_scores = BaselineScores( model=llm_config.model, scores=scores, episode_lengths=episode_lengths, ) write_evaluation_outputs( timestamp=timestamp, run_paths=run_paths, model=llm_config.model, base_url=base_url, llm_config=llm_config, baseline_scores=baseline_scores, evaluation_records=evaluation_records, selected_task_ids=selected_task_ids, ) append_evaluation_markdown( timestamp=timestamp, model=llm_config.model, llm_config=llm_config, baseline_scores=baseline_scores, evaluation_records=evaluation_records, run_paths=run_paths, selected_task_ids=selected_task_ids, ) return baseline_scores def choose_action_with_qwen( client: OpenAI, env: GridEnv, episode_id: str, task_id: TaskId, observation: GridObservation, step_count: int, max_steps: int, include_task_description: bool, llm_config: BaselineConfig, ) -> tuple[GridAction, dict[str, Any]]: planning_context = env.planning_context(episode_id) graph_intelligence = planning_context.graph_intelligence redispatchable_generators = planning_context.redispatchable_generators redispatch_generators = planning_context.redispatch_generators logger.info( "Graph intelligence task_id=%s step=%s bridges=%s safe_disconnect=%s central_buses=%s islanded=%s corridor=%s stressed=%s", task_id, step_count + 1, graph_intelligence.get("bridge_lines", []), graph_intelligence.get("safe_to_disconnect", []), graph_intelligence.get("high_centrality_buses", []), graph_intelligence.get("islanded_clusters", []), graph_intelligence.get("congestion_corridor", "none"), graph_intelligence.get("stressed_lines", []), ) proposal_prompt = build_proposal_prompt( task_id=task_id, observation=observation, graph_intelligence=graph_intelligence, redispatchable_generators=redispatchable_generators, redispatch_generators=redispatch_generators, step_count=step_count, max_steps=max_steps, include_task_description=include_task_description, ) response = client.chat.completions.create( **_chat_completion_kwargs(llm_config=llm_config, prompt=proposal_prompt) ) proposal_raw_output = response.choices[0].message.content or "" logger.info( "Received proposal response task_id=%s chars=%s content=%r", task_id, len(proposal_raw_output), proposal_raw_output, ) proposal_candidates, proposal_trace = parse_candidate_proposals( proposal_raw_output, task_id=task_id, n_line=len(observation.line_status), n_gen=len(observation.gen_p), redispatchable_generators=redispatchable_generators, redispatch_generators=redispatch_generators, ) proposal_candidates = supplement_candidate_proposals( task_id=task_id, observation=observation, graph_intelligence=graph_intelligence, redispatch_generators=redispatch_generators, proposal_candidates=proposal_candidates, parsed_candidate_count=int(proposal_trace.get("parsed_candidate_count", 0)), ) proposal_candidates, prefilter_trace = filter_candidate_proposals( task_id=task_id, observation=observation, graph_intelligence=graph_intelligence, proposal_candidates=proposal_candidates, ) simulation_response = env.simulate_candidates( episode_id, [action for action, _ in proposal_candidates], ) simulations = [ SimulationOutcome( candidate_index=index, action=result.action, trace=proposal_candidates[index - 1][1], done=result.done, simulated_reward=result.simulated_reward, max_rho=result.max_rho, overloaded_line_ids=result.overloaded_line_ids, disconnected_lines=result.disconnected_lines, convergence_failed=result.convergence_failed, exceptions=result.exceptions, raw_result=result.raw_result, ) for index, result in enumerate(simulation_response.results, start=1) ] logger.info( "Simulation results task_id=%s step=%s results=%s", task_id, step_count + 1, [ { "candidate_index": outcome.candidate_index, "action": outcome.action.model_dump(), "max_rho": outcome.max_rho, "done": outcome.done, "convergence_failed": outcome.convergence_failed, "overloaded_line_ids": outcome.overloaded_line_ids, "exceptions": outcome.exceptions, } for outcome in simulations ], ) selectable_simulations = filter_selectable_simulations(simulations) if not selectable_simulations: return GridAction(do_nothing=True), { "proposal_prompt": proposal_prompt, "proposal_raw_output": proposal_raw_output, "proposal_trace": {**proposal_trace, **prefilter_trace}, "graph_intelligence": graph_intelligence, "simulations": [ serialize_simulation_outcome(outcome) for outcome in simulations ], "final_prompt": "", "final_raw_output": "", "final_trace": { "decision": "do_nothing_all_candidates_failed", "reason": "no selectable candidates survived simulation", "selectable_candidate_count": 0, }, } if task_id == "single_fault": selected_outcome = choose_best_simulation( task_id=task_id, observation=observation, simulations=selectable_simulations, ) return selected_outcome.action, { "proposal_prompt": proposal_prompt, "proposal_raw_output": proposal_raw_output, "proposal_trace": {**proposal_trace, **prefilter_trace}, "graph_intelligence": graph_intelligence, "simulations": [ serialize_simulation_outcome(outcome) for outcome in simulations ], "final_prompt": "", "final_raw_output": "", "final_trace": { "decision": "deterministic_post_simulation_selection", "reason": selected_outcome.trace.get("reason", ""), "selected_candidate": selected_outcome.candidate_index, "task_id": task_id, }, } final_prompt = build_final_selection_prompt( task_id=task_id, observation=observation, step_count=step_count, max_steps=max_steps, simulations=selectable_simulations, ) final_response = client.chat.completions.create( **_chat_completion_kwargs(llm_config=llm_config, prompt=final_prompt) ) final_raw_output = final_response.choices[0].message.content or "" logger.info( "Received final selection task_id=%s chars=%s content=%r", task_id, len(final_raw_output), final_raw_output, ) action, final_trace = select_final_action( task_id=task_id, observation=observation, final_raw_output=final_raw_output, simulations=simulations, n_line=len(observation.line_status), n_gen=len(observation.gen_p), ) return action, { "proposal_prompt": proposal_prompt, "proposal_raw_output": proposal_raw_output, "proposal_trace": {**proposal_trace, **prefilter_trace}, "graph_intelligence": graph_intelligence, "simulations": [ serialize_simulation_outcome(outcome) for outcome in simulations ], "final_prompt": final_prompt, "final_raw_output": final_raw_output, "final_trace": final_trace, } def build_proposal_prompt( task_id: TaskId, observation: GridObservation, graph_intelligence: dict[str, Any], redispatchable_generators: Sequence[int], redispatch_generators: Sequence[RedispatchGeneratorContext], step_count: int, max_steps: int, include_task_description: bool, ) -> str: line_count = len(observation.line_status) gen_count = len(observation.gen_p) stressed_lines = summarize_lines(observation.rho, limit=8, minimum_rho=0.7) sensitivity_guidance = observation.sensitivity_guidance[:3] topology_guidance = [ item for item in sensitivity_guidance if item.get("action_type") == "disconnect_line" ] redispatch_guidance = [ item for item in sensitivity_guidance if item.get("action_type") == "redispatch" ] disconnected = [ {"line_id": idx, "status": "disconnected"} for idx, status in enumerate(observation.line_status) if not status ] generator_summary = summarize_generators(observation.gen_p, limit=6) cooldown_info = observation.metadata.get("time_before_cooldown_line", []) stage_index = int(observation.metadata.get("stage_index", 1)) steps_to_stage_boundary = int( observation.metadata.get("steps_to_stage_boundary", 0) ) available_load_ratio = float(observation.metadata.get("available_load_ratio", 1.0)) available_island_ratio = float( observation.metadata.get("available_island_ratio", 1.0) ) stage_boundary_assessed = bool( observation.metadata.get("stage_boundary_assessed", False) ) majority_islands_available = bool( observation.metadata.get("majority_islands_available", False) ) action_schema = ( '{"action_type":"disconnect_line|reconnect_line|redispatch|do_nothing","line_id":null|int,"gen_id":null|int,"delta_mw":null|float,"reason":"short string"}' ) response_schema = ( '{"primary_action":' + action_schema + ',"backup_action_1":' + action_schema + ',"backup_action_2":' + action_schema + "}" if task_id == "single_fault" else '{"candidates":[' + action_schema + "," + action_schema + "," + action_schema + "]}" ) lines = [ "You are a grid operator proposing actions for a deterministic simulator.", "Propose exactly 3 candidate actions to test in the physics sandbox.", "Allowed action types: disconnect_line, reconnect_line, redispatch, do_nothing.", "Return a single JSON object only.", "Use this exact schema: " + response_schema, "Rules: no markdown, no prose, no code fences, no extra keys, exactly 3 candidates.", "Diversity rule: use at least two different action types when plausible.", "CRITICAL PHYSICS RULE: You must prioritize candidates from the sensitivity_guidance list. These actions have been mathematically verified by power-flow sensitivity factors to reduce the load on the stressed line.", f"task_id={task_id}", f"step={step_count + 1}/{max_steps}", f"max_rho={max(observation.rho):.3f}" if observation.rho else "max_rho=0.0", f"line_count={line_count}", f"generator_count={gen_count}", "redispatchable_generators=" + json.dumps( [int(x) for x in redispatchable_generators], separators=(",", ":") ), "redispatch_generator_bounds=" + json.dumps( [context.model_dump() for context in redispatch_generators], separators=(",", ":"), ), "stressed_lines=" + json.dumps(stressed_lines, separators=(",", ":")), "sensitivity_guidance=" + json.dumps(sensitivity_guidance, separators=(",", ":")), "disconnected_lines=" + json.dumps(disconnected, separators=(",", ":")), "generators=" + json.dumps(generator_summary, separators=(",", ":")), "timestep_overflow=" + json.dumps(observation.timestep_overflow, separators=(",", ":")), "grid_topology_intelligence=" + json.dumps(graph_intelligence, separators=(",", ":")), ] if task_id == "single_fault": lines.insert( 6, "TASK RULE: For single_fault, do not propose disconnect_line or reconnect_line. Use redispatch and do_nothing only. Solve congestion by shifting generation, not by cutting topology.", ) lines.insert( 7, "TASK RULE: Rank your output strictly as primary_action first, then backup_action_1, then backup_action_2. The simulator will test all three and execute the highest-ranked safe option.", ) if task_id == "n_minus_1": danger_lines = [ entry for entry in stressed_lines if float(entry["rho"]) >= 0.92 ] warning_lines = [ entry for entry in stressed_lines if 0.80 <= float(entry["rho"]) < 0.92 ] cooldown_zero_lines = ( [int(idx) for idx, value in enumerate(cooldown_info) if int(value) == 0] if isinstance(cooldown_info, list) else [] ) lines.insert( 6, "TASK RULE: In n_minus_1, operate the degraded topology safely for 20 steps. Reconnect the faulted line when cooldown allows and when simulation shows it is safe.", ) lines.insert( 7, f"FAULTED_LINE=0; disconnected_now={json.dumps([entry['line_id'] for entry in disconnected], separators=(',', ':'))}", ) lines.insert( 8, f"N-1 PHASE={'emergency' if step_count < 5 else 'steady_state'}; emergency_window_steps_remaining={max(0, 5 - step_count)}", ) lines.insert( 9, "EMERGENCY OBJECTIVE: In steps 1-5, prioritize actions that bring max_rho below 0.92 as fast as possible. Clearing the emergency window is the top priority.", ) lines.insert( 10, "STEADY-STATE OBJECTIVE: From step 6 onward, prioritize keeping max_rho below 0.90 on as many steps as possible while preserving survivability.", ) lines.insert( 11, "RECONNECTION OBJECTIVE: When line 0 cooldown reaches 0, include a reconnect_line candidate for line 0 unless graph intelligence or current overloads strongly suggest it is unsafe.", ) lines.insert( 12, "CANDIDATE RULE: In the emergency phase, include at least one redispatch candidate aimed at immediate rho reduction. Do not fill the set with passive do_nothing-style choices.", ) lines.insert( 13, "CANDIDATE RULE: If no action looks clearly better, still propose the smallest safe redispatch or a safe reconnect test rather than defaulting all candidates toward do_nothing.", ) lines.insert( 14, f"N-1 STRUCTURAL SECURITY: score={float(graph_intelligence.get('n1_security_score', 0.0)):.3f}; bridge_lines={json.dumps(graph_intelligence.get('bridge_lines', []), separators=(',', ':'))}", ) lines.insert( 15, "THRESHOLDS: EMERGENCY if any line rho >= 0.92, WARNING for 0.80 <= rho < 0.92, SAFE if all lines are below 0.80.", ) lines.insert( 16, "EMERGENCY_LINES=" + json.dumps(danger_lines, separators=(",", ":")), ) lines.insert( 17, "WARNING_LINES=" + json.dumps(warning_lines, separators=(",", ":")), ) lines.insert( 18, "RECONNECT_WINDOW_LINES=" + json.dumps(cooldown_zero_lines, separators=(",", ":")), ) if task_id == "cascade_prevent": overflow_urgent = [ { "line_id": idx, "rho": round(float(observation.rho[idx]), 4), "timestep_overflow": int(value), } for idx, value in enumerate(observation.timestep_overflow) if int(value) > 0 ] overflow_urgent.sort( key=lambda item: (item["timestep_overflow"], item["rho"]), reverse=True ) lines.insert( 6, "TASK RULE: In cascade_prevent, prioritize lines with active overflow countdowns. A line with timestep_overflow=2 is more urgent than a line with high rho but overflow=0.", ) lines.insert( 7, "CASCADE RULE: Prevent automatic trips first, then improve thermal margin. Triaging imminent countdown expirations is more important than slightly reducing global max_rho.", ) lines.insert( 8, "OVERFLOW_COUNTDOWNS=" + json.dumps(overflow_urgent[:8], separators=(",", ":")), ) if task_id == "multi_stage_cascade": overflow_urgent = [ { "line_id": idx, "rho": round(float(observation.rho[idx]), 4), "timestep_overflow": int(value), } for idx, value in enumerate(observation.timestep_overflow) if int(value) > 0 ] overflow_urgent.sort( key=lambda item: (item["timestep_overflow"], item["rho"]), reverse=True ) lines.insert( 6, "TASK RULE: In multi_stage_cascade, assume the collapse will continue across three stages. Do not optimize only for this step; position the grid so later stages keep more load available.", ) lines.insert( 7, f"STAGE_CONTEXT=stage_{stage_index}_of_3; steps_to_stage_boundary={steps_to_stage_boundary}; available_load_ratio={available_load_ratio:.4f}; available_island_ratio={available_island_ratio:.4f}", ) lines.insert( 8, f"BOUNDARY_STATUS=assessed:{str(stage_boundary_assessed).lower()}; majority_islands_available:{str(majority_islands_available).lower()}", ) lines.insert( 9, "MSCF RULE: Prefer actions that preserve transferable generation and keep islands self-sustaining at the next boundary. Avoid short-term fixes that strand load in islands with insufficient generation.", ) lines.insert( 10, "TASK RULE: With multiple overloaded lines, topology cuts risk bus isolation. Prioritize redispatch over disconnect_line unless the line is explicitly safe_to_disconnect and the action preserves connectivity.", ) lines.insert( 11, "CONTROLLED_ISLANDING_CANDIDATES=" + json.dumps(topology_guidance, separators=(",", ":")), ) lines.insert( 12, "REDISPATCH_CANDIDATES=" + json.dumps(redispatch_guidance, separators=(",", ":")), ) lines.insert( 13, "OVERFLOW_COUNTDOWNS=" + json.dumps(overflow_urgent[:8], separators=(",", ":")), ) if include_task_description: lines.append("task_description=" + TASKS[task_id].description) lines.append( 'example_1={"candidates":[{"action_type":"disconnect_line","line_id":10,"gen_id":null,"delta_mw":null,"reason":"line 10 appears to be the stress bottleneck"},{"action_type":"redispatch","line_id":null,"gen_id":0,"delta_mw":5.0,"reason":"pick one allowed positive delta from redispatch_generator_bounds"},{"action_type":"do_nothing","line_id":null,"gen_id":null,"delta_mw":null,"reason":"keep a safe fallback"}]}' ) lines.append( 'example_2={"candidates":[{"action_type":"reconnect_line","line_id":0,"gen_id":null,"delta_mw":null,"reason":"restore missing transfer capacity"},{"action_type":"redispatch","line_id":null,"gen_id":1,"delta_mw":-5.0,"reason":"pick one allowed negative delta from redispatch_generator_bounds"},{"action_type":"do_nothing","line_id":null,"gen_id":null,"delta_mw":null,"reason":"baseline if both interventions are dangerous"}]}' ) return "\n".join(lines) def build_final_selection_prompt( task_id: TaskId, observation: GridObservation, step_count: int, max_steps: int, simulations: Sequence[SimulationOutcome], ) -> str: task4_hints = ( build_task4_selection_hints(simulations) if task_id == "multi_stage_cascade" else {} ) lines = [ "You are a grid operator choosing a final action after reviewing simulator outcomes.", "Select the safest candidate that reduces stress without ending the episode.", "You must select one simulated candidate or explicit do_nothing.", "Return a single JSON object only.", 'Use this exact schema: {"selected_candidate":1|2|3|0,"reason":"short string"}', "Use selected_candidate=0 only if every candidate is unsafe.", "Rules: no markdown, no prose, no code fences, no extra keys.", f"task_id={task_id}", f"step={step_count + 1}/{max_steps}", f"current_max_rho={max(observation.rho):.3f}" if observation.rho else "current_max_rho=0.0", "simulation_results=" + json.dumps( [serialize_simulation_outcome(outcome) for outcome in simulations], separators=(",", ":"), ), ] if task_id == "single_fault": lines.insert( 7, "RULE: If a simulated candidate safely reduces max_rho compared to the current state, you MUST select it over do_nothing, no matter how small the reduction is. Do not choose do_nothing unless every other candidate increases max_rho or causes a failure. Safe, incremental redispatch improvements are the only way to win.", ) if task_id == "n_minus_1": lines.insert( 7, "RULE: In steps 1-5, prioritize candidates that clear the emergency by bringing max_rho below 0.92. Do not choose do_nothing in the emergency window if a safe simulated action lowers max_rho.", ) lines.insert( 8, "RULE: When a safe reconnect_line action for line 0 is available after cooldown, strongly prefer it if it improves or preserves security.", ) lines.insert( 9, "RULE: After step 5, prefer candidates that keep max_rho below 0.90 on future steps rather than merely surviving at higher stress.", ) if task_id == "multi_stage_cascade": lines.insert( 7, "RULE: Prefer the candidate that preserves future survivability across stage boundaries. A slightly higher short-term max_rho can be acceptable if it keeps more load in islands that still have enough generation.", ) lines.insert( 8, "RULE: The listed candidates already exclude failed simulations. Choose only from these surviving candidates, or choose 0 only if none are listed.", ) lines.insert( 9, "RULE: If a safe redispatch candidate strictly improves max_rho over do_nothing, choose that redispatch instead of do_nothing. Do not pick do_nothing when a safe redispatch is better, even by a small margin.", ) lines.insert( 10, "RULE: A controlled-islanding candidate is justified only if it materially beats the best redispatch on survivability, not merely because it changes topology.", ) lines.insert( 11, "TASK4_SELECTION_HINTS=" + json.dumps(task4_hints, separators=(",", ":")), ) return "\n".join(lines) def build_task4_selection_hints( simulations: Sequence[SimulationOutcome], ) -> dict[str, Any]: best_do_nothing = next( (outcome for outcome in simulations if outcome.action.do_nothing), None, ) redispatch = [ outcome for outcome in simulations if outcome.action.redispatch and not outcome.action.line_set ] topology = [outcome for outcome in simulations if outcome.action.line_set] best_redispatch = ( min(redispatch, key=lambda outcome: (outcome.max_rho, outcome.candidate_index)) if redispatch else None ) prefer_redispatch_indices = [ outcome.candidate_index for outcome in redispatch if best_do_nothing is not None and outcome.max_rho < best_do_nothing.max_rho - 1e-9 ] topology_justified_indices = [ outcome.candidate_index for outcome in topology if best_redispatch is None or outcome.max_rho < best_redispatch.max_rho - 0.02 or len(outcome.overloaded_line_ids) < len(best_redispatch.overloaded_line_ids) ] return { "best_do_nothing_index": best_do_nothing.candidate_index if best_do_nothing else None, "best_redispatch_index": best_redispatch.candidate_index if best_redispatch else None, "prefer_redispatch_indices": prefer_redispatch_indices, "topology_justified_indices": topology_justified_indices, } def summarize_lines( rho: Sequence[float], limit: int, minimum_rho: float ) -> list[dict[str, Any]]: return sorted( ( {"line_id": idx, "rho": round(float(value), 4)} for idx, value in enumerate(rho) if float(value) >= minimum_rho ), key=lambda item: item["rho"], reverse=True, )[:limit] def summarize_generators(gen_p: Sequence[float], limit: int) -> list[dict[str, Any]]: return sorted( ( {"gen_id": idx, "p_mw": round(float(value), 4)} for idx, value in enumerate(gen_p) ), key=lambda item: abs(item["p_mw"]), reverse=True, )[:limit] def parse_json_action(content: str) -> Dict[str, Any]: match = re.search(r"\{.*\}", content, re.DOTALL) if not match: return {"do_nothing": True} try: return json.loads(match.group(0)) except json.JSONDecodeError: return {"do_nothing": True} def parse_selected_candidate(content: str) -> int | None: match = re.search(r'"selected_candidate"\s*:\s*(-?\d+)', content) if not match: return None try: return int(match.group(1)) except (TypeError, ValueError): return None def parse_candidate_proposals( content: str, n_line: int, n_gen: int, redispatchable_generators: Sequence[int] | None = None, redispatch_generators: Sequence[RedispatchGeneratorContext] | None = None, task_id: TaskId = "n_minus_1", ) -> tuple[list[tuple[GridAction, dict[str, Any]]], dict[str, Any]]: payload = parse_json_action(content) if task_id == "single_fault": raw_candidates = [ payload.get("primary_action"), payload.get("backup_action_1"), payload.get("backup_action_2"), ] else: raw_candidates = payload.get("candidates", []) candidates: list[tuple[GridAction, dict[str, Any]]] = [] if isinstance(raw_candidates, list): for item in raw_candidates[:3]: if isinstance(item, dict): candidates.append( validate_baseline_action( item, task_id=task_id, n_line=n_line, n_gen=n_gen, redispatchable_generators=redispatchable_generators, redispatch_generators=redispatch_generators, ) ) deduped: list[tuple[GridAction, dict[str, Any]]] = [] seen: set[str] = set() for action, trace in candidates: key = json.dumps(action.model_dump(), sort_keys=True) if key in seen: continue seen.add(key) deduped.append((action, trace)) fallback_pool = build_diverse_fallback_pool(redispatch_generators) for action, trace in fallback_pool: if len(deduped) >= 3: break key = json.dumps(action.model_dump(), sort_keys=True) if key in seen: continue seen.add(key) deduped.append((action, trace)) return deduped[:3], { "parsed_candidate_count": len(candidates), "deduped_candidate_count": len(deduped[:3]), } def build_diverse_fallback_pool( redispatch_generators: Sequence[RedispatchGeneratorContext] | None, ) -> list[tuple[GridAction, dict[str, Any]]]: fallback_pool: list[tuple[GridAction, dict[str, Any]]] = [] if redispatch_generators: signed_extremes: list[tuple[float, int, float]] = [] for context in redispatch_generators: feasible = [ float(value) for value in context.allowed_deltas if abs(float(value)) > 1e-9 ] if not feasible: continue negatives = [value for value in feasible if value < 0.0] positives = [value for value in feasible if value > 0.0] if negatives: signed_extremes.append( (abs(min(negatives)), int(context.gen_id), min(negatives)) ) if positives: signed_extremes.append( (abs(max(positives)), int(context.gen_id), max(positives)) ) for _magnitude, gen_id, delta in sorted( signed_extremes, key=lambda item: (-item[0], abs(item[2]), item[1], item[2]), ): fallback_pool.append( ( GridAction(redispatch={gen_id: float(delta)}), { "decision": "redispatch", "reason": f"fallback_generator_{gen_id}_{float(delta):.4f}", }, ) ) else: fallback_pool.extend( [ ( GridAction(redispatch={0: 10.0}), {"decision": "redispatch", "reason": "fallback_generator_0_up"}, ), ( GridAction(redispatch={0: -10.0}), {"decision": "redispatch", "reason": "fallback_generator_0_down"}, ), ] ) fallback_pool.append( ( GridAction(do_nothing=True), {"decision": "do_nothing", "reason": "fallback_baseline"}, ) ) return fallback_pool def supplement_candidate_proposals( task_id: TaskId, observation: GridObservation, graph_intelligence: dict[str, Any], redispatch_generators: Sequence[RedispatchGeneratorContext], proposal_candidates: Sequence[tuple[GridAction, dict[str, Any]]], parsed_candidate_count: int, ) -> list[tuple[GridAction, dict[str, Any]]]: emergency = is_emergency_state(task_id=task_id, observation=observation) heuristic_candidates = build_heuristic_candidates( task_id=task_id, observation=observation, graph_intelligence=graph_intelligence, redispatch_generators=redispatch_generators, ) candidate_stream: list[tuple[GridAction, dict[str, Any]]] = [] if emergency or parsed_candidate_count == 0: candidate_stream.extend(heuristic_candidates) candidate_stream.extend(proposal_candidates) if not emergency and parsed_candidate_count > 0: candidate_stream.extend(heuristic_candidates) deduped: list[tuple[GridAction, dict[str, Any]]] = [] seen: set[str] = set() for action, trace in candidate_stream: key = json.dumps(action.model_dump(), sort_keys=True) if key in seen: continue seen.add(key) deduped.append((action, trace)) if len(deduped) >= 3: break return deduped def build_heuristic_candidates( task_id: TaskId, observation: GridObservation, graph_intelligence: dict[str, Any], redispatch_generators: Sequence[RedispatchGeneratorContext], ) -> list[tuple[GridAction, dict[str, Any]]]: del graph_intelligence heuristics: list[tuple[GridAction, dict[str, Any]]] = [] for item in observation.sensitivity_guidance: action_type = item.get("action_type") target_id = item.get("target_id") try: target_id_int = int(target_id) except (TypeError, ValueError): continue if action_type == "disconnect_line" and task_id != "single_fault": heuristics.append( ( GridAction(line_set={target_id_int: -1}), { "decision": "heuristic_disconnect", "reason": f"sensitivity_guidance_{target_id_int}", }, ) ) elif action_type == "redispatch": delta_value = item.get("delta_mw") try: delta_float = float(delta_value) except (TypeError, ValueError): delta_float = None if delta_float is None: continue context = next( ( context for context in redispatch_generators if int(context.gen_id) == target_id_int ), None, ) if context is None: continue constrained = constrain_redispatch_delta(delta_float, context) if constrained is None: continue heuristics.append( ( GridAction(redispatch={target_id_int: constrained}), { "decision": "heuristic_redispatch", "reason": f"sensitivity_guidance_{target_id_int}_{constrained}", }, ) ) if heuristics: heuristics.append( ( GridAction(do_nothing=True), {"decision": "do_nothing", "reason": "heuristic_safe_fallback"}, ) ) return heuristics return build_diverse_fallback_pool(redispatch_generators) def filter_candidate_proposals( task_id: TaskId, observation: GridObservation, graph_intelligence: dict[str, Any], proposal_candidates: Sequence[tuple[GridAction, dict[str, Any]]], ) -> tuple[list[tuple[GridAction, dict[str, Any]]], dict[str, Any]]: del observation filtered: list[tuple[GridAction, dict[str, Any]]] = [] rejected: list[dict[str, Any]] = [] safe_disconnect = { int(line_id) for line_id in graph_intelligence.get("safe_to_disconnect", []) } for action, trace in proposal_candidates: if task_id == "multi_stage_cascade": rejected_line_ids = [ int(line_id) for line_id, status in action.line_set.items() if int(status) == -1 and int(line_id) not in safe_disconnect ] if rejected_line_ids: rejected.append( { "action": action.model_dump(), "reason": f"unsafe_disconnect_filtered:{sorted(rejected_line_ids)}", } ) continue filtered.append((action, trace)) if not filtered: filtered = [ ( GridAction(do_nothing=True), {"decision": "do_nothing", "reason": "all_candidates_prefiltered"}, ) ] deduped: list[tuple[GridAction, dict[str, Any]]] = [] seen: set[str] = set() for action, trace in filtered: key = json.dumps(action.model_dump(), sort_keys=True) if key in seen: continue seen.add(key) deduped.append((action, trace)) return deduped[:3], { "prefiltered_candidate_count": len(deduped[:3]), "prefilter_rejections": rejected, } def filter_selectable_simulations( simulations: Sequence[SimulationOutcome], ) -> list[SimulationOutcome]: return [ outcome for outcome in simulations if not outcome.convergence_failed and not outcome.done ] def select_final_action( final_raw_output: str, simulations: Sequence[SimulationOutcome], n_line: int, n_gen: int, task_id: TaskId = "n_minus_1", observation: GridObservation | None = None, ) -> tuple[GridAction, dict[str, Any]]: selectable_simulations = filter_selectable_simulations(simulations) deterministic_best = choose_best_simulation( task_id, observation, selectable_simulations or simulations, ) payload = parse_json_action(final_raw_output) reason = payload.get("reason", "") selected_candidate = payload.get("selected_candidate") try: selected_candidate = int(selected_candidate) except (TypeError, ValueError): selected_candidate = parse_selected_candidate(final_raw_output) if selected_candidate == 0: return GridAction(do_nothing=True), { "decision": "do_nothing", "reason": reason or "model_rejected_all_candidates", } if selected_candidate is not None: for outcome in selectable_simulations: if outcome.candidate_index == selected_candidate: return outcome.action, { "decision": "simulated_candidate_by_index", "reason": reason or outcome.trace.get("reason", ""), "selected_candidate": selected_candidate, } if not selectable_simulations: return GridAction(do_nothing=True), { "decision": "do_nothing_no_selectable_candidates", "reason": reason or "all_candidates_failed_simulation", } return deterministic_best.action, { "decision": "fallback_best_simulation", "reason": reason or "invalid_final_selection", "selected_candidate": deterministic_best.candidate_index, } def choose_best_simulation( task_id: TaskId, observation: GridObservation | None, simulations: Sequence[SimulationOutcome], ) -> SimulationOutcome: safe = [ outcome for outcome in simulations if not outcome.done and not outcome.convergence_failed ] if safe: current_max_rho = ( max(observation.rho) if observation and observation.rho else None ) safe = prefer_active_control_in_emergencies( task_id=task_id, observation=observation, simulations=safe, ) if task_id == "single_fault": improving = [ outcome for outcome in safe if current_max_rho is not None and outcome.max_rho < current_max_rho - 1e-9 and not outcome.action.do_nothing ] if improving: return min( improving, key=lambda outcome: ( outcome.max_rho, outcome.action.do_nothing, len(outcome.overloaded_line_ids), len(outcome.action.line_set), outcome.candidate_index, ), ) return min( safe, key=lambda outcome: ( outcome.max_rho, outcome.action.do_nothing, len(outcome.overloaded_line_ids), len(outcome.action.line_set), outcome.candidate_index, ), ) nonfatal = [outcome for outcome in simulations if not outcome.convergence_failed] if nonfatal: return min( nonfatal, key=lambda outcome: ( outcome.done, outcome.max_rho, outcome.action.do_nothing, len(outcome.action.line_set), outcome.candidate_index, ), ) return min(simulations, key=lambda outcome: outcome.candidate_index) def is_emergency_state(task_id: TaskId, observation: GridObservation | None) -> bool: if observation is None or not observation.rho: return False current_max_rho = max(float(value) for value in observation.rho) max_overflow = max((int(value) for value in observation.timestep_overflow), default=0) if task_id == "n_minus_1": return current_max_rho >= 0.92 if task_id == "cascade_prevent": return current_max_rho > 1.0 or max_overflow > 0 if task_id == "multi_stage_cascade": return current_max_rho > 1.0 or max_overflow > 0 return current_max_rho > 0.8 def prefer_active_control_in_emergencies( task_id: TaskId, observation: GridObservation | None, simulations: Sequence[SimulationOutcome], ) -> list[SimulationOutcome]: if not is_emergency_state(task_id=task_id, observation=observation): return list(simulations) do_nothing = [ outcome for outcome in simulations if outcome.action.do_nothing ] active = [outcome for outcome in simulations if not outcome.action.do_nothing] if not do_nothing or not active: return list(simulations) best_noop = min( do_nothing, key=lambda outcome: ( max_simulated_overflow(outcome), len(outcome.overloaded_line_ids), outcome.max_rho, outcome.candidate_index, ), ) epsilon = 0.02 if task_id == "multi_stage_cascade" else 0.01 viable_active = [ outcome for outcome in active if max_simulated_overflow(outcome) <= max_simulated_overflow(best_noop) and len(outcome.overloaded_line_ids) <= len(best_noop.overloaded_line_ids) and outcome.max_rho <= best_noop.max_rho + epsilon ] if not viable_active: return list(simulations) preferred = sorted( viable_active, key=lambda outcome: ( max_simulated_overflow(outcome), len(outcome.overloaded_line_ids), outcome.max_rho, len(outcome.action.line_set), outcome.candidate_index, ), ) remainder = [ outcome for outcome in simulations if outcome not in preferred ] return preferred + remainder def max_simulated_overflow(outcome: SimulationOutcome) -> int: overflow = outcome.raw_result.get("timestep_overflow", []) if not isinstance(overflow, list): return 0 return max((int(value) for value in overflow), default=0) def serialize_simulation_outcome(outcome: SimulationOutcome) -> dict[str, Any]: return { "candidate_index": outcome.candidate_index, "action": outcome.action.model_dump(), "proposal_trace": outcome.trace, "done": outcome.done, "simulated_reward": outcome.simulated_reward, "max_rho": outcome.max_rho, "overloaded_line_ids": outcome.overloaded_line_ids, "disconnected_lines": outcome.disconnected_lines, "convergence_failed": outcome.convergence_failed, "exceptions": outcome.exceptions, "raw_result": outcome.raw_result, } def actions_equivalent(lhs: GridAction, rhs: GridAction) -> bool: return lhs.model_dump() == rhs.model_dump() def validate_baseline_action( payload: Dict[str, Any], task_id: TaskId, n_line: int, n_gen: int, redispatchable_generators: Sequence[int] | None = None, redispatch_generators: Sequence[RedispatchGeneratorContext] | None = None, ) -> tuple[GridAction, dict[str, Any]]: allowed_redispatch = ( {int(gen_id) for gen_id in redispatchable_generators} if redispatchable_generators is not None else set(range(n_gen)) ) redispatch_context_by_id = ( {int(context.gen_id): context for context in redispatch_generators} if redispatch_generators is not None else {} ) action_type = payload.get("action_type") if payload.get("do_nothing") or action_type == "do_nothing": return GridAction(do_nothing=True), { "decision": "do_nothing", "reason": payload.get("reason", "explicit_do_nothing"), } if task_id == "single_fault" and action_type in { "disconnect_line", "reconnect_line", }: return GridAction(do_nothing=True), { "decision": "fallback_do_nothing", "reason": f"{action_type}_forbidden_for_single_fault", } line_set_payload = payload.get("line_set") or {} if not isinstance(line_set_payload, dict): line_set_payload = {} valid_line_set = {} for key, value in line_set_payload.items(): try: line_id = int(key) status = int(value) except (TypeError, ValueError): continue if 0 <= line_id < n_line and status in (-1, 1): valid_line_set[line_id] = status redispatch_payload = payload.get("redispatch") or {} if not isinstance(redispatch_payload, dict): redispatch_payload = {} if action_type == "disconnect_line": line_id = payload.get("line_id") try: line_id = int(line_id) except (TypeError, ValueError): line_id = None if line_id is not None and 0 <= line_id < n_line: return ( GridAction(line_set={line_id: -1}, redispatch={}), {"decision": "disconnect_line", "reason": payload.get("reason", "")}, ) return GridAction(do_nothing=True), { "decision": "fallback_do_nothing", "reason": "invalid_disconnect_line", } if action_type == "reconnect_line": line_id = payload.get("line_id") try: line_id = int(line_id) except (TypeError, ValueError): line_id = None if line_id is not None and 0 <= line_id < n_line: return ( GridAction(line_set={line_id: 1}, redispatch={}), {"decision": "reconnect_line", "reason": payload.get("reason", "")}, ) return GridAction(do_nothing=True), { "decision": "fallback_do_nothing", "reason": "invalid_reconnect_line", } if action_type == "redispatch": gen_id = payload.get("gen_id") delta = payload.get("delta_mw") try: gen_id = int(gen_id) delta = float(delta) except (TypeError, ValueError): gen_id = None delta = None if ( gen_id is not None and 0 <= gen_id < n_gen and gen_id in allowed_redispatch and delta is not None ): if gen_id in redispatch_context_by_id: delta = constrain_redispatch_delta( delta, redispatch_context_by_id[gen_id] ) if delta is None: return GridAction(do_nothing=True), { "decision": "fallback_do_nothing", "reason": "invalid_redispatch", } return ( GridAction(redispatch={gen_id: delta}, line_set={}), {"decision": "redispatch", "reason": payload.get("reason", "")}, ) return GridAction(do_nothing=True), { "decision": "fallback_do_nothing", "reason": "invalid_redispatch", } valid_redispatch = {} for key, value in redispatch_payload.items(): try: gen_id = int(key) delta = float(value) except (TypeError, ValueError): continue if 0 <= gen_id < n_gen and gen_id in allowed_redispatch: if gen_id in redispatch_context_by_id: delta = constrain_redispatch_delta( delta, redispatch_context_by_id[gen_id] ) if delta is None: continue valid_redispatch[gen_id] = delta if not valid_line_set and not valid_redispatch: return GridAction(do_nothing=True), { "decision": "fallback_do_nothing", "reason": "empty_or_invalid_payload", } return ( GridAction(line_set=valid_line_set, redispatch=valid_redispatch), {"decision": "legacy_payload", "reason": payload.get("reason", "")}, ) def constrain_redispatch_delta( delta: float, context: RedispatchGeneratorContext, ) -> float | None: if context.allowed_delta_min == 0.0 and context.allowed_delta_max == 0.0: return None clamped = min( max(float(delta), float(context.allowed_delta_min)), float(context.allowed_delta_max), ) feasible = [ float(value) for value in context.allowed_deltas if float(context.allowed_delta_min) <= float(value) <= float(context.allowed_delta_max) ] if feasible: return min( feasible, key=lambda value: (abs(value - clamped), abs(value), value) ) if abs(clamped) < 1e-9: return None return round(clamped, 4) def write_evaluation_outputs( timestamp: str, run_paths: dict[str, Path], model: str, base_url: str, llm_config: BaselineConfig, baseline_scores: BaselineScores, evaluation_records: list[dict[str, Any]], selected_task_ids: Sequence[TaskId], ) -> None: json_path = run_paths["json"] csv_path = run_paths["csv"] total_proposal_calls = 0 total_final_calls = 0 per_task_llm_calls: dict[str, dict[str, int]] = {} aggregate: dict[str, Any] = {} for task_id in selected_task_ids: records = [ record for record in evaluation_records if record["task_id"] == task_id ] scores = [float(record["score"]) for record in records] lengths = [int(record["episode_length"]) for record in records] wall_times = [ float(record.get("episode_wall_time_s", 0.0)) for record in records ] do_nothing = [int(record["do_nothing_steps"]) for record in records] redispatch_mw = [ float(record.get("episode_total_redispatch_mw", 0.0)) for record in records ] action_penalty = [ float(record.get("episode_action_penalty_total", 0.0)) for record in records ] task_proposal_calls = sum( len(record.get("raw_outputs", [])) for record in records ) task_final_calls = sum( 1 for record in records for step in record.get("raw_outputs", []) if step.get("final_prompt") ) total_proposal_calls += task_proposal_calls total_final_calls += task_final_calls per_task_llm_calls[task_id] = { "proposal_calls": task_proposal_calls, "final_calls": task_final_calls, "total_llm_calls": task_proposal_calls + task_final_calls, } aggregate[task_id] = { "num_episodes": len(records), "score_mean": round(mean(scores), 6) if scores else 0.0, "score_std": round(pstdev(scores), 6) if len(scores) > 1 else 0.0, "episode_length_mean": round(mean(lengths), 6) if lengths else 0.0, "episode_length_std": round(pstdev(lengths), 6) if len(lengths) > 1 else 0.0, "episode_wall_time_mean_s": round(mean(wall_times), 6) if wall_times else 0.0, "episode_wall_time_std_s": round(pstdev(wall_times), 6) if len(wall_times) > 1 else 0.0, "do_nothing_steps_mean": round(mean(do_nothing), 6) if do_nothing else 0.0, "episode_total_redispatch_mw_mean": round(mean(redispatch_mw), 6) if redispatch_mw else 0.0, "episode_action_penalty_total_mean": round(mean(action_penalty), 6) if action_penalty else 0.0, "proposal_calls": task_proposal_calls, "final_calls": task_final_calls, "total_llm_calls": task_proposal_calls + task_final_calls, } aggregate_by_tier: dict[str, Any] = {} for benchmark_tier in sorted( {record["benchmark_tier"] for record in evaluation_records} ): records = [ record for record in evaluation_records if record["benchmark_tier"] == benchmark_tier ] scores = [float(record["score"]) for record in records] lengths = [int(record["episode_length"]) for record in records] wall_times = [ float(record.get("episode_wall_time_s", 0.0)) for record in records ] do_nothing = [int(record["do_nothing_steps"]) for record in records] redispatch_mw = [ float(record.get("episode_total_redispatch_mw", 0.0)) for record in records ] action_penalty = [ float(record.get("episode_action_penalty_total", 0.0)) for record in records ] aggregate_by_tier[benchmark_tier] = { "num_episodes": len(records), "score_mean": round(mean(scores), 6) if scores else 0.0, "score_std": round(pstdev(scores), 6) if len(scores) > 1 else 0.0, "episode_length_mean": round(mean(lengths), 6) if lengths else 0.0, "episode_wall_time_mean_s": round(mean(wall_times), 6) if wall_times else 0.0, "episode_wall_time_std_s": round(pstdev(wall_times), 6) if len(wall_times) > 1 else 0.0, "do_nothing_steps_mean": round(mean(do_nothing), 6) if do_nothing else 0.0, "episode_total_redispatch_mw_mean": round(mean(redispatch_mw), 6) if redispatch_mw else 0.0, "episode_action_penalty_total_mean": round(mean(action_penalty), 6) if action_penalty else 0.0, } payload = { "timestamp": timestamp, "model": model, "base_url": base_url, "llm_usage": { "proposal_calls": total_proposal_calls, "final_calls": total_final_calls, "total_llm_calls": total_proposal_calls + total_final_calls, "per_task_llm_calls": per_task_llm_calls, }, "sampling": { "temperature": llm_config.temperature, "top_p": llm_config.top_p, "top_k": llm_config.top_k, "min_p": llm_config.min_p, "presence_penalty": llm_config.presence_penalty, "repetition_penalty": llm_config.repetition_penalty, "enable_thinking": llm_config.enable_thinking, "max_tokens": llm_config.max_tokens, "num_seeds": llm_config.num_seeds, "seed_start": llm_config.seed_start, "scenario_mode": llm_config.scenario_mode, "task_seed_overrides": TASK_SEED_OVERRIDES, }, "summary": baseline_scores.model_dump(), "aggregate": aggregate, "aggregate_by_tier": aggregate_by_tier, "episodes": evaluation_records, } json_path.write_text(json.dumps(payload, indent=2), encoding="utf-8") with csv_path.open("w", newline="", encoding="utf-8") as handle: writer = csv.DictWriter( handle, fieldnames=[ "task_id", "seed", "curriculum_episode", "benchmark_tier", "score", "episode_length", "episode_wall_time_s", "done", "do_nothing_steps", "non_do_nothing_steps", "episode_total_redispatch_mw", "episode_action_penalty_total", "episode_action_penalty_mean", ], ) writer.writeheader() for record in evaluation_records: writer.writerow( { "task_id": record["task_id"], "seed": record["seed"], "curriculum_episode": record["curriculum_episode"], "benchmark_tier": record["benchmark_tier"], "score": record["score"], "episode_length": record["episode_length"], "episode_wall_time_s": record.get("episode_wall_time_s", 0.0), "done": record["done"], "do_nothing_steps": record["do_nothing_steps"], "non_do_nothing_steps": record["non_do_nothing_steps"], "episode_total_redispatch_mw": record.get( "episode_total_redispatch_mw", 0.0 ), "episode_action_penalty_total": record.get( "episode_action_penalty_total", 0.0 ), "episode_action_penalty_mean": record.get( "episode_action_penalty_mean", 0.0 ), } ) logger.info("Wrote evaluation outputs json=%s csv=%s", json_path, csv_path) def prepare_run_paths(timestamp: str) -> dict[str, Path]: base_dir = Path(__file__).resolve().parent eval_dir = base_dir / "outputs" / "evals" log_dir = base_dir / "outputs" / "logs" eval_dir.mkdir(parents=True, exist_ok=True) log_dir.mkdir(parents=True, exist_ok=True) return { "json": eval_dir / f"baseline_eval_{timestamp}.json", "csv": eval_dir / f"baseline_eval_{timestamp}.csv", "log": log_dir / f"baseline_run_{timestamp}.log", } def attach_file_logger(log_path: Path) -> None: root_logger = logging.getLogger() if any( isinstance(handler, logging.FileHandler) and Path(handler.baseFilename) == log_path for handler in root_logger.handlers ): return file_handler = logging.FileHandler(log_path, encoding="utf-8") file_handler.setLevel(logging.INFO) file_handler.setFormatter( logging.Formatter( fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) ) root_logger.addHandler(file_handler) logger.info("Attached file logger log_path=%s", log_path) def append_evaluation_markdown( timestamp: str, model: str, llm_config: BaselineConfig, baseline_scores: BaselineScores, evaluation_records: list[dict[str, Any]], run_paths: dict[str, Path], selected_task_ids: Sequence[TaskId], ) -> None: eval_md = Path(__file__).resolve().parent.parent / "evaluation.md" grouped: dict[str, list[dict[str, Any]]] = {} for record in evaluation_records: grouped.setdefault(record["task_id"], []).append(record) lines = [ f"## Run {timestamp}", "", f"- Model: `{model}`", f"- Tasks: `{', '.join(selected_task_ids)}`", f"- Seeds: `{llm_config.seed_start}` to `{llm_config.seed_start + llm_config.num_seeds - 1}`", f"- Scenario mode: `{llm_config.scenario_mode}`", f"- Sampling: `temperature={llm_config.temperature}`, `top_p={llm_config.top_p}`, `top_k={llm_config.top_k}`, `min_p={llm_config.min_p}`, `presence_penalty={llm_config.presence_penalty}`, `repetition_penalty={llm_config.repetition_penalty}`", f"- JSON output: [{run_paths['json']}]({run_paths['json']})", f"- CSV output: [{run_paths['csv']}]({run_paths['csv']})", f"- Log file: [{run_paths['log']}]({run_paths['log']})", "", "| Task | Tier | Mean Score | Mean Episode Length | Mean Time (s) | Mean Do-Nothing Steps |", "| --- | --- | ---: | ---: | ---: | ---: |", ] for task_id, records in grouped.items(): tier_groups: dict[str, list[dict[str, Any]]] = {} for record in records: tier_groups.setdefault(record["benchmark_tier"], []).append(record) for benchmark_tier, tier_records in tier_groups.items(): scores = [float(record["score"]) for record in tier_records] lengths = [int(record["episode_length"]) for record in tier_records] wall_times = [ float(record.get("episode_wall_time_s", 0.0)) for record in tier_records ] do_nothing = [int(record["do_nothing_steps"]) for record in tier_records] lines.append( f"| `{task_id}` | `{benchmark_tier}` | `{mean(scores):.6f}` | `{mean(lengths):.2f}` | `{mean(wall_times):.2f}` | `{mean(do_nothing):.2f}` |" ) lines.extend( [ "", "Summary scores:", f"```json\n{baseline_scores.model_dump_json(indent=2)}\n```", "", ] ) with eval_md.open("a", encoding="utf-8") as handle: handle.write("\n".join(lines)) handle.write("\n") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--baseline-suite", action="store_true", help="Run the internal multi-task baseline suite instead of the submission episode runner.", ) parser.add_argument( "--task-id", dest="task_ids", nargs="+", choices=sorted(TASKS.keys()), help="Run only the selected task ids for --baseline-suite. Defaults to all tasks.", ) args = parser.parse_args() if args.baseline_suite: base_url = os.environ.get("GRID2OP_BASE_URL", DEFAULT_ENV_BASE_URL) result = run_baseline_suite(base_url=base_url, task_ids=args.task_ids) print(result.model_dump_json(indent=2)) else: run_submission_episodes(task_ids=args.task_ids)