ahvn.utils.db.base module

class ahvn.utils.db.base.SQLResponse(cursor_result)[source]

Bases: object

Enhanced result wrapper for SQLAlchemy CursorResult with convenient data access methods.

__init__(cursor_result)[source]
property raw

Access the underlying SQLAlchemy CursorResult.

Returns:

The underlying SQLAlchemy CursorResult.

Return type:

CursorResult

Warning

When a connection is closed, the cursor result may no longer be available.

property columns: List[str]

Get column names from the result.

Returns:

The list of column names. If the result is not available, returns an empty list.

Return type:

List[str]

property row_count: int

Get the number of affected rows.

Returns:

The number of affected rows. If the result is not available, returns -1.

Return type:

int

property lastrowid: int | None

Get the last inserted row ID.

Returns:

The last inserted row ID. If the result is not available, returns None.

Return type:

Optional[int]

fetchall()[source]

Fetch all rows as a list of dictionaries.

Yields:

Dict[str, Any] – The next row as a dictionary.

Return type:

Generator[Dict[str, Any], None, None]

__len__()[source]

Get the number of rows in the result.

Returns:

The number of rows in the result.

Return type:

int

to_list(row_fmt='dict')[source]

Convert result to list of tuples.

Parameters:

row_fmt (Literal['dict', 'tuple']) – The format for the rows.

Returns:

The result as a list of tuples or dictionaries.

Return type:

Union[List[Tuple], List[Dict[str, Any]]]

close()[source]

Close the result cursor.

class ahvn.utils.db.base.SQLErrorResponse(error_type, short_message, full_message, query=None, params=None)[source]

Bases: object

Structured error response for database operation failures.

This class provides a clean, structured way to return error information from database operations, making it easier for LLMs and tools to handle and present errors to users.

Parameters:
__init__(error_type, short_message, full_message, query=None, params=None)[source]

Initialize a SQL error response.

Parameters:
  • error_type (str) – The type/category of error (e.g., “TableNotFound”, “SyntaxError”).

  • short_message (str) – A brief, human-readable error message.

  • full_message (str) – The complete error message with traceback.

  • query (str, optional) – The SQL query that caused the error.

  • params (Union[Dict, List, Tuple], optional) – The parameters used with the query.

to_string(include_full=False)[source]

Format the error as a user-friendly string.

Parameters:

include_full (bool) – Whether to include the full original error message. Defaults to False.

Returns:

Formatted error message.

Return type:

str

class ahvn.utils.db.base.DatabaseErrorHandler(db=None)[source]

Bases: object

Extensible handler for database errors with type-specific processing.

This class provides a clean way to handle different types of database errors, extract relevant information, and provide helpful suggestions to users.

Parameters:

db (Database | None)

__init__(db=None)[source]

Initialize the error handler.

Parameters:

db (Database, optional) – Database instance for context-aware suggestions.

register_handler(exception_type, handler_func)[source]

Register a custom handler for a specific exception type.

Parameters:
  • exception_type (type) – The exception type to handle.

  • handler_func (callable) – Function that takes (exception, query, params) and returns (error_type, short_message).

handle(e, query=None, params=None)[source]

Handle a database exception and extract structured error information.

Returns:

(error_type, short_message, full_message) extracted from the exception.

Return type:

tuple

Parameters:
class ahvn.utils.db.base.Database(database=None, provider=None, pool=None, connect=False, **kwargs)[source]

Bases: object

Universal Database Connector

Provides a clean, intuitive interface for database operations across different providers (SQLite, PostgreSQL, DuckDB, MySQL) with standard connection management:

  1. Basic Usage:

    `python db = Database(provider="sqlite", database=":memory:") result = db.execute("SELECT * FROM table") `

  2. Context Manager (recommended for transactions):

    ```python with Database(provider=”pg”, database=”mydb”) as db:

    db.execute(“INSERT INTO users (name) VALUES (:name)”, params={“name”: “Alice”}) db.execute(“UPDATE users SET active = TRUE WHERE name = :name”, params={“name”: “Alice”}) # Automatically commits on success, rolls back on exception

    ```

  3. Manual Transaction Control:

    ```python db = Database(provider=”sqlite”, database=”mydb”) try:

    db.execute(“INSERT INTO users (name) VALUES (:name)”, params={“name”: “Bob”}, autocommit=False) db.execute(“UPDATE users SET active = TRUE WHERE name = :name)”, params={“name”: “Bob”}, autocommit=False) db.commit()

    except Exception:

    db.rollback()

    finally:

    db.close_conn()

    ```

