import os import subprocess import threading import uuid import time from fastapi import FastAPI, UploadFile, File from fastapi.middleware.cors import CORSMiddleware import streamlit as st import requests UPLOAD_DIR = "uploads" LOG_DIR = "logs" os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(LOG_DIR, exist_ok=True) # مدیریت اجرای فایل‌ها executions = {} exec_counter = 0 # ------------------ FastAPI ------------------ # app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) @app.post("/upload/") async def upload(file: UploadFile = File(...)): global exec_counter save_path = os.path.join(UPLOAD_DIR, file.filename) with open(save_path, "wb") as f: f.write(await file.read()) exec_counter += 1 exec_id = exec_counter log_path = os.path.join(LOG_DIR, f"{exec_id}.log") def run_code(): with open(log_path, "w") as out: process = subprocess.Popen(["python", save_path], stdout=out, stderr=subprocess.STDOUT) executions[exec_id]["process"] = process process.wait() executions[exec_id]["status"] = "finished" executions[exec_id] = { "status": "running", "file": save_path, "log": log_path } threading.Thread(target=run_code).start() return {"exec_id": exec_id} @app.get("/status/{exec_id}") def status(exec_id: int): if exec_id not in executions: return {"error": "not found"} return {"status": executions[exec_id]["status"]} @app.get("/log/{exec_id}") def get_log(exec_id: int): path = executions.get(exec_id, {}).get("log") if not path or not os.path.exists(path): return {"log": ""} with open(path) as f: lines = f.readlines() return {"log": "".join(lines[-10:])} @app.post("/stop/{exec_id}") def stop(exec_id: int): proc = executions.get(exec_id, {}).get("process") if proc and proc.poll() is None: proc.terminate() executions[exec_id]["status"] = "stopped" return {"stopped": True} return {"stopped": False} @app.get("/executions/") def list_execs(): return [{"id": k, "file": v["file"], "status": v["status"]} for k, v in executions.items()] # ------------------ Streamlit ------------------ # def streamlit_ui(): st.set_page_config("اجرای فایل پایتون", layout="centered") st.title("آپلود و اجرای فایل پایتون") file = st.file_uploader("فایل پایتون را انتخاب کنید", type=["py"]) if file and st.button("آپلود و اجرا"): res = requests.post("http://localhost:8000/upload/", files={"file": file}) exec_id = res.json()["exec_id"] st.success(f"اجرا با آیدی {exec_id} شروع شد") st.divider() st.subheader("فایل‌های اجرا شده") execs = requests.get("http://localhost:8000/executions/").json() if execs: exec_ids = [str(e["id"]) + f" - {os.path.basename(e['file'])}" for e in execs] selected = st.selectbox("انتخاب اجرا برای مشاهده:", exec_ids) selected_id = int(selected.split(" - ")[0]) col1, col2 = st.columns(2) with col1: if st.button("توقف اجرا", key="stop"): requests.post(f"http://localhost:8000/stop/{selected_id}") with col2: if st.button("نمایش لاگ زنده", key="log"): st.subheader(f"لاگ اجرای {selected_id}") log_box = st.empty() status_box = st.empty() while True: status = requests.get(f"http://localhost:8000/status/{selected_id}").json() status_box.markdown(f"**وضعیت:** {status['status']}") logs = requests.get(f"http://localhost:8000/log/{selected_id}").json() log_box.code(logs["log"]) if status["status"] != "running": break time.sleep(2) else: st.info("فعلاً هیچ اجرایی انجام نشده.") # ------------------ اجرا ------------------ # if __name__ == "__main__": def start_api(): subprocess.run(["uvicorn", "app:app", "--port", "8000", "--reload"]) threading.Thread(target=start_api).start() time.sleep(2) streamlit_ui()