缓存¶
AgentHeaven 提供了灵活的缓存系统,支持多种后端选项。本指南介绍如何有效使用缓存系统进行性能优化。
1. 基础缓存示例¶
该示例展示了如何缓存递归斐波那契函数。首次调用会计算值并存储在缓存中,后续调用会立即从缓存中获取结果。
from ahvn.cache import InMemCache
cache = InMemCache()
@cache.memoize()
def fibonacci(n: int) -> int:
if n <= 1:
return n
return fibonacci(n-2) + fibonacci(n-1)
# 第一次调用计算,第二次使用缓存
print(fibonacci(30)) # 计算 - 需要等待
print(fibonacci(30)) # 缓存 - 立即返回结果
2. 异步支持示例¶
演示异步函数的缓存。适用于 API 调用、数据库查询或任何可从缓存中受益的异步操作。
import asyncio
from ahvn.cache import InMemCache
cache = InMemCache()
@cache.memoize()
async def async_operation(x):
await asyncio.sleep(1) # 模拟异步工作,如 API 调用
return x * 2
# 使用
result = await async_operation(5) # 耗时 1 秒(首次调用)
result = await async_operation(5) # 缓存立即返回(第二次调用)
3. 缓存后端¶
3.1. 内存缓存(最快)¶
基于字典的简单缓存,存储在 RAM 中。适用于开发和不需要持久化的临时缓存。
from ahvn.cache import InMemCache
cache = InMemCache()
优点: 速度最快,无 I/O 开销
缺点: 易失(重启后丢失),受内存限制
用例: 开发、临时缓存
3.2. diskcache(带压缩的本地数据库)¶
使用 diskcache 库的文件系统持久存储。适用于需要跨会话缓存的生产工作负载。
from ahvn.cache import DiskCache
cache = DiskCache("/tmp/cache_dir", size_limit=32*1024*1024*1024) # 32GB
优点: 持久化、大容量、线程安全
缺点: 比内存慢
用例: 生产工作负载、跨会话缓存
3.3. 数据库缓存(可扩展)¶
SQL 数据库支持的缓存,支持 SQLite、PostgreSQL 和 MySQL。适用于多用户应用和可扩展部署。
from ahvn.cache import DatabaseCache
# SQLite - 轻量级基于文件的数据库
sqlite_cache = DatabaseCache(provider="sqlite", database="cache.db")
# PostgreSQL - 企业级数据库解决方案
pg_cache = DatabaseCache(
provider="postgresql",
database="mydb",
)
优点: 可扩展、可查询、并发访问
缺点: 数据库开销
用例: 多用户应用、大数据集
3.4. JSON 缓存(可调试)¶
每个缓存项存储为单独的 JSON 文件。在开发期间调试和检查非常有用。
from ahvn.cache import JsonCache
cache = JsonCache("/tmp/json_cache")
优点: 人类可读、易于调试
缺点: 较慢、文件系统开销
用例: 开发、调试、检查
3.5. 无缓存(开发)¶
始终未命中缓存,强制重新计算。适用于测试和调试缓存行为,不影响性能。
from ahvn.cache import NoCache
cache = NoCache()
优点: 无缓存,始终重新计算
缺点: 无性能优势
用例: 测试、调试缓存问题
3.6. 回调缓存(事件驱动)¶
事件驱动的缓存,在缓存操作时不存储数据而是触发回调。非常适合监控、日志记录或通过回调和 feed 函数实现自定义缓存行为。
from ahvn.cache import CallbackCache
# 定义缓存设置事件的回调
def log_cache_set(key, value):
print(f"缓存设置: {key} = {value}")
def monitor_memory(key, value):
# 自定义内存监控逻辑
print(f"设置 {key} 后的内存使用情况")
# 定义缓存获取事件的 feed 函数
def fast_computation(func, **kwargs):
"""为特定输入提供快速替代方案"""
if kwargs.get('x', 0) < 100:
return kwargs['x'] * 2 # 快速计算
return ... # 让原函数处理
def database_lookup(func, **kwargs):
"""检查外部数据库中的缓存结果"""
# 自定义数据库查找逻辑
return ... # 继续下一个 feed 或原函数
# 创建带有回调和 feed 的 CallbackCache
cache = CallbackCache(
callbacks=[log_cache_set, monitor_memory],
feeds=[fast_computation, database_lookup]
)
@cache.memoize()
def expensive_function(x):
print(f"为 {x} 计算")
return x * x + complex_calculation(x)
# 使用示例
result = expensive_function(5) # 使用 fast_computation feed
result = expensive_function(200) # 回退到 expensive_function
3.6.1. 回调函数¶
回调在缓存设置操作时触发,接收缓存键和值:
def my_callback(key: int, value: dict):
"""
处理缓存设置事件。
参数:
key: 缓存键(整数哈希)
value: 包含 func、inputs、output 和 metadata 的缓存项
"""
print(f"函数: {value['func']}")
print(f"输入: {value['inputs']}")
print(f"输出: {value['output']}")
# 自定义逻辑:写入日志、更新指标等
# 多个回调按顺序执行
cache = CallbackCache(callbacks=[callback1, callback2, callback3])
回调 API: 每个回调必须接受 (key: int, value: dict) 参数,其中 value 包含:
func: 函数名或可调用对象inputs: 函数字参数字典output: 函数返回值metadata: 额外缓存元数据
3.6.2. Feed 函数¶
Feed 在缓存获取操作时提供替代计算或数据源,按顺序处理直到某个返回非 Ellipsis 值:
def custom_feed(func, **kwargs):
"""
提供替代计算或数据查找。
参数:
func: 原始函数(可调用对象或字符串名称)
**kwargs: 调用者提供的函数字参数
返回:
Any: 返回计算值,或 ... 继续下一个 feed
"""
# 示例:为特定输入使用预计算结果
if func.__name__ == 'fibonacci' and kwargs['n'] <= 10:
return [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55][kwargs['n']]
# 示例:数据库查找
if kwargs.get('user_id'):
cached = database.get_user_cache(kwargs['user_id'])
if cached:
return cached
return ... # 继续下一个 feed 或原始函数
# Feed 函数按顺序处理
cache = CallbackCache(feeds=[fast_lookup, database_check, expensive_computation])
Feed API: 每个 feed 函数必须接受 (func, **kwargs) 并返回:
实际值: 停止 feed 处理并返回此值
Ellipsis (
...): 继续下一个 feed 或原始函数
3.6.3. 错误处理¶
回调和 feed 都包含内置错误处理:
def failing_callback(key, value):
raise ValueError("回调错误!")
def failing_feed(func, **kwargs):
raise RuntimeError("Feed 错误!")
# 错误被记录并跳过,处理继续
cache = CallbackCache(
callbacks=[failing_callback, working_callback],
feeds=[failing_feed, working_feed]
)
回调错误: 被记录、跳过,其他回调继续
Feed 错误: 被记录、跳过,调用下一个 feed 或原始函数
不中断: 尽管有个别失败,系统保持稳定
优点: 事件驱动、高度可定制、零存储开销
缺点: 无实际缓存,需要自定义实现存储
用例: 监控、日志记录、自定义缓存层、A/B 测试
4. CacheEntry 类型¶
CacheEntry 类是 AgentHeaven 缓存系统中使用的基本数据结构。它封装了缓存函数调用的所有信息,包括函数名、输入参数、输出值、预期值和可选元数据。
4.1. 基本结构¶
CacheEntry 包含以下字段:
from ahvn.cache import CacheEntry
# 直接实例化(很少需要 - 通常由缓存后端创建)
entry = CacheEntry(
func="my_function", # 函数名(字符串)
inputs={"x": 5, "y": 10}, # 输入参数作为字典
output=50, # 实际输出值
expected=..., # 预期输出(... 表示未设置)
metadata={"timestamp": "..."} # 可选元数据
)
# 访问缓存键和值
print(entry.key) # 输入 + 函数名的 MD5 哈希
print(entry.value) # 如果设置了预期则返回预期,否则返回输出
4.2. 创建 CacheEntry 对象¶
4.2.1. 从函数参数创建¶
使用 from_args() 从函数参数创建 CacheEntry:
from ahvn.cache import CacheEntry
def my_function(x, y, z=10):
return x + y + z
# 从函数和参数创建条目
entry = CacheEntry.from_args(
func=my_function, # 传递可调用对象或字符串名称
output=25, # 函数的输出
x=5, # 函数参数作为 kwargs
y=10,
z=10,
exclude=["z"] # 可选排除某些参数从键中
)
print(entry.func) # "my_function"
print(entry.inputs) # {"x": 5, "y": 10} (z 被排除)
print(entry.output) # 25
4.2.2. 从字典创建¶
从字典表示创建 CacheEntry:
from ahvn.cache import CacheEntry
# 从字典反序列化
data = {
"func": "compute",
"inputs": {"n": 100},
"output": 5050,
"expected": 5050,
"metadata": {"cached_at": "2025-10-21"}
}
entry = CacheEntry.from_dict(data)
print(entry.func) # "compute"
print(entry.inputs) # {"n": 100}
4.3. 使用 CacheEntry¶
4.3.1. 转换为字典¶
将 CacheEntry 序列化为字典:
entry = CacheEntry.from_args(func="add", output=15, x=5, y=10)
data = entry.to_dict()
# 返回: {
# "func": "add",
# "inputs": {"x": 5, "y": 10},
# "output": 15,
# "metadata": {}
# }
4.3.2. 克隆并更新¶
创建带有更新的 CacheEntry 副本:
original = CacheEntry.from_args(func="compute", output=100, n=10)
# 克隆并更新
modified = original.clone(
output=200, # 更新输出
metadata={"v": 2} # 更新元数据
)
print(original.output) # 100 (未更改)
print(modified.output) # 200 (新值)
4.3.3. 注解¶
用预期输出和元数据注解 CacheEntry:
# 从实际计算创建条目
entry = CacheEntry.from_args(
func="fibonacci",
output=55,
n=10
)
# 用预期输出注解(用于验证/测试)
annotated = entry.annotate(
expected=55, # 设置预期输出
metadata={"source": "ground_truth"} # 添加元数据
)
print(annotated.expected) # 55
print(annotated.annotated) # True (有预期值)
print(entry.annotated) # False (原始未更改)
# 如果省略预期,则使用输出作为预期
auto_annotated = entry.annotate()
print(auto_annotated.expected) # 55 (从输出复制)
4.4. 属性¶
4.4.1. 缓存键¶
key 属性返回标识缓存条目的唯一整数哈希:
entry = CacheEntry.from_args(func="add", output=15, x=5, y=10)
print(entry.key) # 整数 MD5 哈希
# 相同输入 = 相同键(kwargs 的顺序无关)
entry2 = CacheEntry.from_args(func="add", output=15, y=10, x=5)
print(entry.key == entry2.key) # True
4.4.2. 值¶
value 属性返回预期输出(如果设置),否则返回实际输出:
entry = CacheEntry.from_args(func="compute", output=100, n=10)
print(entry.value) # 100 (未设置预期,返回输出)
annotated = entry.annotate(expected=99)
print(annotated.value) # 99 (返回预期,不是输出)
4.4.3. 注解状态¶
检查 CacheEntry 是否已用预期值注解:
entry = CacheEntry.from_args(func="test", output=42, x=1)
print(entry.annotated) # False
annotated = entry.annotate(expected=42)
print(annotated.annotated) # True
4.5. 用例¶
4.5.1. 缓存检查¶
访问存储的缓存条目用于调试或分析:
from ahvn.cache import DiskCache
cache = DiskCache("/tmp/my_cache")
@cache.memoize()
def compute(x, y):
return x * y + x + y
# 计算并缓存
result = compute(5, 10)
# 手动检索和检查缓存条目
# (实现取决于缓存后端)
# 大多数后端在内部存储 CacheEntry 对象
4.5.2. 自定义缓存后端¶
实现自定义缓存后端时使用 CacheEntry 进行序列化:
from ahvn.cache import BaseCache, CacheEntry
class MyCustomCache(BaseCache):
def __init__(self):
self.store = {}
def set(self, key: int, value: Dict[str, Any]):
# 将字典转换为 CacheEntry 以进行验证
entry = CacheEntry.from_dict(value)
# 存储序列化形式
self.store[key] = entry.to_dict()
def get(self, key: int) -> Optional[Dict[str, Any]]:
return self.store.get(key)
4.5.3. 测试和验证¶
使用 CacheEntry 注解进行测试驱动缓存:
from ahvn.cache import CacheEntry, InMemCache
# 为测试创建预期结果
expected_entries = [
CacheEntry.from_args(func="add", expected=15, x=5, y=10),
CacheEntry.from_args(func="add", expected=25, x=10, y=15),
]
# 根据预期验证缓存结果
cache = InMemCache()
@cache.memoize()
def add(x, y):
return x + y
for entry in expected_entries:
actual = add(**entry.inputs)
assert actual == entry.expected, f"不匹配: {actual} != {entry.expected}"
5. 高级用法¶
5.1. 生成器缓存¶
缓存整个生成器输出,适用于流数据处理或昂贵的数据转换。
@cache.memoize()
def data_stream(n):
"""缓存整个生成器输出"""
for i in range(n):
yield expensive_computation(i)
# 使用 - 首次运行后整个流被缓存
for item in data_stream(1000):
process(item)
注意,只有在生成器完全完成时才会缓存生成器输出。如果迭代被以下情况中断:break 语句、异常、错误、提前终止或部分消费。部分输出将不会被缓存。只有在整个生成器成功完成后才会创建缓存项。
# 示例:部分消费不会缓存
for item in data_stream(1000):
if item > 100: # 提前终止
break # 生成器输出**不**被缓存
process(item)
# 示例:异常不会缓存
try:
for item in data_stream(1000):
if item == 500:
raise ValueError("处理错误")
process(item)
except ValueError:
pass # 由于异常,生成器输出**不**被缓存
# 示例:完整完成会缓存
for item in data_stream(1000):
process(item) # 整个生成器完成 - 成功缓存
5.2. 批量记忆化¶
通过缓存批量操作高效处理多个输入,减少批量计算的开销。
@cache.batch_memoize()
def process_batch(items):
"""缓存批量操作"""
return [expensive_operation(item) for item in items]
# 使用
results = process_batch([1, 2, 3, 4, 5]) # 计算 - 需要时间
results = process_batch([1, 2, 3, 4, 5]) # 缓存 - 立即返回结果
5.3. 参数排除¶
从缓存键中排除特定参数,适用于调试标志、时间戳或其他非功能性参数。
@cache.memoize(exclude=["debug"])
def compute_with_debug(x, debug=False):
"""调试参数从缓存键中排除"""
if debug:
print(f"为 {x} 计算")
return x * x
# 尽管调试值不同,这些使用相同的缓存项
result1 = compute_with_debug(5, debug=True)
result2 = compute_with_debug(5, debug=False)
5.4. 手动缓存操作¶
直接控制缓存项以实现自定义缓存策略、手动失效或高级元数据管理。
from ahvn.cache.base import CacheEntry
# 创建自定义缓存项
entry = CacheEntry(
func="my_function",
inputs={"x": 5},
output=25,
metadata={"version": "1.0", "timestamp": "2024-01-01"}
)
# 手动存储用于自定义缓存
cache.set(entry)
# 手动检索用于检查
result = cache.get("my_function", {"x": 5})
# 清除特定项用于有针对性的失效
cache.remove("my_function", {"x": 5})
# 清除整个缓存用于完全重置
cache.clear()
5.5. 缓存注解¶
为缓存项添加元数据和注解,增强调试、监控和缓存管理能力。
@cache.memoize(annotation={"purpose": "user_profile", "team": "backend"})
def get_user_profile(user_id: int) -> dict:
"""用团队注解缓存用户配置用于监控"""
return database.fetch_user(user_id)
@cache.memoize(annotation={"priority": "high", "ttl": "1h"})
def get_realtime_data(sensor_id: str) -> dict:
"""高优先级传感器数据,1小时 TTL"""
return api.fetch_sensor_data(sensor_id)
# 在缓存检查期间访问注解
for entry in cache:
if entry.metadata.get("annotation", {}).get("team") == "backend":
print(f"后端缓存: {entry.func} 与 {entry.inputs}")
注解作为元数据与缓存项一起存储,可用于:
过滤: 按注解选择性失效或检查缓存项
监控: 按目的或团队跟踪缓存使用模式
TTL 管理: 用基于注解的过期覆盖默认 TTL
调试: 按功能目的识别缓存项
# 按注解过滤缓存项
backend_entries = [
entry for entry in cache
if entry.metadata.get("annotation", {}).get("team") == "backend"
]
# 清除特定注解的缓存项
for entry in cache:
if entry.metadata.get("annotation", {}).get("purpose") == "user_profile":
cache.remove(entry.func, entry.inputs)
5.6. 缓存检查¶
浏览和分析缓存内容用于调试、监控或缓存管理目的。
# 遍历所有缓存项
for entry in cache:
print(f"函数: {entry.func}")
print(f"输入: {entry.inputs}")
print(f"输出: {entry.output}")
print(f"元数据: {entry.metadata}")
print("---")
6. 集成示例¶
6.1. LLM 集成¶
缓存 LLM 响应以避免冗余 API 调用并降低成本。非常适合常见问题或重复提示。
from ahvn.llm import LLM
from ahvn.cache import DiskCache
# 用持久存储跨会话缓存 LLM 响应
cache = DiskCache("/tmp/llm_cache")
llm = LLM(preset="chat", cache=cache)
# 首次调用 - 计算,进行 API 调用
response1 = llm.oracle("What is Python?")
# 第二次调用 - 来自缓存,立即响应
response2 = llm.oracle("What is Python?") # 立即 - 无 API 调用
6.2. KLStore 集成¶
使用任何缓存后端高效缓存知识对象,为知识库操作提供快速检索。
from ahvn.klstore import CacheKLStore
from ahvn.cache import DatabaseCache
# 用数据库后端缓存 KLStore 操作
cache = DatabaseCache(provider="sqlite", database="kl_cache.db")
kl_store = CacheKLStore(cache=cache)
# 用自动缓存存储 KL 对象
kl_store.store(kl_object)
# 用缓存查找检索
retrieved = kl_store.retrieve(kl_id) # 如果可用则快速缓存命中
拓展阅读¶
提示: 相关功能参见:
CacheKLStore - 由简单缓存支持的 KLStore