The class automatically handles: - Database creation (PostgreSQL auto-creation if database doesn’t exist) - Connection lifecycle management - SQL transpilation between different database dialects

Parameters:
__init__(database=None, provider=None, pool=None, connect=False, **kwargs)[source]

Initialize database connection.

Parameters:
  • database (Optional[str]) – Database name or path (‘:memory:’ for in-memory)

  • provider (Optional[str]) – Database provider (‘sqlite’, ‘pg’, ‘duckdb’, etc.)

  • pool (Optional[Dict[str, Any]]) – Pool configuration to override provider defaults (e.g., {‘pool_size’: 10})

  • connect (bool) – Whether to establish a connection immediately (default: False)

  • **kwargs – Additional connection parameters

clone()[source]

Create an independent Database instance with the same configuration.

Each clone has its own connection, making it safe for parallel operations where each worker needs an independent database connection.

Warning

For in-memory databases (:memory:), cloned instances do NOT share data. Each clone gets its own separate in-memory database. Use file-based databases for parallel operations requiring shared state.

Returns:

A new independent Database instance.

Return type:

Database

Example

```python # Parallel-safe pattern def worker(db_template, task_id):

db = db_template.clone() # Each worker gets own connection try:

result = db.execute(“SELECT * FROM tasks WHERE id = :id”, params={“id”: task_id}) return result.to_list()

finally:

db.close()

# Use with threading/multiprocessing with ThreadPoolExecutor() as executor:

futures = [executor.submit(worker, db, i) for i in range(10)]

```

connect()[source]

Establish a database connection.

The connection pool (configured per dialect) handles: - Stale connection detection via pool_pre_ping - Connection recycling to prevent timeouts - Thread-safe connection management

Returns:

The SQLAlchemy connection object

Return type:

Connection

close_conn(commit=True)[source]

Close the database connection and return it to the pool.

Parameters:

commit (bool) – Whether to commit pending transaction before closing.

property connected

Check if database is currently connected.

property conn

Get the current connection, establishing one if needed.

in_transaction()[source]

Check if currently in a transaction.

Returns:

True if in transaction, False otherwise

Return type:

bool

commit()[source]

Commit the current transaction.

rollback()[source]

Rollback the current transaction.

__enter__()[source]

Context manager entry: establishes connection and begins transaction.

__exit__(exc_type, exc_val, exc_tb)[source]

Context manager exit: commits or rolls back transaction and closes connection.

orm_execute(query, autocommit=False, **kwargs)[source]

Execute a SQLAlchemy ORM query or statement.

Parameters:
  • query – SQLAlchemy ORM statement or ClauseElement

  • autocommit (Optional[bool]) – Whether to run in autocommit mode (default: False - no commits after execution)

  • **kwargs – Additional keyword arguments for query execution

Returns:

Enhanced result wrapper with convenient data access None: For operations that don’t return results (e.g., INSERT, UPDATE, DELETE)

Return type:

SQLResponse

Examples

# Using SQLAlchemy ORM statements from sqlalchemy import select, insert, update, delete from sqlalchemy.sql import text

# Select statement stmt = select(users_table).where(users_table.c.id == 1) result = db.orm_execute(stmt)

# Insert statement stmt = insert(users_table).values(name=”Alice”) db.orm_execute(stmt, autocommit=True)

# Update statement stmt = update(users_table).where(users_table.c.id == 1).values(name=”Bob”) db.orm_execute(stmt, autocommit=True)

