sgbaird commited on
Commit
b7afc5f
·
1 Parent(s): 10091e7

chore: Add Streamlit and MQTT implementation for light mixing control panel

Browse files
app.py CHANGED
@@ -1,19 +1,12 @@
1
- """Based on
2
- https://github.com/AccelerationConsortium/ac-training-lab/blob/main/src/ac_training_lab/picow/digital-pipette/scripts/streamlit_webapp.py
3
-
4
- permalink:
5
- https://github.com/AccelerationConsortium/ac-training-lab/blob/230c72f3d9d6e8a5d0b9cece044515dfd386acdc/src/ac_training_lab/picow/digital-pipette/scripts/streamlit_webapp.py]
6
-
7
- Streamlit was preferred over the gradio implementation
8
- """
9
-
10
  import json
 
 
11
 
12
  import paho.mqtt.client as mqtt
13
  import streamlit as st
14
 
15
  # Initialize Streamlit app
16
- st.title("Actuator Control Panel")
17
 
18
  # MQTT Configuration
19
  HIVEMQ_HOST = st.text_input("Enter your HiveMQ host:", "", type="password")
@@ -26,66 +19,80 @@ PORT = st.number_input(
26
  # User input for the Pico ID
27
  pico_id = st.text_input("Enter your Pico ID:", "", type="password")
28
 
29
- # Slider for position value
30
- position = st.slider(
31
- "Select the position value:", min_value=1.1, max_value=1.9, value=1.5
32
- )
33
 
 
 
 
34
 
35
- # singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource
 
 
 
 
36
  @st.cache_resource
37
- def get_paho_client(hostname, username, password=None, port=8883, tls=True):
 
 
 
 
38
 
39
- client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5)
 
40
 
41
- # The callback for when the client receives a CONNACK response from the server.
42
  def on_connect(client, userdata, flags, rc, properties=None):
43
  if rc != 0:
44
  print("Connected with result code " + str(rc))
 
45
 
46
  client.on_connect = on_connect
 
47
 
48
- # enable TLS for secure connection
49
  if tls:
50
- client.tls_set()
51
- # set username and password
52
  client.username_pw_set(username, password)
53
- # connect to HiveMQ Cloud on port 8883 (default for MQTT)
54
  client.connect(hostname, port)
55
  client.loop_start() # Use a non-blocking loop
56
 
57
  return client
58
 
59
 
60
- def send_command(client, pico_id, position):
61
- # Topic
62
- command_topic = f"digital-pipette/picow/{pico_id}/L16-R"
63
 
64
- # Create and send command
65
- command = {"position": position}
66
-
67
- try:
68
- result = client.publish(command_topic, json.dumps(command), qos=1)
69
- result.wait_for_publish() # Ensure the message is sent
70
- if result.rc == mqtt.MQTT_ERR_SUCCESS:
71
- return f"Command sent: {command} to topic {command_topic}"
72
- else:
73
- return f"Failed to send command: {result.rc}"
74
- except Exception as e:
75
- return f"An error occurred: {e}"
76
 
77
 
78
  # Publish button
79
- if st.button("Send Command"):
80
  if not pico_id or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD:
81
  st.error("Please enter all required fields.")
82
  else:
 
 
 
83
  client = get_paho_client(
 
84
  HIVEMQ_HOST,
85
  HIVEMQ_USERNAME,
86
  password=HIVEMQ_PASSWORD,
87
  port=int(PORT),
88
  tls=True,
89
  )
90
- success_msg = send_command(client, pico_id, position)
91
- st.success(success_msg)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import json
2
+ import queue
3
+ import threading
4
 
5
  import paho.mqtt.client as mqtt
6
  import streamlit as st
7
 
8
  # Initialize Streamlit app
9
+ st.title("RGB Command and Sensor Data Panel")
10
 
11
  # MQTT Configuration
12
  HIVEMQ_HOST = st.text_input("Enter your HiveMQ host:", "", type="password")
 
