"""Vector database type definitions and conversion utilities for UKF models."""
__all__ = [
"BaseVdbType",
"VdbIdType",
"VdbTextType",
"VdbIntegerType",
"VdbBooleanType",
"VdbDurationType",
"VdbTimestampType",
"VdbJsonType",
"VdbVectorType",
"VdbTagsType",
"VdbSynonymsType",
"VdbRelatedType",
"VdbAuthsType",
]
from ..basic.hash_utils import fmt_hash
from ..basic.serialize_utils import dumps_json, loads_json, AhvnJsonEncoder, AhvnJsonDecoder
import datetime
import calendar
try:
import pyarrow as pa
HAS_PYARROW = True
except ImportError:
pa = None
HAS_PYARROW = False
[docs]
class BaseVdbType:
"""Base class for vector database field types with UKF conversion."""
[docs]
def __init__(self, **kwargs):
self.kwargs = kwargs
[docs]
def from_ukf(self, ukf_value, backend=None):
return ukf_value
[docs]
def to_ukf(self, vdb_value, backend=None):
return vdb_value
[docs]
def pyarrow_type(self, **kwargs):
if not HAS_PYARROW:
return None
raise NotImplementedError("Subclasses must implement to_pyarrow")
[docs]
class VdbIdType(BaseVdbType):
"""ID type for vector databases."""
[docs]
def from_ukf(self, ukf_value, backend=None):
return None if ukf_value is None else fmt_hash(ukf_value)
[docs]
def to_ukf(self, vdb_value, backend=None):
return None if vdb_value is None else int(vdb_value)
[docs]
def pyarrow_type(self, **kwargs):
return pa.string() if HAS_PYARROW else None
[docs]
class VdbTextType(BaseVdbType):
"""Text type for vector databases."""
[docs]
def __init__(self, length=None, **kwargs):
super().__init__(**kwargs)
self.length = length
[docs]
def from_ukf(self, ukf_value, backend=None):
return None if ukf_value is None else str(ukf_value)
[docs]
def to_ukf(self, vdb_value, backend=None):
return None if vdb_value is None else str(vdb_value)
[docs]
def pyarrow_type(self, **kwargs):
return pa.string() if HAS_PYARROW else None
[docs]
class VdbIntegerType(BaseVdbType):
"""Integer type for vector databases."""
[docs]
def from_ukf(self, ukf_value, backend=None):
return None if ukf_value is None else int(ukf_value)
[docs]
def to_ukf(self, vdb_value, backend=None):
return None if vdb_value is None else int(vdb_value)
[docs]
def pyarrow_type(self, **kwargs):
return pa.int64() if HAS_PYARROW else None
[docs]
class VdbBooleanType(BaseVdbType):
"""Boolean type for vector databases."""
[docs]
def from_ukf(self, ukf_value, backend=None):
return None if ukf_value is None else bool(ukf_value)
[docs]
def to_ukf(self, vdb_value, backend=None):
return None if vdb_value is None else bool(vdb_value)
[docs]
def pyarrow_type(self, **kwargs):
return pa.bool_() if HAS_PYARROW else None
[docs]
class VdbDurationType(BaseVdbType):
[docs]
def from_ukf(self, ukf_value, backend=None):
return None if ukf_value is None else int(ukf_value.total_seconds())
[docs]
def to_ukf(self, vdb_value, backend=None):
return None if vdb_value is None else datetime.timedelta(seconds=int(vdb_value))
[docs]
def pyarrow_type(self, **kwargs):
return pa.string() if HAS_PYARROW else None
[docs]
class VdbTimestampType(BaseVdbType):
[docs]
def from_ukf(self, ukf_value, backend=None):
return None if ukf_value is None else int(calendar.timegm(ukf_value.utctimetuple()))
[docs]
def to_ukf(self, vdb_value, backend=None):
return None if vdb_value is None else datetime.datetime.fromtimestamp(vdb_value, tz=datetime.timezone.utc)
[docs]
def pyarrow_type(self, **kwargs):
return pa.timestamp("us", tz="UTC") if HAS_PYARROW else None
[docs]
class VdbJsonType(BaseVdbType):
"""JSON type with backend-aware serialization."""
# Backends that support native JSON types
_native_backends = {"milvuslite"}
# Backends that require string serialization
_string_backends = {"lancedb", "chroma"}
[docs]
def from_ukf(self, ukf_value, backend=None):
if ukf_value is None:
return None
if backend in self._native_backends:
return AhvnJsonEncoder.transform(ukf_value)
if backend in self._string_backends:
return dumps_json(AhvnJsonEncoder.transform(ukf_value), indent=None)
return dumps_json(AhvnJsonEncoder.transform(ukf_value), indent=None)
[docs]
def to_ukf(self, vdb_value, backend=None):
if vdb_value is None:
return None
if backend in self._native_backends:
return AhvnJsonDecoder.transform(vdb_value)
if backend in self._string_backends:
return loads_json(AhvnJsonDecoder.transform(vdb_value))
return loads_json(AhvnJsonDecoder.transform(vdb_value))
[docs]
def pyarrow_type(self, **kwargs):
return pa.string() if HAS_PYARROW else None
[docs]
class VdbVectorType(BaseVdbType):
"""Vector/embedding type for vector databases."""
[docs]
def __init__(self, dimension=None, **kwargs):
super().__init__(**kwargs)
self.dimension = dimension
[docs]
def from_ukf(self, ukf_value, backend=None):
return None if ukf_value is None else [float(x) for x in ukf_value]
[docs]
def to_ukf(self, vdb_value, backend=None):
return None if vdb_value is None else [float(x) for x in vdb_value]
[docs]
def pyarrow_type(self, dim: int = 768, **kwargs):
return pa.list_(pa.float32(), dim) if HAS_PYARROW else None
[docs]
class VdbSynonymsType(BaseVdbType):
"""Synonyms type with backend-aware serialization."""
# Backends that support native array types
_native_backends = {"milvuslite"}
# Backends that require string serialization
_string_backends = {"lancedb", "chroma"}
[docs]
def from_ukf(self, ukf_value, backend=None):
if ukf_value is None:
return None
synonyms_list = sorted(list(ukf_value))
if backend in self._native_backends:
return synonyms_list
if backend in self._string_backends:
return "\n".join(synonyms_list)
return "\n".join(synonyms_list)
[docs]
def to_ukf(self, vdb_value, backend=None):
if vdb_value is None:
return set()
if backend in self._native_backends:
return set(vdb_value)
if backend in self._string_backends:
return set(vdb_value.split("\n")) if vdb_value else set()
return set(vdb_value.split("\n")) if vdb_value else set()
[docs]
def pyarrow_type(self, **kwargs):
return pa.string() if HAS_PYARROW else None
[docs]
class VdbAuthsType(BaseVdbType):
"""Auths type with backend-aware serialization."""
# Backends that support native array types
_native_backends = {"milvuslite"}
# Backends that require string serialization
_string_backends = {"lancedb", "chroma"}
[docs]
def from_ukf(self, ukf_value, backend=None):
if ukf_value is None:
return None
auths_list = sorted(list(ukf_value))
if backend in self._native_backends:
return auths_list
if backend in self._string_backends:
return "\n".join(auths_list)
return "\n".join(auths_list)
[docs]
def to_ukf(self, vdb_value, backend=None):
if vdb_value is None:
return set()
if backend in self._native_backends:
return set(vdb_value)
if backend in self._string_backends:
return set(vdb_value.split("\n")) if vdb_value else set()
return set(vdb_value.split("\n")) if vdb_value else set()
[docs]
def pyarrow_type(self, **kwargs):
return pa.string() if HAS_PYARROW else None