import os import json import gradio as gr import zipfile import tempfile import requests import urllib.parse import io from langchain_community.vectorstores import Chroma from huggingface_hub import HfApi, login from PyPDF2 import PdfReader from langchain_huggingface import HuggingFaceEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_groq import ChatGroq from dotenv import load_dotenv from langchain.docstore.document import Document from langchain.schema import Document from chunk_python_code import chunk_python_code_with_metadata from vectorstore import get_chroma_vectorstore from download_gitlab_repo import download_and_upload_kadiAPY_repo_to_huggingfacespace from process_repo_zipfile.py import extract_files_and_filepath_from_dir # Load environment variables from .env file load_dotenv() # Load configuration from JSON file with open('config.json') as config_file: config = json.load(config_file) with open("config2.json", "r") as file: config2 = json.load(file) PERSIST_DOC_DIRECTORY = config["persist_doc_directory"] PERSIST_CODE_DIRECTORY =config["persist_code_directory"] CHUNK_SIZE = config["chunk_size"] CHUNK_OVERLAP = config["chunk_overlap"] EMBEDDING_MODEL_NAME = config["embedding_model"] LLM_MODEL_NAME = config["llm_model"] LLM_TEMPERATURE = config["llm_temperature"] GITLAB_API_URL = config["gitlab_api_url"] HF_SPACE_NAME = config["hf_space_name"] DATA_DIR = config["data_dir"] GROQ_API_KEY = os.environ["GROQ_API_KEY"] HF_TOKEN = os.environ["HF_Token"] login(HF_TOKEN) api = HfApi() def load_project_id(json_file): with open(json_file, 'r') as f: data = json.load(f) return data['project_id'] def download_gitlab_repo(): print("Start the upload_gitRepository function") project_id = load_project_id('repository_ids.json') encoded_project_id = urllib.parse.quote_plus(project_id) # Define the URL to download the repository archive archive_url = f"{GITLAB_API_URL}/projects/{encoded_project_id}/repository/archive.zip" # Download the repository archive response = requests.get(archive_url) archive_bytes = io.BytesIO(response.content) # Retrieve the original file name from the response headers content_disposition = response.headers.get('content-disposition') if content_disposition: filename = content_disposition.split('filename=')[-1].strip('\"') else: filename = 'archive.zip' # Fallback to a default name if not found # Check if the file already exists in the repository existing_files = api.list_repo_files(repo_id=HF_SPACE_NAME, repo_type='space') target_path = f"{DATA_DIR}/{filename}" print(f"Target Path: '{target_path}'") print(f"Existing Files: {[repr(file) for file in existing_files]}") if target_path in existing_files: print(f"File '{target_path}' already exists in the repository. Skipping upload...") else: # Upload the ZIP file to the new folder in the Hugging Face space repository print("Uploading File to directory:") print(f"Archive Bytes: {repr(archive_bytes.getvalue())[:100]}") # Show a preview of bytes print(f"Target Path in Repo: '{target_path}'") api.upload_file( path_or_fileobj=archive_bytes, path_in_repo=target_path, repo_id=HF_SPACE_NAME, repo_type='space' ) print("Upload complete") def split_python_code_into_chunks(texts, file_paths): chunks = [] for text, file_path in zip(texts, file_paths): document_chunks = chunk_python_code_with_metadata(text, file_path) chunks.extend(document_chunks) return chunks # Split text into chunks def split_into_chunks(texts, references, chunk_size, chunk_overlap): text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) chunks = [] for text, reference in zip(texts, references): chunks.extend([ Document( page_content=chunk, metadata={ "source": reference, "usage": "doc" } ) for chunk in text_splitter.split_text(text) ]) return chunks # Setup Vectorstore def embed_documents_into_vectorstore(chunks, model_name, persist_directory): print("Start setup_vectorstore_function") embedding_model = HuggingFaceEmbeddings(model_name=model_name) vectorstore = get_chroma_vectorstore(embedding_model, persist_directory) vectorstore.add_documents(chunks) return vectorstore # Setup LLM def setup_llm(model_name, temperature, api_key): llm = ChatGroq(model=model_name, temperature=temperature, api_key=api_key) return llm def format_kadi_apy_library_context(docs): doc_context = [] for doc in docs: # Extract metadata information class_info = doc.metadata.get("class", "Unknown Class") type_info = doc.metadata.get("type", "Unknown Type") source_info = doc.metadata.get("source", "Unknown Type") print(":}\n\n", doc.page_content) formatted_doc = f"# source: {source_info}\n# class: {class_info}\n# type: {type_info}\n{doc.page_content}\n\n\n" doc_context.append(formatted_doc) return doc_context def format_kadi_api_doc_context(docs): doc_context = [] for doc in docs: source_info = doc.metadata.get("source", "Unknown Type") print(":}\n\n", doc.page_content) formatted_doc = f"# source: {source_info}\n{doc.page_content}\n\n\n" doc_context.append(formatted_doc) return doc_context def rag_workflow(query): """ RAGChain class to perform the complete RAG workflow. """ # Assume 'llm' and 'vector_store' are already initialized instances rag_chain = RAGChain(llm, vector_store) # Step 1: Predict which library usage is relevant library_usage_prediction = rag_chain.predict_library_usage(query) print(f"Predicted library usage: {library_usage_prediction}") # Step 2: Retrieve contexts (documents and code snippets) doc_contexts, code_contexts = rag_chain.retrieve_contexts(query, library_usage_prediction) print("Retrieved Document Contexts:", doc_contexts) print("Retrieved Code Contexts:", code_contexts) # Step 3: Format the contexts formatted_doc_context, formatted_code_context = rag_chain.format_context(doc_contexts, code_contexts) print("Formatted Document Contexts:", formatted_doc_context) print("Formatted Code Contexts:", formatted_code_context) # Step 4: Generate the final response response = rag_chain.generate_response(query, formatted_doc_context, formatted_code_context) print("Generated Response:", response) return response def get_chroma_vectorstore2(embedding_model): # Define the persist_directory path vectorstore_path = "/home/user/data" # Ensure the directory exists os.makedirs(vectorstore_path, exist_ok=True) # Creates it if it doesn't exist print(f"Using persist_directory: {vectorstore_path}") # Initialize the Chroma vectorstore with the specified persist_directory vectorstore = Chroma(persist_directory=vectorstore_path, embedding_function=embedding_model) return vectorstore def initialize(): global vector_store, chunks, llm download_and_upload_kadiAPY_repo_to_huggingfacespace() code_texts, code_references = extract_files_and_filepath_from_dir(DATA_DIR, ['kadi_apy'], []) doc_texts, kadiAPY_doc_references = extract_files_and_filepath_from_dir(DATA_DIR, ['docs/source/'], []) print("LEEEEEEEEEEEENGTH of code_texts: ", len(code_texts)) print("LEEEEEEEEEEEENGTH of doc_files: ", len(doc_texts)) code_chunks = split_python_code_into_chunks(code_texts, code_references) doc_chunks = split_into_chunks(doc_texts, kadiAPY_doc_references, CHUNK_SIZE, CHUNK_OVERLAP) print(f"Total number of code_chunks: {len(code_chunks)}") print(f"Total number of doc_chunks: {len(doc_chunks)}") filename = "test" vector_store = embed_documents_into_vectorstore(doc_chunks + code_chunks, EMBEDDING_MODEL_NAME, f"{DATA_DIR}/{filename}") llm = setup_llm(LLM_MODEL_NAME, LLM_TEMPERATURE, GROQ_API_KEY) from langchain_community.document_loaders import TextLoader initialize() # Gradio utils def check_input_text(text): if not text: gr.Warning("Please input a question.") raise TypeError return True def add_text(history, text): history = history + [(text, None)] yield history, "" import gradio as gr def bot_kadi(history): user_query = history[-1][0] response = rag_workflow(user_query) history[-1] = (user_query, response) yield history def main(): with gr.Blocks() as demo: gr.Markdown("## KadiAPY - AI Coding-Assistant") gr.Markdown("AI assistant for KadiAPY based on RAG architecture powered by LLM") with gr.Tab("KadiAPY - AI Assistant"): with gr.Row(): with gr.Column(scale=10): chatbot = gr.Chatbot([], elem_id="chatbot", label="Kadi Bot", bubble_full_width=False, show_copy_button=True, height=600) user_txt = gr.Textbox(label="Question", placeholder="Type in your question and press Enter or click Submit") with gr.Row(): with gr.Column(scale=1): submit_btn = gr.Button("Submit", variant="primary") with gr.Column(scale=1): clear_btn = gr.Button("Clear", variant="stop") gr.Examples( examples=[ "Who is working on Kadi4Mat?", "How do i install the Kadi-Apy library?", "How do i install the Kadi-Apy library for development?", "I need a method to upload a file to a record", ], inputs=user_txt, outputs=chatbot, fn=add_text, label="Try asking...", cache_examples=False, examples_per_page=3, ) user_txt.submit(check_input_text, user_txt, None).success(add_text, [chatbot, user_txt], [chatbot, user_txt]).then(bot_kadi, [chatbot], [chatbot]) submit_btn.click(check_input_text, user_txt, None).success(add_text, [chatbot, user_txt], [chatbot, user_txt]).then(bot_kadi, [chatbot], [chatbot]) clear_btn.click(lambda: None, None, chatbot, queue=False) demo.launch() if __name__ == "__main__": main()