19
  # User input for the Pico ID
20
  pico_id = st.text_input("Enter your Pico ID:", "", type="password")
21
 
22
+ # Sliders for RGB values
23
+ R = st.slider("Select the Red value:", min_value=0, max_value=255, value=0)
24
+ G = st.slider("Select the Green value:", min_value=0, max_value=255, value=0)
25
+ B = st.slider("Select the Blue value:", min_value=0, max_value=255, value=0)
26
 
27
+ # Initialize session state for messages
28
+ if "messages" not in st.session_state:
29
+ st.session_state.messages = []
30
 
31
+ # Queue to hold sensor data
32
+ sensor_data_queue = queue.Queue()
33
+
34
+
35
+ # Singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource
36
  @st.cache_resource
37
+ def get_paho_client(
38
+ sensor_data_topic, hostname, username, password=None, port=8883, tls=True
39
+ ):
40
+
41
+ client = mqtt.Client(protocol=mqtt.MQTTv5) # create new instance
42
 
43
+ def on_message(client, userdata, msg):
44
+ sensor_data_queue.put(json.loads(msg.payload))
45
 
 
46
  def on_connect(client, userdata, flags, rc, properties=None):
47
  if rc != 0:
48
  print("Connected with result code " + str(rc))
49
+ client.subscribe(sensor_data_topic, qos=1)
50
 
51
  client.on_connect = on_connect
52
+ client.on_message = on_message
53
 
 
54
  if tls:
55
+ client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
 
56
  client.username_pw_set(username, password)
 
57
  client.connect(hostname, port)
58
  client.loop_start() # Use a non-blocking loop
59
 
60
  return client
61
 
62
 
63
+ def send_and_receive(client, command_topic, msg, queue_timeout=60):
64
+ client.publish(command_topic, json.dumps(msg), qos=2)
 
65
 
66
+ while True:
67
+ sensor_data = sensor_data_queue.get(True, queue_timeout)
68
+ return sensor_data
 
 
 
 
 
 
 
 
 
69
 
70
 
71
  # Publish button
72
+ if st.button("Send RGB Command"):
73
  if not pico_id or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD:
74
  st.error("Please enter all required fields.")
75
  else:
76
+ command_topic = f"{pico_id}/neopixel"
77
+ sensor_data_topic = f"{pico_id}/as7341"
78
+
79
  client = get_paho_client(
80
+ sensor_data_topic,
81
  HIVEMQ_HOST,
82
  HIVEMQ_USERNAME,
83
  password=HIVEMQ_PASSWORD,
84
  port=int(PORT),
85
  tls=True,
86
  )
