Source code for langchain_arangodb.chat_message_histories.arangodb

import time
from typing import Any, List, Optional, Union

from arango.database import StandardDatabase
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, messages_from_dict


[docs] class ArangoChatMessageHistory(BaseChatMessageHistory): """Chat message history stored in an ArangoDB database. This class provides persistent storage for chat message histories using ArangoDB as the backend. It supports session-based message storage with automatic collection creation and indexing. :param session_id: Unique identifier for the chat session. :type session_id: Union[str, int] :param db: ArangoDB database instance for storing chat messages. :type db: arango.database.StandardDatabase :param collection_name: Name of the ArangoDB collection to store messages. Defaults to "ChatHistory". :type collection_name: str :param window: Maximum number of messages to keep in memory (currently unused). Defaults to 3. :type window: int :param args: Additional positional arguments passed to BaseChatMessageHistory. :type args: Any :param kwargs: Additional keyword arguments passed to BaseChatMessageHistory. :type kwargs: Any .. code-block:: python from arango import ArangoClient from langchain_arangodb.chat_message_histories import ArangoChatMessageHistory # Connect to ArangoDB client = ArangoClient("http://localhost:8529") db = client.db("test", username="root", password="openSesame") # Create chat message history history = ArangoChatMessageHistory( session_id="user_123", db=db, collection_name="chat_sessions" ) # Add messages history.add_user_message("Hello! How are you?") history.add_ai_message("I'm doing well, thank you!") # Add QA message history.add_qa_message( user_input="Who is the first character?", aql_query="FOR doc IN Characters LIMIT 1 RETURN doc", result="The first character is Arya Stark." ) # Retrieve messages messages = history.messages print(f"Found {len(messages)} messages") # Retrieve messages by role human_messages = history.get_messages(role="human") ai_messages = history.get_messages(role="ai") qa_messages = history.get_messages(role="qa") # Clear session history.clear() """ def __init__( self, session_id: Union[str, int], db: StandardDatabase, collection_name: str = "ChatHistory", window: int = 3, *args: Any, **kwargs: Any, ) -> None: # Make sure session id is not null if not session_id: raise ValueError("Please ensure that the session_id parameter is provided") self._session_id = str(session_id) self._db = db self._collection_name = collection_name self._window = window # TODO: Use this if not self._db.has_collection(collection_name): self._db.create_collection(collection_name) self._collection = self._db.collection(self._collection_name) has_index = False for index in self._collection.indexes(): # type: ignore if "session_id" in index["fields"]: has_index = True break if not has_index: self._collection.add_persistent_index(["session_id"], unique=False) super().__init__(*args, **kwargs) @property def messages(self) -> List[BaseMessage]: """Retrieve the messages from ArangoDB. Retrieves all messages for the current session from the ArangoDB collection, sorted by timestamp in descending order (most recent first). :return: List of chat messages for the current session, sorted in reverse chronological order (most recent first). :rtype: List[BaseMessage] .. code-block:: python # Get all messages for the session messages = history.messages for msg in messages: print(f"{msg.type}: {msg.content}") # Check if session has any messages if history.messages: print(f"Session has {len(history.messages)} messages") else: print("No messages in this session") """ query = """ FOR doc IN @@col FILTER doc.session_id == @session_id SORT doc.time DESC RETURN UNSET(doc, ["session_id", "_id", "_rev"]) """ bind_vars = {"@col": self._collection_name, "session_id": self._session_id} cursor = self._db.aql.execute(query, bind_vars=bind_vars) # type: ignore messages = [ {"data": {"content": res["content"]}, "type": res["role"]} for res in cursor # type: ignore ] return messages_from_dict(messages) @messages.setter def messages(self, messages: List[BaseMessage]) -> None: raise NotImplementedError( "Direct assignment to 'messages' is not allowed." " Use the 'add_messages' instead." )
[docs] def get_messages( self, role: Optional[str] = None, n_messages: int = 10, excluded_fields: list[str] = ["_id", "_key", "_rev", "session_id", "time"], ) -> list: """Retrieve messages from ArangoDB, optionally filtered by role. :param role: Optional filter to retrieve messages of a specific role. :type role: Optional[str] :param n_messages: Number of messages to retrieve. :type n_messages: int :param excluded_fields: Fields to exclude from the returned messages. :type excluded_fields: list[str] .. code-block:: python # Get all types of messages, default is 10 messages messages = history.get_messages() # Get the first 20 human messages messages = history.get_messages(role="human", n_messages=20) # Get the first 20 AI messages messages = history.get_messages(role="ai", n_messages=20) """ query = f""" FOR doc IN @@col FILTER doc.session_id == @session_id {"AND doc.role == @role" if role else ""} SORT doc.time DESC LIMIT @n RETURN UNSET(doc, @excluded_fields) """ bind_vars = { "@col": self._collection_name, "session_id": self._session_id, "n": n_messages, "excluded_fields": excluded_fields, } if role is not None: bind_vars["role"] = role cursor = self._db.aql.execute(query, bind_vars=bind_vars) # type: ignore # return in chronological order return [d for d in cursor][::-1] # type: ignore
[docs] def add_message(self, message: BaseMessage) -> None: """Append the message to the record in ArangoDB. Stores a single chat message in the ArangoDB collection associated with the current session. The message is stored with its type, content, and session identifier. :param message: The chat message to add to the history. :type message: BaseMessage .. code-block:: python from langchain_core.messages import HumanMessage, AIMessage # Add user message user_msg = HumanMessage(content="What is the weather like?") history.add_message(user_msg) # Add AI response ai_msg = AIMessage(content="I don't have access to current weather data.") history.add_message(ai_msg) # Or use convenience methods history.add_user_message("Hello!") history.add_ai_message("Hi there!") """ self._db.collection(self._collection_name).insert( { "role": message.type, "content": message.content, "session_id": self._session_id, "time": time.time(), }, )
[docs] def add_qa_message(self, user_input: str, aql_query: str, result: str) -> None: """Add a QA message to the chat history. :param user_input: The user's input. :type user_input: str :param aql_query: The AQL query to execute. :type aql_query: str :param result: The result of the AQL query. :type result: str .. code-block:: python history.add_qa_message( user_input="Who is the first character?", aql_query="FOR doc IN Characters LIMIT 1 RETURN doc", result="The first character is Arya Stark." ) """ self._db.collection(self._collection_name).insert( { "role": "qa", "session_id": self._session_id, "time": time.time(), "user_input": user_input, "aql_query": aql_query, "result": result, } )
[docs] def clear(self) -> None: """Clear session memory from ArangoDB. Removes all messages associated with the current session from the ArangoDB collection. The collection itself is preserved for future use. .. code-block:: python # Add some messages history.add_user_message("Hello") history.add_ai_message("Hi!") print(f"Messages before clear: {len(history.messages)}") # Output: 2 # Clear all messages for this session history.clear() print(f"Messages after clear: {len(history.messages)}") # Output: 0 # Collection still exists for future messages history.add_user_message("Starting fresh conversation") print(f"New message count: {len(history.messages)}") # Output: 1 """ query = """ FOR doc IN @@col FILTER doc.session_id == @session_id REMOVE doc IN @@col """ bind_vars = {"@col": self._collection_name, "session_id": self._session_id} self._db.aql.execute(query, bind_vars=bind_vars) # type: ignore