from datetime import datetime, timezone import socket import shlex import subprocess import os from shutil import which from llama_index.core import ( Settings, SimpleDirectoryReader, VectorStoreIndex, load_index_from_storage, StorageContext, ) from llama_index.core.retrievers import VectorIndexRetriever from llama_index.core.query_engine import RetrieverQueryEngine from llama_index.core.postprocessor import SimilarityPostprocessor from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.core.llms.mock import MockLLM from llama_index.core.node_parser import MarkdownNodeParser from gnews import GNews from googlenewsdecoder import gnewsdecoder def get_current_time(): return datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M-%S") def check_server(address, port): # Create a TCP socket s = socket.socket() print(f"Attempting to connect to {address} on port {port}", flush=True) try: s.connect((address, port)) print(f"Connected to {address} on port {port}", flush=True) return True except socket.error as e: print(f"Connection to {address} on port {port} failed:{e}", flush=True) return False finally: s.close() def get_valid_filename(filename: str): invalid_chars = frozenset('<>:"/\\|?*') return "".join("-" if c in invalid_chars else c for c in filename) ############################################################################### # Example usage: restart_exec(llama_server_path, "server is listening on") def restart_exec(shell_exec_path_with_args, success_text): print( f"restart_exec(): shell_exec_path_with_args: [{shell_exec_path_with_args}]", flush=True, ) exec_path = shlex.split(shell_exec_path_with_args)[ 0 ] # supposed to have absolute path! exec_name = os.path.basename(exec_path) exec_folder = os.path.dirname(exec_path) # Start a new llama-server process. exec_proc = subprocess.Popen( shell_exec_path_with_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True, cwd=f"{exec_folder}", ) os.set_blocking(exec_proc.stdout.fileno(), False) # type: ignore started = False total_lines = [] while exec_proc.poll() is None: output = exec_proc.stdout.readline() # type: ignore error = exec_proc.stderr.readline() # type: ignore current_line = ( curr_line_output if output and len(curr_line_output := output.strip()) > 0 else "" ) + ( curr_line_error if error and len(curr_line_error := error.strip()) > 0 else "" ) if len(current_line) > 0: print(f"restart_exec(): Current CMD Line: {current_line}") total_lines.append(current_line) if success_text in current_line: print( f"(っ◕‿◕)っ ➔ Found the success text [{success_text}] at [{get_current_time()}].", flush=True, ) started = True break if not started: print( f"Failed to start the {exec_name}. returncode: {exec_proc.returncode}", flush=True, ) print(f"Last logs:\n{'\n'.join(total_lines[-10:])}", flush=True) return started def use_huggingface_cli_to_download_model(model_name, local_dir): print( f"use_huggingface_cli_to_download_model(): model_name: [{model_name}]; local_dir: [{local_dir}]; " f"HF_TOKEN here: [{os.environ['HF_TOKEN'][0:5]}...{os.environ['HF_TOKEN'][-5:]}]", flush=True, ) if huggingface_cli := which("huggingface-cli"): subprocess.run( [huggingface_cli, "login", "--token", os.environ["HF_TOKEN"]], cwd=local_dir ) print(f"Downloading: Start! [{get_current_time()}]", flush=True) subprocess.run( [ huggingface_cli, "download", "--repo-type=model", "tnt306/finetuned-Qwen2.5-7B-Instruct-1M", f"{model_name}.gguf", "--local-dir", local_dir, ], cwd=local_dir, ) print(f"Downloading: End! [{get_current_time()}]", flush=True) else: raise Exception( f"Model {model_name} not found but huggingface-cli is not installed. Can't download." ) ############################################################################### def get_latest_news(topic: str, number_of_news: int = 3) -> str: """ Given the topic, get the latest news from the internet about the topic. Args: topic (str): The topic to search for. number_of_news (int): the number of pieces of news to search for. Default: 3. Returns: str: The latest news about the topic. """ print( f"(っ◕‿◕)っ Tool - get_latest_news(): topic = {topic}; number_of_news = {number_of_news}", flush=True, ) google_news = GNews(language="en", country="US", period="1d", max_results=10) result = google_news.get_news(topic) returned_news = "" for idx, news in enumerate(result[:number_of_news]): # type: ignore decoded_url = gnewsdecoder(news["url"], interval=1) if decoded_url.get("status"): real_url = decoded_url["decoded_url"] else: real_url = news["url"] returned_news += ( f"# News Article {idx}\n" f"Title: {news['title']}\n" f"Source: {news['publisher']['title']}\n" f"URL: [Click here]({real_url})\n" "\n" ) print( f"(っ◕‿◕)っ Tool - get_latest_news(): returned_news = \n{returned_news}\n", flush=True, ) return returned_news ############################################################################### def create_vector_store_index( folder_path: str, persist_dir: str = "vector_store_index" ): Settings.llm = MockLLM() # No need for real LLM. Settings.embed_model = HuggingFaceEmbedding( model_name="BAAI/bge-small-en-v1.5", device="cpu" ) Settings.chunk_size = 1024 # default Settings.chunk_overlap = 20 # default os.environ["TIKTOKEN_CACHE_DIR"] = ( f"{os.path.dirname(os.path.realpath(__file__))}/tiktoken_cache" ) documents = SimpleDirectoryReader(folder_path).load_data() parser = MarkdownNodeParser(include_metadata=True) vector_store_index = VectorStoreIndex.from_documents( documents, transformations=[parser] ) vector_store_index.storage_context.persist(persist_dir=persist_dir) def get_query_from_vector_store_index( persist_dir: str = "vector_store_index", top_k: int = 2 ): Settings.llm = MockLLM() Settings.embed_model = HuggingFaceEmbedding( model_name="BAAI/bge-small-en-v1.5", device="cpu" ) os.environ["TIKTOKEN_CACHE_DIR"] = ( f"{os.path.dirname(os.path.realpath(__file__))}/tiktoken_cache" ) storage_context = StorageContext.from_defaults(persist_dir=persist_dir) index = load_index_from_storage(storage_context) retriever = VectorIndexRetriever( index=index, similarity_top_k=top_k, ) query_engine = RetrieverQueryEngine( retriever=retriever, node_postprocessors=[SimilarityPostprocessor(similarity_cutoff=0.5)], ) def get_rag_context(query: str, separator: str = "\n\n") -> str: print(f"(っ◕‿◕)っ get_rag_context(): query = {query}", flush=True) response = query_engine.query(query) res = separator.join([node.text for node in response.source_nodes]) print(f"(っ◕‿◕)っ get_rag_context(): res = {res}", flush=True) return res return get_rag_context