# Delete statement stmt = delete(users_table).where(users_table.c.id == 1) db.orm_execute(stmt, autocommit=True)

# DDL operations from sqlalchemy import MetaData, Table metadata = MetaData() table = Table(‘users’, metadata, autoload_with=engine) table.drop(engine) # This should work without literal_binds

execute(query, transpile=None, autocommit=False, params=None, safe=False, **kwargs)[source]

Execute a raw SQL query against the database.

Parameters:
  • query (str) – The SQL query to execute (raw SQL string)

  • transpile (Optional[str]) – Source dialect to transpile from (if different from target)

  • autocommit (Optional[bool]) – Whether to run in autocommit mode (default: False - no commits after execution)

  • params (Union[Dict[str, Any], List[Dict[str, Any]], Tuple, None]) – Query parameters (dict for named, tuple/list for positional)

  • safe (bool) – If True, returns SQLErrorResponse on error instead of raising exception (default: False)

  • **kwargs – Additional keyword arguments for query execution

Returns:

Enhanced result wrapper with convenient data access SQLErrorResponse: Structured error response (only if safe=True) None: For operations that don’t return results (e.g., INSERT, UPDATE, DELETE)

Return type:

SQLResponse

Examples

# Simple query (uses temporary connection with autocommit) result = db.execute(“SELECT * FROM users”) rows = list(result.fetchall())

# Parameterized query result = db.execute(“SELECT * FROM users WHERE id = :id”, params={“id”: 1})

# Parameterized insert db.execute(

“INSERT INTO users (name) VALUES (:name)”, params={“name”: “Alice”}

)

# Transactional operation with db:

db.execute(“INSERT INTO users (name) VALUES (:name)”, params={“name”: “Bob”}) db.execute(“UPDATE users SET active = TRUE WHERE name = :name”, params={“name”: “Bob”})

# Cross-database SQL (transpile from PostgreSQL to the current database dialect, i.e., SQLite) result = db.execute(“SELECT * FROM users LIMIT 10”, transpile=”postgresql”)

# Safe mode - returns error instead of raising result = db.execute(“SELECT * FROM nonexistent”, safe=True) if isinstance(result, SQLErrorResponse):

print(result.to_string())

Note

For SQLAlchemy ORM operations, use orm_execute() method instead.

db_tabs()[source]

List all table names in the database.

Returns:

List of table names

Return type:

List[str]

db_views()[source]

List all view names in the database.

Returns:

List of view names

Return type:

List[str]

tab_cols(tab_name, full_info=False)[source]

List column information for a specific table.

Parameters:
  • tab_name (str) – Name of the table

  • full_info (bool) – If True, return full column information; if False, return only column names

Returns:

List of column dictionaries with full metadata When full_info=False: List[str] of column names

Return type:

When full_info=True

tab_pks(tab_name)[source]

List primary key column names for a specific table.

Parameters:

tab_name (str) – Name of the table

Returns:

List of primary key column names

Return type:

List[str]

tab_fks(tab_name)[source]

List foreign key information for a specific table.

Parameters:

tab_name (str) – Name of the table

Returns:

List of foreign key information with keys:
  • col_name: Column name in the current table

  • tab_ref: Referenced table name

  • col_ref: Referenced column name

  • name: Foreign key constraint name

Return type:

List[Dict[str, str]]

row_count(tab_name)[source]

Get row count for a specific table.

Parameters:

tab_name (str) – Name of the table

Returns:

Number of rows in the table

Return type:

int

col_type(tab_name, col_name)[source]

Get column type for a specific column in a table.

Parameters:
  • tab_name (str) – Name of the table

  • col_name (str) – Name of the column

Returns:

Column type

Return type:

str

col_distincts(tab_name, col_name)[source]

Get distinct values for a specific column.

Parameters:
  • tab_name (str) – Name of the table

  • col_name (str) – Name of the column

Returns:

List of distinct values

Return type:

List[Any]

col_enums(tab_name, col_name)[source]

Get all enumerated values for a specific column (including duplicates).

This method returns all values from a column, including duplicates. For unique values only, use col_distincts() instead.

