ProximileAdmin's picture
Update app.py
37a4dd1 verified
import os
import textwrap
import datetime
import json
import gradio as gr
from openai import OpenAI
import urllib.request
import feedparser
import time
from typing import Dict, List, Optional, Any
import pubmed_parser
import requests
VERBOSE_SHELL = True
ENDPOINT_URL = "https://api.hyperbolic.xyz/v1"
OAI_API_KEY = os.environ['HYPERBOLIC_XYZ_API_KEY']
WEATHER_API_KEY = os.environ["WEATHER_API_KEY"]
MODEL_NAME = "meta-llama/Llama-3.3-70B-Instruct"
def lgs(log_string):
if VERBOSE_SHELL:
print(log_string)
sampling_params = {
"temperature": 0.8,
"top_p": 0.95,
"max_tokens": 2048,
}
todays_date_string = datetime.date.today().strftime("%d %B %Y")
def system_prompt_format(function_descriptions, function_jsons):
return """Cutting Knowledge Date: December 2023
Today Date: """ + todays_date_string + """
You are a helpful assistant with tool calling capabilities.
""" + "\n".join(function_descriptions) + """
If you choose to use one of the following functions, respond with a JSON for a function call with its proper arguments that best answers the given prompt.
Your tool request should be in the exact format {"name": function name, "parameters": dictionary of argument name and its value}. Do not use variables. Just a two-key dictionary, starting with the function name, followed by a dictionary of parameters.
""" + "\n".join([json.dumps(d, indent=2) for d in function_jsons]) + """
After receiving the results back from a function (formatted as {"name": function name, "return": returned data after running function}) formulate your response to the user. If the information needed is not found in the returned data, either attempt a new function call, or inform the user that you cannot answer based on your available knowledge. The user cannot see the function results. You have to interpret the data and provide a response based on it.
If the user request does not necessitate a function call, simply respond to the user's query directly."""
def build_sys_prompt(tool_objects):
function_descriptions = [t.system_prompt_paragraph for t in tool_objects]
function_jsons = [t.json_definition_of_function for t in tool_objects]
return system_prompt_format(function_descriptions, function_jsons)
class ToolBase:
def __init__(self,
programmatic_name: str,
natural_name: str,
active_voice_description_of_capability: str,
passive_voice_description_of_function: str,
prescriptive_conditional: str,
input_params: Dict[str, Dict],
required_params: Optional[List[str]] = None,
):
self.json_name = programmatic_name
self.json_description = passive_voice_description_of_function
parameters_schema = {
"type": "object",
"properties": input_params
}
if required_params:
parameters_schema["required"] = required_params
self.json_definition_of_function = {
"type": "function",
"function": {
"name": self.json_name,
"description": self.json_description,
"parameters": parameters_schema
}
}
self.system_prompt_paragraph = active_voice_description_of_capability + " " + prescriptive_conditional
def actual_function(self, **kwargs):
raise NotImplementedError("Subclasses must implement this method.")
def search_arxiv_papers(
query: str,
max_results: int = 5,
sort_by: str = 'relevance'
) -> Dict:
"""
Search for papers on arXiv using their API.
Args:
query: Search query string
max_results: Maximum number of results to return (default: 5)
sort_by: Sorting criteria (default: 'relevance')
Returns:
Dictionary containing search results and metadata
"""
try:
# Construct the search query
search_query = f'all:{query}'
# Construct the API URL
base_url = 'https://export.arxiv.org/api/query?'
params = {
'search_query': search_query,
'start': 0,
'max_results': max_results,
'sortBy': sort_by,
'sortOrder': 'descending'
}
query_string = '&'.join([f'{k}={urllib.parse.quote(str(v))}' for k, v in params.items()])
url = base_url + query_string
# Make the API request
response = urllib.request.urlopen(url)
feed = feedparser.parse(response.read().decode('utf-8'))
# Process the results
papers = []
for entry in feed.entries:
paper = {
'id': entry.id.split('/abs/')[-1],
'title': entry.title,
'authors': [author.name for author in entry.authors],
'summary': entry.summary,
'published': entry.published,
'link': entry.link,
'primary_category': entry.tags[0]['term'] if entry.tags else 'Unknown'
}
papers.append(paper)
time.sleep(1)
return {
'status': 'success',
'total_results': len(papers),
'papers': papers
}
except Exception as e:
return {
'status': 'error',
'message': str(e)
}
class ArxivSearchTool(ToolBase):
def __init__(self):
super().__init__(
programmatic_name="search_arxiv_papers",
natural_name="arXiv Paper Search",
active_voice_description_of_capability="You can search for academic papers on arXiv.",
passive_voice_description_of_function="a service that searches and retrieves academic papers from arXiv based on various criteria",
prescriptive_conditional="When given a research topic or paper query, you should call the search_arxiv_papers function to find relevant papers.",
input_params={
"query": {
"type": "string",
"description": "Search query (e.g., 'deep learning', 'quantum computing')"
},
"max_results": {
"type": "integer",
"description": "Maximum number of results to return (default: 5)"
},
"sort_by": {
"type": "string",
"description": "Sort criteria (e.g., 'relevance', 'lastUpdatedDate', 'submittedDate')"
}
},
required_params=["query"],
)
def actual_function(self, **kwargs):
return search_arxiv_papers(**kwargs)
arxiv_tool = ArxivSearchTool()
def get_snp_info(rsid):
base_url = "https://api.ncbi.nlm.nih.gov/variation/v0/"
result = {"rsid": rsid, "error": "No data found"}
# Fetch RefSNP data
snp_url = f"{base_url}refsnp/{rsid}"
response = requests.get(snp_url)
if response.status_code != 200:
return {"error": f"Failed to retrieve data for rs{rsid}"}
data = response.json()
# Extract useful information
result = {
"create_date": data.get("create_date", "Unknown"),
"last_update_date": data.get("last_update_date", "Unknown"),
"genes": [],
"hgvs": [],
"spdi": [],
"clinical_significance": [],
"frequency_data": {},
}
# Extract gene associations
primary_data = data.get("primary_snapshot_data", {})
if "allele_annotations" in primary_data:
for annotation in primary_data["allele_annotations"]:
for gene in annotation.get("assembly_annotation", []):
for gene_info in gene.get("genes", []):
result["genes"].append(gene_info.get("locus", "Unknown"))
# Extract HGVS notation
for placement in primary_data.get("placements_with_allele", []):
for allele in placement.get("alleles", []):
if "hgvs" in allele:
result["hgvs"].append(allele["hgvs"])
if "spdi" in allele.get("allele", {}):
spdi_data = allele["allele"]["spdi"]
spdi_notation = f"{spdi_data['seq_id']}:{spdi_data['position']}:{spdi_data['deleted_sequence']}:{spdi_data['inserted_sequence']}"
result["spdi"].append(spdi_notation)
# Extract clinical significance from ClinVar
for annotation in primary_data.get("allele_annotations", []):
for clinical in annotation.get("clinical", []):
result["clinical_significance"].extend([str(s)[:600] for s in clinical.get("clinical_significances", [])])
# Fetch ALFA frequency data
freq_url = f"{base_url}refsnp/{rsid}/frequency"
freq_response = requests.get(freq_url)
if freq_response.status_code == 200:
freq_data = freq_response.json().get("results", {})
for key, value in freq_data.items():
if "counts" in value:
result["frequency_data"] = value["counts"]
break
citations = data.get("citations", [])[:6]
lgs("citations: " + str(citations))
result["citations"] = [pubmed_parser.parse_xml_web(c, sleep=0.5, save_xml=False,) for c in citations]
lgs("full citations data: " + str(result["citations"]))
return result
class NIHRefSNPTool(ToolBase):
def __init__(self):
super().__init__(
programmatic_name="search_nih_refsnp",
natural_name="NIH RefSNP Searcher",
active_voice_description_of_capability=(
"You can search for refSNP data on the NIH Variation API."
),
passive_voice_description_of_function=(
"a service that retrieves refSNP data from the NIH Variation API "
"based on a provided SNP identifier"
),
prescriptive_conditional=(
"When given a refSNP identifier (e.g., 'rs79220014'), "
"you should call the search_nih_refsnp function "
"to find its associated data."
),
input_params={
"snp": {
"type": "string",
"description": "The refSNP identifier (e.g., 'rs79220014')"
}
},
required_params=["snp"],
)
def actual_function(self, **kwargs):
return get_snp_info(kwargs["snp"][2:])
nih_ref_snp_tool = NIHRefSNPTool()
def get_weather_data(location):
"""
Fetch current weather data for a given location using WeatherAPI.com.
Args:
location (str): The location for which to retrieve weather (e.g., "London", "90210", or "48.8567,2.3510").
Returns:
dict: A dictionary containing the current weather data or an error message.
"""
base_url = "https://api.weatherapi.com/v1/current.json"
params = {
"key": WEATHER_API_KEY,
"q": location,
"aqi": "no" # Set to "yes" to include air quality data if desired.
}
full_url = base_url + "?" + "&".join([f"{k}={urllib.parse.quote(str(v))}" for k, v in params.items()])
try:
response = requests.get(full_url)
lgs("RAW RESPONSE: " + str(response))
except Exception as e:
lgs("FAILED PARAMS: " + str(params))
lgs("FAILED ERROR: " + str(e))
return {"error": f"Failed to retrieve weather data for {location}. Error: {str(e)}"}
if response.status_code != 200:
return {"error": f"Failed to retrieve weather data for {location}. Status code: {response.status_code}"}
data = response.json()
formatted_data = {
"location": data.get("location", {}),
"current": {
"last_updated": data.get("current", {}).get("last_updated"),
"temp_c": data.get("current", {}).get("temp_c"),
"temp_f": data.get("current", {}).get("temp_f"),
"precip_mm": data.get("current", {}).get("precip_mm"),
"precip_in": data.get("current", {}).get("precip_in"),
"humidity": data.get("current", {}).get("humidity"),
"wind_kph": data.get("current", {}).get("wind_kph"),
"wind_mph": data.get("current", {}).get("wind_mph"),
"condition": data.get("current", {}).get("condition", {})
}
}
return formatted_data
class WeatherAPITool(ToolBase):
def __init__(self):
super().__init__(
programmatic_name="get_weather_data",
natural_name="Weather Report Fetcher",
active_voice_description_of_capability="You can fetch real-time weather data for any location worldwide.",
passive_voice_description_of_function="a service that retrieves current weather details including temperature, precipitation, humidity, and wind data.",
prescriptive_conditional="When provided with a location (city, ZIP, or lat,long) call the get_weather_data function to retrieve its weather information.",
input_params={
"location": {
"type": "string",
"description": "The location to retrieve weather data for (e.g., 'London', '90210', or '48.8567,2.3510')."
},
},
required_params=["location"],
)
def actual_function(self, **kwargs):
return get_weather_data(kwargs["location"])
weather_tool = WeatherAPITool()
tool_objects_list = [arxiv_tool, nih_ref_snp_tool, weather_tool]
system_prompt = build_sys_prompt(tool_objects_list)
functions_dict = {t.json_name: t.actual_function for t in tool_objects_list}
print("===== Application Startup at", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "=====")
print("\n" + system_prompt)
class LLM:
def __init__(self, max_model_len: int = 4096):
self.api_key = OAI_API_KEY
self.max_model_len = max_model_len
self.client = OpenAI(base_url=ENDPOINT_URL, api_key=self.api_key)
self.model_name = MODEL_NAME
def generate(self, messages: List[Dict[str, str]], sampling_params: dict) -> Any:
"""
Generate a chat completion using the chat.completions API.
Args:
messages: List of message dictionaries with 'role' and 'content' keys
sampling_params: Dictionary of sampling parameters
Returns:
ChatCompletion response object
"""
try:
completion_params = {
"model": self.model_name,
"messages": messages,
"max_tokens": sampling_params.get("max_tokens", 2048),
"temperature": sampling_params.get("temperature", 0.8),
"top_p": sampling_params.get("top_p", 0.95),
"n": sampling_params.get("n", 1),
"stream": False,
}
if "stop" in sampling_params:
completion_params["stop"] = sampling_params["stop"]
if "presence_penalty" in sampling_params:
completion_params["presence_penalty"] = sampling_params["presence_penalty"]
if "frequency_penalty" in sampling_params:
completion_params["frequency_penalty"] = sampling_params["frequency_penalty"]
response = self.client.chat.completions.create(**completion_params)
return response
except Exception as e:
lgs(f"Error in generate: {str(e)}")
raise
def build_messages_for_api(message_history: List[Dict]) -> List[Dict[str, str]]:
"""
Convert message history to the format expected by the chat.completions API.
Args:
message_history: Internal message history format
Returns:
List of messages in OpenAI chat format
"""
messages = [
{"role": "system", "content": system_prompt}
]
for message in message_history:
role = message["role"]
content = message["content"]
# Convert 'function' role to 'assistant' for the API
if role == "function":
# Function results are typically shown as assistant messages
messages.append({"role": "assistant", "content": content})
else:
messages.append({"role": role, "content": content})
return messages
def check_assistant_response_for_tool_calls(response: str) -> Optional[Dict]:
"""
Check if the LLM response contains a function call.
Args:
response: The assistant's response string
Returns:
Dictionary with tool call data or None if no tool call found
"""
# Look for JSON structure in the response
if "{" in response and "}" in response:
# Try to extract JSON from the response
try:
# Find the first { and last }
start_idx = response.find("{")
end_idx = response.rfind("}") + 1
json_str = response[start_idx:end_idx]
# Parse the JSON
data = json.loads(json_str)
# Check if it's a valid tool call format
if "name" in data and "parameters" in data:
if data["name"] in functions_dict:
return data
except json.JSONDecodeError:
pass
return None
def process_tool_request(tool_request_data: Dict) -> Dict:
"""
Process tool requests from the LLM.
Args:
tool_request_data: Dictionary with 'name' and 'parameters' keys
Returns:
Dictionary with tool name and return value
"""
tool_name = tool_request_data["name"]
tool_parameters = tool_request_data["parameters"]
if tool_name not in functions_dict:
raise ValueError(f"Unknown tool name: {tool_name}")
# Call the appropriate tool function
tool_function = functions_dict[tool_name]
search_results = tool_function(**tool_parameters)
tool_return = {"name": tool_name, "return": search_results}
lgs("TOOL: " + str(tool_return))
return tool_return
def restore_message_history(full_history: List[Dict]) -> List[Dict]:
"""
Restore the complete message history including tool interactions.
Args:
full_history: The stored message history
Returns:
Restored message history with tool interactions expanded
"""
restored = []
for message in full_history:
if message["role"] == "assistant" and "metadata" in message:
tool_interactions = message["metadata"].get("tool_interactions", [])
if tool_interactions:
for tool_msg in tool_interactions:
restored.append(tool_msg)
final_msg = message.copy()
if "metadata" in final_msg and "tool_interactions" in final_msg["metadata"]:
del final_msg["metadata"]["tool_interactions"]
restored.append(final_msg)
else:
restored.append(message)
else:
restored.append(message)
return restored
def iterate_chat(llm: LLM, sampling_params: dict, full_history: List[Dict]) -> List[Dict]:
"""
Handle conversation turns with tool calling using the chat.completions API.
Args:
llm: The LLM instance
sampling_params: Sampling parameters
full_history: The conversation history
Returns:
Updated conversation history
"""
tool_interactions = []
for iteration in range(10): # Maximum 10 iterations to prevent infinite loops
# Prepare messages for the API
current_messages = restore_message_history(full_history) + tool_interactions
api_messages = build_messages_for_api(current_messages)
try:
# Call the chat.completions API
output = llm.generate(api_messages, sampling_params)
if VERBOSE_SHELL:
print(f"\n--- Iteration {iteration + 1} ---")
print(f"Messages sent: {json.dumps(api_messages, indent=2)[:500]}...")
print("-" * 50)
if not output or not output.choices:
raise ValueError("Invalid completion response")
# Extract the assistant's response
assistant_response = output.choices[0].message.content.strip()
lgs("ASSISTANT: " + assistant_response.replace("\n", "\\n")[:200] + "...")
# Check if the response contains a tool call
tool_request_data = check_assistant_response_for_tool_calls(assistant_response)
if not tool_request_data:
# No tool call, this is the final response
final_message = {
"role": "assistant",
"content": assistant_response,
"metadata": {
"tool_interactions": tool_interactions
}
}
full_history.append(final_message)
return full_history
else:
# Tool call detected
assistant_message = {
"role": "assistant",
"content": json.dumps(tool_request_data),
}
tool_interactions.append(assistant_message)
# Process the tool request
try:
tool_return_data = process_tool_request(tool_request_data)
# Add tool result to interactions
tool_message = {
"role": "function",
"content": json.dumps(tool_return_data)
}
tool_interactions.append(tool_message)
except Exception as e:
lgs(f"Tool execution error: {str(e)}")
error_message = {
"role": "function",
"content": json.dumps({"error": str(e)})
}
tool_interactions.append(error_message)
except Exception as e:
lgs(f"Error in iterate_chat: {str(e)}")
# Add error response
error_message = {
"role": "assistant",
"content": f"I encountered an error processing your request: {str(e)}. Please try again.",
"metadata": {
"tool_interactions": tool_interactions
}
}
full_history.append(error_message)
return full_history
# If we've exhausted iterations, return with a message
timeout_message = {
"role": "assistant",
"content": "I've processed multiple tool calls but couldn't complete the task. Please try rephrasing your request.",
"metadata": {
"tool_interactions": tool_interactions
}
}
full_history.append(timeout_message)
return full_history
def user_conversation(user_message: str, chat_history: List, full_history: Optional[List]) -> tuple:
"""
Handle user input and maintain conversation state.
Args:
user_message: The user's input message
chat_history: Gradio chat history (list of tuples)
full_history: Full conversation history with metadata
Returns:
Tuple of (empty string, updated chat_history, updated full_history)
"""
if full_history is None:
full_history = []
lgs("USER: " + user_message.replace("\n", "\\n"))
full_history.append({"role": "user", "content": user_message})
try:
updated_history = iterate_chat(llm, sampling_params, full_history)
assistant_answer = updated_history[-1]["content"]
chat_history.append((user_message, assistant_answer))
except Exception as e:
lgs(f"Error in user_conversation: {str(e)}")
error_response = f"I encountered an error: {str(e)}"
chat_history.append((user_message, error_response))
full_history.append({"role": "assistant", "content": error_response})
return "", chat_history, full_history
# Initialize the LLM
llm = LLM(max_model_len=32000)
lgs("STARTING NEW CHAT")
# Create the Gradio interface
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🛠️ Weather/Arxiv/SNP Multi-tool Calling Bot
This bot can help you with:
- 🌤️ Current weather information for any location
- 📚 Searching academic papers on arXiv
- 🧬 Looking up SNP (Single Nucleotide Polymorphism) information
""")
# State management
chat_state = gr.State([])
# Chat interface with proper type specification
chatbot = gr.Chatbot(
label="Chat with the multi-tool bot",
type="tuples", # Explicitly set to avoid deprecation warning
height=500,
show_copy_button=True
)
# User input
with gr.Row():
user_input = gr.Textbox(
lines=2,
placeholder="Type your message here... (Press Enter to send)",
label="Your Message",
scale=4
)
send_button = gr.Button("Send", variant="primary", scale=1)
# Examples
gr.Examples(
examples=[
"What is the current weather in Åfjord?",
"What's the weather in Tokyo, Japan?",
"List some papers about humor in LLMs",
"Find recent papers on quantum computing",
"What does this SNP do?: rs429358",
],
inputs=[user_input],
label="Example Queries",
)
# Clear button
clear_button = gr.Button("Clear Chat", variant="secondary")
# Event handlers
def clear_chat():
return None, []
user_input.submit(
fn=user_conversation,
inputs=[user_input, chatbot, chat_state],
outputs=[user_input, chatbot, chat_state],
queue=True
)
send_button.click(
fn=user_conversation,
inputs=[user_input, chatbot, chat_state],
outputs=[user_input, chatbot, chat_state],
queue=True
)
clear_button.click(
fn=clear_chat,
outputs=[chatbot, chat_state],
queue=False
)
# Add footer
gr.Markdown("""
---
**Note:** This bot uses the Llama-3.3-70B-Instruct model via Hyperbolic API with tool calling capabilities.
""")
# Launch the app
if __name__ == "__main__":
demo.queue(max_size=20)
demo.launch(
show_api=False, # Disable API documentation to avoid schema issues
quiet=False, # Show startup messages
share=False, # Set to True if you want a public link
debug=True # Enable debug mode for better error messages
)