Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from fastapi import FastAPI, HTTPException, Request | |
| from pydantic import BaseModel | |
| import uvicorn | |
| from typing import List, Dict, Optional | |
| from collections import defaultdict | |
| from queue import PriorityQueue | |
| import random | |
| # Load the model and tokenizer | |
| MODEL_NAME = "unit-mesh/autodev-coder-deepseek-6.7b-finetunes" | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, torch_dtype=torch.float16, device_map="auto") | |
| # Custom CSS for OpenWebUI-like design | |
| custom_css = """ | |
| #chatbot { | |
| font-family: Arial, sans-serif; | |
| max-width: 800px; | |
| margin: auto; | |
| padding: 20px; | |
| border-radius: 10px; | |
| box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
| } | |
| #sidebar { | |
| background-color: #f5f5f5; | |
| padding: 20px; | |
| border-radius: 10px; | |
| } | |
| .message.user { | |
| background-color: #007bff; | |
| color: white; | |
| border-radius: 10px 10px 0 10px; | |
| padding: 10px; | |
| margin: 5px 0; | |
| max-width: 70%; | |
| margin-left: auto; | |
| } | |
| .message.bot { | |
| background-color: #e9ecef; | |
| color: black; | |
| border-radius: 10px 10px 10px 0; | |
| padding: 10px; | |
| margin: 5px 0; | |
| max-width: 70%; | |
| margin-right: auto; | |
| } | |
| .dark-mode #chatbot { | |
| background-color: #2d2d2d; | |
| color: #ffffff; | |
| } | |
| .dark-mode #sidebar { | |
| background-color: #1e1e1e; | |
| color: #ffffff; | |
| } | |
| .dark-mode .message.user { | |
| background-color: #0056b3; | |
| } | |
| .dark-mode .message.bot { | |
| background-color: #3d3d3d; | |
| color: #ffffff; | |
| } | |
| """ | |
| # Enhanced Reasoning Algorithms | |
| class DeductiveReasoner: | |
| def __init__(self, rules: Dict[str, str]): | |
| self.rules = rules | |
| def infer(self, premise: str, specific_case: str) -> str: | |
| for condition, conclusion in self.rules.items(): | |
| if condition in specific_case: | |
| return f"Given the premise '{premise}' and the specific case '{specific_case}', the conclusion is: {conclusion}" | |
| return f"Given the premise '{premise}', no applicable rule was found for the specific case '{specific_case}'." | |
| class InductiveReasoner: | |
| def __init__(self): | |
| self.patterns = defaultdict(int) | |
| def learn(self, examples: List[str]): | |
| for example in examples: | |
| words = example.split() | |
| for i in range(len(words) - 1): | |
| self.patterns[(words[i], words[i + 1])] += 1 | |
| def infer(self) -> str: | |
| if not self.patterns: | |
| return "No patterns have been learned yet." | |
| most_common_pattern = max(self.patterns, key=self.patterns.get) | |
| return f"From the learned examples, the most common pattern is: '{most_common_pattern[0]} {most_common_pattern[1]}'." | |
| class AbductiveReasoner: | |
| def __init__(self, hypotheses: Dict[str, float]): | |
| self.hypotheses = hypotheses | |
| def evaluate(self, observation: str, likelihoods: Dict[str, float]) -> str: | |
| posterior = { | |
| hypothesis: prior * likelihoods.get(hypothesis, 0.0) | |
| for hypothesis, prior in self.hypotheses.items() | |
| } | |
| best_hypothesis = max(posterior, key=posterior.get) | |
| return f"Given the observation '{observation}', the most plausible explanation is: {best_hypothesis} (posterior probability: {posterior[best_hypothesis]:.2f})." | |
| class BayesianReasoner: | |
| def __init__(self, prior: float): | |
| self.prior = prior | |
| def update(self, evidence: str, likelihood: float) -> str: | |
| posterior = self.prior * likelihood | |
| self.prior = posterior # Update the prior for future reasoning | |
| return f"Given the evidence '{evidence}', the updated probability is: {posterior:.2f}." | |
| class HeuristicSearcher: | |
| def __init__(self, heuristic_func): | |
| self.heuristic_func = heuristic_func | |
| def search(self, start, goal): | |
| frontier = PriorityQueue() | |
| frontier.put((0, start)) | |
| came_from = {} | |
| cost_so_far = {} | |
| came_from[start] = None | |
| cost_so_far[start] = 0 | |
| while not frontier.empty(): | |
| _, current = frontier.get() | |
| if current == goal: | |
| break | |
| for next_state in self.get_neighbors(current): | |
| new_cost = cost_so_far[current] + 1 # Assume uniform cost | |
| if next_state not in cost_so_far or new_cost < cost_so_far[next_state]: | |
| cost_so_far[next_state] = new_cost | |
| priority = new_cost + self.heuristic_func(next_state, goal) | |
| frontier.put((priority, next_state)) | |
| came_from[next_state] = current | |
| return f"Best solution found from {start} to {goal}." | |
| def get_neighbors(self, state): | |
| # Example: For a numeric state, return neighboring states | |
| return [state - 1, state + 1] | |
| # Initialize reasoning algorithms | |
| deductive_reasoner = DeductiveReasoner( | |
| rules={ | |
| "error": "Check for syntax errors in the code.", | |
| "loop": "Optimize the loop structure for better performance.", | |
| "null": "Ensure proper null checks are in place.", | |
| } | |
| ) | |
| inductive_reasoner = InductiveReasoner() | |
| inductive_reasoner.learn(["If it rains, the ground gets wet.", "If you study, you pass the exam."]) | |
| abductive_reasoner = AbductiveReasoner( | |
| hypotheses={"syntax error": 0.3, "logical error": 0.5, "runtime error": 0.2} | |
| ) | |
| bayesian_reasoner = BayesianReasoner(prior=0.5) | |
| heuristic_searcher = HeuristicSearcher(heuristic_func=lambda state, goal: abs(state - goal)) | |
| # Chatbot function with reasoning enhancements | |
| def chatbot_response(message, history, reasoning_algorithm, file_content=None): | |
| history = history or [] | |
| reasoning = { | |
| "Deductive": deductive_reasoner.infer("General rule", message), | |
| "Inductive": inductive_reasoner.infer(), | |
| "Abductive": abductive_reasoner.evaluate(message, {"syntax error": 0.8, "logical error": 0.5}), | |
| "Bayesian": bayesian_reasoner.update(message, likelihood=0.7), | |
| "Heuristic": heuristic_searcher.search(start=0, goal=10), | |
| }.get(reasoning_algorithm, "Invalid reasoning algorithm.") | |
| # Append file content if provided | |
| if file_content: | |
| reasoning += f"\n\nFile Content:\n{file_content}" | |
| history.append((message, reasoning)) | |
| return history, history | |
| # File upload handler | |
| def handle_file_upload(file): | |
| if file: | |
| with open(file.name, "r") as f: | |
| content = f.read() | |
| return content | |
| return None | |
| # Theme toggling | |
| def toggle_theme(theme): | |
| if theme == "Dark": | |
| return gr.update(css=custom_css + ".dark-mode") | |
| else: | |
| return gr.update(css=custom_css) | |
| # Gradio interface | |
| with gr.Blocks(css=custom_css) as demo: | |
| gr.Markdown("# OpenWebUI-like Chat Interface with Reasoning Enhancements") | |
| with gr.Row(): | |
| with gr.Column(scale=1, elem_id="sidebar"): | |
| gr.Markdown("### Settings") | |
| model_selector = gr.Dropdown(["Model 1", "Model 2"], label="Select Model") | |
| reasoning_selector = gr.Dropdown( | |
| ["Deductive", "Inductive", "Abductive", "Bayesian", "Heuristic"], | |
| label="Select Reasoning Algorithm", | |
| value="Deductive", | |
| ) | |
| theme_selector = gr.Radio(["Light", "Dark"], label="Theme", value="Light") | |
| file_upload = gr.File(label="Upload File") | |
| with gr.Column(scale=3, elem_id="chatbot"): | |
| chatbot = gr.Chatbot(label="Chat") | |
| message = gr.Textbox(label="Your Message", placeholder="Type your message here...") | |
| submit = gr.Button("Send") | |
| state = gr.State() | |
| # Chat interaction | |
| submit.click( | |
| chatbot_response, | |
| inputs=[message, state, reasoning_selector, file_upload], | |
| outputs=[chatbot, state], | |
| ) | |
| # File upload handling | |
| file_upload.change( | |
| handle_file_upload, | |
| inputs=file_upload, | |
| outputs=message, | |
| ) | |
| # Theme toggling | |
| theme_selector.change( | |
| toggle_theme, | |
| inputs=theme_selector, | |
| outputs=None, | |
| ) | |
| # OpenAI-compatible API using FastAPI | |
| app = FastAPI() | |
| class ChatCompletionRequest(BaseModel): | |
| model: str | |
| messages: List[dict] | |
| max_tokens: Optional[int] = 500 | |
| temperature: Optional[float] = 0.7 | |
| class ChatCompletionResponse(BaseModel): | |
| id: str | |
| object: str = "chat.completion" | |
| created: int | |
| model: str | |
| choices: List[dict] | |
| usage: dict | |
| async def chat_completions(request: ChatCompletionRequest): | |
| try: | |
| # Extract the last user message | |
| user_message = request.messages[-1]["content"] | |
| # Generate a response using the model | |
| inputs = tokenizer(user_message, return_tensors="pt").to(model.device) | |
| outputs = model.generate(**inputs, max_length=request.max_tokens, temperature=request.temperature) | |
| response_text = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Format the response in OpenAI-compatible format | |
| response = ChatCompletionResponse( | |
| id="chatcmpl-12345", | |
| created=int(torch.tensor(0)), # Placeholder for timestamp | |
| model=request.model, | |
| choices=[ | |
| { | |
| "message": { | |
| "role": "assistant", | |
| "content": response_text, | |
| }, | |
| "finish_reason": "stop", | |
| "index": 0, | |
| } | |
| ], | |
| usage={ | |
| "prompt_tokens": len(tokenizer.encode(user_message)), | |
| "completion_tokens": len(tokenizer.encode(response_text)), | |
| "total_tokens": len(tokenizer.encode(user_message)) + len(tokenizer.encode(response_text)), | |
| }, | |
| ) | |
| return response | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # Run the FastAPI server | |
| def run_api(): | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |
| # Run the Gradio app | |
| def run_gradio(): | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |
| # Entry point | |
| if __name__ == "__main__": | |
| import threading | |
| # Start the FastAPI server in a separate thread | |
| api_thread = threading.Thread(target=run_api) | |
| api_thread.start() | |
| # Start the Gradio app | |
| run_gradio() |