ahvn.klengine.vector_engine 源代码

"""Universal vector KL engine implementation."""

from __future__ import annotations

__all__ = [
    "VectorKLEngine",
]
from typing import Any, Dict, Iterable, List, Optional, Generator, Tuple, TYPE_CHECKING

from ..utils.deps import deps

if TYPE_CHECKING:
    from llama_index.core.vector_stores.types import VectorStoreQuery
    from llama_index.core.schema import TextNode

from ..utils.vdb.compiler import VectorCompiler
from ..utils.klop import KLOp
from ..utils.basic.config_utils import HEAVEN_CM
from ..utils.basic.log_utils import get_logger
from ..utils.basic.debug_utils import raise_mismatch
from ..utils.basic.progress_utils import Progress

from .base import BaseKLEngine
from ..ukf.base import BaseUKF
from ..ukf.templates.basic.dummy import DummyUKFT
from ..adapter.vdb import VdbUKFAdapter
from ..klstore.vdb_store import VectorKLStore
from ..utils.vdb.base import VectorDatabase

logger = get_logger(__name__)


def get_llama_index_types():
    return deps.load("llama_index.core.vector_stores.types")


[文档] class VectorKLEngine(BaseKLEngine): """\ A vector-based search KLEngine implementation that provides multiple search interfaces. This class extends BaseKLEngine with specialized search methods: - Vector similarity search through the default _search method - Filtered vector search through _search method - LLM-powered natural language to vector search through _search_auto method The engine is designed to work with vector data that can be searched using semantic similarity and filtered using various metadata conditions. Search Methods: _search_vector(query, topk, include, **filters): Perform vector similarity search with optional metadata filters. _search = _search_vector: Alias for _search_vector for default search behavior. Abstract Methods (inherited from BaseKLEngine): _upsert(kl): Insert or update a KL in the engine. _remove(key): Remove a KL from the engine by its key (id). _clear(): Clear all KLs from the engine. """ inplace: bool = True recoverable: bool = True
[文档] def __init__( self, storage: VectorKLStore, inplace: bool = True, include: Optional[List[str]] = None, exclude: Optional[List[str]] = None, filters: Dict[str, Any] = None, name: Optional[str] = None, condition: Optional[Any] = None, *args, **kwargs, ): """Initialize the VectorKLEngine. Args: storage: attach VectorKLEngine to a VectorKLStore (required). inplace: If True, search directly on storage vector database; if False, create a copied collection with included fields. include (if inplace=False): List of BaseUKF field names to include. If None, includes all fields. Default is None. exclude (if inplace=False): List of BaseUKF field names to exclude. If None, excludes no fields. Default is None. Notice that exclude is applied after include, so if a field is in both include and exclude, it will be excluded. It is recommended to use only one of include or exclude. filters: global filters that will be applied to all searches. name: Name of the KLEngine instance. If None, defaults to "{storage.name}_vec_idx". condition: Optional upsert/insert condition to apply to the KLEngine. KLs that do not satisfy the condition will be ignored. If None, all KLs are accepted. *args: Additional positional arguments passed to VectorKLEngine. **kwargs: Additional keyword arguments passed to VectorKLEngine. """ if inplace and not isinstance(storage, VectorKLStore): raise ValueError("When inplace=True, storage must be a VectorKLStore instance") super().__init__(storage=storage, inplace=inplace, name=name or f"{storage.name}_vec_idx", condition=condition, *args, **kwargs) self.exprs = None if not filters else KLOp.expr(**filters) if self.inplace: self.vdb = self.storage.vdb self.adapter = self.storage.adapter else: provider = kwargs.get("provider") or HEAVEN_CM.get("vdb.default_provider") collection = kwargs.get("collection") or self.name or HEAVEN_CM.get(f"vdb.providers.{provider}.collection") encoder = kwargs.get("encoder") embedder = kwargs.get("embedder") connection_args = {k: v for k, v in kwargs.items() if k not in ["collection", "provider", "encoder", "embedder", "include", "exclude"]} self.vdb = VectorDatabase(collection=collection, provider=provider, encoder=encoder, embedder=embedder, **connection_args) self.vdb.connect() # Connect to the vector database self.adapter = VdbUKFAdapter(backend=self.vdb.backend, name=self.name, include=include, exclude=exclude) self._init() self.recoverable = self.adapter.recoverable
def _init(self): if self.inplace: return # Initialize the collection with a dummy node, then clear it # This ensures the collection schema is properly set up dummy = DummyUKFT(name="<dummy>", content="This is a dummy node to initialize the collection.") self.vdb.vdb.add(self._batch_convert([dummy])) self.vdb.flush() # Remove the dummy node self.remove(dummy.id) self.vdb.flush() def _batch_convert(self, kls: Iterable[BaseUKF]) -> List[TextNode]: kls_list = list(kls) non_dummy_kls = [kl for kl in kls_list if not isinstance(kl, DummyUKFT)] non_dymmy_key_embeddings = self.vdb.batch_k_encode_embed(non_dummy_kls) if non_dummy_kls else [] non_dummy_mapping = dict(zip([kl.id for kl in non_dummy_kls], non_dymmy_key_embeddings)) dummy_emb = self.vdb.k_embed("<dummy>") nodes = [] for kl in kls_list: if isinstance(kl, DummyUKFT): key, embedding = "<dummy>", dummy_emb else: key, embedding = non_dummy_mapping[kl.id] nodes.append(self.adapter.from_ukf(kl=kl, key=key, embedding=embedding)) return nodes def _search_vector( self, query: str = None, topk: int = 20, fetchk: Optional[int] = 100, include: Optional[Iterable[str]] = None, *args, **kwargs ) -> List[Dict[str, Any]]: """\ Perform a vector similarity search using metadata filters. This method applies structured filters to search through the knowledge items using vector similarity combined with metadata filtering. Args: query (str): The text query to search for using vector similarity. If None, only filter-based search is performed without vector search. topk (int): Number of top results to return. Default is 20. fetchk (Optional[int]): Number of top results to fetch from the vector database before applying filters or reranking. include (Optional[Iterable[str]]): The keys to include in the search results. Supported keys include: - 'id': The unique identifier of the KL (BaseUKF.id). - 'kl': The KL object itself (BaseUKF). - 'score': The similarity score from vector search. - 'filter': The applied metadata filter for debugging. - 'vsq': The VectorStoreQuery object for debugging. - 'key': The vector search key (_key field). - 'embedding': The vector embedding (_vec field). Defaults to None, which resolves to ['id', 'kl', 'score']. *args: Additional positional arguments. **kwargs: Filter conditions as keyword arguments. Returns: List[Dict[str, Any]]: The search results matching the applied filters and query. Raises: ValueError: If filter fields are not in schema when not inplace. """ fields = set(self.adapter.fields) for field_name in kwargs.keys(): raise_mismatch( fields, got=field_name, name="search filter field", mode="raise", comment="Check `include`, `exclude` or BaseUKF definition.", ) _supported_includes = ["id", "kl", "score", "filter", "vsq", "key", "embedding", "qkey", "qembedding"] include_set = set(include) if include is not None else {"id", "kl", "score"} for inc in include_set: raise_mismatch( _supported_includes, got=inc, name="search `include` type", mode="warn", comment="It will be ignored in the return results.", thres=1.0, ) # Build metadata filters metadata_filters = VectorCompiler.compile(expr=self.exprs, **kwargs) if query is None: query_key, query_embedding = None, None else: query_key, query_embedding = self.vdb.q_encode_embed(query) fetchk = max(fetchk or topk, topk, 0) # TODO: throw in fetchk somewhere in VDB query query_stmt: VectorStoreQuery = get_llama_index_types().VectorStoreQuery( query_embedding=query_embedding, similarity_top_k=topk, filters=metadata_filters, ) try: results = self.vdb.vdb.query(query_stmt) except Exception as e: logger.error(f"Vector database query failed: {e}") return list() nodes = results.nodes or [] similarities = results.similarities or [None] * len(nodes) return [ { "id": int(node.node_id) if isinstance(node.node_id, (int, str)) and str(node.node_id).isdigit() else node.node_id, **({"score": float(similarity) if similarity is not None else 0.0} if "score" in include_set else {}), **({"filter": metadata_filters} if "filter" in include_set else {}), **({"vsq": query_stmt} if "vsq" in include_set else {}), **({"key": query_key} if "key" in include_set else {}), **({"embedding": query_embedding} if "embedding" in include_set else {}), **({"qkey": query_key} if "qkey" in include_set else {}), **({"qembedding": query_embedding} if "qembedding" in include_set else {}), **({"kl": self.adapter.to_ukf(entity=node)} if self.recoverable and ("kl" in include_set) else {}), } for node, similarity in zip(nodes, similarities) ] def _search( self, query: str = None, topk: int = 20, fetchk: Optional[int] = 100, include: Optional[Iterable[str]] = None, *args, **kwargs ) -> List[Dict[str, Any]]: """Alias for _search_vector for default search behavior.""" return self._search_vector(query=query, topk=topk, fetchk=fetchk, include=include, *args, **kwargs) def _get(self, key: int, default: Any = ...) -> Optional[BaseUKF]: if not self.recoverable: return default return VectorKLStore._get(self, key, default=default) def _has(self, key: int) -> bool: return VectorKLStore._has(self, key) def _upsert(self, kl: BaseUKF, **kwargs): if self.inplace: return VectorKLStore._upsert(self, kl, **kwargs) def _insert(self, kl, **kwargs): if self.inplace: return VectorKLStore._insert(self, kl, **kwargs) def _batch_upsert(self, kls, progress: Progress = None, **kwargs): if self.inplace: return VectorKLStore._batch_upsert(self, kls, progress=progress, **kwargs) def _batch_insert(self, kls, progress: Progress = None, **kwargs): if self.inplace: return VectorKLStore._batch_insert(self, kls, progress=progress, **kwargs) def _remove(self, key: int, **kwargs): if self.inplace: return VectorKLStore._remove(self, key, **kwargs) def _batch_remove(self, keys, progress: Progress = None, **kwargs): if self.inplace: return VectorKLStore._batch_remove(self, keys, progress=progress, **kwargs) def __len__(self) -> int: if self.inplace: return len(self.storage) else: return len(self.vdb._get_all_nodes()) def _itervalues(self) -> Generator[BaseUKF, None, None]: if self.inplace: yield from self.storage._itervalues() else: for node in self.vdb._get_all_nodes(): yield self.adapter.to_ukf(entity=node) def _clear(self): if self.inplace: return self.vdb.clear()
[文档] def close(self): """\ Closes the engine. """ if self.vdb is not None: self.vdb.close() self.vdb = None
[文档] def k_encode(self, kl: BaseUKF) -> str: """Encode a BaseUKF using the VDB's key encoder.""" return self.vdb.k_encode(kl)
[文档] def k_embed(self, encoded_kl: str) -> List[float]: """Embed an encoded BaseUKF using the VDB's key embedder.""" return self.vdb.k_embed(encoded_kl)
[文档] def batch_k_encode(self, kls: Iterable[BaseUKF]) -> List[str]: """Encode a batch of BaseUKFs using the VDB's key encoder.""" return self.vdb.batch_k_encode(kls)
[文档] def batch_k_embed(self, encoded_kls: List[str]) -> List[List[float]]: """Embed a batch of encoded BaseUKFs using the VDB's key embedder.""" return self.vdb.batch_k_embed(encoded_kls)
[文档] def q_encode(self, query: str) -> str: """Encode a query string using the VDB's query encoder.""" return self.vdb.q_encode(query)
[文档] def q_embed(self, encoded_query: str) -> List[float]: """Embed an encoded query string using the VDB's query embedder.""" return self.vdb.q_embed(encoded_query)
[文档] def batch_q_encode(self, queries: Iterable[str]) -> List[str]: """Encode a batch of query strings using the VDB's query encoder.""" return self.vdb.batch_q_encode(queries)
[文档] def batch_q_embed(self, encoded_queries: List[str]) -> List[List[float]]: """Embed a batch of encoded query strings using the VDB's query embedder.""" return self.vdb.batch_q_embed(encoded_queries)
[文档] def k_encode_embed(self, kl: BaseUKF) -> Tuple[str, List[float]]: """Encode and embed a BaseUKF using the VDB's key encoder and embedder.""" return self.vdb.k_encode_embed(kl)
[文档] def batch_k_encode_embed(self, kls: Iterable[BaseUKF]) -> List[Tuple[str, List[float]]]: """Encode and embed a batch of BaseUKFs using the VDB's key encoder and embedder.""" return self.vdb.batch_k_encode_embed(kls)
[文档] def q_encode_embed(self, query: str) -> Tuple[str, List[float]]: """Encode and embed a query string using the VDB's query encoder and embedder.""" return self.vdb.q_encode_embed(query)
[文档] def batch_q_encode_embed(self, queries: Iterable[str]) -> List[Tuple[str, List[float]]]: """Encode and embed a batch of query strings using the VDB's query encoder and embedder.""" return self.vdb.batch_q_encode_embed(queries)
@property def embedding_field(self): """Get the vector field name used by this engine.""" return getattr(self.adapter, "embedding_field", "_vec") @property def adapter(self): """Get the adapter used by this engine.""" return self._adapter if hasattr(self, "_adapter") else None @adapter.setter def adapter(self, value): """Set the adapter used by this engine.""" self._adapter = value