ahvn.utils.db.base module¶
- class ahvn.utils.db.base.SQLResponse(cursor_result)[source]¶
Bases:
objectEnhanced result wrapper for SQLAlchemy CursorResult with convenient data access methods.
- property raw¶
Access the underlying SQLAlchemy CursorResult.
- Returns:
The underlying SQLAlchemy CursorResult.
- Return type:
CursorResult
Warning
When a connection is closed, the cursor result may no longer be available.
- property columns: List[str]¶
Get column names from the result.
- Returns:
The list of column names. If the result is not available, returns an empty list.
- Return type:
List[str]
- property row_count: int¶
Get the number of affected rows.
- Returns:
The number of affected rows. If the result is not available, returns -1.
- Return type:
- property lastrowid: int | None¶
Get the last inserted row ID.
- Returns:
The last inserted row ID. If the result is not available, returns None.
- Return type:
Optional[int]
- __len__()[source]¶
Get the number of rows in the result.
- Returns:
The number of rows in the result.
- Return type:
- class ahvn.utils.db.base.SQLErrorResponse(error_type, short_message, full_message, query=None, params=None)[source]¶
Bases:
objectStructured error response for database operation failures.
This class provides a clean, structured way to return error information from database operations, making it easier for LLMs and tools to handle and present errors to users.
- Parameters:
- __init__(error_type, short_message, full_message, query=None, params=None)[source]¶
Initialize a SQL error response.
- Parameters:
error_type (str) – The type/category of error (e.g., “TableNotFound”, “SyntaxError”).
short_message (str) – A brief, human-readable error message.
full_message (str) – The complete error message with traceback.
query (str, optional) – The SQL query that caused the error.
params (Union[Dict, List, Tuple], optional) – The parameters used with the query.
- class ahvn.utils.db.base.DatabaseErrorHandler(db=None)[source]¶
Bases:
objectExtensible handler for database errors with type-specific processing.
This class provides a clean way to handle different types of database errors, extract relevant information, and provide helpful suggestions to users.
- Parameters:
db (Database | None)
- class ahvn.utils.db.base.Database(database=None, provider=None, pool=None, connect=False, **kwargs)[source]¶
Bases:
objectUniversal Database Connector
Provides a clean, intuitive interface for database operations across different providers (SQLite, PostgreSQL, DuckDB, MySQL) with standard connection management:
- Basic Usage:
`python db = Database(provider="sqlite", database=":memory:") result = db.execute("SELECT * FROM table") `
- Context Manager (recommended for transactions):
```python with Database(provider=”pg”, database=”mydb”) as db:
db.execute(“INSERT INTO users (name) VALUES (:name)”, params={“name”: “Alice”}) db.execute(“UPDATE users SET active = TRUE WHERE name = :name”, params={“name”: “Alice”}) # Automatically commits on success, rolls back on exception
- Manual Transaction Control:
```python db = Database(provider=”sqlite”, database=”mydb”) try:
db.execute(“INSERT INTO users (name) VALUES (:name)”, params={“name”: “Bob”}, autocommit=False) db.execute(“UPDATE users SET active = TRUE WHERE name = :name)”, params={“name”: “Bob”}, autocommit=False) db.commit()
- except Exception:
db.rollback()
- finally:
db.close_conn()
The class automatically handles: - Database creation (PostgreSQL auto-creation if database doesn’t exist) - Connection lifecycle management - SQL transpilation between different database dialects
- __init__(database=None, provider=None, pool=None, connect=False, **kwargs)[source]¶
Initialize database connection.
- Parameters:
database (
Optional[str]) – Database name or path (‘:memory:’ for in-memory)provider (
Optional[str]) – Database provider (‘sqlite’, ‘pg’, ‘duckdb’, etc.)pool (
Optional[Dict[str,Any]]) – Pool configuration to override provider defaults (e.g., {‘pool_size’: 10})connect (
bool) – Whether to establish a connection immediately (default: False)**kwargs – Additional connection parameters
- clone()[source]¶
Create an independent Database instance with the same configuration.
Each clone has its own connection, making it safe for parallel operations where each worker needs an independent database connection.
Warning
For in-memory databases (:memory:), cloned instances do NOT share data. Each clone gets its own separate in-memory database. Use file-based databases for parallel operations requiring shared state.
- Returns:
A new independent Database instance.
- Return type:
Example
```python # Parallel-safe pattern def worker(db_template, task_id):
db = db_template.clone() # Each worker gets own connection try:
result = db.execute(“SELECT * FROM tasks WHERE id = :id”, params={“id”: task_id}) return result.to_list()
- finally:
db.close()
# Use with threading/multiprocessing with ThreadPoolExecutor() as executor:
futures = [executor.submit(worker, db, i) for i in range(10)]
- connect()[source]¶
Establish a database connection.
The connection pool (configured per dialect) handles: - Stale connection detection via pool_pre_ping - Connection recycling to prevent timeouts - Thread-safe connection management
- Returns:
The SQLAlchemy connection object
- Return type:
Connection
- close_conn(commit=True)[source]¶
Close the database connection and return it to the pool.
- Parameters:
commit (
bool) – Whether to commit pending transaction before closing.
- property connected¶
Check if database is currently connected.
- property conn¶
Get the current connection, establishing one if needed.
- in_transaction()[source]¶
Check if currently in a transaction.
- Returns:
True if in transaction, False otherwise
- Return type:
- __exit__(exc_type, exc_val, exc_tb)[source]¶
Context manager exit: commits or rolls back transaction and closes connection.
- orm_execute(query, autocommit=False, **kwargs)[source]¶
Execute a SQLAlchemy ORM query or statement.
- Parameters:
- Returns:
Enhanced result wrapper with convenient data access None: For operations that don’t return results (e.g., INSERT, UPDATE, DELETE)
- Return type:
Examples
# Using SQLAlchemy ORM statements from sqlalchemy import select, insert, update, delete from sqlalchemy.sql import text
# Select statement stmt = select(users_table).where(users_table.c.id == 1) result = db.orm_execute(stmt)
# Insert statement stmt = insert(users_table).values(name=”Alice”) db.orm_execute(stmt, autocommit=True)
# Update statement stmt = update(users_table).where(users_table.c.id == 1).values(name=”Bob”) db.orm_execute(stmt, autocommit=True)
# Delete statement stmt = delete(users_table).where(users_table.c.id == 1) db.orm_execute(stmt, autocommit=True)
# DDL operations from sqlalchemy import MetaData, Table metadata = MetaData() table = Table(‘users’, metadata, autoload_with=engine) table.drop(engine) # This should work without literal_binds
- execute(query, transpile=None, autocommit=False, params=None, safe=False, **kwargs)[source]¶
Execute a raw SQL query against the database.
- Parameters:
query (
str) – The SQL query to execute (raw SQL string)transpile (
Optional[str]) – Source dialect to transpile from (if different from target)autocommit (
Optional[bool]) – Whether to run in autocommit mode (default: False - no commits after execution)params (
Union[Dict[str,Any],List[Dict[str,Any]],Tuple,None]) – Query parameters (dict for named, tuple/list for positional)safe (
bool) – If True, returns SQLErrorResponse on error instead of raising exception (default: False)**kwargs – Additional keyword arguments for query execution
- Returns:
Enhanced result wrapper with convenient data access SQLErrorResponse: Structured error response (only if safe=True) None: For operations that don’t return results (e.g., INSERT, UPDATE, DELETE)
- Return type:
Examples
# Simple query (uses temporary connection with autocommit) result = db.execute(“SELECT * FROM users”) rows = list(result.fetchall())
# Parameterized query result = db.execute(“SELECT * FROM users WHERE id = :id”, params={“id”: 1})
# Parameterized insert db.execute(
“INSERT INTO users (name) VALUES (:name)”, params={“name”: “Alice”}
)
# Transactional operation with db:
db.execute(“INSERT INTO users (name) VALUES (:name)”, params={“name”: “Bob”}) db.execute(“UPDATE users SET active = TRUE WHERE name = :name”, params={“name”: “Bob”})
# Cross-database SQL (transpile from PostgreSQL to the current database dialect, i.e., SQLite) result = db.execute(“SELECT * FROM users LIMIT 10”, transpile=”postgresql”)
# Safe mode - returns error instead of raising result = db.execute(“SELECT * FROM nonexistent”, safe=True) if isinstance(result, SQLErrorResponse):
print(result.to_string())
Note
For SQLAlchemy ORM operations, use orm_execute() method instead.
- db_tabs()[source]¶
List all table names in the database.
- Returns:
List of table names
- Return type:
List[str]
- db_views()[source]¶
List all view names in the database.
- Returns:
List of view names
- Return type:
List[str]
- col_enums(tab_name, col_name)[source]¶
Get all enumerated values for a specific column (including duplicates).
This method returns all values from a column, including duplicates. For unique values only, use col_distincts() instead.
- clear_tab(tab_name)[source]¶
Clear all data from a specific table without deleting the table itself.
Uses SQLAlchemy ORM to ensure compatibility across all database backends.
- drop_tab(tab_name)[source]¶
Drop a specific table from the database.
Uses SQLAlchemy ORM to ensure compatibility across all database backends.
- drop()[source]¶
Drop all tables in the database.
Uses SQLAlchemy metadata reflection to drop all tables.
- Raises:
DatabaseError – If the database drop operation fails
- Return type:
- init(connect=True)[source]¶
Drop the entire database and create a new one.
This method combines drop() and database creation. After dropping, it will recreate the database and establish a new connection.
- ahvn.utils.db.base.table_display(table, schema=None, max_rows=64, max_width=64, style='DEFAULT', **kwargs)[source]¶
Render a tabular display of SQL query results or iterable dictionaries using PrettyTable.
- Parameters:
table (Union[SQLResponse, Iterable[Dict]]) – The table data to display. Can be a SQLResponse object (from a database query) or any iterable of dictionaries (e.g., list of dicts).
schema (Optional[List[str]], optional) – List of column names to use as the table schema. If not provided, the schema is inferred from the SQLResponse or from the first row of the iterable.
max_rows (int, optional) – Maximum number of rows to display (including the last row and an ellipsis row if truncated). If the table has more than max_rows + 1 rows, the output will show the first max_rows-1 rows, an ellipsis row, and the last row. Defaults to 64.
max_width (int, optional) – Maximum width for each column in the output table. Defaults to 64.
style (Literal["DEFAULT", "MARKDOWN", "PLAIN_COLUMNS", "MSWORD_FRIENDLY", "ORGMODE", "SINGLE_BORDER", "DOUBLE_BORDER", "RANDOM"], optional) – The style to use for the table (supported by PrettyTable). Defaults to “DEFAULT”.
**kwargs – Additional keyword arguments passed to PrettyTable.
- Returns:
A string representation of the formatted table, including the number of rows in total.
- Return type:
- Raises:
ValueError – If the provided table rows do not match the schema in length.
Example
>>> result = db.execute("SELECT * FROM users") >>> table_display(result, max_rows=5)