Spaces:
Sleeping
Sleeping
| from openai import OpenAI | |
| # Prefer the new Google GenAI SDK if available (import style: `from google import genai`). | |
| # Fall back to the legacy `google.generativeai` package if that's what's installed. | |
| try: | |
| from google import genai # new `google-genai` package | |
| except Exception: | |
| try: | |
| import google.generativeai as genai # older, deprecated package | |
| except Exception: | |
| genai = None | |
| import os | |
| import requests | |
| import json | |
| import gradio as gr | |
| import time | |
| import re | |
| #export GRADIO_DEBUG=1 | |
| # ----------- CONFIGURATION ---------------------------------------------------- | |
| # OPENAI_API_KEY must be set in the environment | |
| # Model name for LLM calls. Can be overridden by setting the LLM_MODEL | |
| # environment variable. Falls back to a sensible default if unset. | |
| MODEL_NAME = os.getenv("LLM_MODEL", "models/gemini-flash-latest") | |
| GENAI_API = os.getenv("GENAI_API", "gemini") | |
| # LLM_MODEL_NAME must be set in the environment | |
| def _extract_text_from_message(message): | |
| """Extract plain text from a message entry used in this codebase. | |
| Messages in this project often look like: | |
| {"role": "user", "content": [{"type": "text", "text": "..."}]} | |
| This helper normalizes that shape to a single string. | |
| """ | |
| content = message.get("content") | |
| if isinstance(content, list) and len(content) > 0: | |
| first = content[0] | |
| if isinstance(first, dict) and "text" in first: | |
| return first.get("text", "") | |
| return str(first) | |
| if isinstance(content, dict) and "text" in content: | |
| return content.get("text", "") | |
| if isinstance(content, str): | |
| return content | |
| return str(content) | |
| def create_chat_response(messages, model, temperature=0, max_tokens=2048): | |
| """Unified helper to produce a text response from either OpenAI or | |
| Google's GenAI backends. | |
| Returns a plain string with the assistant reply. | |
| """ | |
| # OpenAI-style client: keep calling the same API | |
| if GENAI_API == "openai": | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| top_p=1, | |
| frequency_penalty=0, | |
| presence_penalty=0, | |
| ) | |
| # Expect OpenAI-style response | |
| try: | |
| return response.choices[0].message.content | |
| except Exception: | |
| # Fallback: stringify | |
| return str(response) | |
| # Google GenAI path: convert messages to a single prompt and call | |
| # the available model API (best-effort mapping). | |
| prompt = "\n\n".join(f"{m.get('role','')}: {_extract_text_from_message(m)}" for m in messages) | |
| # Try common modern GenAI SDK pattern: client.models.generate_content | |
| try: | |
| if hasattr(client, "models") and hasattr(client.models, "generate_content"): | |
| # Use names similar to examples: contents and optional params | |
| try: | |
| resp = client.models.generate_content(model=model, contents=prompt, temperature=temperature, max_output_tokens=max_tokens) | |
| except TypeError: | |
| # Some versions may not accept those named args; try minimal call | |
| resp = client.models.generate_content(model=model, contents=prompt) | |
| # Response object often has `.text` or `.content` | |
| text = getattr(resp, "text", None) or getattr(resp, "content", None) | |
| if text is None: | |
| return str(resp) | |
| return text | |
| # Older `google.generativeai` (legacy) had different surface; try a | |
| # generous fallback: look for a top-level `generate` or `generate_text`. | |
| if hasattr(client, "generate"): | |
| resp = client.generate(prompt) | |
| return getattr(resp, "text", str(resp)) | |
| if hasattr(client, "generate_text"): | |
| resp = client.generate_text(prompt) | |
| return getattr(resp, "text", str(resp)) | |
| except Exception as e: | |
| # Surface the error with context to help debugging. | |
| raise RuntimeError(f"GenAI model call failed: {e}") | |
| raise RuntimeError("No suitable GenAI method found on `client`; please install/initialize supported SDK or set GENAI_API=openai") | |
| def search_inspire(query, size=10): | |
| """ | |
| Search INSPIRE HEP database using fulltext search | |
| Args: | |
| query (str): Search query | |
| size (int): Number of results to return | |
| """ | |
| base_url = "https://inspirehep.net/api/literature" | |
| params = { | |
| "q": query, | |
| "size": size, | |
| "format": "json" | |
| } | |
| response = requests.get(base_url, params=params) | |
| return response.json() | |
| def format_reference(metadata): | |
| output = f"{', '.join(author.get('full_name', '') for author in metadata.get('authors', []))} " | |
| output += f"({metadata.get('publication_info', [{}])[0].get('year', 'N/A')}). " | |
| output += f"*{metadata.get('titles', [{}])[0].get('title', 'N/A')}*. " | |
| output += f"DOI: {metadata.get('dois', [{}])[0].get('value', 'N/A') if metadata.get('dois') else 'N/A'}. " | |
| output += f"[INSPIRE record {metadata['control_number']}](https://inspirehep.net/literature/{metadata['control_number']})" | |
| output += "\n\n" | |
| return output | |
| def format_results(results): | |
| """Print formatted search results""" | |
| output = "" | |
| for i, hit in enumerate(results['hits']['hits']): | |
| metadata = hit['metadata'] | |
| output += f"**[{i}]** " | |
| output += format_reference(metadata) | |
| return output | |
| def results_context(results): | |
| """ Prepare a context from the results for the LLM """ | |
| context = "" | |
| for i, hit in enumerate(results['hits']['hits']): | |
| metadata = hit['metadata'] | |
| context += f"Result [{i}]\n\n" | |
| context += f"Title: {metadata.get('titles', [{}])[0].get('title', 'N/A')}\n\n" | |
| context += f"Abstract: {metadata.get('abstracts', [{}])[0].get('value', 'N/A')}\n\n" | |
| return context | |
| def user_prompt(query, context): | |
| """ Generate a prompt for the LLM """ | |
| prompt = f""" | |
| QUERY: {query} | |
| CONTEXT: | |
| {context} | |
| ANSWER: | |
| """ | |
| return prompt | |
| def llm_expand_query(query): | |
| """ Expands a query to variations of fulltext searches """ | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": f""" | |
| Expand this query into a the query format used for a fulltext search | |
| over the INSPIRE HEP database. Propose alternatives of the query to | |
| maximize the recall and join those variantes using OR operators and | |
| prepend each variant with the ft prefix. Just provide the expanded | |
| query, without explanations. | |
| Example of query: | |
| how far are black holes? | |
| Expanded query: | |
| ft "how far are black holes" OR ft "distance from black holes" OR ft | |
| "distances to black holes" OR ft "measurement of distance to black | |
| holes" OR ft "remoteness of black holes" OR ft "distance to black | |
| holes" OR ft "how far are singularities" OR ft "distance to | |
| singularities" OR ft "distances to event horizon" OR ft "distance | |
| from Schwarzschild radius" OR ft "black hole distance" | |
| Query: {query} | |
| Expanded query: | |
| """ | |
| } | |
| ] | |
| } | |
| ] | |
| return create_chat_response(messages=messages, model=MODEL_NAME, temperature=0, max_tokens=2048) | |
| def llm_generate_answer(prompt): | |
| """ Generate a response from the LLM """ | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": """You are part of a Retrieval Augmented Generation system | |
| (RAG) and are asked with a query and a context of results. Generate an | |
| answer substantiated by the results provided and citing them using | |
| their index when used to provide an answer text. Do not put two or more | |
| references together (ex: use [1][2] instead of [1,2]. Do not generate an answer | |
| that cannot be entailed from cited abstract, so all paragraphs should cite a | |
| search result. End the answer with the query and a brief answer as | |
| summary of the previous discussed results. Do not consider results | |
| that are not related to the query and, if no specific answer can be | |
| provided, assert that in the brief answer.""" | |
| } | |
| ] | |
| }, | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": prompt | |
| } | |
| ] | |
| } | |
| ] | |
| return create_chat_response(messages=messages, model=MODEL_NAME, temperature=0, max_tokens=2048) | |
| def clean_refs(answer, results): | |
| """ Clean the references from the answer """ | |
| # Find references | |
| unique_ordered = [] | |
| for match in re.finditer(r'\[(\d+)\]', answer): | |
| ref_num = int(match.group(1)) | |
| if ref_num not in unique_ordered: | |
| unique_ordered.append(ref_num) | |
| # Filter references | |
| new_i = 1 | |
| new_results = "" | |
| for i, hit in enumerate(results['hits']['hits']): | |
| if i not in unique_ordered: | |
| continue | |
| metadata = hit['metadata'] | |
| new_results += f"**[{new_i}]** " | |
| new_results += format_reference(metadata) | |
| new_i += 1 | |
| new_i = 1 | |
| for i in unique_ordered: | |
| answer = answer.replace(f"[{i}]", f" **[__NEW_REF_ID_{new_i}]**") | |
| new_i += 1 | |
| answer = answer.replace("__NEW_REF_ID_", "") | |
| return answer, new_results | |
| def search(query, progress=gr.Progress()): | |
| time.sleep(1) | |
| progress(0, desc="Expanding query...") | |
| expanded_query = llm_expand_query(query) | |
| progress(0.25, desc="Searching INSPIRE HEP...") | |
| results = search_inspire(expanded_query) | |
| progress(0.50, desc="Generating answer...") | |
| context = results_context(results) | |
| prompt = user_prompt(query, context) | |
| answer = llm_generate_answer(prompt) | |
| new_answer, references = clean_refs(answer, results) | |
| progress(1, desc="Done!") | |
| #json_str = json.dumps(results['hits']['hits'][0]['metadata'], indent=4) | |
| return "**Answer**:\n\n" + new_answer +"\n\n**References**:\n\n" + references #+ "\n\n <pre>\n" + json_str + "</pre>" | |
| # ----------- MAIN ------------------------------------------------------------ | |
| if GENAI_API == "openai": | |
| client = OpenAI() | |
| elif GENAI_API and GENAI_API.lower() in ("gemini", "google", "genai"): | |
| # If the genai package couldn't be imported earlier, tell the user. | |
| if genai is None: | |
| raise RuntimeError( | |
| "GENAI_API is set to Gemini but no Google GenAI SDK is installed. " | |
| "Install `google-genai` (preferred) or `google-generativeai`, or set GENAI_API=openai." | |
| ) | |
| # Prefer the new `genai.Client()` style when available (google-genai SDK). | |
| if hasattr(genai, "Client"): | |
| client = genai.Client(api_key=os.getenv('GEMINI_API_KEY')) | |
| else: | |
| # Legacy SDK: configure module-level API key and use the module as client. | |
| genai.configure(api_key=os.getenv('GEMINI_API_KEY')) | |
| client = genai | |
| else: | |
| # Default to OpenAI client if GENAI_API is unrecognized or unset. | |
| client = OpenAI() | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Feynbot on INSPIRE HEP Search") | |
| gr.Markdown("""Specialized academic search tool that combines traditional | |
| database searching with AI-powered query expansion and result | |
| synthesis, focused on High Energy Physics research papers.""") | |
| with gr.Row(): | |
| with gr.Column(): | |
| query = gr.Textbox(label="Search Query") | |
| search_btn = gr.Button("Search") | |
| examples = gr.Examples([ | |
| ["Which one is closest star?"], | |
| ["In which particles does the Higgs Boson decay to?"], | |
| ["What is the 'swampland criteria' in inflation?"]], query) | |
| with gr.Row(): | |
| gr.HTML("<a href='https://sinai.ujaen.es'><img src='https://sinai.ujaen.es/sites/default/files/SINAI%20-%20logo%20tx%20azul%20%5Baf%5D.png' width='200'></img></a>") | |
| gr.HTML("<a href='https://www.ujaen.es'><img src='https://diariodigital.ujaen.es/sites/default/files/general/logo-uja.svg' width='180'></img></a>") | |
| with gr.Column(): | |
| results = gr.Markdown("Answer will appear here...", label="Search Results", ) | |
| search_btn.click(fn=search, inputs=query, outputs=results, api_name="search", show_progress=True) | |
| demo.launch() | |
| #print(search("how far are black holes?")) |