Docosphere / utils.py
adi-123's picture
Update utils.py
d1299a1 verified
raw
history blame
5.07 kB
import os, tempfile, streamlit as st
from typing import List, IO, Tuple
from dotenv import load_dotenv
from PyPDF2 import PdfReader
from docx import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document as LangchainDocument
from langchain_community.vectorstores import FAISS
from cerebras.cloud.sdk import Cerebras # <-- NEW
from langchain.prompts import PromptTemplate
from langchain_together.embeddings import TogetherEmbeddings
load_dotenv()
# ---------- Helpers ---------------------------------------------------------
def get_cerebras_api_key() -> str:
key = os.environ.get("CEREBRAS_API_KEY") or st.secrets.get("CEREBRAS_API_KEY", None)
if not key:
raise EnvironmentError("CEREBRAS_API_KEY not found in env or Streamlit secrets.")
return key
# ---------- File-reading utilities -----------------------------------------
def get_pdf_text(pdf_docs: List[IO[bytes]]) -> str:
txt = ""
for pdf in pdf_docs:
for page in PdfReader(pdf).pages:
if (t := page.extract_text()):
txt += t + "\n"
return txt
def get_docx_text(docx_docs: List[IO[bytes]]) -> str:
txt = ""
for d in docx_docs:
with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as tmp:
tmp.write(d.getvalue()); tmp.flush()
try:
doc = Document(tmp.name)
txt += "\n".join(p.text for p in doc.paragraphs) + "\n"
finally:
os.unlink(tmp.name)
return txt
def get_text_chunks(text: str) -> List[str]:
return RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200).split_text(text)
# ---------- Vector-store build & save --------------------------------------
def get_vector_store(text_chunks: List[str]) -> None:
api_key = get_together_api_key()
embeddings = TogetherEmbeddings(model="BAAI/bge-base-en-v1.5", api_key=api_key)
vector_store = FAISS.from_texts(text_chunks, embedding=embeddings)
vector_store.save_local("faiss_index")
# ---------- QA chain helpers ----------------------------------------------
def get_conversational_chain() -> Tuple[Cerebras, PromptTemplate]:
# Cerebras client is instantiated here, prompt template unchanged
client = Cerebras(api_key=get_cerebras_api_key())
prompt = PromptTemplate(
template=(
"As a professional assistant, provide a detailed and formally written "
"answer to the question using the provided context.\n\nContext:\n{context}\n\n"
"Question:\n{question}\n\nAnswer:"
),
input_variables=["context", "question"]
)
return client, prompt
def self_assess(question: str) -> str:
client = Cerebras(api_key=get_cerebras_api_key())
msgs = [
{"role": "system", "content": "You are an expert assistant…"},
{"role": "user", "content": (
"If you can confidently answer the following question from your own "
"knowledge, do so; otherwise reply with 'NEED_RETRIEVAL'.\n\n"
f"Question: {question}"
)}
]
result = client.chat.completions.create(
messages=msgs,
model="llama-3.3-70b",
max_completion_tokens=1024,
temperature=0.2,
top_p=1,
stream=False
)
return result.choices[0].message.content.strip()
def process_docs_for_query(docs: List[LangchainDocument], question: str) -> str:
if not docs:
return "Sorry, I couldn’t find relevant info in the documents."
ctx = "\n\n".join(d.page_content for d in docs)
client, prompt = get_conversational_chain()
prompt_text = prompt.format(context=ctx, question=question)
result = client.chat.completions.create(
messages=[{"role": "user", "content": prompt_text}],
model="llama-3.3-70b",
max_completion_tokens=1024,
temperature=0.2,
top_p=1,
stream=False
)
return result.choices[0].message.content
# ---------- Main user-query orchestrator -----------------------------------
def user_input(user_question: str) -> None:
assessment = self_assess(user_question)
need_retrieval = assessment.upper() == "NEED_RETRIEVAL"
st.info("🔍 Searching documents…" if need_retrieval else "💡 Using model knowledge…")
try:
if need_retrieval:
# Embeddings usage remains, need to replace TogetherEmbeddings if you want Cerebras embedding alternative
api_key = get_cerebras_api_key()
# Comment or replace TogetherEmbeddings below if unsupported
embeddings = TogetherEmbeddings(model="BAAI/bge-base-en-v1.5", api_key=api_key)
vs = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True)
docs = vs.similarity_search(user_question)
answer = process_docs_for_query(docs, user_question)
else:
answer = assessment
st.markdown("### Answer")
st.markdown(answer)
except Exception as e:
st.error(f"⚠️ Error: {e}")