consult / app.py
aravsaxena884's picture
final
4379a9c
import os
import logging
from typing import Optional, List
import asyncio
from contextlib import asynccontextmanager
import pymongo
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
import uvicorn
# LangChain imports
from langchain_core.messages import (
BaseMessage, HumanMessage, AIMessage,
message_to_dict, messages_from_dict
)
from langchain_core.prompts import PromptTemplate
from langchain_groq import ChatGroq
from langchain_core.output_parsers import PydanticOutputParser, StrOutputParser
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
# -------------------------------------------------------------------
# CONFIGURATION
# -------------------------------------------------------------------
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- ENVIRONMENT VARIABLES ---
MONGO_URI = os.getenv("MONGO_URI")
MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "chat_history_db")
MONGO_COLLECTION_NAME = os.getenv("MONGO_COLLECTION_NAME", "session_history")
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
if not GROQ_API_KEY:
raise ValueError("GROQ_API_KEY environment variable is required")
if not MONGO_URI:
raise ValueError("MONGO_URI environment variable is required for chat history")
# Global variables
llm = None
history_collection = None
# -------------------------------------------------------------------
# CHAT HISTORY HANDLER (MongoDB)
# -------------------------------------------------------------------
class MongoDBChatMessageHistory(BaseChatMessageHistory):
"""Chat message history stored in MongoDB."""
def __init__(self, collection, session_id: str):
self.collection = collection
self.session_id = session_id
self.collection.create_index("session_id")
@property
def messages(self) -> List[BaseMessage]:
"""Retrieve messages from MongoDB"""
document = self.collection.find_one({"session_id": self.session_id})
if document and "history" in document:
return messages_from_dict(document["history"])
return []
def add_message(self, message: BaseMessage) -> None:
"""Append the message to MongoDB history"""
message_dict = message_to_dict(message)
self.collection.update_one(
{"session_id": self.session_id},
{"$push": {"history": message_dict}},
upsert=True,
)
def clear(self) -> None:
"""Clear history for a session"""
self.collection.delete_one({"session_id": self.session_id})
# -------------------------------------------------------------------
# LIFESPAN INITIALIZATION
# -------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI):
global llm, history_collection
try:
# Initialize LLM
llm = ChatGroq(
groq_api_key=GROQ_API_KEY,
model_name="openai/gpt-oss-20b",
temperature=0,
max_tokens=600
)
logger.info("ChatGroq LLM initialized successfully")
# Initialize MongoDB
mongo_client = pymongo.MongoClient(MONGO_URI)
db = mongo_client[MONGO_DB_NAME]
history_collection = db[MONGO_COLLECTION_NAME]
mongo_client.server_info()
logger.info(f"Connected to MongoDB database '{MONGO_DB_NAME}'")
yield
logger.info("App shutdown complete.")
except Exception as e:
logger.error(f"Initialization failed: {e}")
raise e
# -------------------------------------------------------------------
# FASTAPI APP SETUP
# -------------------------------------------------------------------
app = FastAPI(
title="India Legal Consultation Service",
description="A bilingual (Hindi-English) legal consultation API powered by Groq and MongoDB.",
version="2.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# -------------------------------------------------------------------
# PYDANTIC MODELS
# -------------------------------------------------------------------
class ConsultationResponse(BaseModel):
consultation: str
key_terms: Optional[str] = "general"
consultation_parser = PydanticOutputParser(pydantic_object=ConsultationResponse)
class QueryRequest(BaseModel):
query: str
user_id: str
class ConsultationResponseWithHistory(BaseModel):
response: str
history: List[str]
class HistoryResponse(BaseModel):
history: List[dict] = Field(default_factory=list)
class DeleteResponse(BaseModel):
status: str
session_id: str
# -------------------------------------------------------------------
# LEGAL CONSULTATION PROMPT (FIXED - NO NEWLINE CHARS IN OUTPUT)
# -------------------------------------------------------------------
consultation_prompt = PromptTemplate(
input_variables=["query", "previous_messages"],
template="""You are an expert Indian legal advisor (behave like a senior advocate or legal consultant with deep understanding of Indian laws). Context: Current user query: "{query}" Previous conversation: "{previous_messages}" Instructions: 1. Analyze the user's question under Indian law context (IPC, CrPC, Indian Evidence Act, Constitution, Family Law, Contract Law, IT Act, etc.). 2. Provide a clear, practical, and lawful explanation that can help a normal citizen understand their rights and possible actions. 3. Include relevant sections or provisions from Indian law where applicable. 4. Explain in both Hindi and English (use short, readable paragraphs). 5. Keep the tone professional, supportive, and educational, not judgmental. 6. Avoid giving direct legal verdicts; instead, provide guidance and awareness. 7. Extract one main legal keyword or area (e.g., murder, divorce, contract breach, cybercrime, tenant rights, property dispute). Output: Strictly follow this JSON structure without newline characters: {format_instructions}""",
partial_variables={"format_instructions": consultation_parser.get_format_instructions()},
)
# -------------------------------------------------------------------
# FUNCTIONS TO BUILD CHAINS
# -------------------------------------------------------------------
def get_consultation_chain():
if llm is None:
raise HTTPException(status_code=503, detail="LLM not initialized yet")
return consultation_prompt | llm | StrOutputParser()
def get_consultation_runnable():
consultation_chain = get_consultation_chain()
return RunnableWithMessageHistory(
consultation_chain,
lambda user_id: MongoDBChatMessageHistory(collection=history_collection, session_id=user_id),
input_messages_key="query",
history_messages_key="previous_messages",
)
# -------------------------------------------------------------------
# API ROUTES
# -------------------------------------------------------------------
@app.post("/legal-consultation", response_model=ConsultationResponseWithHistory)
def legal_consultation(request: QueryRequest):
if llm is None or history_collection is None:
raise HTTPException(status_code=503, detail="Service not ready. Try again later.")
try:
consultation_runnable = get_consultation_runnable()
result = consultation_runnable.invoke(
{"query": request.query},
{"configurable": {"session_id": request.user_id}}
)
history_obj = MongoDBChatMessageHistory(history_collection, request.user_id)
all_messages = history_obj.messages
return ConsultationResponseWithHistory(
response=result,
history=[msg.content for msg in all_messages]
)
except Exception as e:
logger.error(f"Consultation failed: {e}")
raise HTTPException(status_code=500, detail=f"Consultation failed: {e}")
@app.get("/get-session-history/{session_id}", response_model=HistoryResponse)
async def get_session_history(session_id: str):
if history_collection is None:
raise HTTPException(status_code=503, detail="Service not ready.")
try:
history_obj = MongoDBChatMessageHistory(history_collection, session_id)
messages = history_obj.messages
history_dicts = [message_to_dict(msg) for msg in messages]
return HistoryResponse(history=history_dicts)
except Exception as e:
logger.error(f"Failed to get history for {session_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve history: {e}")
@app.delete("/delete-session/{session_id}", response_model=DeleteResponse)
async def delete_session(session_id: str):
if history_collection is None:
raise HTTPException(status_code=503, detail="Service not ready.")
try:
history_obj = MongoDBChatMessageHistory(history_collection, session_id)
history_obj.clear()
logger.info(f"Deleted session {session_id}")
return DeleteResponse(status="deleted", session_id=session_id)
except Exception as e:
logger.error(f"Failed to delete session {session_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to delete session: {e}")
@app.get("/health")
async def health_check():
return {"status": "healthy", "message": "Groq/Mongo legal consultation service running."}
# -------------------------------------------------------------------
# RUN SERVER
# -------------------------------------------------------------------
if __name__ == "__main__":
uvicorn.run(
"app:app",
host="0.0.0.0",
port=int(os.getenv("PORT", 7860)),
reload=False
)