In [None]:
import dotenv
dotenv.load_dotenv(dotenv.find_dotenv())

In [None]:
import json
from typing import Annotated, List
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import ToolMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition


# Define a State class, that each node in the graph will need
class State(TypedDict):
 # Messages have the type "list". The `add_messages` function
 # in the annotation defines how this state key should be updated
 # (in this case, it appends messages to the list, rather than overwriting them)
 messages: Annotated[list, add_messages]

# Initialize the graph as a stategraph:
graph_builder = StateGraph(State)

In [None]:
def create_llm(use_model):
 # Create the language model
 if use_model == 'gpt-4o-mini':
 from langchain_openai import ChatOpenAI
 print(f'As llm, using OpenAI model: {use_model}')
 llm = ChatOpenAI(
 model_name="gpt-4o-mini",
 temperature=0)
 elif use_model == 'zephyr-7b-alpha':
 from langchain_huggingface import HuggingFaceEndpoint
 print(f'As llm, using HF-Endpint: {use_model}')
 llm = HuggingFaceEndpoint(
 repo_id=f"huggingfaceh4/{use_model}",
 temperature=0.1,
 max_new_tokens=512
 )
 return llm

In [None]:
# Define tools to bind to llm
def create_wiki_tool(verbose):
 print('Creating wiki tool')
 # Let's define a wikipedia-lookup tool 
 from langchain_community.tools import WikipediaQueryRun
 from langchain_community.utilities import WikipediaAPIWrapper

 api_wrapper = WikipediaAPIWrapper(top_k_results=1, doc_content_chars_max=5000)
 tool_wiki = WikipediaQueryRun(api_wrapper=api_wrapper)
 if verbose:
 test_search = "quantum mechanics"
 print(f"Testing wiki tool, with search key: {test_search}")
 response = tool_wiki.run({"query": test_search})
 print(f"Response: {response}")
 return tool_wiki

def get_tools(verbose):
 print('Gathering tools')
 tool_wiki = create_wiki_tool(verbose=verbose) 
 tools = [tool_wiki]
 return tools

In [None]:
def chatbot(state: State):
 tools = get_tools(verbose=False)
 llm = create_llm(use_model='gpt-4o-mini')
 # llm = create_llm(use_model='zephyr-7b-alpha')
 llm_with_tools = llm.bind_tools(tools)
 return {"messages": [llm_with_tools.invoke(state["messages"])]}

In [None]:
# Simpler versions of ToolNode and tools_condition
if False:
 class BasicToolNode:
 """A node that runs the tools requested in the last AIMessage."""

 def __init__(self, tools: list) -> None:
 self.tools_by_name = {tool.name: tool for tool in tools}

 def __call__(self, inputs: dict):
 if messages := inputs.get("messages", []):
 message = messages[-1]
 else:
 raise ValueError("No message found in input")
 outputs = []
 for tool_call in message.tool_calls:
 tool_result = self.tools_by_name[tool_call["name"]].invoke(
 tool_call["args"]
 )
 outputs.append(
 ToolMessage(
 content=json.dumps(tool_result),
 name=tool_call["name"],
 tool_call_id=tool_call["id"],
 )
 )
 return {"messages": outputs}

 def basic_tools_conditions(state: State):
 """
 Use in the conditional_edge to route to the ToolNode if the last message
 has tool calls. Otherwise, route to the end.
 """
 if isinstance(state, list):
 ai_message = state[-1]
 elif messages := state.get("messages", []):
 ai_message = messages[-1]
 else:
 raise ValueError(f"No messages found in input state to tool_edge: {state}")
 print('route_tools: {ai_message}')
 if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
 routing_decision = "tools"
 else:
 routing_decision = END
 return routing_decision


In [None]:
# Define nodes:
graph_builder.add_node("chatbot", chatbot)
# tool_node = BasicToolNode(tools=get_tools(verbose=False))
tool_node = ToolNode(tools=get_tools(verbose=False))
graph_builder.add_node("tools", tool_node)

# Define edges:
# Entry Point
graph_builder.add_edge(START, "chatbot")
# Conditional Edge between the chatbot and the tool node
# The `route_tools` function returns "tools" if the chatbot asks to use a tool, and "END" if
# it is fine directly responding. This conditional routing defines the main agent loop.
graph_builder.add_conditional_edges(
 "chatbot",
 # basic_tools_condition,
 tools_condition
 # The following dictionary lets you tell the graph to interpret the condition's outputs as a specific node
 # It defaults to the identity function, but if you want to use a node named something else apart from "tools",
 # You can update the value of the dictionary to something else e.g., "tools": "my_tools"
 # {"tools": "tools", END: END},
)
# Edge between the tool node and the chatbot
# Any time a tool is called, we return to the chatbot to decide the next step
graph_builder.add_edge("tools", "chatbot")

memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)

In [None]:
from IPython.display import Image, display

try:
 display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
 # This requires some extra dependencies and is optional
 pass

In [None]:
config = {"configurable": {"thread_id": "1"}}

def stream_graph_updates(user_input: str):
 events= graph.stream(
 {"messages": [("user", user_input)]},
 config,
 stream_mode="values"
 )
 for event in events:
 event["messages"][-1].pretty_print()
 #for value in event.values():
 # print("Assistant:", value["messages"][-1].content)


while True:
 try:
 user_input = input("User: ")
 if user_input.lower() in ["quit", "exit", "q"]:
 print("Goodbye!")
 break
 snapshot = graph.get_state(config)
 print(f'Current state: {snapshot}')
 stream_graph_updates(user_input)
 except Exception as e:
 # fallback if input() is not available
 raise Exception(f'An error occured: {e}')