Source code for ahvn.resources.ahvn_kb

__all__ = [
    "HEAVEN_KB",
    "setup_heaven_kb",
]

from ahvn.cache import JsonCache
from ahvn.klstore.cache_store import CacheKLStore
from ahvn.klengine.scan_engine import ScanKLEngine
from ahvn.klbase.base import KLBase
from ahvn.ukf.templates.basic.prompt import PromptUKFT

from ahvn.utils.basic.config_utils import hpj, HEAVEN_CM
from ahvn.utils.basic.log_utils import get_logger

logger = get_logger(__name__)


class AhvnKLBase(KLBase):
    def __init__(self):
        super().__init__(name="ahvn")
        self.add_storage(
            CacheKLStore(
                name="_prompts",
                cache=JsonCache(hpj("& ukfs/prompts")),
            )
        )
        self.add_engine(
            ScanKLEngine(
                name="prompts",
                storage=self.storages["_prompts"],
            )
        )

    def get_prompt(self, name: str, **kwargs) -> PromptUKFT:
        results = self.search(engine="prompts", name=name, **kwargs)
        if not results:
            raise ValueError(f"Prompt '{name}' not found in HEAVEN_KB.")
        if len(results) > 1:
            raise ValueError(f"Multiple prompts named '{name}' found in HEAVEN_KB. Please refine your search facets by adding `**kwargs`.")
        return results[0]["kl"]


HEAVEN_KB = AhvnKLBase()


[docs] def setup_heaven_kb(): logger.info("Re-generating HEAVEN_KB...") HEAVEN_KB.clear() from ahvn.utils.exts.autotask import build_autotask_base_prompt from ahvn.utils.exts.autocode import build_autocode_base_prompt from ahvn.utils.exts.autofunc import build_autofunc_base_prompt base_prompt = PromptUKFT.from_path( "& prompts/system", default_entry="prompt.jinja", name="prompt", ) autotask_base_prompt = build_autotask_base_prompt(output_schema=None) autotask_text_prompt = build_autotask_base_prompt(output_schema={"mode": "base"}) autotask_repr_prompt = build_autotask_base_prompt(output_schema={"mode": "repr"}) autotask_json_prompt = build_autotask_base_prompt(output_schema={"mode": "json", "args": {"indent": 4}}) autotask_code_prompt = build_autotask_base_prompt(output_schema={"mode": "code"}) autocode_prompt = build_autocode_base_prompt() autofunc_prompt = build_autofunc_base_prompt() HEAVEN_KB.batch_upsert( [ base_prompt, autotask_base_prompt, autotask_text_prompt, autotask_repr_prompt, autotask_json_prompt, autotask_code_prompt, autocode_prompt, autofunc_prompt, ], storages=["_prompts"], )
# Temporary trigger for initial setup if (len(HEAVEN_KB.storages["_prompts"]) == 0) or (HEAVEN_CM.get("core.debug")): setup_heaven_kb() if __name__ == "__main__": setup_heaven_kb() # Debug for r in HEAVEN_KB.search(engine="prompts", name="autocode"): print(r["kl"].name) exit(0)