Spaces:
Runtime error
Runtime error
| import math | |
| import time | |
| import pygame as pg | |
| from vi import Agent, Config, Window, HeadlessSimulation | |
| from typing import Optional | |
| from queue import Queue | |
| import numpy as np | |
| from pygame.math import Vector2 | |
| import py_trees as pt | |
| import parser | |
| import xml.etree.ElementTree as ET | |
| import threading | |
| # import pyttsx3 | |
| class MyWindow(Window): | |
| """Custom window class for simulation.""" | |
| def __init__(self, width=800, height=600): | |
| super().__init__(width, height) | |
| class MyConfig(Config): | |
| """Custom configuration for simulation.""" | |
| def __init__(self, radius=25, visualise_chunks=True, window=None, movement_speed=2.0): | |
| super().__init__( | |
| radius=radius, | |
| visualise_chunks=visualise_chunks, | |
| window=window or MyWindow(800, 600), | |
| movement_speed=movement_speed | |
| ) | |
| class SwarmAgent(Agent): | |
| def __init__(self, images, simulation, pos, nest_pos, target_pos): | |
| super().__init__(images=images, simulation=simulation) | |
| # Ensure the agent gets the configuration from the simulation. | |
| self.config = simulation.config | |
| self.pos = pos | |
| self.nest_pos = nest_pos | |
| self.target_pos = target_pos | |
| self.target_detected_flag = False | |
| self.target_reached_flag = False | |
| self.is_agent_in_nest_flag = False | |
| self.obstacle_radius = 3 | |
| self.state = "seeking" | |
| self.bt_active = True # Add a flag | |
| # self.tts_engine = pyttsx3.init() # Initialize text-to-speech engine | |
| file_path = "tree.xml" | |
| trees = parser.parse_behavior_trees(file_path) | |
| subtree_mapping = { tree.attributes.get("ID"): tree for tree in trees } | |
| xml_tree = ET.parse(file_path) | |
| xml_root = xml_tree.getroot() | |
| main_tree_id = xml_root.attrib.get("main_tree_to_execute") | |
| if not main_tree_id or main_tree_id not in subtree_mapping: | |
| raise ValueError("Main tree not found in the XML!") | |
| main_tree_node = subtree_mapping[main_tree_id] | |
| # Build the py_trees tree: | |
| self.bt = parser.build_behavior(main_tree_node, subtree_mapping) | |
| # Inject the agent instance into all leaf behaviors. | |
| self._inject_agent(self.bt) | |
| def _inject_agent(self, node): | |
| """Recursively set the agent for any custom BT nodes.""" | |
| if hasattr(node, "agent"): | |
| node.agent = self | |
| if hasattr(node, "children"): | |
| for child in node.children: | |
| self._inject_agent(child) | |
| def update(self): | |
| if self.bt_active: | |
| self.bt.tick_once() | |
| def obstacle(self): | |
| """ | |
| Check for obstacle intersections within a predefined radius. | |
| Returns: True if an obstacle is detected within the radius, False otherwise. | |
| """ | |
| for intersection in self.obstacle_intersections(scale=self.obstacle_radius): | |
| return True | |
| return False | |
| # def update(self): | |
| # self.bt.tick_once() | |
| # # self.root_node.run(self) | |
| # def say(self, message: str): | |
| # """ | |
| # Action Node: Speak the provided message using text-to-speech if it hasn't been spoken before. | |
| # Args: message (str): The message to be spoken. | |
| # Returns: Always returns SUCCESS, indicating the action was executed. | |
| # """ | |
| # if not hasattr(self, 'old_message'): | |
| # self.old_message = [] | |
| # # Only speak the message if it has not been spoken before (i.e. not in old_message) | |
| # if message not in self.old_message: | |
| # self.tts_engine.say(message) | |
| # self.tts_engine.runAndWait() | |
| # self.old_message.append(message) | |
| # return pt.common.Status.SUCCESS | |
| def flocking(self): | |
| """ | |
| Action Node: Adjust the agent's move vector by blending alignment and separation forces from nearby agents. | |
| Returns: Always returns SUCCESS, indicating the action was executed. | |
| """ | |
| nearby_agents = list(self.in_proximity_accuracy().without_distance()) | |
| if not nearby_agents: | |
| return pt.common.Status.SUCCESS | |
| alignment = Vector2(0, 0) | |
| separation = Vector2(0, 0) | |
| separation_count = 0 | |
| # Desired minimum separation distance (adjust as needed) | |
| separation_threshold = 3 | |
| # Calculate alignment and separation contributions. | |
| for other in nearby_agents: | |
| alignment += other.move | |
| diff = self.pos - other.pos | |
| distance = diff.length() | |
| if 0 < distance < separation_threshold: | |
| # The closer the neighbor, the stronger the repulsive force. | |
| separation += diff.normalize() * (separation_threshold - distance) | |
| separation_count += 1 | |
| # Average the alignment vector over all neighbors. | |
| alignment /= len(nearby_agents) | |
| # If any agents are too close, average the separation vector. | |
| if separation_count > 0: | |
| separation /= separation_count | |
| # Blend the two influences. Here, alignment has a stronger influence than separation. | |
| # Adjust the blend factor (e.g., 0.3) to control separation influence. | |
| blended_force = alignment.lerp(separation, 0.3) | |
| # Smoothly blend the current move with the blended force. | |
| self.move = self.move.lerp(blended_force, 0.5) | |
| # Normalize and scale to the configured movement speed. | |
| if self.move.length() > 0: | |
| self.move = self.move.normalize() * self.config.movement_speed | |
| # Update position and apply wrap-around if necessary. | |
| self.pos += self.move | |
| self.there_is_no_escape() | |
| return pt.common.Status.SUCCESS | |
| def align_with_swarm(self): | |
| """ | |
| Action Node: Align the agent's move vector with the average movement of nearby agents. | |
| Returns: Always returns SUCCESS, indicating the action was executed. | |
| """ | |
| nearby_agents = list(self.in_proximity_accuracy().without_distance()) | |
| if not nearby_agents: | |
| return pt.common.Status.SUCCESS | |
| avg_direction = Vector2(0, 0) | |
| for other in nearby_agents: | |
| avg_direction += other.move | |
| avg_direction /= len(nearby_agents) | |
| # Blend current movement with average direction. | |
| self.move = self.move.lerp(avg_direction, 0.5) | |
| if self.move.length() > 0: | |
| self.move = self.move.normalize() * self.config.movement_speed | |
| # Update position and wrap-around if necessary. | |
| self.pos += self.move | |
| self.there_is_no_escape() | |
| return pt.common.Status.SUCCESS | |
| def is_obstacle_detected(self): | |
| """ | |
| Condition node: Determine if any obstacles are detected in the vicinity of the agent. | |
| Returns: SUCCESS if an obstacle is detected, FAILURE otherwise. | |
| """ | |
| if self.obstacle(): | |
| return pt.common.Status.SUCCESS | |
| else: | |
| return pt.common.Status.FAILURE | |
| def avoid_obstacle(self): | |
| """ | |
| Action node: Execute an action to avoid detected obstacles. | |
| Returns: Always returns SUCCESS, indicating the action was executed. | |
| """ | |
| self.move.rotate_ip(180) | |
| return pt.common.Status.SUCCESS | |
| def is_target_detected(self): | |
| """ | |
| Condition node: Check if the target is within a detectable distance from the agent's position. | |
| Returns: SUCCESS if the target is within 20 units of distance, FAILURE otherwise. | |
| """ | |
| distance = math.dist(self.target_pos, self.pos) | |
| if distance <= 20: | |
| self.target_detected_flag = True | |
| if self.target_detected_flag: | |
| return pt.common.Status.SUCCESS | |
| return pt.common.Status.FAILURE | |
| def is_target_reached(self): | |
| """ | |
| Condition node: Check if the agent has reached the target. | |
| Returns: SUCCESS if the target is within 15 units of distance, FAILURE otherwise. | |
| """ | |
| distance = math.dist(self.target_pos, self.pos) | |
| if distance <= 15: | |
| self.target_reached_flag = True | |
| if self.target_reached_flag: | |
| return pt.common.Status.SUCCESS | |
| return pt.common.Status.FAILURE | |
| def change_color(self, color): | |
| """ | |
| Action Node: Change the agent's color to 'white', 'green', or 'red'. | |
| Args: color (str): Color name. | |
| Returns: Always returns SUCCESS, indicating the action was executed. | |
| """ | |
| color = color.lower() | |
| if color == "white": | |
| self.change_image(0) | |
| elif color == "green": | |
| self.change_image(1) | |
| elif color == "red": | |
| self.change_image(2) | |
| return pt.common.Status.SUCCESS | |
| def is_agent_in_nest(self): | |
| """ | |
| Condition node: Determine if the agent is in the nest. | |
| Returns: SUCCESS if the agent is in the nest, FAILURE otherwise. | |
| """ | |
| distance = math.dist(self.nest_pos, self.pos) | |
| if distance <= 17 and (self.target_reached_flag==True or self.target_detected_flag == True or self.state == "completed" ) : | |
| self.state = "seeking" | |
| # self.target_detected_flag = False | |
| # self.target_reached_flag = False | |
| self.is_agent_in_nest_flag = True | |
| if self.is_agent_in_nest_flag: | |
| return pt.common.Status.SUCCESS | |
| return pt.common.Status.FAILURE | |
| def agent_movement_freeze(self): | |
| """ | |
| Action node: Freeze the agent's movement, typically to indicate a stop in activity. | |
| Returns: Always returns SUCCESS, indicating the action was executed. | |
| """ | |
| self.freeze_movement() | |
| return pt.common.Status.SUCCESS | |
| def continue_movement_agent(self): | |
| """ | |
| Action node: Continue the agent's movement after it has been previously frozen. | |
| Returns: Always returns SUCCESS, indicating the action was executed. | |
| """ | |
| self.continue_movement() | |
| return pt.common.Status.SUCCESS | |
| def move_randomly(self): | |
| """ | |
| Action node: Perform a wandering action where the agent moves randomly within the environment. | |
| Returns: Always returns SUCCESS, indicating the action was executed. | |
| """ | |
| Agent.change_position(self) | |
| return pt.common.Status.SUCCESS | |
| def is_path_clear(self): | |
| """ | |
| Condition node: Check if the path ahead of the agent is clear of obstacles. | |
| Returns: SUCCESS if no obstacles are detected ahead, FAILURE if obstacles are present. | |
| """ | |
| # return not self.obstacle() | |
| if not self.obstacle(): | |
| return pt.common.Status.SUCCESS | |
| else: | |
| return pt.common.Status.FAILURE | |
| def is_line_formed(self): | |
| """ | |
| Condition node: Determine if the agent has formed a line with a reference point at the center of the window. | |
| Returns: SUCCESS if the line is formed with the center, FAILURE otherwise. | |
| """ | |
| center_x = self.config.window.width / 2 | |
| direction = Vector2(center_x, self.pos.y) - self.pos | |
| if direction.length() > 0.5: | |
| return pt.common.Status.FAILURE | |
| return pt.common.Status.SUCCESS | |
| def form_line(self): | |
| """ | |
| Action node: Direct the agent to form a line towards the center of the window. This function adjuststhe agent's position to align it with the center. | |
| Returns: Always returns SUCCESS, indicating the action was executed. | |
| """ | |
| # print("form_line") | |
| center_x = self.config.window.width / 2 | |
| direction = Vector2(center_x, self.pos.y) - self.pos | |
| if direction.length() > 0.5: | |
| direction.scale_to_length(self.config.movement_speed) | |
| self.pos += direction | |
| return pt.common.Status.SUCCESS | |
| # def task_completed(self): | |
| # """ | |
| # Action node: Signal that the agent has completed its designated task. Returns: Always returns True, indicating that the task completion action was executed. | |
| # """ | |
| # self.state = "completed" | |
| # return pt.common.Status.SUCCESS | |
| class StreamableSimulation(HeadlessSimulation): | |
| """Modified Simulation class that captures frames for streaming.""" | |
| def __init__(self, config: Optional[Config] = None): | |
| super().__init__(config) | |
| pg.init() | |
| size = self.config.window.as_tuple() | |
| self._screen = pg.Surface(size, pg.SRCALPHA) | |
| self._background = pg.Surface(size, pg.SRCALPHA) | |
| self._background.fill((0, 0, 0)) | |
| self.frame_queue = Queue(maxsize=30) | |
| self.running = True | |
| self._frame_lock = threading.Lock() | |
| def get_frame(self): | |
| with self._frame_lock: | |
| surf_copy = self._screen.copy() | |
| frame = np.array(pg.surfarray.pixels3d(surf_copy)) | |
| return np.transpose(frame, (1, 0, 2)) | |
| def tick(self): | |
| """Run a simulation step and capture frames.""" | |
| super().tick() | |
| with self._frame_lock: | |
| self._screen.blit(self._background, (0, 0)) | |
| for sprite in self._all.sprites(): | |
| self._screen.blit(sprite.image, sprite.rect) | |
| try: | |
| frame = self.get_frame() | |
| self.frame_queue.put(frame, block=False) | |
| except Queue.Full: | |
| print("Frame queue is full. Dropping frame.") | |
| # def _load_image(self, path: str) -> pg.surface.Surface: | |
| # """Load an image from the given path.""" | |
| # return pg.image.load(path) | |
| def _load_image(self, paths): | |
| """Load one or more images from given paths.""" | |
| if isinstance(paths, str): # If it's a single string, load normally | |
| return pg.image.load(paths) | |
| elif isinstance(paths, list): # If it's a list, load all images | |
| return [pg.image.load(path) for path in paths] | |
| raise TypeError("Expected a string (file path) or a list of file paths") | |
| def stop(self): | |
| """Stop the simulation.""" | |
| # Do not try to call self.bt.stop() because simulation does not own a BT. | |
| # self.running = False | |
| super().stop() | |
| pg.quit() # Quit the Pygame environment | |
| # if __name__=="__main__": | |
| # # Define nest and target positions | |
| # nest_x, nest_y = 450, 400 | |
| # target_x, target_y = 200, 100 | |
| # nest_pos = Vector2(nest_x, nest_y) | |
| # target_pos = Vector2(target_x, target_y) | |
| # # Load images for agents | |
| # agent_images_paths = ["./images/white.png", "./images/green.png", "./images/red circle.png"] | |
| # config = MyConfig(radius=250, visualise_chunks=True, movement_speed=2) | |
| # sim = StreamableSimulation(config=config) | |
| # # Load images | |
| # loaded_agent_images = sim._load_image(agent_images_paths) | |
| # # Initialize agents with behavior tree parsing | |
| # for _ in range(50): | |
| # agent = SwarmAgent( | |
| # images=loaded_agent_images, | |
| # simulation=sim, | |
| # pos=Vector2(nest_x, nest_y), | |
| # nest_pos=nest_pos, | |
| # target_pos=target_pos, | |
| # ) | |
| # sim._agents.add(agent) | |
| # sim._all.add(agent) | |
| # # Draw environment elements | |
| # sim.spawn_obstacle("./images/rect_obst.png", 350, 100) | |
| # sim.spawn_obstacle("./images/rect_obst (1).png", 100, 350) | |
| # sim.spawn_site("./images/rect.png", target_x, target_y) | |
| # sim.spawn_site("./images/nest.png", nest_x, nest_y) | |
| # for agent in sim._agents: | |
| # agent.bt.tick_once() | |
| # # Then run your simulation loop without ticking the BT further. | |
| # while sim.running: | |
| # sim.tick() | |
| # if not sim.frame_queue.empty(): | |
| # frame = sim.frame_queue.get() | |
| # # update_frame(frame) or display the frame as needed. | |
| # time.sleep(1/30) |