Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| from typing import Optional, List | |
| import asyncio | |
| from contextlib import asynccontextmanager | |
| import pymongo | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| import uvicorn | |
| # LangChain imports | |
| from langchain_core.messages import ( | |
| BaseMessage, HumanMessage, AIMessage, | |
| message_to_dict, messages_from_dict | |
| ) | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain_groq import ChatGroq | |
| from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser | |
| from langchain_core.chat_history import BaseChatMessageHistory | |
| from langchain_core.runnables.history import RunnableWithMessageHistory | |
| # ------------------------------------------------------------------- | |
| # CONFIGURATION | |
| # ------------------------------------------------------------------- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --- ENVIRONMENT VARIABLES --- | |
| MONGO_URI = os.getenv("MONGO_URI") | |
| MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "chat_history_db") | |
| MONGO_COLLECTION_NAME = os.getenv("MONGO_COLLECTION_NAME", "session_history") | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| if not GROQ_API_KEY: | |
| raise ValueError("GROQ_API_KEY environment variable is required") | |
| if not MONGO_URI: | |
| raise ValueError("MONGO_URI environment variable is required for chat history") | |
| # Global variables | |
| llm = None | |
| history_collection = None | |
| # ------------------------------------------------------------------- | |
| # CHAT HISTORY HANDLER (MongoDB) | |
| # ------------------------------------------------------------------- | |
| class MongoDBChatMessageHistory(BaseChatMessageHistory): | |
| """Chat message history stored in MongoDB.""" | |
| def __init__(self, collection, session_id: str): | |
| self.collection = collection | |
| self.session_id = session_id | |
| self.collection.create_index("session_id") | |
| def messages(self) -> List[BaseMessage]: | |
| """Retrieve messages from MongoDB""" | |
| document = self.collection.find_one({"session_id": self.session_id}) | |
| if document and "history" in document: | |
| return messages_from_dict(document["history"]) | |
| return [] | |
| def add_message(self, message: BaseMessage) -> None: | |
| """Append the message to MongoDB history""" | |
| message_dict = message_to_dict(message) | |
| self.collection.update_one( | |
| {"session_id": self.session_id}, | |
| {"$push": {"history": message_dict}}, | |
| upsert=True, | |
| ) | |
| def clear(self) -> None: | |
| """Clear history for a session""" | |
| self.collection.delete_one({"session_id": self.session_id}) | |
| # ------------------------------------------------------------------- | |
| # LIFESPAN INITIALIZATION | |
| # ------------------------------------------------------------------- | |
| async def lifespan(app: FastAPI): | |
| global llm, history_collection | |
| try: | |
| # Initialize LLM | |
| llm = ChatGroq( | |
| groq_api_key=GROQ_API_KEY, | |
| model_name="openai/gpt-oss-20b", | |
| temperature=0, | |
| max_tokens=600 | |
| ) | |
| logger.info("ChatGroq LLM initialized successfully") | |
| # Initialize MongoDB | |
| mongo_client = pymongo.MongoClient(MONGO_URI) | |
| db = mongo_client[MONGO_DB_NAME] | |
| history_collection = db[MONGO_COLLECTION_NAME] | |
| mongo_client.server_info() | |
| logger.info(f"Connected to MongoDB database '{MONGO_DB_NAME}'") | |
| yield | |
| logger.info("App shutdown complete.") | |
| except Exception as e: | |
| logger.error(f"Initialization failed: {e}") | |
| raise e | |
| # ------------------------------------------------------------------- | |
| # FASTAPI APP SETUP | |
| # ------------------------------------------------------------------- | |
| app = FastAPI( | |
| title="India Legal Consultation Service", | |
| description="A bilingual (Hindi-English) legal consultation API powered by Groq and MongoDB.", | |
| version="2.0.0", | |
| lifespan=lifespan | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ------------------------------------------------------------------- | |
| # PYDANTIC MODELS | |
| # ------------------------------------------------------------------- | |
| class ConsultationResponse(BaseModel): | |
| consultation: str | |
| key_terms: Optional[str] = "general" | |
| consultation_parser = PydanticOutputParser(pydantic_object=ConsultationResponse) | |
| class QueryRequest(BaseModel): | |
| query: str | |
| user_id: str | |
| class ConsultationResponseWithHistory(BaseModel): | |
| response: str | |
| history: List[str] | |
| class HistoryResponse(BaseModel): | |
| history: List[dict] = Field(default_factory=list) | |
| class DeleteResponse(BaseModel): | |
| status: str | |
| session_id: str | |
| # ------------------------------------------------------------------- | |
| # LEGAL CONSULTATION PROMPT (FIXED - NO NEWLINE CHARS IN OUTPUT) | |
| # ------------------------------------------------------------------- | |
| consultation_prompt = PromptTemplate( | |
| input_variables=["query", "previous_messages"], | |
| template="""You are an expert Indian legal advisor (behave like a senior advocate or legal consultant with deep understanding of Indian laws). Context: Current user query: "{query}" Previous conversation: "{previous_messages}" Instructions: 1. Analyze the user's question under Indian law context (IPC, CrPC, Indian Evidence Act, Constitution, Family Law, Contract Law, IT Act, etc.). 2. Provide a clear, practical, and lawful explanation that can help a normal citizen understand their rights and possible actions. 3. Include relevant sections or provisions from Indian law where applicable. 4. Explain in both Hindi and English (use short, readable paragraphs). 5. Keep the tone professional, supportive, and educational, not judgmental. 6. Avoid giving direct legal verdicts; instead, provide guidance and awareness. 7. Extract one main legal keyword or area (e.g., murder, divorce, contract breach, cybercrime, tenant rights, property dispute). Output: Strictly follow this JSON structure without newline characters: {format_instructions}""", | |
| partial_variables={"format_instructions": consultation_parser.get_format_instructions()}, | |
| ) | |
| # ------------------------------------------------------------------- | |
| # FUNCTIONS TO BUILD CHAINS | |
| # ------------------------------------------------------------------- | |
| def get_consultation_chain(): | |
| if llm is None: | |
| raise HTTPException(status_code=503, detail="LLM not initialized yet") | |
| return consultation_prompt | llm | StrOutputParser() | |
| def get_consultation_runnable(): | |
| consultation_chain = get_consultation_chain() | |
| return RunnableWithMessageHistory( | |
| consultation_chain, | |
| lambda user_id: MongoDBChatMessageHistory(collection=history_collection, session_id=user_id), | |
| input_messages_key="query", | |
| history_messages_key="previous_messages", | |
| ) | |
| # ------------------------------------------------------------------- | |
| # API ROUTES | |
| # ------------------------------------------------------------------- | |
| def legal_consultation(request: QueryRequest): | |
| if llm is None or history_collection is None: | |
| raise HTTPException(status_code=503, detail="Service not ready. Try again later.") | |
| try: | |
| consultation_runnable = get_consultation_runnable() | |
| result = consultation_runnable.invoke( | |
| {"query": request.query}, | |
| {"configurable": {"session_id": request.user_id}} | |
| ) | |
| history_obj = MongoDBChatMessageHistory(history_collection, request.user_id) | |
| all_messages = history_obj.messages | |
| return ConsultationResponseWithHistory( | |
| response=result, | |
| history=[msg.content for msg in all_messages] | |
| ) | |
| except Exception as e: | |
| logger.error(f"Consultation failed: {e}") | |
| raise HTTPException(status_code=500, detail=f"Consultation failed: {e}") | |
| async def get_session_history(session_id: str): | |
| if history_collection is None: | |
| raise HTTPException(status_code=503, detail="Service not ready.") | |
| try: | |
| history_obj = MongoDBChatMessageHistory(history_collection, session_id) | |
| messages = history_obj.messages | |
| history_dicts = [message_to_dict(msg) for msg in messages] | |
| return HistoryResponse(history=history_dicts) | |
| except Exception as e: | |
| logger.error(f"Failed to get history for {session_id}: {e}") | |
| raise HTTPException(status_code=500, detail=f"Failed to retrieve history: {e}") | |
| async def delete_session(session_id: str): | |
| if history_collection is None: | |
| raise HTTPException(status_code=503, detail="Service not ready.") | |
| try: | |
| history_obj = MongoDBChatMessageHistory(history_collection, session_id) | |
| history_obj.clear() | |
| logger.info(f"Deleted session {session_id}") | |
| return DeleteResponse(status="deleted", session_id=session_id) | |
| except Exception as e: | |
| logger.error(f"Failed to delete session {session_id}: {e}") | |
| raise HTTPException(status_code=500, detail=f"Failed to delete session: {e}") | |
| async def health_check(): | |
| return {"status": "healthy", "message": "Groq/Mongo legal consultation service running."} | |
| # ------------------------------------------------------------------- | |
| # RUN SERVER | |
| # ------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| uvicorn.run( | |
| "app:app", | |
| host="0.0.0.0", | |
| port=int(os.getenv("PORT", 7860)), | |
| reload=False | |
| ) | |