Parameters:
  • tab_name (str) – Name of the table

  • col_name (str) – Name of the column

Returns:

List of all enumerated values (may contain duplicates)

Return type:

List[Any]

col_freqs(tab_name, col_name)[source]

Get value frequencies for a specific column.

Parameters:
  • tab_name (str) – Name of the table

  • col_name (str) – Name of the column

Returns:

List of value-frequency pairs

Return type:

List[Dict[str, Any]]

col_freqk(tab_name, col_name, topk=20)[source]

Get top-k value frequencies for a specific column.

Parameters:
  • tab_name (str) – Name of the table

  • col_name (str) – Name of the column

  • k – Number of top values to return

  • topk (int)

Returns:

List of top-k value-frequency pairs

Return type:

List[Dict[str, Any]]

col_nonnulls(tab_name, col_name)[source]

Get list of non-null values for a specific column.

Parameters:
  • tab_name (str) – Name of the table

  • col_name (str) – Name of the column

Returns:

List of non-null values

Return type:

List[Any]

clear_tab(tab_name)[source]

Clear all data from a specific table without deleting the table itself.

Uses SQLAlchemy ORM to ensure compatibility across all database backends.

Parameters:

tab_name (str) – Name of the table to clear

Raises:

Exception – If the clearing operation fails

Return type:

None

drop_tab(tab_name)[source]

Drop a specific table from the database.

Uses SQLAlchemy ORM to ensure compatibility across all database backends.

Parameters:

tab_name (str) – Name of the table to drop

Raises:

Exception – If the drop operation fails

Return type:

None

drop_view(view_name)[source]

Drop a specific view from the database.

Parameters:

view_name (str) – Name of the view to drop

Raises:

Exception – If the drop operation fails

Return type:

None

drop()[source]

Drop all tables in the database.

Uses SQLAlchemy metadata reflection to drop all tables.

Raises:

DatabaseError – If the database drop operation fails

Return type:

None

init(connect=True)[source]

Drop the entire database and create a new one.

This method combines drop() and database creation. After dropping, it will recreate the database and establish a new connection.

Raises:

Exception – If the database initialization fails

Return type:

None

Parameters:

connect (bool)

clear()[source]

Clear all data from tables in the database without deleting the tables themselves.

Uses the clear_tab method to ensure compatibility across all database backends.

Raises:

Exception – If the clearing operation fails

Return type:

None

close()[source]

Close the database connection and dispose of the engine.

Return type:

None

ahvn.utils.db.base.table_display(table, schema=None, max_rows=64, max_width=64, style='DEFAULT', **kwargs)[source]

Render a tabular display of SQL query results or iterable dictionaries using PrettyTable.

Parameters:
  • table (Union[SQLResponse, Iterable[Dict]]) – The table data to display. Can be a SQLResponse object (from a database query) or any iterable of dictionaries (e.g., list of dicts).

  • schema (Optional[List[str]], optional) – List of column names to use as the table schema. If not provided, the schema is inferred from the SQLResponse or from the first row of the iterable.

  • max_rows (int, optional) – Maximum number of rows to display (including the last row and an ellipsis row if truncated). If the table has more than max_rows + 1 rows, the output will show the first max_rows-1 rows, an ellipsis row, and the last row. Defaults to 64.

  • max_width (int, optional) – Maximum width for each column in the output table. Defaults to 64.

  • style (Literal["DEFAULT", "MARKDOWN", "PLAIN_COLUMNS", "MSWORD_FRIENDLY", "ORGMODE", "SINGLE_BORDER", "DOUBLE_BORDER", "RANDOM"], optional) – The style to use for the table (supported by PrettyTable). Defaults to “DEFAULT”.

  • **kwargs – Additional keyword arguments passed to PrettyTable.

Returns:

A string representation of the formatted table, including the number of rows in total.

Return type:

str

Raises:

ValueError – If the provided table rows do not match the schema in length.

Example

>>> result = db.execute("SELECT * FROM users")
>>> table_display(result, max_rows=5)