Neil-YL commited on
Commit
6f90cff
·
1 Parent(s): c858b82

update maintenance code

Browse files
.prefectignore ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # prefect artifacts
2
+ .prefectignore
3
+
4
+ # python artifacts
5
+ __pycache__/
6
+ *.py[cod]
7
+ *$py.class
8
+ *.egg-info/
9
+ *.egg
10
+
11
+ # Type checking artifacts
12
+ .mypy_cache/
13
+ .dmypy.json
14
+ dmypy.json
15
+ .pyre/
16
+
17
+ # IPython
18
+ profile_default/
19
+ ipython_config.py
20
+ *.ipynb_checkpoints/*
21
+
22
+ # Environments
23
+ .python-version
24
+ .env
25
+ .venv
26
+ env/
27
+ venv/
28
+
29
+ # MacOS
30
+ .DS_Store
31
+
32
+ # Dask
33
+ dask-worker-space/
34
+
35
+ # Editors
36
+ .idea/
37
+ .vscode/
38
+
39
+ # VCS
40
+ .git/
41
+ .hg/
DB_utls.py CHANGED
@@ -1,6 +1,7 @@
1
- #from lambda_mongo_utils import send_data_to_mongodb
2
  from pymongo import MongoClient
3
  from datetime import datetime
 
 
4
  from prefect import task
5
  import pandas as pd
6
  import os
@@ -56,7 +57,6 @@ def update_used_wells(used_wells):
56
  # close connection
57
  dbclient.close()
58
 
59
- @task
60
  def find_unused_wells():
61
  dbclient = MongoClient(connection_string)
62
  db = dbclient["LCM-OT-2-SLD"]
@@ -77,9 +77,13 @@ def find_unused_wells():
77
  empty_wells = sorted(df["well"].tolist(), key=well_sort_key)
78
  else:
79
  empty_wells = []
80
- #print(empty_wells)
81
-
82
- # close connection
 
 
 
 
83
  dbclient.close()
84
 
85
  # Check if there are any empty wells
@@ -94,7 +98,7 @@ def save_result(result_data):
94
  db = dbclient["LCM-OT-2-SLD"]
95
  collection = db["MSE403_result"]
96
  #collection = db["test_result"]
97
- result_data["timestamp"] = datetime.utcnow() # UTC time
98
  insert_result = collection.insert_one(result_data)
99
  inserted_id = insert_result.inserted_id
100
  # close connection
@@ -144,13 +148,52 @@ def add_student_quota(student_id, quota):
144
  student_data = {"student_id": student_id, "quota": quota}
145
  collection.update_one({"student_id": student_id}, {"$set": student_data}, upsert=True)
146
  dbclient.close()
147
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
 
149
  if __name__ == "__main__":
150
- generate_empty_well()
151
- #find_unused_wells()
 
 
152
  #test_id = "test"
153
  #quota = 999
154
  #add_student_quota(test_id, quota)
155
-
156
-
 
 
1
  from pymongo import MongoClient
2
  from datetime import datetime
3
+ import pytz
4
+ from prefect_utils import trigger_maintenance_request
5
  from prefect import task
6
  import pandas as pd
7
  import os
 
57
  # close connection
58
  dbclient.close()
59
 
 
60
  def find_unused_wells():
61
  dbclient = MongoClient(connection_string)
62
  db = dbclient["LCM-OT-2-SLD"]
 
77
  empty_wells = sorted(df["well"].tolist(), key=well_sort_key)
78
  else:
79
  empty_wells = []
80
+
81
+ if len(empty_wells) == 1:
82
+ maintenance_type = "wellplate_maintenance"
83
+ if check_maintenance_status(maintenance_type) == 0:
84
+ trigger_maintenance_request(maintenance_type)
85
+ set_maintenance_status(maintenance_type,1)
86
+
87
  dbclient.close()
88
 
89
  # Check if there are any empty wells
 
98
  db = dbclient["LCM-OT-2-SLD"]
99
  collection = db["MSE403_result"]
100
  #collection = db["test_result"]
101
+ result_data["timestamp"] = datetime.now(pytz.timezone("America/Toronto")) # UTC time
102
  insert_result = collection.insert_one(result_data)
103
  inserted_id = insert_result.inserted_id
104
  # close connection
 
148
  student_data = {"student_id": student_id, "quota": quota}
149
  collection.update_one({"student_id": student_id}, {"$set": student_data}, upsert=True)
150
  dbclient.close()
151
+
152
+ def check_maintenance_status(maintenance_type):
153
+ dbclient = MongoClient(connection_string)
154
+ db = dbclient["LCM-OT-2-SLD"]
155
+ collection = db["HITL_status"]
156
+ status_doc = collection.find_one({"name": maintenance_type})
157
+ dbclient.close()
158
+
159
+ if status_doc is None:
160
+ return 0
161
+ return status_doc.get("flag", 0)
162
+
163
+ def set_maintenance_status(maintenance_type,new_status):
164
+ #flag == 1, request sent; flag == 0, no request undergoing
165
+ dbclient = MongoClient(connection_string)
166
+ db = dbclient["LCM-OT-2-SLD"]
167
+ collection = db["HITL_status"]
168
+
169
+ collection.update_one(
170
+ {"name": maintenance_type},
171
+ {"$set": {"flag": new_status,
172
+ "timestamp": datetime.now(pytz.timezone("America/Toronto"))}
173
+ },
174
+ upsert=True
175
+ )
176
+ dbclient.close()
177
+
178
+ def insert_maintenance_log(maintenance_type, operator="Unknown"):
179
+ dbclient = MongoClient(connection_string)
180
+ db = dbclient["LCM-OT-2-SLD"]
181
+ collection = db["maintenance_logs"]
182
+
183
+ log_data = {
184
+ "maintenance_type": maintenance_type,
185
+ "timestamp": datetime.now(pytz.timezone("America/Toronto")),
186
+ "operator": operator
187
+ }
188
+ collection.insert_one(log_data)
189
+ dbclient.close()
190
+
191
 
192
  if __name__ == "__main__":
193
+ #generate_empty_well()
194
+
195
+ #set_maintenance_status(maintenance_type="wellplate_maintenance",new_status=0)
196
+ find_unused_wells()
197
  #test_id = "test"
198
  #quota = 999
199
  #add_student_quota(test_id, quota)
 
 
__pycache__/DB_utls.cpython-312.pyc ADDED
Binary file (7.4 kB). View file
 
__pycache__/maintenance_flow.cpython-312.pyc ADDED
Binary file (1.83 kB). View file
 
__pycache__/prefect_utils.cpython-311.pyc CHANGED
Binary files a/__pycache__/prefect_utils.cpython-311.pyc and b/__pycache__/prefect_utils.cpython-311.pyc differ
 
__pycache__/prefect_utils.cpython-312.pyc ADDED
Binary file (1.51 kB). View file
 
app.py CHANGED
@@ -308,7 +308,6 @@ def add_to_queue(student_id, R, Y, B):
308
  "Message": "Debug ID cannot submit to real experiment queue. Please use your student id to submit experiment."
309
  }
310
  return
311
-
312
 
313
  # Validate RYB inputs
314
  validation_result = validate_ryb_input(R, Y, B)
 
308
  "Message": "Debug ID cannot submit to real experiment queue. Please use your student id to submit experiment."
309
  }
310
  return
 
311
 
312
  # Validate RYB inputs
313
  validation_result = validate_ryb_input(R, Y, B)
maintenance_flow.py ADDED
@@ -0,0 +1,33 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from prefect import flow,get_run_logger, pause_flow_run, settings
2
+ from prefect.blocks.notifications import SlackWebhook
3
+ from prefect.context import get_run_context
4
+ from DB_utls import set_maintenance_status,generate_empty_well,insert_maintenance_log
5
+
6
+ @flow
7
+ def request_wells_maintenance(maintenance_type):
8
+ MESSAGE = "HITL request"
9
+ logger = get_run_logger()
10
+ slack_block = SlackWebhook.load("prefect-test")
11
+ message = str(MESSAGE)
12
+ flow_run = get_run_context().flow_run
13
+
14
+ if flow_run and settings.PREFECT_UI_URL:
15
+ flow_run_url = (
16
+ f"{settings.PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run.id}"
17
+ )
18
+ message += f"\n\nOpen the <{flow_run_url}|paused flow run>, OT2-LCM requests a {maintenance_type}, please complete with your user name and then click 'Resume'"
19
+
20
+ slack_block.notify(message)
21
+
22
+ user = pause_flow_run(wait_for_input=str, timeout=600)
23
+
24
+ set_maintenance_status(maintenance_type,0)
25
+
26
+ if maintenance_type == "wellplate_maintenance":
27
+ generate_empty_well()
28
+ msg_out = f"Updating wells status on DB by {user}"
29
+ logger.info(msg_out)
30
+
31
+ insert_maintenance_log(maintenance_type, user)
32
+
33
+ return msg_out
prefect.yaml ADDED
@@ -0,0 +1,34 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Welcome to your prefect.yaml file! You can use this file for storing and managing
2
+ # configuration for deploying your flows. We recommend committing this file to source
3
+ # control along with your flow code.
4
+
5
+ # Generic metadata about this project
6
+ name: OT-2-LCM-test
7
+ prefect-version: 3.1.5
8
+
9
+ # build section allows you to manage and build docker images
10
+ build: null
11
+
12
+ # push section allows you to manage if and how this project is uploaded to remote locations
13
+ push: null
14
+
15
+ # pull section allows you to provide instructions for cloning this project in remote locations
16
+ pull:
17
+ - prefect.deployments.steps.set_working_directory:
18
+ directory: /Users/yh/Documents/AC work file/work SDL/OT-2-LCM-test
19
+
20
+ # the deployments section allows you to provide configuration for deploying flows
21
+ deployments:
22
+ - name: wells-maintenance
23
+ version: null
24
+ tags: []
25
+ concurrency_limit: null
26
+ description: null
27
+ entrypoint: maintenance_flow.py:request_wells_maintenance
28
+ parameters: {}
29
+ work_pool:
30
+ name: ot2-pool
31
+ work_queue_name: null
32
+ job_variables: {}
33
+ enforce_parameter_schema: true
34
+ schedules: []
prefect_utils.py CHANGED
@@ -12,14 +12,11 @@ def start_prefect_worker(work_pool_name: str = "ot2-pool"):
12
  worker_thread.start()
13
  print("Prefect Worker started in background thread.")
14
 
 
15
 
16
-
17
- from prefect.blocks.notifications import SlackWebhook
18
- from prefect.input import RunInput
19
-
20
- slack_block = SlackWebhook.load("prefect-test")
21
- class UserInput(RunInput):
22
- github_username: str
23
- comments: str = ""
24
- flag_for_review: bool
25
-
 
12
  worker_thread.start()
13
  print("Prefect Worker started in background thread.")
14
 
15
+ from prefect.deployments import run_deployment
16
 
17
+ def trigger_maintenance_request(maintenance_type: str):
18
+ deployment_name = "request-wells-maintenance/wells-maintenance"
19
+ run_deployment(
20
+ name=deployment_name,
21
+ parameters={"maintenance_type": maintenance_type}
22
+ )