File size: 12,542 Bytes
2f0ff12
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20c959e
2f0ff12
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
import gradio as gr
from pygame import Vector2
import time
import threading
import queue
from simulator_env import StreamableSimulation, SwarmAgent, MyConfig, MyWindow

import speech_processing
import text_processing
import safety_module
import bt_generator

from pathlib import Path

BASE = Path(__file__).parent

class GradioStreamer:
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(GradioStreamer, cls).__new__(cls)
            cls._instance.initialized = False
        return cls._instance
    
    def __init__(self):
        if not self.initialized:
            self.latest_frame = None
            self.running = True
            self.sim = None
            self.sim_thread = None
            self.initialized = True
            self.quit = False
    
    def update_frame(self, frame):
        self.latest_frame = frame

    def run_simulation(self):
            # Instantiate simulation and agents here:
            
            nest_pos = Vector2(450, 400)
            target_pos = Vector2(300, 200)
            
            agent_images = ["white.png", "green.png", "red circle.png"]
            image_paths = [str(BASE / "images" / fname) for fname in agent_images]
            

            # agent_images_paths = ["./images/white.png", "./images/green.png", "./images/red circle.png"]
            config = MyConfig(radius=25, visualise_chunks=True, movement_speed=2.0)
            self.sim = StreamableSimulation(config=config)
            loaded_agent_images = self.sim._load_image(image_paths)
            # loaded_agent_images = self.sim._load_image(agent_images_paths)

            # Create agents (each agent builds its own BT in its __init__)
            for _ in range(50):
                agents_pos = Vector2(450, 400)
                agent = SwarmAgent(
                    images=loaded_agent_images,
                    simulation=self.sim,
                    pos=agents_pos,
                    nest_pos=nest_pos,
                    target_pos=target_pos
                )
                self.sim._agents.add(agent)
                self.sim._all.add(agent)
            # (Optionally spawn obstacles and sites.)
            self.sim.spawn_obstacle(str(BASE / "images" / "rect_obst.png"), 350, 50)
            self.sim.spawn_obstacle(str(BASE / "images" / "rect_obst (1).png"), 100, 350)

            self.sim.spawn_site(str(BASE / "images" / "rect.png"), 300, 200)
            self.sim.spawn_site(str(BASE / "images" / "nest.png"), 450, 400)


            start_time = time.time()  # Record the start time
            # while self.sim.running:
            #     self.sim.tick()
            #     for agent in self.sim._agents:
            #         agent.bt.tick_once()  # Continuously update BTs
            #     if not self.sim.frame_queue.empty():
            #         frame = self.sim.frame_queue.get()
            #         self.update_frame(frame)
            #     time.sleep(1/30)
            
            while self.running:
                self.sim.tick()
                
                if not self.sim.frame_queue.empty():
                    frame = self.sim.frame_queue.get()
                    self.update_frame(frame)    

                time.sleep(1/30)  # Maintain a frame rate of ~30 FPS
                # Stop after 1 minute
                if time.time() - start_time >= 120:
                    print("Simulation stopped after 1 minute.")
                    break

            

    def stream(self):
        while True:
            if self.sim is not None and self.latest_frame is not None:
                yield self.latest_frame
            else:
                # Optionally, yield a blank image or None.
                yield None
            time.sleep(1/30)


    def start_simulation(self):
        """Start the simulation, creating a new thread if necessary."""
        if not self.sim_thread or not self.sim_thread.is_alive():
            self.running = True      # Reset running flag
            self.quit = False        # Reset quit flag
            self.latest_frame = None # Clear out the old frame
            self.sim_thread = threading.Thread(target=self.run_simulation, daemon=True)
            self.sim_thread.start()


    def clear_frame_queue(self):
        if self.sim:
            try:
                while True:
                    self.sim.frame_queue.get_nowait()
            except queue.Empty:
                pass



    def stop_simulation(self):
        print("Stopping Simulation...")
        self.running = False
        self.quit = True
        if self.sim:
            for agent in self.sim._agents:
                agent.bt_active = False
            self.sim.running = False
            self.sim.stop()
            self.clear_frame_queue()
            self.sim = None
        if self.sim_thread and self.sim_thread.is_alive():
            self.sim_thread.join(timeout=2)
            print("Simulation thread terminated.")
        self.latest_frame = None  # Clear the displayed frame
        print("Simulation stopped successfully.")




            

def test(temp):
    return "test"

def test_safe(temp, checkbox):
    return "Safe"

def test_LLM_generate_BT(temp):
    print(temp)
    return None





def stop_gradio_interface():
    raise Exception("Simulation stopped!")


