import os import textwrap import datetime import json import gradio as gr from openai import OpenAI import urllib.request import feedparser import time from typing import Dict, List, Optional, Any import pubmed_parser import requests VERBOSE_SHELL = True ENDPOINT_URL = "https://api.hyperbolic.xyz/v1" OAI_API_KEY = os.environ['HYPERBOLIC_XYZ_API_KEY'] WEATHER_API_KEY = os.environ["WEATHER_API_KEY"] MODEL_NAME = "meta-llama/Llama-3.3-70B-Instruct" def lgs(log_string): if VERBOSE_SHELL: print(log_string) sampling_params = { "temperature": 0.8, "top_p": 0.95, "max_tokens": 2048, } todays_date_string = datetime.date.today().strftime("%d %B %Y") def system_prompt_format(function_descriptions, function_jsons): return """Cutting Knowledge Date: December 2023 Today Date: """ + todays_date_string + """ You are a helpful assistant with tool calling capabilities. """ + "\n".join(function_descriptions) + """ If you choose to use one of the following functions, respond with a JSON for a function call with its proper arguments that best answers the given prompt. Your tool request should be in the exact format {"name": function name, "parameters": dictionary of argument name and its value}. Do not use variables. Just a two-key dictionary, starting with the function name, followed by a dictionary of parameters. """ + "\n".join([json.dumps(d, indent=2) for d in function_jsons]) + """ After receiving the results back from a function (formatted as {"name": function name, "return": returned data after running function}) formulate your response to the user. If the information needed is not found in the returned data, either attempt a new function call, or inform the user that you cannot answer based on your available knowledge. The user cannot see the function results. You have to interpret the data and provide a response based on it. If the user request does not necessitate a function call, simply respond to the user's query directly.""" def build_sys_prompt(tool_objects): function_descriptions = [t.system_prompt_paragraph for t in tool_objects] function_jsons = [t.json_definition_of_function for t in tool_objects] return system_prompt_format(function_descriptions, function_jsons) class ToolBase: def __init__(self, programmatic_name: str, natural_name: str, active_voice_description_of_capability: str, passive_voice_description_of_function: str, prescriptive_conditional: str, input_params: Dict[str, Dict], required_params: Optional[List[str]] = None, ): self.json_name = programmatic_name self.json_description = passive_voice_description_of_function parameters_schema = { "type": "object", "properties": input_params } if required_params: parameters_schema["required"] = required_params self.json_definition_of_function = { "type": "function", "function": { "name": self.json_name, "description": self.json_description, "parameters": parameters_schema } } self.system_prompt_paragraph = active_voice_description_of_capability + " " + prescriptive_conditional def actual_function(self, **kwargs): raise NotImplementedError("Subclasses must implement this method.") def search_arxiv_papers( query: str, max_results: int = 5, sort_by: str = 'relevance' ) -> Dict: """ Search for papers on arXiv using their API. Args: query: Search query string max_results: Maximum number of results to return (default: 5) sort_by: Sorting criteria (default: 'relevance') Returns: Dictionary containing search results and metadata """ try: # Construct the search query search_query = f'all:{query}' # Construct the API URL base_url = 'https://export.arxiv.org/api/query?' params = { 'search_query': search_query, 'start': 0, 'max_results': max_results, 'sortBy': sort_by, 'sortOrder': 'descending' } query_string = '&'.join([f'{k}={urllib.parse.quote(str(v))}' for k, v in params.items()]) url = base_url + query_string # Make the API request response = urllib.request.urlopen(url) feed = feedparser.parse(response.read().decode('utf-8')) # Process the results papers = [] for entry in feed.entries: paper = { 'id': entry.id.split('/abs/')[-1], 'title': entry.title, 'authors': [author.name for author in entry.authors], 'summary': entry.summary, 'published': entry.published, 'link': entry.link, 'primary_category': entry.tags[0]['term'] if entry.tags else 'Unknown' } papers.append(paper) time.sleep(1) return { 'status': 'success', 'total_results': len(papers), 'papers': papers } except Exception as e: return { 'status': 'error', 'message': str(e) } class ArxivSearchTool(ToolBase): def __init__(self): super().__init__( programmatic_name="search_arxiv_papers", natural_name="arXiv Paper Search", active_voice_description_of_capability="You can search for academic papers on arXiv.", passive_voice_description_of_function="a service that searches and retrieves academic papers from arXiv based on various criteria", prescriptive_conditional="When given a research topic or paper query, you should call the search_arxiv_papers function to find relevant papers.", input_params={ "query": { "type": "string", "description": "Search query (e.g., 'deep learning', 'quantum computing')" }, "max_results": { "type": "integer", "description": "Maximum number of results to return (default: 5)" }, "sort_by": { "type": "string", "description": "Sort criteria (e.g., 'relevance', 'lastUpdatedDate', 'submittedDate')" } }, required_params=["query"], ) def actual_function(self, **kwargs): return search_arxiv_papers(**kwargs) arxiv_tool = ArxivSearchTool() def get_snp_info(rsid): base_url = "https://api.ncbi.nlm.nih.gov/variation/v0/" result = {"rsid": rsid, "error": "No data found"} # Fetch RefSNP data snp_url = f"{base_url}refsnp/{rsid}" response = requests.get(snp_url) if response.status_code != 200: return {"error": f"Failed to retrieve data for rs{rsid}"} data = response.json() # Extract useful information result = { "create_date": data.get("create_date", "Unknown"), "last_update_date": data.get("last_update_date", "Unknown"), "genes": [], "hgvs": [], "spdi": [], "clinical_significance": [], "frequency_data": {}, } # Extract gene associations primary_data = data.get("primary_snapshot_data", {}) if "allele_annotations" in primary_data: for annotation in primary_data["allele_annotations"]: for gene in annotation.get("assembly_annotation", []): for gene_info in gene.get("genes", []): result["genes"].append(gene_info.get("locus", "Unknown")) # Extract HGVS notation for placement in primary_data.get("placements_with_allele", []): for allele in placement.get("alleles", []): if "hgvs" in allele: result["hgvs"].append(allele["hgvs"]) if "spdi" in allele.get("allele", {}): spdi_data = allele["allele"]["spdi"] spdi_notation = f"{spdi_data['seq_id']}:{spdi_data['position']}:{spdi_data['deleted_sequence']}:{spdi_data['inserted_sequence']}" result["spdi"].append(spdi_notation) # Extract clinical significance from ClinVar for annotation in primary_data.get("allele_annotations", []): for clinical in annotation.get("clinical", []): result["clinical_significance"].extend([str(s)[:600] for s in clinical.get("clinical_significances", [])]) # Fetch ALFA frequency data freq_url = f"{base_url}refsnp/{rsid}/frequency" freq_response = requests.get(freq_url) if freq_response.status_code == 200: freq_data = freq_response.json().get("results", {}) for key, value in freq_data.items(): if "counts" in value: result["frequency_data"] = value["counts"] break citations = data.get("citations", [])[:6] lgs("citations: " + str(citations)) result["citations"] = [pubmed_parser.parse_xml_web(c, sleep=0.5, save_xml=False,) for c in citations] lgs("full citations data: " + str(result["citations"])) return result class NIHRefSNPTool(ToolBase): def __init__(self): super().__init__( programmatic_name="search_nih_refsnp", natural_name="NIH RefSNP Searcher", active_voice_description_of_capability=( "You can search for refSNP data on the NIH Variation API." ), passive_voice_description_of_function=( "a service that retrieves refSNP data from the NIH Variation API " "based on a provided SNP identifier" ), prescriptive_conditional=( "When given a refSNP identifier (e.g., 'rs79220014'), " "you should call the search_nih_refsnp function " "to find its associated data." ), input_params={ "snp": { "type": "string", "description": "The refSNP identifier (e.g., 'rs79220014')" } }, required_params=["snp"], ) def actual_function(self, **kwargs): return get_snp_info(kwargs["snp"][2:]) nih_ref_snp_tool = NIHRefSNPTool() def get_weather_data(location): """ Fetch current weather data for a given location using WeatherAPI.com. Args: location (str): The location for which to retrieve weather (e.g., "London", "90210", or "48.8567,2.3510"). Returns: dict: A dictionary containing the current weather data or an error message. """ base_url = "https://api.weatherapi.com/v1/current.json" params = { "key": WEATHER_API_KEY, "q": location, "aqi": "no" # Set to "yes" to include air quality data if desired. } full_url = base_url + "?" + "&".join([f"{k}={urllib.parse.quote(str(v))}" for k, v in params.items()]) try: response = requests.get(full_url) lgs("RAW RESPONSE: " + str(response)) except Exception as e: lgs("FAILED PARAMS: " + str(params)) lgs("FAILED ERROR: " + str(e)) return {"error": f"Failed to retrieve weather data for {location}. Error: {str(e)}"} if response.status_code != 200: return {"error": f"Failed to retrieve weather data for {location}. Status code: {response.status_code}"} data = response.json() formatted_data = { "location": data.get("location", {}), "current": { "last_updated": data.get("current", {}).get("last_updated"), "temp_c": data.get("current", {}).get("temp_c"), "temp_f": data.get("current", {}).get("temp_f"), "precip_mm": data.get("current", {}).get("precip_mm"), "precip_in": data.get("current", {}).get("precip_in"), "humidity": data.get("current", {}).get("humidity"), "wind_kph": data.get("current", {}).get("wind_kph"), "wind_mph": data.get("current", {}).get("wind_mph"), "condition": data.get("current", {}).get("condition", {}) } } return formatted_data class WeatherAPITool(ToolBase): def __init__(self): super().__init__( programmatic_name="get_weather_data", natural_name="Weather Report Fetcher", active_voice_description_of_capability="You can fetch real-time weather data for any location worldwide.", passive_voice_description_of_function="a service that retrieves current weather details including temperature, precipitation, humidity, and wind data.", prescriptive_conditional="When provided with a location (city, ZIP, or lat,long) call the get_weather_data function to retrieve its weather information.", input_params={ "location": { "type": "string", "description": "The location to retrieve weather data for (e.g., 'London', '90210', or '48.8567,2.3510')." }, }, required_params=["location"], ) def actual_function(self, **kwargs): return get_weather_data(kwargs["location"]) weather_tool = WeatherAPITool() tool_objects_list = [arxiv_tool, nih_ref_snp_tool, weather_tool] system_prompt = build_sys_prompt(tool_objects_list) functions_dict = {t.json_name: t.actual_function for t in tool_objects_list} print("===== Application Startup at", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "=====") print("\n" + system_prompt) class LLM: def __init__(self, max_model_len: int = 4096): self.api_key = OAI_API_KEY self.max_model_len = max_model_len self.client = OpenAI(base_url=ENDPOINT_URL, api_key=self.api_key) self.model_name = MODEL_NAME def generate(self, messages: List[Dict[str, str]], sampling_params: dict) -> Any: """ Generate a chat completion using the chat.completions API. Args: messages: List of message dictionaries with 'role' and 'content' keys sampling_params: Dictionary of sampling parameters Returns: ChatCompletion response object """ try: completion_params = { "model": self.model_name, "messages": messages, "max_tokens": sampling_params.get("max_tokens", 2048), "temperature": sampling_params.get("temperature", 0.8), "top_p": sampling_params.get("top_p", 0.95), "n": sampling_params.get("n", 1), "stream": False, } if "stop" in sampling_params: completion_params["stop"] = sampling_params["stop"] if "presence_penalty" in sampling_params: completion_params["presence_penalty"] = sampling_params["presence_penalty"] if "frequency_penalty" in sampling_params: completion_params["frequency_penalty"] = sampling_params["frequency_penalty"] response = self.client.chat.completions.create(**completion_params) return response except Exception as e: lgs(f"Error in generate: {str(e)}") raise def build_messages_for_api(message_history: List[Dict]) -> List[Dict[str, str]]: """ Convert message history to the format expected by the chat.completions API. Args: message_history: Internal message history format Returns: List of messages in OpenAI chat format """ messages = [ {"role": "system", "content": system_prompt} ] for message in message_history: role = message["role"] content = message["content"] # Convert 'function' role to 'assistant' for the API if role == "function": # Function results are typically shown as assistant messages messages.append({"role": "assistant", "content": content}) else: messages.append({"role": role, "content": content}) return messages def check_assistant_response_for_tool_calls(response: str) -> Optional[Dict]: """ Check if the LLM response contains a function call. Args: response: The assistant's response string Returns: Dictionary with tool call data or None if no tool call found """ # Look for JSON structure in the response if "{" in response and "}" in response: # Try to extract JSON from the response try: # Find the first { and last } start_idx = response.find("{") end_idx = response.rfind("}") + 1 json_str = response[start_idx:end_idx] # Parse the JSON data = json.loads(json_str) # Check if it's a valid tool call format if "name" in data and "parameters" in data: if data["name"] in functions_dict: return data except json.JSONDecodeError: pass return None def process_tool_request(tool_request_data: Dict) -> Dict: """ Process tool requests from the LLM. Args: tool_request_data: Dictionary with 'name' and 'parameters' keys Returns: Dictionary with tool name and return value """ tool_name = tool_request_data["name"] tool_parameters = tool_request_data["parameters"] if tool_name not in functions_dict: raise ValueError(f"Unknown tool name: {tool_name}") # Call the appropriate tool function tool_function = functions_dict[tool_name] search_results = tool_function(**tool_parameters) tool_return = {"name": tool_name, "return": search_results} lgs("TOOL: " + str(tool_return)) return tool_return def restore_message_history(full_history: List[Dict]) -> List[Dict]: """ Restore the complete message history including tool interactions. Args: full_history: The stored message history Returns: Restored message history with tool interactions expanded """ restored = [] for message in full_history: if message["role"] == "assistant" and "metadata" in message: tool_interactions = message["metadata"].get("tool_interactions", []) if tool_interactions: for tool_msg in tool_interactions: restored.append(tool_msg) final_msg = message.copy() if "metadata" in final_msg and "tool_interactions" in final_msg["metadata"]: del final_msg["metadata"]["tool_interactions"] restored.append(final_msg) else: restored.append(message) else: restored.append(message) return restored def iterate_chat(llm: LLM, sampling_params: dict, full_history: List[Dict]) -> List[Dict]: """ Handle conversation turns with tool calling using the chat.completions API. Args: llm: The LLM instance sampling_params: Sampling parameters full_history: The conversation history Returns: Updated conversation history """ tool_interactions = [] for iteration in range(10): # Maximum 10 iterations to prevent infinite loops # Prepare messages for the API current_messages = restore_message_history(full_history) + tool_interactions api_messages = build_messages_for_api(current_messages) try: # Call the chat.completions API output = llm.generate(api_messages, sampling_params) if VERBOSE_SHELL: print(f"\n--- Iteration {iteration + 1} ---") print(f"Messages sent: {json.dumps(api_messages, indent=2)[:500]}...") print("-" * 50) if not output or not output.choices: raise ValueError("Invalid completion response") # Extract the assistant's response assistant_response = output.choices[0].message.content.strip() lgs("ASSISTANT: " + assistant_response.replace("\n", "\\n")[:200] + "...") # Check if the response contains a tool call tool_request_data = check_assistant_response_for_tool_calls(assistant_response) if not tool_request_data: # No tool call, this is the final response final_message = { "role": "assistant", "content": assistant_response, "metadata": { "tool_interactions": tool_interactions } } full_history.append(final_message) return full_history else: # Tool call detected assistant_message = { "role": "assistant", "content": json.dumps(tool_request_data), } tool_interactions.append(assistant_message) # Process the tool request try: tool_return_data = process_tool_request(tool_request_data) # Add tool result to interactions tool_message = { "role": "function", "content": json.dumps(tool_return_data) } tool_interactions.append(tool_message) except Exception as e: lgs(f"Tool execution error: {str(e)}") error_message = { "role": "function", "content": json.dumps({"error": str(e)}) } tool_interactions.append(error_message) except Exception as e: lgs(f"Error in iterate_chat: {str(e)}") # Add error response error_message = { "role": "assistant", "content": f"I encountered an error processing your request: {str(e)}. Please try again.", "metadata": { "tool_interactions": tool_interactions } } full_history.append(error_message) return full_history # If we've exhausted iterations, return with a message timeout_message = { "role": "assistant", "content": "I've processed multiple tool calls but couldn't complete the task. Please try rephrasing your request.", "metadata": { "tool_interactions": tool_interactions } } full_history.append(timeout_message) return full_history def user_conversation(user_message: str, chat_history: List, full_history: Optional[List]) -> tuple: """ Handle user input and maintain conversation state. Args: user_message: The user's input message chat_history: Gradio chat history (list of tuples) full_history: Full conversation history with metadata Returns: Tuple of (empty string, updated chat_history, updated full_history) """ if full_history is None: full_history = [] lgs("USER: " + user_message.replace("\n", "\\n")) full_history.append({"role": "user", "content": user_message}) try: updated_history = iterate_chat(llm, sampling_params, full_history) assistant_answer = updated_history[-1]["content"] chat_history.append((user_message, assistant_answer)) except Exception as e: lgs(f"Error in user_conversation: {str(e)}") error_response = f"I encountered an error: {str(e)}" chat_history.append((user_message, error_response)) full_history.append({"role": "assistant", "content": error_response}) return "", chat_history, full_history # Initialize the LLM llm = LLM(max_model_len=32000) lgs("STARTING NEW CHAT") # Create the Gradio interface with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🛠️ Weather/Arxiv/SNP Multi-tool Calling Bot This bot can help you with: - 🌤️ Current weather information for any location - 📚 Searching academic papers on arXiv - 🧬 Looking up SNP (Single Nucleotide Polymorphism) information """) # State management chat_state = gr.State([]) # Chat interface with proper type specification chatbot = gr.Chatbot( label="Chat with the multi-tool bot", type="tuples", # Explicitly set to avoid deprecation warning height=500, show_copy_button=True ) # User input with gr.Row(): user_input = gr.Textbox( lines=2, placeholder="Type your message here... (Press Enter to send)", label="Your Message", scale=4 ) send_button = gr.Button("Send", variant="primary", scale=1) # Examples gr.Examples( examples=[ "What is the current weather in Åfjord?", "What's the weather in Tokyo, Japan?", "List some papers about humor in LLMs", "Find recent papers on quantum computing", "What does this SNP do?: rs429358", ], inputs=[user_input], label="Example Queries", ) # Clear button clear_button = gr.Button("Clear Chat", variant="secondary") # Event handlers def clear_chat(): return None, [] user_input.submit( fn=user_conversation, inputs=[user_input, chatbot, chat_state], outputs=[user_input, chatbot, chat_state], queue=True ) send_button.click( fn=user_conversation, inputs=[user_input, chatbot, chat_state], outputs=[user_input, chatbot, chat_state], queue=True ) clear_button.click( fn=clear_chat, outputs=[chatbot, chat_state], queue=False ) # Add footer gr.Markdown(""" --- **Note:** This bot uses the Llama-3.3-70B-Instruct model via Hyperbolic API with tool calling capabilities. """) # Launch the app if __name__ == "__main__": demo.queue(max_size=20) demo.launch( show_api=False, # Disable API documentation to avoid schema issues quiet=False, # Show startup messages share=False, # Set to True if you want a public link debug=True # Enable debug mode for better error messages )