Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import requests | |
| import os | |
| import time | |
| from datetime import timedelta | |
| from openai import OpenAI | |
| from pinecone import Pinecone | |
| import uuid | |
| import re | |
| import pandas as pd | |
| from google.cloud import storage | |
| from elevenlabs.client import ElevenLabs, AsyncElevenLabs | |
| from elevenlabs import play, save, Voice, stream | |
| from pymongo.mongo_client import MongoClient | |
| from utils import create_folders | |
| from gcp import download_credentials | |
| from csv import writer | |
| import asyncio | |
| import httpx | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| MODEL_OPENAI = os.getenv("MODEL_OPENAI") | |
| PINECONE_API_TOKEN = os.getenv("PINECONE_API_TOKEN") | |
| PINECONE_ENVIRONMENT = os.getenv("PINECONE_ENV") | |
| PINECONE_HOST = os.getenv("PINECONE_HOST") | |
| DB_USER_NAME = os.getenv("DB_USER_NAME") | |
| DB_PASSWORD = os.getenv("DB_PASSWORD") | |
| API_KEY_ELEVENLABS = os.getenv("API_KEY_ELEVENLABS") | |
| D_ID_KEY = os.getenv("D_ID_KEY") | |
| IMG_XAVY = os.getenv("IMG_XAVY") | |
| CREDENTIALS_GCP = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") | |
| NAME_BUCKET = os.getenv("NAME_BUCKET") | |
| URL_AUDIO = os.getenv("URL_AUDIO") | |
| # Chat | |
| openai_client = OpenAI(api_key=OPENAI_API_KEY) | |
| # Vector store | |
| pc = Pinecone(api_key=PINECONE_API_TOKEN) | |
| index = pc.Index(host=PINECONE_HOST) | |
| # Database | |
| uri = f"mongodb+srv://{DB_USER_NAME}:{DB_PASSWORD}@cluster-rob01.3fpztfw.mongodb.net/?retryWrites=true&w=majority&appName=cluster-rob01" | |
| client = MongoClient(uri) | |
| db = client["ChatCrunchyroll"] | |
| collection = db["history_msg"] | |
| def _save_history_msg(): | |
| return None | |
| def _add_question_vectorstore(question: str, response: str): | |
| vector_id = str(uuid.uuid4()) | |
| vector_embedding = _call_embedding(question) | |
| vector_metadata = { | |
| 'question': question, | |
| 'text': response | |
| } | |
| index.upsert([(vector_id, vector_embedding, vector_metadata)]) | |
| def _update_elements(question, chatbot, output, history_messages, url_audio, url_video, df_table_times): | |
| chatbot.append([question, output]) | |
| new_comp_audio = gr.Audio(value=str(url_audio), autoplay=False, label="Audio") | |
| new_comp_video = gr.Video(value=str(url_video), autoplay=True, height=400, label="Video") | |
| history_messages.append({'role': 'user', 'content': question}) | |
| history_messages.append({'role': 'assistant', 'content': output}) | |
| return chatbot, new_comp_audio, new_comp_video, df_table_times | |
| def _query_pinecone(embedding): | |
| results = index.query( | |
| vector=embedding, | |
| top_k=10, | |
| include_metadata=True, | |
| ) | |
| final_results = """""" | |
| for result in results['matches']: | |
| final_results += f"{result['metadata']['text']}\n" | |
| return final_results | |
| def _general_prompt(context, option_prompt, general_prompt): | |
| if option_prompt == "Default": | |
| with open("prompt_general.txt", "r") as file: | |
| file_prompt = file.read().replace("\n", "") | |
| elif option_prompt == "Custom": | |
| file_prompt = general_prompt | |
| context_prompt = file_prompt.replace('CONTEXT', context) | |
| print(context_prompt) | |
| print("--------------------") | |
| return context_prompt | |
| def _call_embedding(text: str): | |
| response = openai_client.embeddings.create( | |
| input=text, | |
| model='text-embedding-ada-002' | |
| ) | |
| return response.data[0].embedding | |
| def _call_gpt(prompt: str, message: str): | |
| response = openai_client.chat.completions.create( | |
| model=MODEL_OPENAI, | |
| temperature=0.2, | |
| messages=[ | |
| {'role': 'system', 'content': prompt}, | |
| {'role': 'user', 'content': message} | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| def _call_gpt_standalone(prompt: str): | |
| response = openai_client.chat.completions.create( | |
| model=MODEL_OPENAI, | |
| temperature=0.2, | |
| messages=[ | |
| {'role': 'system', 'content': prompt}, | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| def _get_standalone_question(question, history_messages, option_prompt, standalone_prompt): | |
| if option_prompt == "Default": | |
| with open("prompt_standalone_message.txt", "r") as file: | |
| file_prompt_standalone = file.read().replace("\n", "") | |
| elif option_prompt == "Custom": | |
| file_prompt_standalone = standalone_prompt | |
| history = '' | |
| for i, msg in enumerate(history_messages): | |
| try: | |
| if i == 0: | |
| continue # Omit the prompt | |
| if i % 2 == 0: | |
| history += f'user: {msg["content"]}\n' | |
| else: | |
| history += f'assistant: {msg["content"]}\n' | |
| except Exception as e: | |
| print(e) | |
| prompt_standalone = file_prompt_standalone.replace('HISTORY', history).replace('QUESTION', question) | |
| print(prompt_standalone) | |
| print("------------------") | |
| standalone_msg_q = _call_gpt_standalone(prompt_standalone) | |
| print(standalone_msg_q) | |
| print("------------------") | |
| return standalone_msg_q | |
| def _create_clean_message(text: str): | |
| clean_answer = re.sub(r'http[s]?://\S+', 'el siguiente link', text) | |
| return clean_answer | |
| async def _create_audio(clean_text: str, option_audio: str): | |
| download_credentials() | |
| create_folders() | |
| STORAGE_CLIENT = storage.Client.from_service_account_json(CREDENTIALS_GCP) | |
| unique_id = str(uuid.uuid4()) | |
| signed_url_audio = "None" | |
| if option_audio == "Elevenlabs": | |
| # Create audio file with elevenlabs | |
| client_elevenlabs = ElevenLabs(api_key=API_KEY_ELEVENLABS) | |
| voice_custom = Voice(voice_id = "ZQe5CZNOzWyzPSCn5a3c") | |
| audio = client_elevenlabs.generate( | |
| text=clean_text, | |
| voice=voice_custom, | |
| model="eleven_multilingual_v2" | |
| ) | |
| source_audio_file_name = f'./audios/file_audio_{unique_id}.wav' | |
| try: | |
| save(audio, source_audio_file_name) | |
| except Exception as e: | |
| print(e) | |
| # Save audio and get url of gcp | |
| destination_blob_name_audio = unique_id + '.wav' | |
| bucket = STORAGE_CLIENT.bucket(NAME_BUCKET) | |
| blob = bucket.blob(destination_blob_name_audio) | |
| try: | |
| blob.upload_from_filename(source_audio_file_name) | |
| except Exception as e: | |
| print(e) | |
| try: | |
| url_expiration = timedelta(minutes=15) | |
| signed_url_audio = blob.generate_signed_url(expiration=url_expiration) | |
| except Exception as e: | |
| print(e) | |
| elif option_audio == "XTTS": | |
| params = {'text': clean_text, 'language': 'es'} | |
| headers = {'accept': 'application/json'} | |
| # Makes a request to the instance with the audio api | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.get(URL_AUDIO, params=params, headers=headers, timeout=120) | |
| except Exception as e: | |
| print(f'There is a problem with the audio. Check that instance. ERROR: {e}') | |
| # Check if everything was successful | |
| if response.status_code == 200: | |
| r = response.json() | |
| signed_url_audio = r['link_audio'] | |
| else: | |
| print(f'There is a problem with the audio. Check that instance. ERROR: {response.status_code}') | |
| return signed_url_audio, unique_id | |
| def _create_video(link_audio: str, unique_id: str): | |
| download_credentials() | |
| create_folders() | |
| STORAGE_CLIENT = storage.Client.from_service_account_json(CREDENTIALS_GCP) | |
| bucket = STORAGE_CLIENT.bucket(NAME_BUCKET) | |
| # Create video talk with file audio created by elevenlabs api | |
| url_did = "https://api.d-id.com/talks" | |
| payload = { | |
| "script": { | |
| "type": "audio", | |
| "provider": { | |
| "type": "microsoft", | |
| "voice_id": "en-US-JennyNeural" | |
| }, | |
| "ssml": "false", | |
| "audio_url": link_audio | |
| }, | |
| "config": { | |
| "fluent": "false", | |
| "pad_audio": "0.0", | |
| "stitch": True | |
| }, | |
| "source_url": IMG_XAVY | |
| } | |
| headers = { | |
| "accept": "application/json", | |
| "content-type": "application/json", | |
| "authorization": f"Basic {D_ID_KEY}" | |
| } | |
| request_create_talk = requests.post(url_did, json=payload, headers=headers) | |
| resp_create_talk = request_create_talk.json() | |
| talk_id = "None" | |
| try: | |
| talk_id = resp_create_talk['id'] | |
| except Exception as e: | |
| print(e) | |
| # Get url of video file | |
| url_get_talk_id = f"https://api.d-id.com/talks/{talk_id}" | |
| while True: | |
| request_video_url = requests.get(url_get_talk_id, headers=headers) | |
| resp_video_url = request_video_url.json() | |
| if resp_video_url['status'] == 'done': | |
| break | |
| # Sleep until the video is ready | |
| time.sleep(0.5) | |
| result_url_video = resp_video_url['result_url'] | |
| # Saves the video into a file to later upload it to the GCP | |
| source_video_file_name = f'./videos/video_final_{unique_id}.mp4' | |
| request_video = requests.get(result_url_video) | |
| if request_video.status_code == 200: | |
| with open(source_video_file_name, 'wb') as outfile: | |
| outfile.write(request_video.content) | |
| # Save video file to the GCP | |
| destination_blob_name_video = unique_id + '.mp4' | |
| # Configure bucket | |
| blob = bucket.blob(destination_blob_name_video) | |
| try: | |
| blob.upload_from_filename(source_video_file_name) | |
| except Exception as e: | |
| print(e) | |
| signed_url_video = "None" | |
| try: | |
| url_expiration_video = timedelta(minutes=15) | |
| signed_url_video = blob.generate_signed_url(expiration=url_expiration_video) | |
| except Exception as e: | |
| print(e) | |
| return signed_url_video | |
| def get_answer(question: str, chatbot: list[tuple[str, str]], history_messages, comp_audio, comp_video, df_table, option_audio, option_prompt, general_prompt, standalone_prompt): | |
| """ | |
| Gets the answer of the chatbot | |
| """ | |
| if len(chatbot) == 8: | |
| message_output = 'Un placer haberte ayudado, hasta luego!' | |
| else: | |
| start_get_standalone_question = time.time() | |
| standalone_msg_q = _get_standalone_question(question, history_messages, option_prompt, standalone_prompt) # create standalone question or message | |
| end_get_standalone_question = time.time() | |
| time_get_standalone_question = end_get_standalone_question - start_get_standalone_question | |
| start_call_embedding = time.time() | |
| output_embedding = _call_embedding(standalone_msg_q) # create embedding of standalone question or message | |
| end_call_embedding = time.time() | |
| time_call_embedding = end_call_embedding - start_call_embedding | |
| start_query_pinecone = time.time() | |
| best_results = _query_pinecone(output_embedding) # get nearest embeddings | |
| end_query_pinecone = time.time() | |
| time_query_pinecone = end_query_pinecone - start_query_pinecone | |
| start_general_prompt = time.time() | |
| final_context_prompt = _general_prompt(best_results, option_prompt, general_prompt) # create context/general prompt | |
| end_general_prompt = time.time() | |
| time_general_prompt = end_general_prompt - start_general_prompt | |
| start_call_gpt = time.time() | |
| message_output = _call_gpt(final_context_prompt, question) # final response (to user) | |
| end_call_gpt = time.time() | |
| time_call_gpt = end_call_gpt - start_call_gpt | |
| if "Respuesta:" in message_output: | |
| message_output.replace("Respuesta:", "") | |
| start_create_clean_message = time.time() | |
| processed_message = _create_clean_message(message_output) # clean message output | |
| end_create_clean_message = time.time() | |
| time_create_clean_message = end_create_clean_message - start_create_clean_message | |
| start_create_audio = time.time() | |
| url_audio, unique_id = asyncio.run(_create_audio(processed_message, option_audio)) # create audio | |
| end_create_audio = time.time() | |
| time_create_audio = end_create_audio - start_create_audio | |
| start_create_video = time.time() | |
| url_video = _create_video(url_audio, unique_id) # create video with d-id no streaming | |
| end_create_video = time.time() | |
| time_create_video = end_create_video - start_create_video | |
| final_time = time_get_standalone_question + time_call_embedding + time_query_pinecone + time_general_prompt | |
| final_time += (time_call_gpt + time_create_clean_message + time_create_audio + time_create_video) | |
| df_table = pd.DataFrame(df_table) | |
| df_table.loc[len(df_table.index)] = [question, | |
| message_output, | |
| time_get_standalone_question, | |
| time_call_embedding, | |
| time_query_pinecone, | |
| time_general_prompt, | |
| time_call_gpt, | |
| time_create_clean_message, | |
| time_create_audio, | |
| time_create_video, | |
| final_time] | |
| new_df_table = gr.DataFrame(df_table, interactive=False, visible=True) | |
| print(history_messages) | |
| return _update_elements(question, chatbot, message_output, history_messages, url_audio, url_video, new_df_table) | |
| def init_greeting(chatbot, history_messages): | |
| if len(chatbot) == 0: | |
| greeting = ('Hola 👋, soy tu asistente de recomendación de series y películas animadas en Crunchyroll. ¿En qué puedo ayudarte hoy?') | |
| history_messages.append({'role': 'assistant', 'content': greeting}) | |
| chatbot.append([None, greeting]) | |
| return chatbot, history_messages | |
| def export_dataframe(df): | |
| final_df = pd.DataFrame(df) | |
| final_df = final_df.iloc[1:] | |
| final_df.to_csv("./csv_times/csv_times.csv", index=False, encoding='utf-8') | |
| return gr.File(value="./csv_times/csv_times.csv", visible=True) | |