def create_gradio_interface():
    streamer = GradioStreamer()
    
    def on_translate_or_process():
        streamer.start_simulation()
        return gr.update(visible=True)
    
    def on_stop():
        print("Simulation on_stop")
        streamer.stop_simulation()
        return gr.update(visible=False)
    
    behaviors = bt_generator.call_behaviors()       
    formatted_behaviors = "\n".join(f"- **{name}**: {doc.split('Returns:')[0].strip()}" for name, doc in behaviors.items())
    # formatted_behaviors = "Test"

    # Gradio Interface
    with gr.Blocks() as demo:
        gr.Markdown(
            """
        # ๐Ÿ **SwarmChat:** Enabling Humanโ€“Swarm Interaction and Robot Control via Natural Language
        Easily talk to virtual robots, and see the result live.  
        """
        )
        gr.Markdown(
            """
        **How it works**

        1.  Speak or type a task in *any EU language* (e.g. โ€œFind the target, then line up by colourโ€).
        2.  Press **Start** to launch the simulator. Use **Stop** to halt & reset.
        3.  SwarmChat translates your command, runs a safety check, and auto-builds a behaviour tree (BT).          

        > The BT XML is shown on the right so you can copy / save it for real robots.
        """
        )  
        with gr.Tabs():
            # Tab for microphone input
            with gr.Tab("Microphone Input"):
                gr.Markdown("## ๐ŸŽ™๏ธ Voice mode")
                gr.Markdown("""
                Use your microphone to record audio instructions for the swarm. The system translates them into a robot-executable BT.
                """)
                with gr.Row():
                    with gr.Column():
                        microphone_input = gr.Audio(sources=["microphone"], type="filepath", label="๐ŸŽ™๏ธ Record Audio")
                        safety_checkbox = gr.Checkbox(label="Turn off Safety Model")                        
                    with gr.Column():
                        output_text_audio = gr.Textbox(label="๐Ÿ“„ Translated Instructions to English" )
                        safty_check_audio = gr.Textbox(label="โœ… Safety Check")
                        
                
                translate_button_audio = gr.Button("Start")

                simulation_output = gr.Image(label="Live Stream", streaming=True, visible=False)
                stop_button = gr.Button("Stop")
                with gr.Row():
                    with gr.Column():
                        gr.Markdown(f"""**๐Ÿ›  Primitive behaviours available.**\n{formatted_behaviors}\n\nThese are the only low-level actions/conditions the model is allowed to use yet.""")

                    with gr.Column():
                        generated_BT_audio = gr.Textbox(label="Generated behavior tree")
                
                translate_button_audio.click(
                    fn=speech_processing.translate_audio,
                    # fn=test,
                    inputs=microphone_input,
                    outputs=output_text_audio
                ).then(
                    fn=safety_module.check_safety, 
                    # fn=test_safe,
                    inputs=[output_text_audio,safety_checkbox], 
                    outputs=safty_check_audio
                ).then(
                    fn=lambda x: x if x == "Safe" else stop_gradio_interface(),
                    inputs=safty_check_audio, 
                    outputs=None
                ).success(
                    fn=bt_generator.generate_behavior_tree,
                    # fn=test_LLM_generate_BT,
                    inputs=output_text_audio, 
                    outputs=generated_BT_audio
                ).success(                    
                    fn=on_translate_or_process,
                    outputs=simulation_output
                )

                # stop_button.click(fn=on_stop, outputs=simulation_output)
            # stop_button.click(fn=on_stop, outputs=simulation_output)#.then(js="window.location.reload()")
            stop_button.click(fn=on_stop,outputs=simulation_output)#.then(js="window.location.reload()")
            demo.load(fn=streamer.stream, outputs=simulation_output)

            # Tab for text input
            with gr.Tab("๐Ÿ“ Text Input"):
                gr.Markdown("## ๐Ÿ“ Text mode")
                gr.Markdown("""
                Enter text-based instructions for the swarm. The system translates them into a robot-executable BT.
                """)
                with gr.Row():
                    with gr.Column():
                        text_input = gr.Textbox(lines=4, placeholder="Enter your instructions here...", label="๐Ÿ“ Input Text")
                        safety_checkbox_text = gr.Checkbox(label="Turn off Safety Model")                        
                    with gr.Column():
                        output_text_text = gr.Textbox(label="๐Ÿ“„ Translated Instructions to English", lines=2)
                        safty_check_text = gr.Textbox(label="โœ… Safety Check")

                process_button_text = gr.Button("Start")

                simulation_output = gr.Image(label="Live Stream", streaming=True, visible=False)
                stop_button = gr.Button("Stop")
                with gr.Row():
                    with gr.Column():
                        gr.Markdown(f"""**๐Ÿ›  Primitive behaviours available.**\n{formatted_behaviors}\n\nThese are the only low-level actions/conditions the model is allowed to use yet.""")

                    with gr.Column():
                        generated_BT_text = gr.Textbox(label="Generated behavior tree")

                process_button_text.click(
                    fn=text_processing.translate_text,
                    # fn=test,
                    inputs=text_input,
                    outputs=output_text_text
                ).then(
                    fn=safety_module.check_safety, 
                    # fn=test_safe,
                    inputs=[output_text_text,safety_checkbox_text], 
                    outputs=safty_check_text
                ).then(
                    fn=lambda x: x if x == "Safe" else stop_gradio_interface(), 
                    inputs=safty_check_text, 
                    outputs=None
                ).success(
                    fn=bt_generator.generate_behavior_tree,
                    # fn=test_LLM_generate_BT,
                    inputs=output_text_text, 
                    outputs=generated_BT_text
                ).success(                    
                    fn=on_translate_or_process,
                    outputs=simulation_output
                )
                stop_button.click(fn=on_stop,outputs=simulation_output)#.then(fn=reload_page,outputs=None ,js="window.location.reload()")
                # stop_button.click(fn=on_stop, outputs=simulation_output, js="window.location.reload()")
                demo.load(fn=streamer.stream, outputs=simulation_output)
    
    return demo

if __name__ == "__main__":
    demo = create_gradio_interface()
    try:
        demo.launch(server_port=7860, server_name="0.0.0.0")
    finally:
        streamer = GradioStreamer()
        streamer.stop_simulation()