"""
SQL Compiler for KLOp JSON IR.
This module provides functionality to compile KLOp JSON IR expressions
into SQLAlchemy query expressions for database backends.
"""
from __future__ import annotations
__all__ = ["SQLCompiler"]
from typing import Any, Dict, Optional, TYPE_CHECKING
from ..basic.log_utils import get_logger
from ..basic.debug_utils import error_str
from ..deps import deps
if TYPE_CHECKING:
from sqlalchemy.sql.elements import ClauseElement
def get_sa():
return deps.load("sqlalchemy")
def get_sa_elements():
return deps.load("sqlalchemy.sql.elements")
logger = get_logger(__name__)
[文档]
class SQLCompiler:
"""Compiler that converts KLOp JSON IR to SQLAlchemy expressions."""
@staticmethod
def _parse_op(orms: Dict[str, Any], entity: str, key: str, op: str, val: Any) -> ClauseElement:
"""Build expression for a specific operator.
Args:
orms: Mapping of entity names to SQLAlchemy model classes
entity: The current entity name for dimension mapping
key: Current field context for operator expressions
op: Operator type (==, !=, <, >, <=, >=, LIKE, ILIKE, IN)
val: Value for the operator
Returns:
SQLAlchemy expression for the operator
Raises:
ValueError: If field is None or operator is unknown
"""
attr = getattr(orms[entity], orms[entity].alias(key))
if op == "==":
return attr == val
elif op == "!=":
return attr != val
elif op == "<":
return attr < val
elif op == "<=":
return attr <= val
elif op == ">":
return attr > val
elif op == ">=":
return attr >= val
elif op == "LIKE":
return attr.like(str(val))
elif op == "ILIKE":
return attr.ilike(str(val))
elif op == "IN":
if not isinstance(val, (list, tuple, set)):
raise ValueError("IN operator requires a list, tuple, or set of values")
return attr.in_([v for v in val])
else:
raise ValueError(f"Unknown/Incorrectly placed operator: '{op}'. " f"Supported operators: ==, !=, <, <=, >, >=, LIKE, ILIKE, IN")
@staticmethod
def _parse_nf(orms: Dict[str, Any], entity: str, nf_entity: str, nf: Dict[str, Any]) -> ClauseElement:
"""Build expression for NF (normalized form) operator.
Args:
orms: Mapping of entity names to SQLAlchemy model classes
entity: The current entity name for dimension mapping
nf_entity: The entity name for the NF check (dimension table)
nf: Dictionary containing NF attributes and their values
Values can be simple values or nested operators (dict)
Returns:
SQLAlchemy expression for the NF operator
Example:
>>> # Basic NF: {"slot": "TOPIC", "value": "math"}
>>> # NF with LIKE: {"slot": "TOPIC", "value": {"LIKE": "%math%"}}
"""
# Build WHERE conditions for dimension table
# Each nf key-value pair becomes a condition
conditions = []
for nf_key, nf_val in nf.items():
# Handle nested operators in NF values (e.g., LIKE, GT, etc.)
if isinstance(nf_val, dict):
# Recursively parse nested operator
parsed = SQLCompiler._parse(orms=orms, entity=nf_entity, field=nf_key, expr=nf_val)
conditions.append(parsed)
else:
# Simple equality check
# Use alias() to handle reserved field names (e.g., "value" -> "value_")
attr = getattr(orms[nf_entity], orms[nf_entity].alias(nf_key))
conditions.append(attr == nf_val)
return get_sa().exists(
get_sa()
.select(get_sa().distinct(orms[nf_entity].id))
.where(
orms[entity].id == orms[nf_entity].ukf_id,
*conditions,
)
)
@staticmethod
def _parse_json(orms: Dict[str, Any], entity: str, field: str, path: str, val: Any) -> ClauseElement:
"""Build expression for JSON path queries.
Uses database-specific JSON operators to query nested fields.
Supports PostgreSQL JSONB, DuckDB JSON, and other JSON-capable databases.
Args:
orms: Mapping of entity names to SQLAlchemy model classes
entity: The current entity name for dimension mapping
field: The JSON column name (e.g., "content_resources")
path: Dot-notation path within JSON (e.g., "stats.views")
val: Value to match. Can be:
- Simple value: exact match
- Operator dict: comparison/pattern matching
- Ellipsis (...): field existence check
- {"NOT": ...}: field non-existence check
Returns:
SQLAlchemy expression for JSON queries
Examples:
PostgreSQL JSONB:
# Comparison: content_resources->'stats'->>'views' > 2000
if isinstance(val, dict) and len(val) == 1:
op, op_val = next(iter(val.items()))
# Build path expression with proper type handling
path_expr = attr
for part in path_parts[:-1]:
path_expr = path_expr.op("->")(literal(part))
path_expr = path_expr.op("->>")(literal(path_parts[-1]))
content_resources @> '{"type": "categorical"}'
DuckDB JSON:
json_extract(content_resources, '$.stats.views') > 2000
"""
from sqlalchemy import cast, String, Integer, Float, Boolean, literal
from sqlalchemy.dialects import postgresql
attr = getattr(orms[entity], orms[entity].alias(field))
# Convert dot notation to path array: "stats.views" -> ["stats", "views"]
path_parts = path.split(".")
# Handle Ellipsis: existence check
if val is ...:
# PostgreSQL: content_resources ? 'key' or content_resources #> '{path}' IS NOT NULL
# DuckDB: json_extract(...) IS NOT NULL
if path_parts:
# Build path expression and check if it's not null
path_expr = attr
for part in path_parts[:-1]:
path_expr = path_expr.op("->")(part)
path_expr = path_expr.op("->>")(path_parts[-1])
return path_expr.isnot(None)
else:
return attr.isnot(None)
# Handle value matching
if isinstance(val, dict) and len(val) == 1:
op, op_val = next(iter(val.items()))
# Handle NOT(...) for non-existence check
if op == "NOT":
if op_val is ...:
# Field non-existence: IS NULL
path_expr = attr
for part in path_parts[:-1]:
path_expr = path_expr.op("->")(literal(part))
path_expr = path_expr.op("->>")(literal(path_parts[-1]))
return path_expr.is_(None)
# NOT(value) - negation of the inner expression
inner = SQLCompiler._parse_json(orms, entity, field, path, op_val)
return get_sa().not_(inner)
# Handle comparison operators
if op in ("==", "!=", "<", "<=", ">", ">=", "LIKE", "ILIKE", "IN"):
# Extract the value at path as text, then cast if needed
path_expr = attr
for part in path_parts[:-1]:
path_expr = path_expr.op("->")(literal(part))
path_expr = path_expr.op("->>")(literal(path_parts[-1])) # ->> extracts as text
# Cast based on value type for comparisons
if op in (">", ">=", "<", "<="):
if isinstance(op_val, int):
path_expr = cast(path_expr, Integer)
elif isinstance(op_val, float):
path_expr = cast(path_expr, Float)
# Apply operator
if op == "==":
return path_expr == str(op_val) if not isinstance(op_val, (int, float)) else path_expr == op_val
elif op == "!=":
return path_expr != str(op_val) if not isinstance(op_val, (int, float)) else path_expr != op_val
elif op == "<":
return path_expr < op_val
elif op == "<=":
return path_expr <= op_val
elif op == ">":
return path_expr > op_val
elif op == ">=":
return path_expr >= op_val
elif op == "LIKE":
return path_expr.like(str(op_val))
elif op == "ILIKE":
return path_expr.ilike(str(op_val))
elif op == "IN":
# For IN, compare text values
return path_expr.in_([str(v) for v in op_val])
# Handle AND/OR for complex nested expressions
if op == "AND":
exprs = [SQLCompiler._parse_json(orms, entity, field, path, v) for v in op_val]
exprs = [expr for expr in exprs if expr is not None]
return get_sa().and_(*exprs) if exprs else None
if op == "OR":
exprs = [SQLCompiler._parse_json(orms, entity, field, path, v) for v in op_val]
exprs = [expr for expr in exprs if expr is not None]
return get_sa().or_(*exprs) if exprs else None
# Simple value match: content_resources->>'path' = 'value'
path_expr = attr
for part in path_parts[:-1]:
path_expr = path_expr.op("->")(literal(part))
path_expr = path_expr.op("->>")(literal(path_parts[-1]))
# Cast booleans to strings for comparison
if isinstance(val, bool):
val_str = "true" if val else "false"
return path_expr == val_str
return path_expr == str(val) if not isinstance(val, (int, float)) else cast(path_expr, type(val).__name__) == val
@staticmethod
def _parse(
orms: Dict[str, Any],
entity: str = "main",
field: Optional[str] = None,
expr: Dict[str, Any] = None,
) -> ClauseElement:
"""Recursively build SQLAlchemy expressions from filter nodes.
Args:
orms: Mapping of entity names to SQLAlchemy model classes
entity: The current entity name for dimension mapping
field: Current field context for operator expressions
expr: The filter expression dictionary to parse
Returns:
SQLAlchemy expression object or None if no valid expression
Raises:
ValueError: If the expr structure is invalid
"""
if not expr:
return None
if len(expr) > 1:
raise NotImplementedError("Complex expressions with multiple root keys not supported.")
op, val = next(iter(expr.items()))
try:
if op in ("AND", "OR"):
exprs = [SQLCompiler._parse(orms=orms, entity=entity, field=field, expr=v) for v in val]
exprs = [expr for expr in exprs if expr is not None]
if len(exprs) == 0:
# AND([]) = TRUE (all zero conditions satisfied) -> no filter
# OR([]) = FALSE (none of zero alternatives true) -> always false
if op == "AND":
return None # No filter = match all
else: # OR
# Return a condition that's always false
return get_sa().literal(False)
if len(exprs) == 1:
return exprs[0]
return get_sa().and_(*exprs) if op == "AND" else get_sa().or_(*exprs)
if op == "NOT":
return get_sa().not_(SQLCompiler._parse(orms=orms, entity=entity, field=field, expr=val))
if op.startswith("FIELD:"):
if field is not None:
raise ValueError(f"Nested FIELD: {op} inside {field} not allowed.")
return SQLCompiler._parse(orms=orms, entity=entity, field=op.split("FIELD:")[1], expr=val)
if op == "NF":
return SQLCompiler._parse_nf(orms=orms, entity=entity, nf_entity=field, nf=val)
if op == "JSON":
# JSON operator: {"JSON": {key1: value1, key2: value2, ...}}
# Similar to NF but for nested JSON field queries
if field is None:
raise ValueError("JSON operator requires a field context (FIELD:).")
if not isinstance(val, dict):
raise ValueError("JSON operator requires a dict value")
# Build AND expression for all key-value pairs
conditions = []
for json_key, json_val in val.items():
# Use _parse_json for each key-value pair
# json_val could be a simple value, Ellipsis, or a parsed expression dict
condition = SQLCompiler._parse_json(orms=orms, entity=entity, field=field, path=json_key, val=json_val)
if condition is not None:
conditions.append(condition)
if not conditions:
return None
if len(conditions) == 1:
return conditions[0]
return get_sa().and_(*conditions)
if field is None:
raise ValueError(f"Operator '{op}' requires a field context (FIELD:).")
return SQLCompiler._parse_op(orms=orms, entity=entity, key=field, op=op, val=val)
except Exception as e:
raise ValueError(f"Error processing expression key '{op}'.\n{expr}\n{error_str(e)}")
[文档]
@staticmethod
def compile(
orms: Dict[str, Any],
expr: Optional[Dict[str, Any]] = None,
**kwargs,
) -> ClauseElement:
"""Convert a KLOp JSON IR to SQLAlchemy query expressions.
Args:
orms: Mapping of entity names to SQLAlchemy model classes
expr: The parsed filter expression dictionary (optional)
**kwargs: Filter conditions as key-value pairs
Returns:
SQLAlchemy expression object or None
Raises:
ValueError: If filter structure is invalid
"""
from ..klop import KLOp
exprs = list()
if expr:
exprs.append(SQLCompiler._parse(orms=orms, expr=expr))
if kwargs:
exprs.append(SQLCompiler._parse(orms=orms, expr=KLOp.expr(**kwargs)))
if len(exprs) == 0:
return None
if len(exprs) == 1:
return exprs[0]
return get_sa().and_(*exprs)