ahvn.utils.db.base module¶
- class ahvn.utils.db.base.SQLResponse(cursor_result)[源代码]¶
基类:
objectEnhanced result wrapper for SQLAlchemy CursorResult with convenient data access methods.
- property raw¶
Access the underlying SQLAlchemy CursorResult.
- 返回:
The underlying SQLAlchemy CursorResult.
- 返回类型:
CursorResult
警告
When a connection is closed, the cursor result may no longer be available.
- property columns: List[str]¶
Get column names from the result.
- 返回:
The list of column names. If the result is not available, returns an empty list.
- 返回类型:
List[str]
- property row_count: int¶
Get the number of affected rows.
- 返回:
The number of affected rows. If the result is not available, returns -1.
- 返回类型:
- property lastrowid: int | None¶
Get the last inserted row ID.
- 返回:
The last inserted row ID. If the result is not available, returns None.
- 返回类型:
Optional[int]
- class ahvn.utils.db.base.SQLErrorResponse(error_type, short_message, full_message, query=None, params=None)[源代码]¶
基类:
objectStructured error response for database operation failures.
This class provides a clean, structured way to return error information from database operations, making it easier for LLMs and tools to handle and present errors to users.
- 参数:
- __init__(error_type, short_message, full_message, query=None, params=None)[源代码]¶
Initialize a SQL error response.
- 参数:
error_type (str) -- The type/category of error (e.g., "TableNotFound", "SyntaxError").
short_message (str) -- A brief, human-readable error message.
full_message (str) -- The complete error message with traceback.
query (str, optional) -- The SQL query that caused the error.
params (Union[Dict, List, Tuple], optional) -- The parameters used with the query.
- class ahvn.utils.db.base.DatabaseErrorHandler(db=None)[源代码]¶
基类:
objectExtensible handler for database errors with type-specific processing.
This class provides a clean way to handle different types of database errors, extract relevant information, and provide helpful suggestions to users.
- 参数:
db (Database | None)
- class ahvn.utils.db.base.Database(database=None, provider=None, pool=None, connect=False, **kwargs)[源代码]¶
基类:
objectUniversal Database Connector
Provides a clean, intuitive interface for database operations across different providers (SQLite, PostgreSQL, DuckDB, MySQL) with standard connection management:
- Basic Usage:
`python db = Database(provider="sqlite", database=":memory:") result = db.execute("SELECT * FROM table") `
- Context Manager (recommended for transactions):
```python with Database(provider="pg", database="mydb") as db:
db.execute("INSERT INTO users (name) VALUES (:name)", params={"name": "Alice"}) db.execute("UPDATE users SET active = TRUE WHERE name = :name", params={"name": "Alice"}) # Automatically commits on success, rolls back on exception
- Manual Transaction Control:
```python db = Database(provider="sqlite", database="mydb") try:
db.execute("INSERT INTO users (name) VALUES (:name)", params={"name": "Bob"}, autocommit=False) db.execute("UPDATE users SET active = TRUE WHERE name = :name)", params={"name": "Bob"}, autocommit=False) db.commit()
- except Exception:
db.rollback()
- finally:
db.close_conn()
The class automatically handles: - Database creation (PostgreSQL auto-creation if database doesn't exist) - Connection lifecycle management - SQL transpilation between different database dialects
- __init__(database=None, provider=None, pool=None, connect=False, **kwargs)[源代码]¶
Initialize database connection.
- 参数:
database (
Optional[str]) -- Database name or path (':memory:' for in-memory)provider (
Optional[str]) -- Database provider ('sqlite', 'pg', 'duckdb', etc.)pool (
Optional[Dict[str,Any]]) -- Pool configuration to override provider defaults (e.g., {'pool_size': 10})connect (
bool) -- Whether to establish a connection immediately (default: False)**kwargs -- Additional connection parameters
- clone()[源代码]¶
Create an independent Database instance with the same configuration.
Each clone has its own connection, making it safe for parallel operations where each worker needs an independent database connection.
警告
For in-memory databases (:memory:), cloned instances do NOT share data. Each clone gets its own separate in-memory database. Use file-based databases for parallel operations requiring shared state.
- 返回:
A new independent Database instance.
- 返回类型:
示例
```python # Parallel-safe pattern def worker(db_template, task_id):
db = db_template.clone() # Each worker gets own connection try:
result = db.execute("SELECT * FROM tasks WHERE id = :id", params={"id": task_id}) return result.to_list()
- finally:
db.close()
# Use with threading/multiprocessing with ThreadPoolExecutor() as executor:
futures = [executor.submit(worker, db, i) for i in range(10)]
- connect()[源代码]¶
Establish a database connection.
The connection pool (configured per dialect) handles: - Stale connection detection via pool_pre_ping - Connection recycling to prevent timeouts - Thread-safe connection management
- 返回:
The SQLAlchemy connection object
- 返回类型:
Connection
- close_conn(commit=True)[源代码]¶
Close the database connection and return it to the pool.
- 参数:
commit (
bool) -- Whether to commit pending transaction before closing.
- property connected¶
Check if database is currently connected.
- property conn¶
Get the current connection, establishing one if needed.
- in_transaction()[源代码]¶
Check if currently in a transaction.
- 返回:
True if in transaction, False otherwise
- 返回类型:
- __exit__(exc_type, exc_val, exc_tb)[源代码]¶
Context manager exit: commits or rolls back transaction and closes connection.
- orm_execute(query, autocommit=False, **kwargs)[源代码]¶
Execute a SQLAlchemy ORM query or statement.
- 参数:
- 返回:
Enhanced result wrapper with convenient data access None: For operations that don't return results (e.g., INSERT, UPDATE, DELETE)
- 返回类型:
示例
# Using SQLAlchemy ORM statements from sqlalchemy import select, insert, update, delete from sqlalchemy.sql import text
# Select statement stmt = select(users_table).where(users_table.c.id == 1) result = db.orm_execute(stmt)
# Insert statement stmt = insert(users_table).values(name="Alice") db.orm_execute(stmt, autocommit=True)
# Update statement stmt = update(users_table).where(users_table.c.id == 1).values(name="Bob") db.orm_execute(stmt, autocommit=True)
# Delete statement stmt = delete(users_table).where(users_table.c.id == 1) db.orm_execute(stmt, autocommit=True)
# DDL operations from sqlalchemy import MetaData, Table metadata = MetaData() table = Table('users', metadata, autoload_with=engine) table.drop(engine) # This should work without literal_binds
- execute(query, transpile=None, autocommit=False, params=None, safe=False, **kwargs)[源代码]¶
Execute a raw SQL query against the database.
- 参数:
query (
str) -- The SQL query to execute (raw SQL string)transpile (
Optional[str]) -- Source dialect to transpile from (if different from target)autocommit (
Optional[bool]) -- Whether to run in autocommit mode (default: False - no commits after execution)params (
Union[Dict[str,Any],List[Dict[str,Any]],Tuple,None]) -- Query parameters (dict for named, tuple/list for positional)safe (
bool) -- If True, returns SQLErrorResponse on error instead of raising exception (default: False)**kwargs -- Additional keyword arguments for query execution
- 返回:
Enhanced result wrapper with convenient data access SQLErrorResponse: Structured error response (only if safe=True) None: For operations that don't return results (e.g., INSERT, UPDATE, DELETE)
- 返回类型:
示例
# Simple query (uses temporary connection with autocommit) result = db.execute("SELECT * FROM users") rows = list(result.fetchall())
# Parameterized query result = db.execute("SELECT * FROM users WHERE id = :id", params={"id": 1})
# Parameterized insert db.execute(
"INSERT INTO users (name) VALUES (:name)", params={"name": "Alice"}
)
# Transactional operation with db:
db.execute("INSERT INTO users (name) VALUES (:name)", params={"name": "Bob"}) db.execute("UPDATE users SET active = TRUE WHERE name = :name", params={"name": "Bob"})
# Cross-database SQL (transpile from PostgreSQL to the current database dialect, i.e., SQLite) result = db.execute("SELECT * FROM users LIMIT 10", transpile="postgresql")
# Safe mode - returns error instead of raising result = db.execute("SELECT * FROM nonexistent", safe=True) if isinstance(result, SQLErrorResponse):
print(result.to_string())
备注
For SQLAlchemy ORM operations, use orm_execute() method instead.
- col_enums(tab_name, col_name)[源代码]¶
Get all enumerated values for a specific column (including duplicates).
This method returns all values from a column, including duplicates. For unique values only, use col_distincts() instead.
- clear_tab(tab_name)[源代码]¶
Clear all data from a specific table without deleting the table itself.
Uses SQLAlchemy ORM to ensure compatibility across all database backends.
- drop_tab(tab_name)[源代码]¶
Drop a specific table from the database.
Uses SQLAlchemy ORM to ensure compatibility across all database backends.
- drop()[源代码]¶
Drop all tables in the database.
Uses SQLAlchemy metadata reflection to drop all tables.
- 抛出:
DatabaseError -- If the database drop operation fails
- 返回类型:
- init(connect=True)[源代码]¶
Drop the entire database and create a new one.
This method combines drop() and database creation. After dropping, it will recreate the database and establish a new connection.
- ahvn.utils.db.base.table_display(table, schema=None, max_rows=64, max_width=64, style='DEFAULT', **kwargs)[源代码]¶
Render a tabular display of SQL query results or iterable dictionaries using PrettyTable.
- 参数:
table (Union[SQLResponse, Iterable[Dict]]) -- The table data to display. Can be a SQLResponse object (from a database query) or any iterable of dictionaries (e.g., list of dicts).
schema (Optional[List[str]], optional) -- List of column names to use as the table schema. If not provided, the schema is inferred from the SQLResponse or from the first row of the iterable.
max_rows (int, optional) -- Maximum number of rows to display (including the last row and an ellipsis row if truncated). If the table has more than max_rows + 1 rows, the output will show the first max_rows-1 rows, an ellipsis row, and the last row. Defaults to 64.
max_width (int, optional) -- Maximum width for each column in the output table. Defaults to 64.
style (Literal["DEFAULT", "MARKDOWN", "PLAIN_COLUMNS", "MSWORD_FRIENDLY", "ORGMODE", "SINGLE_BORDER", "DOUBLE_BORDER", "RANDOM"], optional) -- The style to use for the table (supported by PrettyTable). Defaults to "DEFAULT".
**kwargs -- Additional keyword arguments passed to PrettyTable.
- 返回:
A string representation of the formatted table, including the number of rows in total.
- 返回类型:
- 抛出:
ValueError -- If the provided table rows do not match the schema in length.
示例
>>> result = db.execute("SELECT * FROM users") >>> table_display(result, max_rows=5)