Source code for ahvn.utils.basic.config_utils

__all__ = [
    "dmerge",
    "dget",
    "dset",
    "dunset",
    "dsetdef",
    "dflat",
    "dunflat",
    "ConfigManager",
    "HEAVEN_CM",
    "hpj",
    "encrypt_config",
]

from .log_utils import get_logger

logger = get_logger(__name__)
from .misc_utils import lflat
from .debug_utils import raise_mismatch
from .path_utils import pj, get_file_dir

from typing import Any, Dict, List, Optional, Literal, Generator, Iterable
from copy import deepcopy

__rnd_sep = "#@#@#"


def _split_key_path(key_path: str) -> List[str]:
    """\
    Split a key path string into a list of keys, handling escaped dots.
    """
    return [key.replace(__rnd_sep, ".") for key in key_path.replace("\\.", __rnd_sep).split(".") if key]


[docs] def dmerge(iterable: Iterable[Dict[str, Any]], start: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """\ Merge multiple dictionaries into a single dictionary, with later dictionaries overwriting earlier ones. Nested dictionaries are merged recursively while all other non-dictionary values are overwritten. Warning: The merging of dictionaries is not order-preserving. The order of keys in the resulting dictionary may not match the order of keys in the input dictionaries. Args: iterable (Iterable[Dict[str, Any]]): An iterable of dictionaries to merge. start (Optional[Dict[str, Any]]): An optional starting dictionary to merge into. Returns: Dict[str, Any]: The merged dictionary. Examples: >>> d1 = {'a': 1, 'b': {'c': 2, 'f': 5}} >>> d2 = {'b': {'d': 3, 'f': 0}, 'e': 4} >>> dmerge([d1, d2]) {'a': 1, 'b': {'c': 2, 'f': 0, 'd': 3}, 'e': 4} >>> dmerge([d2, d1]) {'b': {'d': 3, 'f': 5, 'c': 2}, 'e': 4, 'a': 1} >>> dmerge([d1, d2], start={'a': 0, 'g': 6}) {'a': 1, 'g': 6, 'b': {'c': 2, 'f': 0, 'd': 3}, 'e': 4} """ if start is None: start = dict() else: start = deepcopy(start) for d in iterable: if not d: continue if "_OVERWRITE_" in d and d["_OVERWRITE_"]: start = deepcopy({k: v for k, v in d.items() if k != "_OVERWRITE_"}) continue for k, v in d.items(): if (k in start) and isinstance(v, dict): start[k] = dmerge([v], start=start[k]) else: start[k] = v return start
[docs] def dget(d: Dict[str, Any], key_path: Optional[str] = None, default: Optional[Any] = None) -> Any: """\ Get a value from a dictionary using a dot-separated key path. If the key path does not exist, return the default value. Args: d (Dict[str, Any]): The dictionary to search. key_path (Optional[str]): The dot-separated key path to the value. default (Optional[Any]): The default value to return if the key path does not exist. Returns: Any: The value at the specified key path or the default value if not found. Examples: >>> dget({'a': {'b': {'c': 42}}}, 'a.b.c') 42 >>> dget({'a': {'b': {'c': 42}}}, 'a.b.d', default='not found') 'not found' >>> dget({'a': {'b': {'c': [1, 2, 3]}}}, 'a.b.c[1]') 2 """ if key_path is None: return d keys = _split_key_path(key_path) for key in keys: if d is None: return default if key.endswith("]"): k, idx = key[:-1].rsplit("[", 1) idx = int(idx) if (k not in d) or (not isinstance(d[k], list)) or (idx >= len(d[k])) or (idx < -len(d[k])): return default d = d[k][idx] elif key not in d: return default else: d = d[key] return d
[docs] def dset(d: Dict[str, Any], key_path: str, value: Optional[Any] = None) -> bool: """\ Set a value in a dictionary using a dot-separated key path. If the key path does not exist, it will be created. Args: d (Dict[str, Any]): The dictionary to modify. key_path (str): The dot-separated key path to the value. value (Optional[Any]): The value to set at the specified key path. Returns: bool: True if the value was set successfully, False if the key path is invalid. Examples: >>> d = {} >>> dset(d, 'a.b.c', 42) True >>> d {'a': {'b': {'c': 42}}} """ if key_path is None: if not isinstance(value, dict): return False d.update(value) return True keys = _split_key_path(key_path) for key in keys[:-1]: if key.endswith("]"): k, idx = key[:-1].rsplit("[", 1) idx = int(idx) if (k not in d) or (not isinstance(d[k], list)) or (idx >= len(d[k])) or (idx < -len(d[k])): return False d = d[k][idx] elif key not in d: d[key] = dict() d = d[key] else: d = d[key] last_key = keys[-1] if last_key.endswith("]"): k, idx = last_key[:-1].rsplit("[", 1) idx = int(idx) if k not in d: d[k] = list() if (not isinstance(d[k], list)) or (idx < -len(d[k])): return False if idx >= len(d[k]): d[k].extend([None] * (idx - len(d[k]) + 1)) d[k][idx] = value else: d[last_key] = value return True
[docs] def dunset(d: Dict[str, Any], key_path: str) -> bool: """\ Unset a value in a dictionary using a dot-separated key path. If the key path does not exist, it will be ignored. Args: d (Dict[str, Any]): The dictionary to modify. key_path (str): The dot-separated key path to the value to unset. Returns: bool: True if the value was unset successfully, False if the key path is invalid. Examples: >>> d = {'a': {'b': {'c': 42}}} >>> dunset(d, 'a.b.c') True >>> d {'a': {'b': {}}} """ if key_path is None: d.clear() return True keys = _split_key_path(key_path) for key in keys[:-1]: if key.endswith("]"): k, idx = key[:-1].rsplit("[", 1) idx = int(idx) if (k not in d) or (not isinstance(d[k], list)) or (idx >= len(d[k])) or (idx < -len(d[k])): return False d = d[k][idx] elif key not in d: return False else: d = d[key] last_key = keys[-1] if last_key.endswith("]"): k, idx = last_key[:-1].rsplit("[", 1) idx = int(idx) if (k not in d) or (not isinstance(d[k], list)) or (idx >= len(d[k])) or (idx < -len(d[k])): return False if idx < 0: idx += len(d[k]) del d[k][idx] else: if last_key in d: del d[last_key] else: return False return True
[docs] def dsetdef(d: Dict[str, Any], key_path: str, default: Optional[Any] = None) -> bool: """\ Set a default value in a dictionary using a dot-separated key path if the key path does not exist. Notice that if key_path exists but its value is None, the default value will also be set. Args: d (Dict[str, Any]): The dictionary to modify. key_path (str): The dot-separated key path to the value. default (Optional[Any]): The default value to set at the specified key path if it does not exist. Returns: bool: True if the default value was set successfully, False if the key path is invalid or already exists. Examples: >>> d = {} >>> dsetdef(d, 'a.b.c', 42) True >>> d {'a': {'b': {'c': 42}}} >>> dsetdef(d, 'a.b.c', 100) False >>> d {'a': {'b': {'c': 42}}} """ if dget(d, key_path, default=None) is not None: return False return dset(d, key_path, default)
[docs] def dflat(d: Dict[str, Any], prefix: str = "", enum: bool = False) -> Generator[str, None, None]: """\ Flatten a nested dictionary into a flat dictionary with dot-separated keys. Args: d (Dict[str, Any]): The dictionary to flatten. prefix (str): The prefix to prepend to the keys. Defaults to an empty string. enum (bool): If True, apart from leaf nodes, also include intermediate nodes in the flattened output. Defaults to False. Yields: Generator[str, None, None]: A generator yielding key-value pairs in the flattened format. Examples: >>> dict(dflat({'a': {'b': {'c': 42, 'd': [1, 2, 3]}}, 'e': 5})) {'a.b.c': 42, 'a.b.d[0]': 1, 'a.b.d[1]': 2, 'a.b.d[2]': 3, 'e': 5} >>> dict(dflat({'a': {'b': {'c': 42, 'd': [1, 2, 3]}}, 'e': 5}, enum=True)) {'a': {'b': {'c': 42, 'd': [1, 2, 3]}}, 'a.b': {'c': 42, 'd': [1, 2, 3]}, 'a.b.c': 42, 'a.b.d': [1, 2, 3], 'a.b.d[0]': 1, 'a.b.d[1]': 2, 'a.b.d[2]': 3, 'e': 5} """ def _dlist(d: Dict[str, Any], prefix: str = "", enum: bool = False) -> Generator[str, None, None]: for k, v in d.items(): ck = k.replace(".", "\\.") if enum: yield ((".".join([prefix, ck]) if prefix else ck), v) if isinstance(v, dict): yield from _dlist(v, prefix=".".join([prefix, ck]) if prefix else ck, enum=enum) elif isinstance(v, list): for i, item in enumerate(v): yield from _dlist({f"{ck}[{i}]": item}, prefix=prefix, enum=enum) elif not enum: yield ((".".join([prefix, ck]) if prefix else ck), v) yield from _dlist(d, prefix=prefix, enum=enum)
[docs] def dunflat(d: Dict[str, Any]) -> Dict[str, Any]: """\ Unflatten a flat dictionary with dot-separated keys into a nested dictionary. Args: d (Dict[str, Any]): The flat dictionary to unflatten. Returns: Dict[str, Any]: The nested dictionary. Examples: >>> d = {'a.b.c': 42} >>> dunflat(d) {'a': {'b': {'c': 42}}} """ merged = dict() for k, v in d.items(): dset(merged, k, v) return merged
# Copy functions from file_utils.py to avoid circular imports import os import shutil def _touch_dir(path: str, clear: bool = False) -> str: path = os.path.abspath(path) if clear and os.path.exists(path): shutil.rmtree(path) os.makedirs(path, exist_ok=True) return path def _exists_file(path: str) -> bool: path = os.path.abspath(path) return os.path.exists(path) and os.path.isfile(path) def _exists_dir(path: str) -> bool: path = os.path.abspath(path) return os.path.exists(path) and os.path.isdir(path) # Copy functions from serialize_utils.py to avoid circular imports def _load_yaml(path: str) -> Dict[str, Any]: import yaml path = os.path.abspath(path) if not _exists_file(path): return dict() with open(path, "r", encoding="utf-8", errors="ignore") as fp: return yaml.safe_load(fp) def _save_yaml(obj: Any, path: str, sort_keys: bool = False, indent: int = 4): import yaml path = os.path.abspath(path) if os.path.dirname(path): os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w", encoding="utf-8", errors="ignore") as fp: yaml.safe_dump(obj, fp, sort_keys=sort_keys, indent=indent, allow_unicode=True)
[docs] class ConfigManager: name: str = "ahvn" package: str = "ahvn"
[docs] def __init__(self, name: Optional[str] = None, package: Optional[str] = None, cwd: Optional[str] = None): super().__init__() self._config: Optional[Dict[str, Any]] = None self._system_config: Optional[Dict[str, Any]] = None self._global_config: Optional[Dict[str, Any]] = None self._local_config: Optional[Dict[str, Any]] = None self.name = name or self.__class__.name self.package = package or self.__class__.package self.root = pj("~", f".{self.name}", abs=True) self.set_cwd(cwd)
[docs] def set_cwd(self, cwd: Optional[str] = None): """\ Set the current working directory for the configuration manager. Args: cwd (str): The new current working directory. """ self.cwd = pj(cwd or os.getcwd(), abs=True) self.local = self._find_local() if self.local == self.root: logger.warning( f"Local configuration directory is the same as the root directory: {self.local}. This may cause issues with configuration management. ConfigManager will not load the configuration. Use `set_cwd` to set a different working directory. Otherwise using this class may lead to unexpected errors." ) else: self._config = None self._system_config = None self._global_config = None self._local_config = None
def _find_local(self) -> str: """\ Find the local configuration directory from `self.cwd`, according to the GitHub repository finding rules. That is, it will look for the `.<name>` directory in the current working directory or its parents until reaching the root user directory (`~`). If the `.<name>` directory is found, it will be returned. If not found, it will return `<self.cwd>/.<name>`. Returns: str: The path to the local configuration directory. """ path = pj(self.cwd, abs=True) while True: config_candidate = pj(path, f".{self.name}", abs=True) if config_candidate == self.root: break if _exists_dir(config_candidate): return config_candidate parent_path = get_file_dir(path) if parent_path == path: break path = parent_path return pj(self.cwd, f".{self.name}", abs=True) @property def local_dir(self) -> str: """\ Get the local configuration directory. Returns: str: The path to the local configuration directory. """ return get_file_dir(self.local, abs=True) @property def local_config_path(self) -> str: """\ Get the path to the local configuration file. Returns: str: The path to the local configuration file. """ return pj(self.local, "config.yaml", abs=True) @property def global_config_path(self) -> str: """\ Get the path to the global configuration file. Returns: str: The path to the global configuration file. """ return pj(self.root, "config.yaml", abs=True) @property def system_config_path(self) -> str: """\ Get the path to the system configuration file. Returns: str: The path to the system configuration file. """ return self.resource("configs", "default_config.yaml")
[docs] def config_path(self, level: Literal[None, "local", "global", "system"] = None) -> str: """\ Get the path to the configuration file for a specific level. Args: level (Literal[None, 'local', 'global', 'system']): The configuration level to get the path for. If None, returns the local configuration path. Returns: str: The path to the configuration file for the specified level. """ if level is None or level == "local": return self.local_config_path if level == "global": return self.global_config_path if level == "system": return self.system_config_path raise_mismatch(["local", "global", "system", None], got=level, name="config level")
[docs] def resource(self, *args: List[str]) -> str: """\ Get the path to a resource file in the package `resources` directory. Args: *args (List[str]): The path components to the resource file. Returns: str: The absolute path to the resource file. """ import importlib.resources with importlib.resources.as_file(importlib.resources.files(self.package).joinpath("resources", *args)) as path: return pj(str(path.resolve()), abs=True)
@property def config(self) -> Dict[str, Any]: if self._config is None: self._config = dmerge([self.global_config, self.local_config]) return self._config @property def system_config(self) -> Dict[str, Any]: if self._system_config is None: self._system_config = _load_yaml(self.system_config_path) return self._system_config @property def global_config(self) -> Dict[str, Any]: if self._global_config is None: self._global_config = _load_yaml(self.global_config_path) return self._global_config @property def local_config(self) -> Dict[str, Any]: if self._local_config is None: self._local_config = _load_yaml(self.local_config_path) return self._local_config
[docs] def load(self): self._system_config = _load_yaml(self.system_config_path) self._global_config = _load_yaml(self.global_config_path) self._local_config = _load_yaml(self.local_config_path) self._config = dmerge([self.global_config, self.local_config])
[docs] def save(self, level: Literal[None, "local", "global"] = None): if level in [None, "global"]: _save_yaml(self.global_config, self.global_config_path) if level in [None, "local"]: _save_yaml(self.local_config, self.local_config_path) self.load()
[docs] def init(self, reset: bool = False) -> bool: """\ Initialize the local configuration manager by loading the default configuration and creating the necessary directories. Args: reset (bool): If True, reset the local configuration to the default values. """ if (not reset) and _exists_dir(self.local) and _exists_file(self.local_config_path): logger.info(f"Local configuration already exists at {self.local_config_path}. Use `reset=True` to overwrite it.") self.load() return False _touch_dir(self.local, clear=False) _save_yaml(dict(), self.local_config_path) self.load() return True
[docs] def setup(self, reset: bool = False) -> bool: """\ Setup the global configuration manager by initializing it and loading the configuration and creating the necessary directories. Args: reset (bool): If True, reset the global configuration to the default values. """ if (not reset) and _exists_dir(self.root) and _exists_file(self.global_config_path): logger.info(f"Global configuration already exists at {self.global_config_path}. Use `reset=True` to overwrite it.") self.load() return False _touch_dir(self.root, clear=False) _save_yaml(self.system_config, self.global_config_path) self.load() return True
[docs] def get( self, key_path: str = None, default: Optional[Any] = None, level: Literal[None, "local", "global", "system"] = None, ) -> Any: """\ Get a value from the configuration using a dot-separated key path. Args: key_path (str): The dot-separated key path to the value. default (Any): The default value to return if the key path does not exist. level (Literal[None,'local','global']): The configuration level to use. If None, uses local configuration. Returns: Any: The value at the specified key path or the default value if not found. """ if level is None: return dget(self.config, key_path, default=default) if level == "local": return dget(self.local_config, key_path, default=default) if level == "global": return dget(self.global_config, key_path, default=default) if level == "system": return dget(self.system_config, key_path, default=default) raise_mismatch(["local", "global", "system", None], got=level, name="config level to perform 'get' operation on")
[docs] def set(self, key_path: str, value: Optional[Any] = None, level: Literal["local", "global"] = None) -> bool: """\ Set a value in the configuration using a dot-separated key path. Args: key_path (str): The dot-separated key path to the value. value (Any): The value to set at the specified key path. level (Literal['local','global']): The configuration level to use. If None, uses local configuration. Returns: bool: True if the value was set successfully, False if the key path is invalid. """ if level == "local": changed = dset(self.local_config, key_path, value) if changed: self.save(level="local") return changed if level == "global": changed = dset(self.global_config, key_path, value) if changed: self.save(level="global") return changed raise_mismatch(["local", "global"], got=level, name="config level to perform 'set' operation on")
[docs] def unset(self, key_path: str, level: Literal["local", "global"] = None) -> bool: """\ Unset a value in the configuration using a dot-separated key path. Args: key_path (str): The dot-separated key path to the value to unset. level (Literal['local','global']): The configuration level to use. If None, uses local configuration. Returns: bool: True if the value was unset successfully, False if the key path is invalid. """ if level == "local": changed = dunset(self.local_config, key_path) if changed: self.save(level="local") return changed if level == "global": changed = dunset(self.global_config, key_path) if changed: self.save(level="global") return changed raise_mismatch(["local", "global"], got=level, name="config level to perform 'unset' operation on")
HEAVEN_CM = ConfigManager(name="ahvn", package="ahvn") import re
[docs] def hpj(*args: List[str], abs: bool = False, cm: Optional[ConfigManager] = None) -> str: """\ Join a list of strings into a path. Platform-agnostic. Spaces and trailing slashes are stripped from each argument. The following characters will be expanded: - '~' to the user's home directory. - '&' the `resources` directory of AgentHeaven. It is only recommended to use it at the beginning of the path. - '>' the local root folder (without `.ahvn/`) of the current AgentHeaven repository. It is only recommended to use it at the beginning of the path. Args: *args: Components of the path to join. Each argument should be a string. abs (bool, optional): If True, returns the absolute path. Defaults to False. cm (Optional[ConfigManager]): The configuration manager to use for resource and local directory resolution. Defaults to `HEAVEN_CM`. Returns: str: The joined, normalized path. Expands '~' to the user's home directory. Examples: >>> hpj("A", "B/C", " D/ ") 'A/B/C/D' >>> hpj("A", "B/C", " D/ ", abs=True) '<path_to_cwd>/A/B/C' >>> hpj("~", "A", "B/C", " D/ ", abs=True) '<path_to_user_dir>/A/B/C' >>> hpj("&", "B", "C") '<path_to_ahvn>/resources/B/C' >>> hpj("& B", "C") '<path_to_ahvn>/resources/B/C' >>> hpj("&/B", "C") '<path_to_ahvn>/resources/B/C' >>> hpj("& /B", "C") # Not recommended '/B/C' >>> hpj("> B/C", " D/ ") '<path_to_repo>/B/C/D' """ args = [arg.strip() for arg in args if (arg is not None) and arg.strip()] if len(args) == 0: return None if cm is None: cm = HEAVEN_CM args = lflat(re.split(r"(&)(/)?", arg) for arg in args) args = [cm.resource() if arg == "&" else arg for arg in args] args = [arg.rstrip(" /").strip() for arg in args if arg and arg.rstrip(" /").strip()] args = lflat(re.split(r"(>)(/)?", arg) for arg in args) args = [cm.local_dir if arg == ">" else arg for arg in args] args = [arg.rstrip(" /").strip() for arg in args if arg and arg.rstrip(" /").strip()] path = os.path.expanduser(os.path.join(*[arg for arg in args if arg])) return os.path.normpath(path if not abs else os.path.abspath(path))
[docs] def encrypt_config(config: Dict[str, Any], encrypt_keys: Optional[List[str]] = None) -> Dict[str, Any]: """\ Encrypt sensitive information in the LLM configuration dictionary. Args: config (Dict[str, Any]): The LLM configuration dictionary to encrypt. encrypt_keys (Optional[List[str]]): List of keys to encrypt. If None, uses the keys specified in the global config under "core.encrypt_keys". Returns: Dict[str, Any]: The encrypted LLM configuration dictionary. """ _encrypt_keys = set(HEAVEN_CM.get("core.encrypt_keys", list())) if encrypt_keys is None else set(encrypt_keys) config = deepcopy(config) for k in config: if k in _encrypt_keys: config[k] = "******" return config