87
+
88
+ command_msg = {"R": R, "G": G, "B": B}
89
+ sensor_data = send_and_receive(
90
+ client, command_topic, command_msg, queue_timeout=30
91
+ )
92
+
93
+ st.success("Command sent successfully!")
94
+ st.write("Sensor Data Received:", sensor_data)
95
+
96
+ # Display received messages
97
+ for message in st.session_state.messages:
98
+ st.write(f"Received message: {message}")
requirements.txt CHANGED
@@ -1 +1,2 @@
1
- paho-mqtt
 
 
1
+ paho-mqtt
2
+ streamlit
scripts/digital_pipette_picow_basic.py DELETED
@@ -1,28 +0,0 @@
1
- import utime
2
- from machine import PWM, Pin
3
-
4
- # Setup PWM
5
- pwm = PWM(Pin(0)) # Use the appropriate GPIO pin
6
- pwm.freq(50) # 50 Hz frequency
7
-
8
-
9
- def set_position(pulse_ms):
10
- duty = int((pulse_ms / 20.0) * 65535)
11
- pwm.duty_u16(duty)
12
-
13
-
14
- # Example to set the actuator to different positions
15
- set_position(1.1) # Almost full retraction
16
- utime.sleep(5)
17
- set_position(1.5) # Halfway
18
- utime.sleep(5)
19
- set_position(1.9) # Almost full extension
20
- utime.sleep(5)
21
- set_position(1.1) # Almost full retraction
22
- utime.sleep(5)
23
- set_position(1.5) # Halfway
24
- utime.sleep(5)
25
-
26
- # Add your logic to set it to the desired intermediate positions
27
-
28
- pwm.deinit() # Deinitialize PWM
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
scripts/onboard_led_temp.py ADDED
@@ -0,0 +1,73 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ https://ac-microcourses.readthedocs.io/en/latest/courses/hello-world/1.4.1-onboard-led-temp.html
3
+
4
+ permalink: https://github.com/AccelerationConsortium/ac-microcourses/blob/07dc9a0286ded2e21ea64f02d8ae697717e786b9/docs/courses/hello-world/1.4.1-onboard-led-temp.ipynb
5
+ """
6
+
7
+ COURSE_ID = "<your_id_here>" # UPDATE THIS TO YOUR ID
8
+
9
+ command_topic = f"{COURSE_ID}/onboard_led"
10
+ sensor_data_topic = f"{COURSE_ID}/onboard_temp"
11
+
12
+ HIVEMQ_USERNAME = "sgbaird"
13
+ HIVEMQ_PASSWORD = "D.Pq5gYtejYbU#L"
14
+ HIVEMQ_HOST = "248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud"
15
+
16
+ import paho.mqtt.client as mqtt
17
+ import json
18
+ from queue import Queue
19
+
20
+ sensor_data_queue: "Queue[dict]" = Queue()
21
+
22
+
23
+ def get_paho_client(
24
+ sensor_data_topic, hostname, username, password=None, port=8883, tls=True
25
+ ):
26
+
27
+ client = mqtt.Client(
28
+ mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5
29
+ ) # create new instance
30
+
31
+ def on_message(client, userdata, msg):
32
+ sensor_data_queue.put(json.loads(msg.payload))
33
+
34
+ # The callback for when the client receives a CONNACK response from the server.
35
+ def on_connect(client, userdata, flags, rc, properties=None):
36
+ if rc != 0:
37
+ print("Connected with result code " + str(rc))
38
+ # Subscribing in on_connect() means that if we lose the connection and
39
+ # reconnect then subscriptions will be renewed.
40
+ client.subscribe(sensor_data_topic, qos=1)
41
+
42
+ client.on_connect = on_connect
43
+ client.on_message = on_message
44
+
45
+ # enable TLS for secure connection
46
+ if tls:
47
+ client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
48
+ # set username and password
49
+ client.username_pw_set(username, password)
50
+ # connect to HiveMQ Cloud on port 8883 (default for MQTT)
51
+ client.connect(hostname, port)
52
+ client.subscribe(sensor_data_topic, qos=2)
53
+
54
+ return client
55
+
56
+
57
+ def send_and_receive(client, command_topic, msg, queue_timeout=60):
58
+ client.publish(command_topic, msg, qos=2)
59
+
60
+ client.loop_start()
61
+
62
+ while True:
63
+ sensor_data = sensor_data_queue.get(True, queue_timeout)
64
+ client.loop_stop()
65
+ return sensor_data
66
+
67
+
68
+ client = get_paho_client(
69
+ sensor_data_topic, HIVEMQ_HOST, HIVEMQ_USERNAME, password=HIVEMQ_PASSWORD
70
+ )
71
+
72
+ onboard_temp = send_and_receive(client, command_topic, "toggle", queue_timeout=30)
73
+ print(onboard_temp)
scripts/rgb_led_sensor.py ADDED
@@ -0,0 +1,65 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ https://ac-microcourses.readthedocs.io/en/latest/courses/hello-world/1.4.1-onboard-led-temp.html
3
+
4
+ permalink: https://github.com/AccelerationConsortium/ac-microcourses/blob/07dc9a0286ded2e21ea64f02d8ae697717e786b9/docs/courses/hello-world/1.4.1-onboard-led-temp.ipynb
5
+ """
6
+
7
+ import paho.mqtt.client as mqtt
8
+ import json
9
+ from queue import Queue
10
+
11
+ PICO_ID = "test" # UPDATE THIS TO YOUR ID
12
+
13
+ command_topic = f"{PICO_ID}/neopixel"
14
+ sensor_data_topic = f"{PICO_ID}/as7341"
15
+
16
+ HIVEMQ_USERNAME = "sgbaird"
17
+ HIVEMQ_PASSWORD = "D.Pq5gYtejYbU#L"
18
+ HIVEMQ_HOST = "248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud"
19
+
20
+ sensor_data_queue: "Queue[dict]" = Queue()
21
+
22
+
23
+ def get_paho_client(
24
+ sensor_data_topic, hostname, username, password=None, port=8883, tls=True
25
+ ):
26
+ client = mqtt.Client(protocol=mqtt.MQTTv5) # create new instance
27
+
28
+ def on_message(client, userdata, msg):
29
+ sensor_data_queue.put(json.loads(msg.payload))
30
+
31
+ def on_connect(client, userdata, flags, rc, properties=None):
32
+ if rc != 0:
33
+ print("Connected with result code " + str(rc))
34
+ client.subscribe(sensor_data_topic, qos=1)
35
+
36
+ client.on_connect = on_connect
37
+ client.on_message = on_message
38
+
39
+ if tls:
40
+ client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
41
+ client.username_pw_set(username, password)
42
+ client.connect(hostname, port)
43
+ client.subscribe(sensor_data_topic, qos=2)
44
+
45
+ return client
46
+
47
+
48
+ def send_and_receive(client, command_topic, msg, queue_timeout=60):
49
+ client.publish(command_topic, json.dumps(msg), qos=2)
50
+
51
+ client.loop_start()
52
+
53
+ while True:
54
+ sensor_data = sensor_data_queue.get(True, queue_timeout)
55
+ client.loop_stop()
56
+ return sensor_data
57
+
58
+
59
+ client = get_paho_client(
60
+ sensor_data_topic, HIVEMQ_HOST, HIVEMQ_USERNAME, password=HIVEMQ_PASSWORD
61
+ )
62
+
63
+ command_msg = {"R": 50, "G": 20, "B": 240}
64
+ sensor_data = send_and_receive(client, command_topic, command_msg, queue_timeout=30)
65
+ print(sensor_data)
scripts/single_submit.py DELETED
@@ -1,39 +0,0 @@
1
- """This streamlit implementation is preferred over the gradio implementation"""
2
-
3
- import json
4
-
5
- import paho.mqtt.client as mqtt
6
-
7
- # MQTT Configuration
8
- HIVEMQ_HOST = ""
9
- HIVEMQ_USERNAME = ""
10
- HIVEMQ_PASSWORD = ""
11
- PORT = 8883
12
-
13
- # User input for the Pico ID
14
- pico_id = ""
15
-
16
- # Slider for position value
17
- position = 1.35
18
-
19
-
20
- def send_command(client, pico_id, position):
21
- # Topic
22
- command_topic = f"digital-pipette/picow/{pico_id}/L16-R"
23
-
24
- # Create and send command
25
- command = {"position": position}
26
- client.publish(command_topic, json.dumps(command), qos=1)
27
-
28
- return f"Command sent: {command} to topic {command_topic}"
29
-
30
-
31
- # Initialize MQTT client
32
- client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv5)
33
- client.tls_set()
34
- client.username_pw_set(HIVEMQ_USERNAME, HIVEMQ_PASSWORD)
35
- client.connect(HIVEMQ_HOST, PORT, 60)
36
-
37
- success_msg = send_command(client, pico_id, position)
38
-
39
- client.disconnect()