Source code for ahvn.adapter.mdb

"""\
MongoDB adapter for converting between UKF objects and MongoDB documents.
"""

__all__ = [
    "MongoUKFAdapter",
]

from .base import BaseUKFAdapter

from ..utils.mdb.types import MONGO_FIELD_TYPES, MONGO_VIRTUAL_FIELD_TYPES
from ..ukf.base import BaseUKF
from ..utils.basic.log_utils import get_logger

from typing import Any, Dict, List, Optional

logger = get_logger(__name__)


[docs] class MongoUKFAdapter(BaseUKFAdapter): """\ MongoDB adapter for converting between UKF objects and MongoDB documents. Uses type-based field mapping system (like ORMUKFAdapter and VdbUKFAdapter): - Dynamically maps UKF fields based on BaseUKF.schema() - Uses MONGO_FIELD_TYPES for type conversion - Handles virtual fields (_key, _vec) - Supports multi-valued fields as embedded arrays Example: ```python adapter = MongoUKFAdapter(name="test_adapter") # Convert UKF to MongoDB document ukf = BaseUKF(id=1, name="test", type="test_type") doc = adapter.from_ukf(ukf) # doc = {"_id": 1, "name": "test", "type": "test_type", ...} # Convert MongoDB document back to UKF data = adapter.to_ukf_data(doc) ukf_restored = BaseUKF.from_dict(data, polymorphic=True) ``` """ virtual_fields = tuple(k for k in MONGO_VIRTUAL_FIELD_TYPES.keys() if k not in {"_key", "_vec"}) indices = [ [("type", 1), ("name", 1), ("version", 1), ("variant", 1)], [("type", 1), ("workspace", 1), ("collection", 1)], [("type", 1), ("timestamp", -1)], [("creator", 1), ("owner", 1)], [("expiration_timestamp", 1)], [("tags.slot", 1), ("tags.value", 1)], [("auths.user_id", 1), ("auths.authority", 1)], [("synonyms", 1)], [("related.relation", 1)], ]
[docs] def __init__( self, name: str, include: Optional[List[str]] = None, exclude: Optional[List[str]] = None, **kwargs, ): """\ Initialize adapter with field selection. Args: name: Adapter name include: Fields to include (if None, includes all BaseUKF fields) exclude: Fields to exclude **kwargs: Additional parameters (reserved for future use) """ super().__init__(name=name, include=include, exclude=exclude) self.key_field = f"_{self.name}_key" self.embedding_field = f"_{self.name}_vec" self.embedding_idx = f"_{self.name}_vec_idx"
[docs] def parse_id(self, key: int): return MONGO_FIELD_TYPES["id"].from_ukf(int(key))
[docs] def create_indices(self, mdb): for idx_spec in self.indices: try: if not all(key in self.fields for key, _ in idx_spec): continue name = f"idx_{'_'.join([k[0].replace('.', '_') for k in idx_spec])}" mdb.create_index(idx_spec, name=name) except Exception as e: logger.warning(f"Index creation failed: {e}")
[docs] def create_vector_index(self, mdb, dim): return mdb.create_vector_index( embedding_idx=self.embedding_idx, embedding_field=self.embedding_field, dim=dim, )
[docs] def from_ukf(self, kl: BaseUKF, key: Optional[str] = None, embedding: Optional[List[float]] = None) -> Dict[str, Any]: """\ Convert BaseUKF to MongoDB document using type-based mapping. Iterates through self.fields and uses MONGO_FIELD_TYPES for conversion: - For each field in self.fields: - Get field type from BaseUKF.schema() - Get corresponding MongoFieldType from MONGO_FIELD_TYPES - Call field_type.from_ukf(ukf_value) to convert - Handle virtual fields separately (_key, _vec) Args: kl: BaseUKF object to convert key: Optional key for vector search (stored as _key) embedding: Optional embedding vector (stored as _vec) Returns: Dict with structure: { "_id": <int>, "name": "...", "type": "...", "tags": [{"slot": "...", "value": "..."}, ...], "auths": [{"user_id": "...", "authority": "..."}, ...], "synonyms": ["...", "..."], "related": [{...}, ...], "_key": "...", "_vec": [...], ...other fields based on include/exclude... } """ document = {} ukf_schema = BaseUKF.schema() for field_name in self.fields: if field_name in self.virtual_fields: continue ukf_field_type = ukf_schema.get(field_name) if ukf_field_type is None: continue field_type = MONGO_FIELD_TYPES.get(ukf_field_type.name) if field_type: ukf_value = getattr(kl, field_name) document[field_name] = field_type.from_ukf(ukf_value) # Handle virtual fields document["_id"] = MONGO_FIELD_TYPES["id"].from_ukf(kl.id) document["expiration_timestamp"] = MONGO_FIELD_TYPES["timestamp"].from_ukf(kl.expiration_timestamp) document[self.key_field] = kl.name if key is None else key document[self.embedding_field] = None if embedding is None else MONGO_FIELD_TYPES["vector"].from_ukf(embedding) return document
[docs] def to_ukf_data(self, document: Dict[str, Any]) -> Dict[str, Any]: """\ Convert MongoDB document to UKF initialization dict using type-based mapping. Iterates through self.fields and uses MONGO_FIELD_TYPES for conversion: - For each field in self.fields: - Get field type from BaseUKF.schema() - Get corresponding MongoFieldType from MONGO_FIELD_TYPES - Call field_type.to_ukf(mongo_value) to convert Args: document: MongoDB document to convert Returns: Dict suitable for BaseUKF initialization """ data = {} ukf_schema = BaseUKF.schema() for field_name in self.fields: if field_name in self.virtual_fields: continue ukf_field_type = ukf_schema.get(field_name) if ukf_field_type is None: continue field_type = MONGO_FIELD_TYPES.get(ukf_field_type.name) if field_type: mongo_value = document.get(field_name) data[field_name] = field_type.to_ukf(mongo_value) return data
[docs] def from_result(self, result: Any) -> Any: """\ Convert a query result from MongoDB to the appropriate representation. Args: result: The raw result from a MongoDB query Returns: The converted representation (currently returns as-is) """ # For future extensions if needed return result