Source code for langchain_arangodb.vectorstores.arangodb_vector

from __future__ import annotations

from enum import Enum
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    Iterator,
    List,
    MutableMapping,
    Optional,
    Sequence,
    Tuple,
    Type,
    Union,
    cast,
    overload,
)

import farmhash
import numpy as np
from arango.aql import Cursor
from arango.database import StandardDatabase
from arango.exceptions import ArangoServerError, ViewGetError
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
from langchain_core.vectorstores.utils import maximal_marginal_relevance
from packaging import version

from langchain_arangodb.vectorstores.utils import DistanceStrategy

DEFAULT_DISTANCE_STRATEGY = DistanceStrategy.COSINE
DISTANCE_MAPPING = {
    DistanceStrategy.EUCLIDEAN_DISTANCE: "l2",
    DistanceStrategy.COSINE: "cosine",
}


[docs] class SearchType(str, Enum): """Enumerator of the search types.""" VECTOR = "vector" HYBRID = "hybrid"
DEFAULT_SEARCH_TYPE = SearchType.VECTOR # Constants for RRF DEFAULT_RRF_CONSTANT = 60 # Standard constant for RRF DEFAULT_SEARCH_LIMIT = 100 # Default limit for initial search results # Full-text search analyzer options DEFAULT_ANALYZER = "text_en" # Default analyzer for full-text search SUPPORTED_ANALYZERS = [ "text_en", "text_de", "text_es", "text_fi", "text_fr", "text_it", "text_nl", "text_no", "text_pt", "text_ru", "text_sv", "text_zh", ]
[docs] class ArangoVector(VectorStore): """ArangoDB vector store implementation for LangChain. This class provides a vector store implementation using ArangoDB as the backend. It supports both vector similarity search and hybrid search (vector + keyword) capabilities. :param embedding: The embedding function to use for converting text to vectors. Must implement the `langchain.embeddings.base.Embeddings` interface. :type embedding: langchain.embeddings.base.Embeddings :param embedding_dimension: The dimensionality of the embedding vectors. Must match the output dimension of the embedding function. :type embedding_dimension: int :param database: The ArangoDB database instance to use for storage and retrieval. :type database: arango.database.StandardDatabase :param collection_name: The name of the ArangoDB collection to store documents. Defaults to "documents". :type collection_name: str :param search_type: The type of search to perform. Can be either SearchType.VECTOR for pure vector similarity search or SearchType.HYBRID for combining vector and keyword search. Defaults to SearchType.VECTOR. :type search_type: SearchType :param embedding_field: The field name in the document to store the embedding vector Defaults to "embedding". :type embedding_field: str :param text_field: The field name in the document to store the text content. Defaults to "text". :type text_field: str :param vector_index_name: The name of the vector index to create in ArangoDB. This index enables efficient vector similarity search. Defaults to "vector_index". :type vector_index_name: str :param distance_strategy: The distance metric to use for vector similarity. Can be either DistanceStrategy.COSINE or DistanceStrategy.EUCLIDEAN_DISTANCE. Defaults to DistanceStrategy.COSINE. :type distance_strategy: DistanceStrategy :param num_centroids: The number of centroids to use for the vector index. Higher values can improve search accuracy but increase memory usage. Defaults to 1. :type num_centroids: int :param relevance_score_fn: Optional function to normalize the relevance score. If not provided, uses the default normalization for the distance strategy. :type relevance_score_fn: Optional[Callable[[float], float]] :param keyword_index_name: The name of the ArangoDB View created to enable Full-Text-Search capabilities. Only used if search_type is set to SearchType.HYBRID. Defaults to "keyword_index". :type keyword_index_name: str :param keyword_analyzer: The text analyzer to use for keyword search. Must be one of the supported analyzers in ArangoDB. Defaults to "text_en". :type keyword_analyzer: str :param rrf_constant: The constant used in Reciprocal Rank Fusion (RRF) for hybrid search. Higher values give more weight to lower-ranked results. Defaults to 60. :type rrf_constant: int :param rrf_search_limit: The maximum number of results to consider in RRF scoring. Defaults to 100. :type rrf_search_limit: int """ def __init__( self, embedding: Embeddings, embedding_dimension: int, database: StandardDatabase, collection_name: str = "documents", search_type: SearchType = DEFAULT_SEARCH_TYPE, embedding_field: str = "embedding", text_field: str = "text", vector_index_name: str = "vector_index", distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, num_centroids: int = 1, relevance_score_fn: Optional[Callable[[float], float]] = None, keyword_index_name: str = "keyword_index", keyword_analyzer: str = DEFAULT_ANALYZER, rrf_constant: int = DEFAULT_RRF_CONSTANT, rrf_search_limit: int = DEFAULT_SEARCH_LIMIT, ): if search_type not in [SearchType.VECTOR, SearchType.HYBRID]: raise ValueError("search_type must be 'vector' or 'hybrid'") if distance_strategy not in [ DistanceStrategy.COSINE, DistanceStrategy.EUCLIDEAN_DISTANCE, DistanceStrategy.JACCARD, DistanceStrategy.DOT_PRODUCT, DistanceStrategy.MAX_INNER_PRODUCT, ]: m = "distance_strategy must be one of: 'COSINE', 'EUCLIDEAN_DISTANCE', 'JACCARD', 'DOT_PRODUCT', 'MAX_INNER_PRODUCT'" # noqa: E501 raise ValueError(m) self.embedding = embedding self.embedding_dimension = int(embedding_dimension) self.db = database self.async_db = self.db.begin_async_execution(return_result=False) self.search_type = search_type self.collection_name = collection_name self.embedding_field = embedding_field self.text_field = text_field self.vector_index_name = vector_index_name self._distance_strategy = distance_strategy self.num_centroids = num_centroids self.override_relevance_score_fn = relevance_score_fn # Hybrid search parameters self.keyword_index_name = keyword_index_name self.keyword_analyzer = keyword_analyzer self.rrf_constant = rrf_constant self.rrf_search_limit = rrf_search_limit if not self.db.has_collection(collection_name): self.db.create_collection(collection_name) self.collection = self.db.collection(self.collection_name) @property def embeddings(self) -> Embeddings: return self.embedding
[docs] def retrieve_vector_index(self) -> Union[dict[str, Any], None]: """Retrieve the vector index from the collection.""" indexes = self.collection.indexes() # type: ignore for index in indexes: # type: ignore if index["name"] == self.vector_index_name: return index return None
[docs] def create_vector_index(self) -> None: """Create the vector index on the collection.""" self.collection.add_index( # type: ignore { "name": self.vector_index_name, "type": "vector", "fields": [self.embedding_field], "params": { "metric": DISTANCE_MAPPING[self._distance_strategy], "dimension": self.embedding_dimension, "nLists": self.num_centroids, }, } )
[docs] def delete_vector_index(self) -> None: """Delete the vector index from the collection.""" index = self.retrieve_vector_index() if index is not None: self.collection.delete_index(index["id"])
[docs] def retrieve_keyword_index(self) -> Optional[dict[str, Any]]: """Retrieve the keyword index from the collection.""" try: return self.db.view(self.keyword_index_name) # type: ignore except ViewGetError: return None
[docs] def create_keyword_index(self) -> None: """Create the keyword index on the collection.""" if self.retrieve_keyword_index(): return collection = self.db.collection(self.collection_name) collection.add_index( { "type": "inverted", "name": self.keyword_index_name, "fields": [ {"name": self.text_field, "analyzer": self.keyword_analyzer} ], } ) view_properties = { "indexes": [ {"collection": self.collection_name, "index": self.keyword_index_name} ] } self.db.create_view(self.keyword_index_name, "search-alias", view_properties)
[docs] def delete_keyword_index(self) -> None: """Delete the keyword index from the collection.""" view = self.retrieve_keyword_index() if view: self.db.delete_view(self.keyword_index_name) self.db.collection(self.collection_name).delete_index( self.keyword_index_name, ignore_missing=True )
[docs] def add_embeddings( self, texts: Iterable[str], embeddings: List[List[float]], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, batch_size: int = 500, use_async_db: bool = False, insert_text: bool = True, **kwargs: Any, ) -> List[str]: """Add embeddings to the vectorstore.""" texts = list(texts) if ids is None: ids = [str(farmhash.Fingerprint64(text.encode("utf-8"))) for text in texts] # type: ignore if not metadatas: metadatas = [{} for _ in texts] if len(ids) != len(texts) != len(embeddings) != len(metadatas): m = "Length of ids, texts, embeddings and metadatas must be the same." raise ValueError(m) db = self.async_db if use_async_db else self.db collection = db.collection(self.collection_name) data = [] for _key, text, embedding, metadata in zip(ids, texts, embeddings, metadatas): doc: dict[str, Any] = {self.text_field: text} if insert_text else {} doc.update( { **metadata, "_key": _key, self.embedding_field: embedding, } ) data.append(doc) if len(data) == batch_size: collection.import_bulk(data, on_duplicate="update", **kwargs) data = [] collection.import_bulk(data, on_duplicate="update", **kwargs) return ids
[docs] def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None, ids: Optional[List[str]] = None, **kwargs: Any, ) -> List[str]: """Add texts to the vector store. This method embeds the provided texts using the embedding function and stores them in ArangoDB along with their embeddings and metadata. :param texts: An iterable of text strings to add to the vector store. :type texts: Iterable[str] :param metadatas: Optional list of metadata dictionaries to associate with each text. Each dictionary can contain arbitrary key-value pairs that will be stored alongside the text and embedding. :type metadatas: Optional[List[dict]] :param ids: Optional list of unique identifiers for each text. If not provided, IDs will be generated using a hash of the text content. :type ids: Optional[List[str]] :param kwargs: Additional keyword arguments passed to add_embeddings. :type kwargs: Any :return: List of document IDs that were added to the vector store. :rtype: List[str] .. code-block:: python # Add simple texts texts = ["hello world", "hello arango", "test document"] ids = vector_store.add_texts(texts) print(f"Added {len(ids)} documents") # Add texts with metadata texts = ["Machine learning tutorial", "Python programming guide"] metadatas = [ {"category": "AI", "difficulty": "beginner"}, {"category": "Programming", "difficulty": "intermediate"} ] ids = vector_store.add_texts(texts, metadatas=metadatas) # Add texts with custom IDs texts = ["Document 1", "Document 2"] custom_ids = ["doc_001", "doc_002"] ids = vector_store.add_texts(texts, ids=custom_ids) """ embeddings = self.embedding.embed_documents(list(texts)) return self.add_embeddings( texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs )
@overload def similarity_search( self, query: str, k: int = 4, return_fields: set[str] = set(), use_approx: bool = True, embedding: Optional[List[float]] = None, filter_clause: str = "", search_type: Optional[SearchType] = None, vector_weight: float = 1.0, keyword_weight: float = 1.0, keyword_search_clause: str = "", metadata_clause: str = "", stream: bool = True, **kwargs: Any, ) -> Iterator[Document]: ... @overload def similarity_search( self, query: str, k: int = 4, return_fields: set[str] = set(), use_approx: bool = True, embedding: Optional[List[float]] = None, filter_clause: str = "", search_type: Optional[SearchType] = None, vector_weight: float = 1.0, keyword_weight: float = 1.0, keyword_search_clause: str = "", metadata_clause: str = "", stream: Optional[bool] = None, **kwargs: Any, ) -> List[Document]: ... @overload def similarity_search_with_score( self, query: str, k: int = 4, return_fields: set[str] = set(), use_approx: bool = True, embedding: Optional[List[float]] = None, filter_clause: str = "", search_type: Optional[SearchType] = None, vector_weight: float = 1.0, keyword_weight: float = 1.0, keyword_search_clause: str = "", metadata_clause: str = "", stream: bool = True, ) -> Iterator[tuple[Document, float]]: ... @overload def similarity_search_with_score( self, query: str, k: int = 4, return_fields: set[str] = set(), use_approx: bool = True, embedding: Optional[List[float]] = None, filter_clause: str = "", search_type: Optional[SearchType] = None, vector_weight: float = 1.0, keyword_weight: float = 1.0, keyword_search_clause: str = "", metadata_clause: str = "", stream: Optional[bool] = None, ) -> List[tuple[Document, float]]: ...
[docs] def similarity_search_with_score( # type: ignore[override] self, query: str, k: int = 4, return_fields: set[str] = set(), use_approx: bool = True, embedding: Optional[List[float]] = None, filter_clause: str = "", search_type: Optional[SearchType] = None, vector_weight: float = 1.0, keyword_weight: float = 1.0, keyword_search_clause: str = "", metadata_clause: str = "", stream: Optional[bool] = None, ) -> Union[List[tuple[Document, float]], Iterator[tuple[Document, float]]]: """Search for similar documents and return their similarity scores. Similar to similarity_search but returns a tuple of (Document, score) for each result. The score represents the similarity between the query and the document. :param query: The text query to search for. :type query: str :param k: The number of most similar documents to return. Defaults to 4. :type k: int :param return_fields: Set of additional document fields to return in results. The _key and text fields are always returned. :type return_fields: set[str] :param use_approx: Whether to use approximate nearest neighbor search. Enables faster but potentially less accurate results. Defaults to True. :type use_approx: bool :param embedding: Optional pre-computed embedding for the query. If not provided, the query will be embedded using the embedding function. :type embedding: Optional[List[float]] :param filter_clause: Optional AQL filter clause to apply to the search. Can be used to filter results based on document properties. :type filter_clause: str :param search_type: Override the default search type for this query. Can be either SearchType.VECTOR or SearchType.HYBRID. :type search_type: Optional[SearchType] :param vector_weight: Weight to apply to vector similarity scores in hybrid search. Only used when search_type is SearchType.HYBRID. Defaults to 1.0. :type vector_weight: float :param keyword_weight: Weight to apply to keyword search scores in hybrid search. Only used when search_type is SearchType.HYBRID. Defaults to 1.0. :type keyword_weight: float :param keyword_search_clause: Optional AQL filter clause to apply Full Text Search. If empty, a default search clause will be used. :type keyword_search_clause: str :param metadata_clause: Optional AQL clause to return additional metadata once the top k results are retrieved. :type metadata_clause: str :param stream: If True, returns an iterator that yields results one at a time. This reduces memory usage for large k values. If None or False, returns all results as a list. Defaults to None (batch mode). :type stream: Optional[bool] :return: List of tuples containing (Document, score) pairs if stream is None or False, Iterator if stream=True. :rtype: Union[List[tuple[Document, float]], Iterator[tuple[Document, float]]] .. code-block:: python # Batch mode (default) results = vector_store.similarity_search_with_score("query", k=100) for doc, score in results: print(f"Score: {score}, Content: {doc.page_content[:50]}") # Streaming mode (memory efficient) for doc, score in vector_store.similarity_search_with_score( "query", k=10000, stream=True ): process_document(doc, score) """ search_type = search_type or self.search_type embedding = embedding or self.embedding.embed_query(query) if search_type == SearchType.VECTOR: kwargs = { "embedding": embedding, "k": k, "return_fields": return_fields, "use_approx": use_approx, "filter_clause": filter_clause, "metadata_clause": metadata_clause, } if stream: kwargs["stream"] = stream return self.similarity_search_by_vector_with_score(**kwargs) # type: ignore[arg-type] else: kwargs = { "query": query, "embedding": embedding, "k": k, "return_fields": return_fields, "use_approx": use_approx, "filter_clause": filter_clause, "vector_weight": vector_weight, "keyword_weight": keyword_weight, "keyword_search_clause": keyword_search_clause, "metadata_clause": metadata_clause, } if stream: kwargs["stream"] = stream return self.similarity_search_by_vector_and_keyword_with_score(**kwargs) # type: ignore[arg-type]
@overload def similarity_search_by_vector( self, embedding: List[float], k: int = 4, return_fields: set[str] = set(), use_approx: bool = True, filter_clause: str = "", metadata_clause: str = "", stream: bool = True, **kwargs: Any, ) -> Iterator[Document]: ... @overload def similarity_search_by_vector( self, embedding: List[float], k: int = 4, return_fields: set[str] = set(), use_approx: bool = True, filter_clause: str = "", metadata_clause: str = "", stream: Optional[bool] = None, **kwargs: Any, ) -> List[Document]: ...
[docs] def similarity_search_by_vector( # type: ignore[override] self, embedding: List[float], k: int = 4, return_fields: set[str] = set(), use_approx: bool = True, filter_clause: str = "", metadata_clause: str = "", stream: Optional[bool] = None, **kwargs: Any, ) -> Union[List[Document], Iterator[Document]]: """Return docs most similar to embedding vector. :param embedding: Embedding to look up documents similar to. :type embedding: List[float] :param k: Number of Documents to return. Defaults to 4. :type k: int :param return_fields: Fields to return in the result. For example, {"foo", "bar"} will return the "foo" and "bar" fields of the document, in addition to the _key & text field. Defaults to an empty set. :type return_fields: set[str] :param use_approx: Whether to use approximate vector search via ANN. Defaults to True. If False, exact vector search will be used. :type use_approx: bool :param filter_clause: Filter clause to apply to the query. :type filter_clause: str :param metadata_clause: Optional AQL clause to return additional metadata once the top k results are retrieved. If specified, the metadata will be added to the Document.metadata field. :type metadata_clause: str :param stream: If True, returns an iterator that yields results one at a time. This reduces memory usage for large k values. If None or False, returns all results as a list. Defaults to None (batch mode). :type stream: Optional[bool] :param kwargs: Additional keyword arguments. :type kwargs: Any :return: List of Documents if stream is None or False, Iterator if stream=True. :rtype: Union[List[Document], Iterator[Document]] .. code-block:: python # Batch mode (default) docs = vector_store.similarity_search_by_vector(embedding, k=100) # Streaming mode (memory efficient) for doc in vector_store.similarity_search_by_vector( embedding, k=10000, stream=True ): process_document(doc) """ kwargs = { "embedding": embedding, "k": k, "return_fields": return_fields, "use_approx": use_approx, "filter_clause": filter_clause, "metadata_clause": metadata_clause, } if stream: kwargs["stream"] = stream results = self.similarity_search_by_vector_with_score(**kwargs) if stream is True: return (doc for doc, _ in results) else: return [doc for doc, _ in results]
@overload def similarity_search_by_vector_and_keyword( self, query: str, embedding: List[float], k: int = 4, return_fields: set[str] = set(), use_approx: bool = True, filter_clause: str = "", vector_weight: float = 1.0, keyword_weight: float = 1.0, keyword_search_clause: str = "", metadata_clause: str = "", stream: bool = True, ) -> Iterator[Document]: ... @overload def similarity_search_by_vector_and_keyword( self, query: str, embedding: List[float], k: int = 4, return_fields: set[str] = set(), use_approx: bool = True, filter_clause: str = "", vector_weight: float = 1.0, keyword_weight: float = 1.0, keyword_search_clause: str = "", metadata_clause: str = "", stream: Optional[bool] = None, ) -> List[Document]: ...
[docs] def similarity_search_by_vector_and_keyword( self, query: str, embedding: List[float], k: int = 4, return_fields: set[str] = set(), use_approx: bool = True, filter_clause: str = "", vector_weight: float = 1.0, keyword_weight: float = 1.0, keyword_search_clause: str = "", metadata_clause: str = "", stream: Optional[bool] = None, ) -> Union[List[Document], Iterator[Document]]: """Return docs most similar to query using hybrid search. :param query: Query text to search for. :type query: str :param embedding: Embedding vector for the query. :type embedding: List[float] :param k: Number of Documents to return. Defaults to 4. :type k: int :param return_fields: Fields to return in the result. For example, {"foo", "bar"} will return the "foo" and "bar" fields of the document, in addition to the _key & text field. Defaults to an empty set. :type return_fields: set[str] :param use_approx: Whether to use approximate vector search via ANN. Defaults to True. If False, exact vector search will be used. :type use_approx: bool :param filter_clause: Filter clause to apply to the query. :type filter_clause: str :param vector_weight: Weight to apply to vector similarity scores in hybrid search. Defaults to 1.0. :type vector_weight: float :param keyword_weight: Weight to apply to keyword search scores in hybrid search. Defaults to 1.0. :type keyword_weight: float :param keyword_search_clause: Optional AQL filter clause to apply Full Text Search. If empty, a default search clause will be used. :type keyword_search_clause: str :param metadata_clause: Optional AQL clause to return additional metadata once the top k results are retrieved. If specified, the metadata will be added to the Document.metadata field. :type metadata_clause: str :param stream: If True, returns an iterator that yields results one at a time. This reduces memory usage for large k values. If None or False, returns all results as a list. Defaults to None (batch mode). :type stream: Optional[bool] :return: List of Documents if stream is None or False, Iterator if stream=True. :rtype: Union[List[Document], Iterator[Document]] .. code-block:: python # Batch mode (default) docs = vector_store.similarity_search_by_vector_and_keyword( query, embedding, k=100 ) # Streaming mode (memory efficient) for doc in vector_store.similarity_search_by_vector_and_keyword( query, embedding, k=10000, stream=True ): process_document(doc) """ kwargs = { "query": query, "embedding": embedding, "k": k, "return_fields": return_fields, "use_approx": use_approx, "filter_clause": filter_clause, "vector_weight": vector_weight, "keyword_weight": keyword_weight, "keyword_search_clause": keyword_search_clause, "metadata_clause": metadata_clause, } if stream: kwargs["stream"] = stream results = self.similarity_search_by_vector_and_keyword_with_score(**kwargs) # type: ignore[arg-type] if stream is True: return (doc for doc, _ in results) else: return [doc for doc, _ in results]
[docs] def similarity_search_by_vector_with_score( self, embedding: List[float], k: int = 4, return_fields: set[str] = set(), use_approx: bool = True, filter_clause: str = "", metadata_clause: str = "", stream: Optional[bool] = None, ) -> Union[List[tuple[Document, float]], Iterator[tuple[Document, float]]]: """Return docs most similar to embedding vector with scores. :param embedding: Embedding to look up documents similar to. :type embedding: List[float] :param k: Number of Documents to return. Defaults to 4. :type k: int :param return_fields: Fields to return in the result. For example, {"foo", "bar"} will return the "foo" and "bar" fields of the document, in addition to the _key & text field. Defaults to an empty set. :type return_fields: set[str] :param use_approx: Whether to use approximate vector search via ANN. Defaults to True. If False, exact vector search will be used. :type use_approx: bool :param filter_clause: Filter clause to apply to the query. :type filter_clause: str :param metadata_clause: Optional AQL clause to return additional metadata once the top k results are retrieved. If specified, the metadata will be added to the Document.metadata field. :type metadata_clause: str :param stream: If True, returns an iterator that yields results one at a time. This reduces memory usage for large k values. If None or False, returns all results as a list. Defaults to None (batch mode). :type stream: Optional[bool] :return: List of tuples containing (Document, score) pairs if stream is None or False, Iterator if stream=True. :rtype: Union[List[tuple[Document, float]], Iterator[tuple[Document, float]]] .. code-block:: python # Batch mode (default) results = vector_store.similarity_search_by_vector_with_score( embedding, k=100 ) # Streaming mode (memory efficient) for doc, score in vector_store.similarity_search_by_vector_with_score( embedding, k=10000, stream=True ): process_document(doc, score) """ aql_query, bind_vars = self._build_vector_search_query( embedding=embedding, k=k, return_fields=return_fields, use_approx=use_approx, filter_clause=filter_clause, metadata_clause=metadata_clause, ) cursor_result = self.db.aql.execute(aql_query, bind_vars=bind_vars, stream=True) assert cursor_result is not None, ( "AQL execute should not return None with stream=True" ) cursor = cast(Cursor, cursor_result) if stream is True: return self._process_search_query(cursor, stream=stream) else: return self._process_search_query(cursor)
[docs] def similarity_search_by_vector_and_keyword_with_score( self, query: str, embedding: List[float], k: int = 4, return_fields: set[str] = set(), use_approx: bool = True, filter_clause: str = "", vector_weight: float = 1.0, keyword_weight: float = 1.0, keyword_search_clause: str = "", metadata_clause: str = "", stream: Optional[bool] = None, ) -> Union[List[tuple[Document, float]], Iterator[tuple[Document, float]]]: """Run hybrid similarity search combining vector and keyword search with scores. :param query: Query text to search for. :type query: str :param embedding: Embedding vector for the query. :type embedding: List[float] :param k: Number of results to return. Defaults to 4. :type k: int :param return_fields: Fields to return in the result. For example, {"foo", "bar"} will return the "foo" and "bar" fields of the document, in addition to the _key & text field. Defaults to an empty set. :type return_fields: set[str] :param use_approx: Whether to use approximate vector search via ANN. Defaults to True. If False, exact vector search will be used. :type use_approx: bool :param filter_clause: Filter clause to apply to the query. :type filter_clause: str :param vector_weight: Weight to apply to vector similarity scores in hybrid search. Only used when search_type is SearchType.HYBRID. Defaults to 1.0. :type vector_weight: float :param keyword_weight: Weight to apply to keyword search scores in hybrid search. Only used when search_type is SearchType.HYBRID. Defaults to 1.0. :type keyword_weight: float :param keyword_search_clause: Optional AQL filter clause to apply Full Text Search. If empty, a default search clause will be used. :type keyword_search_clause: str :param metadata_clause: Optional AQL clause to return additional metadata once the top k results are retrieved. If specified, the metadata will be added to the Document.metadata field. :type metadata_clause: str :param stream: If True, returns an iterator that yields results one at a time. This reduces memory usage for large k values. If None or False, returns all results as a list. Defaults to None (batch mode). :type stream: Optional[bool] :return: List of tuples containing (Document, score) pairs if stream is None or False, Iterator if stream=True. :rtype: Union[List[tuple[Document, float]], Iterator[tuple[Document, float]]] .. code-block:: python # Batch mode (default) results = vector_store.similarity_search_by_vector_and_keyword_with_score( query, embedding, k=100 ) # Streaming mode (memory efficient) for doc, score in ( vector_store.similarity_search_by_vector_and_keyword_with_score( query, embedding, k=10000, stream=True ) ): process_document(doc, score) """ aql_query, bind_vars = self._build_hybrid_search_query( query=query, k=k, embedding=embedding, return_fields=return_fields, use_approx=use_approx, filter_clause=filter_clause, vector_weight=vector_weight, keyword_weight=keyword_weight, keyword_search_clause=keyword_search_clause, metadata_clause=metadata_clause, ) cursor_result = self.db.aql.execute(aql_query, bind_vars=bind_vars, stream=True) assert cursor_result is not None, ( "AQL execute should not return None with stream=True" ) cursor = cast(Cursor, cursor_result) if stream is True: return self._process_search_query(cursor, stream=stream) else: return self._process_search_query(cursor)
[docs] def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]: """Delete by vector ID or other criteria. :param ids: List of ids to delete. :type ids: Optional[List[str]] :param kwargs: Other keyword arguments that can be used to delete vectors. :type kwargs: Any :return: True if deletion is successful, None if no ids are provided, or raises an exception if an error occurs. :rtype: Optional[bool] """ if not ids: return None for result in self.collection.delete_many(ids, **kwargs): # type: ignore if isinstance(result, ArangoServerError): raise result return True
[docs] def get_by_ids(self, ids: Sequence[str], /) -> list[Document]: """Get documents by their IDs. :param ids: List of ids to get. :type ids: Sequence[str] :return: List of Documents with the given ids. :rtype: list[Document] """ docs = [] doc: dict[str, Any] for doc in self.collection.get_many(ids): # type: ignore _key = doc.pop("_key") page_content = doc.pop(self.text_field) docs.append(Document(page_content=page_content, id=_key, metadata=doc)) return docs
def _select_relevance_score_fn(self) -> Callable[[float], float]: if self.override_relevance_score_fn is not None: return self.override_relevance_score_fn # Default strategy is to rely on distance strategy provided # in vectorstore constructor if self._distance_strategy in [ DistanceStrategy.COSINE, DistanceStrategy.EUCLIDEAN_DISTANCE, ]: return lambda x: x else: raise ValueError( "No supported normalization function" f" for distance_strategy of {self._distance_strategy}." "Consider providing relevance_score_fn to ArangoVector constructor." )
[docs] @classmethod def from_texts( cls: Type[ArangoVector], texts: List[str], embedding: Embeddings, metadatas: Optional[List[dict]] = None, database: Optional[StandardDatabase] = None, collection_name: str = "documents", search_type: SearchType = DEFAULT_SEARCH_TYPE, embedding_field: str = "embedding", text_field: str = "text", vector_index_name: str = "vector_index", distance_strategy: DistanceStrategy = DEFAULT_DISTANCE_STRATEGY, num_centroids: int = 1, ids: Optional[List[str]] = None, overwrite_index: bool = False, insert_text: bool = True, keyword_index_name: str = "keyword_index", keyword_analyzer: str = DEFAULT_ANALYZER, rrf_constant: int = DEFAULT_RRF_CONSTANT, rrf_search_limit: int = DEFAULT_SEARCH_LIMIT, **kwargs: Any, ) -> ArangoVector: """Create an ArangoVector instance from a list of texts. This is a convenience method that creates a new ArangoVector instance, embeds the provided texts, and stores them in ArangoDB. :param texts: List of text strings to add to the vector store. :type texts: List[str] :param embedding: The embedding function to use for converting text to vectors. :type embedding: langchain.embeddings.base.Embeddings :param metadatas: Optional list of metadata dictionaries to associate with each text. :type metadatas: Optional[List[dict]] :param database: The ArangoDB database instance to use. :type database: Optional[arango.database.StandardDatabase] :param collection_name: The name of the ArangoDB collection to use. Defaults to "documents". :type collection_name: str :param search_type: The type of search to perform. Can be either SearchType.VECTOR or SearchType.HYBRID. Defaults to SearchType.VECTOR. :type search_type: SearchType :param embedding_field: The field name to store embeddings. Defaults to "embedding". :type embedding_field: str :param text_field: The field name to store text content. Defaults to "text". :type text_field: str :param vector_index_name: The name of the vector index. Defaults to "vector_index". :type vector_index_name: str :param distance_strategy: The distance metric to use. Can be DistanceStrategy.COSINE or DistanceStrategy.EUCLIDEAN_DISTANCE. Defaults to DistanceStrategy.COSINE. :type distance_strategy: DistanceStrategy :param num_centroids: Number of centroids for vector index. Defaults to 1. :type num_centroids: int :param ids: Optional list of unique identifiers for each text. :type ids: Optional[List[str]] :param overwrite_index: Whether to delete and recreate existing indexes. Defaults to False. :type overwrite_index: bool :param insert_text: Whether to store the text content in the database. Required for hybrid search. Defaults to True. :type insert_text: bool :param keyword_index_name: Name of the keyword search index. Defaults to "keyword_index". :type keyword_index_name: str :param keyword_analyzer: Text analyzer for keyword search. Defaults to "text_en". :type keyword_analyzer: str :param rrf_constant: Constant for RRF scoring in hybrid search. Defaults to 60. :type rrf_constant: int :param rrf_search_limit: Maximum results for RRF scoring. Defaults to 100. :type rrf_search_limit: int :param kwargs: Additional keyword arguments passed to the constructor. :type kwargs: Any :return: A new ArangoVector instance with the texts embedded and stored. :rtype: ArangoVector .. code-block:: python from arango import ArangoClient from langchain_arangodb.vectorstores import ArangoVector from langchain_community.embeddings import OpenAIEmbeddings # Connect to ArangoDB client = ArangoClient("http://localhost:8529") db = client.db("test", username="root", password="openSesame") # Create vector store from texts texts = ["hello world", "hello arango", "test document"] metadatas = [{"source": "doc1"}, {"source": "doc2"}, {"source": "doc3"}] vector_store = ArangoVector.from_texts( texts=texts, embedding=OpenAIEmbeddings(), metadatas=metadatas, database=db, collection_name="test_collection" ) # Create hybrid search store hybrid_store = ArangoVector.from_texts( texts=["Machine learning algorithms", "Deep neural networks"], embedding=OpenAIEmbeddings(), database=db, search_type=SearchType.HYBRID, collection_name="hybrid_docs", overwrite_index=True # Clean start ) """ if not database: raise ValueError("Database must be provided.") if not insert_text and search_type == SearchType.HYBRID: raise ValueError("insert_text must be True when search_type is HYBRID") embeddings = embedding.embed_documents(list(texts)) embedding_dimension = len(embeddings[0]) store = cls( embedding, embedding_dimension=embedding_dimension, database=database, collection_name=collection_name, search_type=search_type, embedding_field=embedding_field, text_field=text_field, vector_index_name=vector_index_name, distance_strategy=distance_strategy, num_centroids=num_centroids, keyword_index_name=keyword_index_name, keyword_analyzer=keyword_analyzer, rrf_constant=rrf_constant, rrf_search_limit=rrf_search_limit, **kwargs, ) if overwrite_index: store.delete_vector_index() if search_type == SearchType.HYBRID: store.delete_keyword_index() store.add_embeddings( texts, embeddings, metadatas=metadatas, ids=ids, insert_text=insert_text ) return store
[docs] @classmethod def from_existing_collection( cls: Type[ArangoVector], collection_name: str, text_properties_to_embed: List[str], embedding: Embeddings, database: StandardDatabase, embedding_field: str = "embedding", text_field: str = "text", vector_index_name: str = "vector_index", batch_size: int = 1000, aql_return_text_query: str = "", insert_text: bool = False, skip_existing_embeddings: bool = False, search_type: SearchType = DEFAULT_SEARCH_TYPE, keyword_index_name: str = "keyword_index", keyword_analyzer: str = DEFAULT_ANALYZER, rrf_constant: int = DEFAULT_RRF_CONSTANT, rrf_search_limit: int = DEFAULT_SEARCH_LIMIT, **kwargs: Any, ) -> ArangoVector: """Create an ArangoVector instance from an existing ArangoDB collection. This method reads documents from an existing collection, extracts specified text properties, embeds them, and creates a new vector store. :param collection_name: Name of the existing ArangoDB collection. :type collection_name: str :param text_properties_to_embed: List of document properties containing text to embed. These properties will be concatenated to create the text for embedding. :type text_properties_to_embed: List[str] :param embedding: The embedding function to use for converting text to vectors. :type embedding: Embeddings :param database: The ArangoDB database instance to use. :type database: StandardDatabase :param embedding_field: The field name to store embeddings. Defaults to "embedding". :type embedding_field: str :param text_field: The field name to store text content. Defaults to "text". Only used if `insert_text` is True. :type text_field: str :param vector_index_name: The name of the vector index. Defaults to "vector_index". :type vector_index_name: str :param batch_size: Number of documents to process in each batch. Defaults to 1000. :type batch_size: int :param aql_return_text_query: Custom AQL query to extract text from properties. Defaults to "RETURN doc[p]". :type aql_return_text_query: str :param insert_text: Whether to store the concatenated text in the database. Required for hybrid search. Defaults to False. :type insert_text: bool :param skip_existing_embeddings: Whether to skip documents that already have embeddings. Defaults to False. :type skip_existing_embeddings: bool :param search_type: The type of search to perform. Can be either SearchType.VECTOR or SearchType.HYBRID. Defaults to SearchType.VECTOR. :type search_type: SearchType :param keyword_index_name: Name of the keyword search index. Defaults to "keyword_index". :type keyword_index_name: str :param keyword_analyzer: Text analyzer for keyword search. Defaults to "text_en". :type keyword_analyzer: str :param rrf_constant: Constant for RRF scoring in hybrid search. Defaults to 60. :type rrf_constant: int :param rrf_search_limit: Maximum results for RRF scoring. Defaults to 100. :type rrf_search_limit: int :param kwargs: Additional keyword arguments passed to the constructor. :type kwargs: Any :return: A new ArangoVector instance with embeddings created from the collection. :rtype: ArangoVector """ if not text_properties_to_embed: m = "Parameter `text_properties_to_embed` must not be an empty list" raise ValueError(m) if text_field in text_properties_to_embed: m = "Parameter `text_field` must not be in `text_properties_to_embed`" raise ValueError(m) if not insert_text and search_type == SearchType.HYBRID: raise ValueError("insert_text must be True when search_type is HYBRID") if not aql_return_text_query: aql_return_text_query = "RETURN doc[p]" filter_clause = "" if skip_existing_embeddings: filter_clause = f"FILTER doc.{embedding_field} == null" aql_query = f""" FOR doc IN @@collection {filter_clause} LET texts = ( FOR p IN @properties FILTER doc[p] != null {aql_return_text_query} ) RETURN {{ key: doc._key, text: CONCAT_SEPARATOR(" ", texts), }} """ bind_vars = { "@collection": collection_name, "properties": text_properties_to_embed, } cursor: Cursor = database.aql.execute( aql_query, bind_vars=bind_vars, # type: ignore batch_size=batch_size, stream=True, ) store: ArangoVector | None = None while not cursor.empty(): batch = cursor.batch() batch_list = list(batch) # type: ignore texts = [doc["text"] for doc in batch_list] ids = [doc["key"] for doc in batch_list] store = cls.from_texts( texts=texts, embedding=embedding, database=database, collection_name=collection_name, embedding_field=embedding_field, text_field=text_field, vector_index_name=vector_index_name, ids=ids, insert_text=insert_text, search_type=search_type, keyword_index_name=keyword_index_name, keyword_analyzer=keyword_analyzer, rrf_constant=rrf_constant, rrf_search_limit=rrf_search_limit, **kwargs, ) batch.clear() # type: ignore if cursor.has_more(): cursor.fetch() if store is None: raise ValueError(f"No documents found in collection in {collection_name}") return store
def _iter_cursor(self, cursor: Cursor) -> Iterator[tuple[Document, float]]: """Iterate over search query cursor and yield results. :param cursor: AQL cursor from executed query. :type cursor: Cursor :return: Iterator of (Document, score) tuples. :rtype: Iterator[tuple[Document, float]] """ data: dict[str, Any] score: float while not cursor.empty(): for result in cursor: data, score, metadata = ( result["data"], result["score"], result["metadata"], ) if not data: continue _key = data.pop("_key") page_content = data.pop(self.text_field) doc = Document( page_content=page_content, id=_key, metadata={**data, **metadata}, ) yield (doc, score) if cursor.has_more(): cursor.fetch() def _process_search_query( self, cursor: Cursor, stream: Optional[bool] = None ) -> Union[List[tuple[Document, float]], Iterator[tuple[Document, float]]]: """Process search query cursor and return results. :param cursor: AQL cursor from executed query. :type cursor: Cursor :param stream: If True, yields results one at a time. If None or False, returns all results as a list. Defaults to None (batch mode). :type stream: Optional[bool] :return: List of (Document, score) tuples if stream is None or False, Iterator if stream=True. :rtype: Union[List[tuple[Document, float]], Iterator[tuple[Document, float]]] """ if stream is True: return self._iter_cursor(cursor) else: return list(self._iter_cursor(cursor)) def _get_score_query_and_sort_order(self, use_approx: bool) -> Tuple[str, str]: """Get the score query and sort order for the given distance strategy. :param use_approx: Whether to use approximate nearest neighbor search. :type use_approx: bool :return: A tuple containing the score query and sort order. :rtype: Tuple[str, str] """ if self._distance_strategy == DistanceStrategy.COSINE: score_func = "APPROX_NEAR_COSINE" if use_approx else "COSINE_SIMILARITY" scoring_query = f"{score_func}(doc.{self.embedding_field}, @embedding)" sort_order = "DESC" elif self._distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: score_func = "APPROX_NEAR_L2" if use_approx else "L2_DISTANCE" scoring_query = f"{score_func}(doc.{self.embedding_field}, @embedding)" sort_order = "ASC" elif self._distance_strategy == DistanceStrategy.JACCARD: use_approx = False score_func = "JACCARD" scoring_query = f"{score_func}(doc.{self.embedding_field}, @embedding)" sort_order = "DESC" elif self._distance_strategy in [ DistanceStrategy.MAX_INNER_PRODUCT, DistanceStrategy.DOT_PRODUCT, ]: scoring_query = """ SUM( FOR i IN 0..LENGTH(doc.embedding)-1 RETURN doc.embedding[i] * @embedding[i] ) """ sort_order = "DESC" else: raise ValueError(f"Unsupported metric: {self._distance_strategy}") return scoring_query, sort_order def _ensure_vector_index(self) -> None: """Ensure the vector index exists.""" if self._distance_strategy in [ DistanceStrategy.JACCARD, DistanceStrategy.DOT_PRODUCT, DistanceStrategy.MAX_INNER_PRODUCT, ]: m = f"Unsupported metric: {self._distance_strategy} is not supported for approximate search" # noqa: E501 raise ValueError(m) if version.parse(self.db.version()) < version.parse("3.12.4"): # type: ignore m = "Approximate Nearest Neighbor search requires ArangoDB >= 3.12.4." raise ValueError(m) if not self.retrieve_vector_index(): self.create_vector_index() def _build_vector_search_query( self, embedding: List[float], k: int, return_fields: set[str], use_approx: bool, filter_clause: str, metadata_clause: str, ) -> Tuple[str, dict[str, Any]]: scoring_query, sort_order = self._get_score_query_and_sort_order(use_approx) if use_approx: self._ensure_vector_index() return_fields.update({"_key", self.text_field}) return_fields_list = list(return_fields) if self._distance_strategy in [ DistanceStrategy.JACCARD, DistanceStrategy.COSINE, DistanceStrategy.EUCLIDEAN_DISTANCE, DistanceStrategy.DOT_PRODUCT, ]: aql_query = f""" FOR doc IN @@collection {filter_clause if not use_approx else ""} LET score = {scoring_query} SORT score {sort_order} LIMIT {k} {filter_clause if use_approx else ""} LET data = KEEP(doc, {return_fields_list}) LET metadata = {f"({metadata_clause})" if metadata_clause else "{}"} RETURN {{data, score, metadata}} """ elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: aql_query = f""" LET scored = ( FOR doc IN @@collection {filter_clause} LET score = {scoring_query} SORT score {sort_order} LIMIT {k} RETURN {{doc, score}} ) LET maxScore = MAX(scored[*].score) FOR item IN scored FILTER item.score == maxScore LET data = KEEP(item.doc, {return_fields_list}) LET metadata = {f"({metadata_clause})" if metadata_clause else "{}"} RETURN {{data, score: item.score, metadata}} """ else: raise ValueError(f"Unsupported metric: {self._distance_strategy}") bind_vars = { "@collection": self.collection_name, "embedding": embedding, } return aql_query, bind_vars def _build_hybrid_search_query( self, query: str, k: int, embedding: List[float], return_fields: set[str], use_approx: bool, filter_clause: str, vector_weight: float, keyword_weight: float, keyword_search_clause: str, metadata_clause: str, ) -> Tuple[str, dict[str, Any]]: """Build the hybrid search query using RRF.""" scoring_query, sort_order = self._get_score_query_and_sort_order(use_approx) if not self.retrieve_keyword_index(): self.create_keyword_index() if use_approx: self._ensure_vector_index() return_fields.update({"_key", self.text_field}) return_fields_list = list(return_fields) if not keyword_search_clause: keyword_search_clause = f""" SEARCH ANALYZER( doc.{self.text_field} IN TOKENS(@query, @analyzer), @analyzer ) """ if self._distance_strategy in [ DistanceStrategy.JACCARD, DistanceStrategy.COSINE, DistanceStrategy.EUCLIDEAN_DISTANCE, DistanceStrategy.DOT_PRODUCT, ]: vector_search_query = f""" LET vector_results = ( FOR doc IN @@collection {filter_clause if not use_approx else ""} LET score = {scoring_query} SORT score {sort_order} LIMIT {k} {filter_clause if use_approx else ""} WINDOW {{ preceding: "unbounded", following: 0 }} AGGREGATE rank = COUNT(1) LET rrf_score = {vector_weight} / ({self.rrf_constant} + rank) RETURN {{ key: doc._key, score: rrf_score }} ) """ elif self._distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT: vector_search_query = f""" LET scored = ( FOR doc IN @@collection {filter_clause} LET score = SUM( FOR i IN 0..LENGTH(doc.embedding)-1 RETURN doc.embedding[i] * @embedding[i] ) SORT score {sort_order} LIMIT {k} RETURN {{doc, score}} ) LET maxScore = MAX(scored[*].score) LET vector_results = ( FOR item IN scored FILTER item.score == maxScore LET rank = 1 LET rrf_score = {vector_weight} / ({self.rrf_constant} + rank) RETURN {{ key: item.doc._key, score: rrf_score }} ) """ else: raise ValueError(f"Unsupported metric: {self._distance_strategy}") aql_query = f""" {vector_search_query} LET keyword_results = ( FOR doc IN @@view {keyword_search_clause} {filter_clause} LET score = BM25(doc) SORT score DESC LIMIT {k} WINDOW {{ preceding: "unbounded", following: 0 }} AGGREGATE rank = COUNT(1) LET rrf_score = {keyword_weight} / ({self.rrf_constant} + rank) RETURN {{ key: doc._key, score: rrf_score }} ) FOR result IN APPEND(vector_results, keyword_results) COLLECT key = result.key AGGREGATE score = SUM(result.score) SORT score DESC LIMIT {self.rrf_search_limit} LET data = FIRST( FOR doc IN @@collection FILTER doc._key == key LIMIT 1 RETURN KEEP(doc, {return_fields_list}) ) LET metadata = {f"({metadata_clause})" if metadata_clause else "{}"} RETURN {{ data, score, metadata }} """ bind_vars = { "@collection": self.collection_name, "@view": self.keyword_index_name, "embedding": embedding, "query": query, "analyzer": self.keyword_analyzer, } return aql_query, bind_vars
[docs] def find_entity_clusters( self, threshold: float = 0.8, k: int = 4, use_approx: bool = True, use_subset_relations: bool = False, merge_similar_entities: bool = False, ) -> Union[List[Dict[str, Any]], Dict[str, List[Dict[str, Any]]]]: """ Find similar documents within the collection for entity resolution. This method compares documents within the collection to each other and returns entities grouped with their most similar documents. Each entity is returned with a list of the top k most similar entities based on the chosen similarity function. similarity function: [COSINE, EUCLIDEAN_DISTANCE, JACCARD, APPROX_NEAR_COSINE, APPROX_NEAR_L2] NOTE: for JACCARD, use_approx is automatically set to False :param threshold: Minimum similarity score for documents to be considered similar. Defaults to 0.8. :type threshold: float :param k: Number of similar documents to return for each entity. Defaults to 4. :type k: int :param use_approx: Whether to use approximate nearest neighbor search. Defaults to True. :type use_approx: bool :param use_subset_relations: Whether to analyze subset relations. Defaults to False. :type use_subset_relations: bool :param merge_similar_entities: Whether to merge similar entities based on subset relationships. Only effective when use_subset_relations=True. When True, merges subset groups into their superset groups to create consolidated, non-overlapping clusters. Defaults to False. :type merge_similar_entities: bool :return: Return format depends on parameters: - Basic clustering (use_subset_relations=False and merge_similar_entities=False): List[Dict] with format: {'entity': entity_key, 'similar': [list_of_keys]} - With subset analysis (use_subset_relations=True, merge_similar_entities=False): Dict with keys: 'similar_entities', 'subset_relationships' - With merging (use_subset_relations=True, merge_similar_entities=True): Dict with keys: 'similar_entities', 'subset_relationships', 'merged_entities' :rtype: Union[List[Dict[str, Any]], Dict[str, List[Dict[str, Any]]]] """ target_collection = self.collection_name if self._distance_strategy == DistanceStrategy.COSINE: score_func = "APPROX_NEAR_COSINE" if use_approx else "COSINE_SIMILARITY" sort_order = "DESC" elif self._distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE: score_func = "APPROX_NEAR_L2" if use_approx else "L2_DISTANCE" sort_order = "ASC" elif self._distance_strategy == DistanceStrategy.JACCARD: use_approx = False score_func = "JACCARD" sort_order = "DESC" else: raise ValueError(f"Unsupported metric: {self._distance_strategy}") if use_approx: if version.parse(self.db.version()) < version.parse("3.12.4"): # type: ignore msg = "ANN search requires ArangoDB >= 3.12.4" m = msg raise ValueError(m) if not self.retrieve_vector_index(): self.create_vector_index() filter_key_clause = "FILTER doc1._key < doc2._key" aql_query = f""" FOR doc1 IN @@collection LET similar = ( FOR doc2 IN @@collection {"" if use_approx else filter_key_clause} LET score = {score_func}(doc1.{self.embedding_field}, doc2.{self.embedding_field}) SORT score {sort_order} LIMIT @k {filter_key_clause if use_approx else ""} FILTER score >= @threshold RETURN doc2._key ) FILTER LENGTH(similar) > 0 RETURN {{entity: doc1._key, similar}} """ bind_vars: MutableMapping[str, Any] = { "@collection": target_collection, "threshold": threshold, "k": k, } cursor = self.db.aql.execute(aql_query, bind_vars=bind_vars, stream=True) results = list(cast(Iterable[Dict[str, Any]], cursor)) if not results: if not use_subset_relations: return [] return {"similar_entities": [], "subset_relationships": []} if not use_subset_relations: if merge_similar_entities: import warnings warnings.warn( "merge_similar_entities=True requires use_subset_relations=True. " "Ignoring merge_similar_entities parameter.", UserWarning, ) return results # SUBSET RELATIONS - only execute when use_subset_relations=True combined_query = """ // Step 1: Calculate subset relationships LET subsetResults = ( FOR group1 IN @results FOR group2 IN @results FILTER group1.entity != group2.entity AND LENGTH(group1.similar) < LENGTH(group2.similar) LET group1Keys = group1.similar LET group2Keys = group2.similar LET missingKeys = MINUS(group1Keys, group2Keys) FILTER LENGTH(missingKeys) == 0 RETURN { subsetGroup: group1.entity, supersetGroup: group2.entity } ) // Step 2: Calculate merged entities ONLY if merge_similar_entities=true LET mergedResults = @merge_similar_entities && LENGTH(subsetResults) > 0 ? ( FOR group IN @results LET isSubset = LENGTH( FOR rel IN subsetResults FILTER rel.subsetGroup == group.entity RETURN 1 ) > 0 FILTER NOT isSubset LET entitiesToMerge = ( FOR rel IN subsetResults FILTER rel.supersetGroup == group.entity RETURN rel.subsetGroup ) LET mergedSimilar = UNION_DISTINCT(group.similar, entitiesToMerge) RETURN { entity: group.entity, merged_entities: mergedSimilar } ) : [] // Return results based on what was requested RETURN { subset_relationships: subsetResults, merged_entities: mergedResults } """ bind_vars_combined: MutableMapping[str, Any] = { "results": results, "merge_similar_entities": merge_similar_entities, } # Execute combined query combined_result = self.db.aql.execute( combined_query, bind_vars=bind_vars_combined, stream=True ) merged_result = list(cast(Iterable[Dict[str, Any]], combined_result))[0] # Return results based on merge_similar_entities parameter if merge_similar_entities: return { "similar_entities": results, "subset_relationships": merged_result["subset_relationships"], "merged_entities": merged_result["merged_entities"], } else: return { "similar_entities": results, "subset_relationships": merged_result["subset_relationships"], }