ahvn.utils.db.base module

class ahvn.utils.db.base.SQLResponse(cursor_result)[源代码]

基类:object

Enhanced result wrapper for SQLAlchemy CursorResult with convenient data access methods.

__init__(cursor_result)[源代码]
property raw

Access the underlying SQLAlchemy CursorResult.

返回:

The underlying SQLAlchemy CursorResult.

返回类型:

CursorResult

警告

When a connection is closed, the cursor result may no longer be available.

property columns: List[str]

Get column names from the result.

返回:

The list of column names. If the result is not available, returns an empty list.

返回类型:

List[str]

property row_count: int

Get the number of affected rows.

返回:

The number of affected rows. If the result is not available, returns -1.

返回类型:

int

property lastrowid: int | None

Get the last inserted row ID.

返回:

The last inserted row ID. If the result is not available, returns None.

返回类型:

Optional[int]

fetchall()[源代码]

Fetch all rows as a list of dictionaries.

生成器:

Dict[str, Any] -- The next row as a dictionary.

返回类型:

Generator[Dict[str, Any], None, None]

__len__()[源代码]

Get the number of rows in the result.

返回:

The number of rows in the result.

返回类型:

int

to_list(row_fmt='dict')[源代码]

Convert result to list of tuples.

参数:

row_fmt (Literal['dict', 'tuple']) -- The format for the rows.

返回:

The result as a list of tuples or dictionaries.

返回类型:

Union[List[Tuple], List[Dict[str, Any]]]

close()[源代码]

Close the result cursor.

class ahvn.utils.db.base.SQLErrorResponse(error_type, short_message, full_message, query=None, params=None)[源代码]

基类:object

Structured error response for database operation failures.

This class provides a clean, structured way to return error information from database operations, making it easier for LLMs and tools to handle and present errors to users.

参数:
__init__(error_type, short_message, full_message, query=None, params=None)[源代码]

Initialize a SQL error response.

参数:
  • error_type (str) -- The type/category of error (e.g., "TableNotFound", "SyntaxError").

  • short_message (str) -- A brief, human-readable error message.

  • full_message (str) -- The complete error message with traceback.

  • query (str, optional) -- The SQL query that caused the error.

  • params (Union[Dict, List, Tuple], optional) -- The parameters used with the query.

to_string(include_full=False)[源代码]

Format the error as a user-friendly string.

参数:

include_full (bool) -- Whether to include the full original error message. Defaults to False.

返回:

Formatted error message.

返回类型:

str

class ahvn.utils.db.base.DatabaseErrorHandler(db=None)[源代码]

基类:object

Extensible handler for database errors with type-specific processing.

This class provides a clean way to handle different types of database errors, extract relevant information, and provide helpful suggestions to users.

参数:

db (Database | None)

__init__(db=None)[源代码]

Initialize the error handler.

参数:

db (Database, optional) -- Database instance for context-aware suggestions.

register_handler(exception_type, handler_func)[源代码]

Register a custom handler for a specific exception type.

参数:
  • exception_type (type) -- The exception type to handle.

  • handler_func (callable) -- Function that takes (exception, query, params) and returns (error_type, short_message).

handle(e, query=None, params=None)[源代码]

Handle a database exception and extract structured error information.

返回:

(error_type, short_message, full_message) extracted from the exception.

返回类型:

tuple

参数:
class ahvn.utils.db.base.Database(database=None, provider=None, pool=None, connect=False, **kwargs)[源代码]

基类:object

Universal Database Connector

Provides a clean, intuitive interface for database operations across different providers (SQLite, PostgreSQL, DuckDB, MySQL) with standard connection management:

  1. Basic Usage:

    `python db = Database(provider="sqlite", database=":memory:") result = db.execute("SELECT * FROM table") `

  2. Context Manager (recommended for transactions):

    ```python with Database(provider="pg", database="mydb") as db:

    db.execute("INSERT INTO users (name) VALUES (:name)", params={"name": "Alice"}) db.execute("UPDATE users SET active = TRUE WHERE name = :name", params={"name": "Alice"}) # Automatically commits on success, rolls back on exception

    ```

  3. Manual Transaction Control:

    ```python db = Database(provider="sqlite", database="mydb") try:

    db.execute("INSERT INTO users (name) VALUES (:name)", params={"name": "Bob"}, autocommit=False) db.execute("UPDATE users SET active = TRUE WHERE name = :name)", params={"name": "Bob"}, autocommit=False) db.commit()

    except Exception:

    db.rollback()

    finally:

    db.close_conn()

    ```

The class automatically handles: - Database creation (PostgreSQL auto-creation if database doesn't exist) - Connection lifecycle management - SQL transpilation between different database dialects

参数:
__init__(database=None, provider=None, pool=None, connect=False, **kwargs)[源代码]

Initialize database connection.

参数:
  • database (Optional[str]) -- Database name or path (':memory:' for in-memory)

  • provider (Optional[str]) -- Database provider ('sqlite', 'pg', 'duckdb', etc.)

  • pool (Optional[Dict[str, Any]]) -- Pool configuration to override provider defaults (e.g., {'pool_size': 10})

  • connect (bool) -- Whether to establish a connection immediately (default: False)

  • **kwargs -- Additional connection parameters

clone()[源代码]

Create an independent Database instance with the same configuration.

Each clone has its own connection, making it safe for parallel operations where each worker needs an independent database connection.

警告

For in-memory databases (:memory:), cloned instances do NOT share data. Each clone gets its own separate in-memory database. Use file-based databases for parallel operations requiring shared state.

返回:

A new independent Database instance.

返回类型:

Database

示例

```python # Parallel-safe pattern def worker(db_template, task_id):

db = db_template.clone() # Each worker gets own connection try:

result = db.execute("SELECT * FROM tasks WHERE id = :id", params={"id": task_id}) return result.to_list()

finally:

db.close()

# Use with threading/multiprocessing with ThreadPoolExecutor() as executor:

futures = [executor.submit(worker, db, i) for i in range(10)]

```

connect()[源代码]

Establish a database connection.

The connection pool (configured per dialect) handles: - Stale connection detection via pool_pre_ping - Connection recycling to prevent timeouts - Thread-safe connection management

返回:

The SQLAlchemy connection object

返回类型:

Connection

close_conn(commit=True)[源代码]

Close the database connection and return it to the pool.

参数:

commit (bool) -- Whether to commit pending transaction before closing.

property connected

Check if database is currently connected.

property conn

Get the current connection, establishing one if needed.

in_transaction()[源代码]

Check if currently in a transaction.

返回:

True if in transaction, False otherwise

返回类型:

bool

commit()[源代码]

Commit the current transaction.

rollback()[源代码]

Rollback the current transaction.

__enter__()[源代码]

Context manager entry: establishes connection and begins transaction.

__exit__(exc_type, exc_val, exc_tb)[源代码]

Context manager exit: commits or rolls back transaction and closes connection.

orm_execute(query, autocommit=False, **kwargs)[源代码]

Execute a SQLAlchemy ORM query or statement.

参数:
  • query -- SQLAlchemy ORM statement or ClauseElement

  • autocommit (Optional[bool]) -- Whether to run in autocommit mode (default: False - no commits after execution)

  • **kwargs -- Additional keyword arguments for query execution

返回:

Enhanced result wrapper with convenient data access None: For operations that don't return results (e.g., INSERT, UPDATE, DELETE)

返回类型:

SQLResponse

示例

# Using SQLAlchemy ORM statements from sqlalchemy import select, insert, update, delete from sqlalchemy.sql import text

# Select statement stmt = select(users_table).where(users_table.c.id == 1) result = db.orm_execute(stmt)

# Insert statement stmt = insert(users_table).values(name="Alice") db.orm_execute(stmt, autocommit=True)

# Update statement stmt = update(users_table).where(users_table.c.id == 1).values(name="Bob") db.orm_execute(stmt, autocommit=True)

# Delete statement stmt = delete(users_table).where(users_table.c.id == 1) db.orm_execute(stmt, autocommit=True)

# DDL operations from sqlalchemy import MetaData, Table metadata = MetaData() table = Table('users', metadata, autoload_with=engine) table.drop(engine) # This should work without literal_binds

execute(query, transpile=None, autocommit=False, params=None, safe=False, **kwargs)[源代码]

Execute a raw SQL query against the database.

参数:
  • query (str) -- The SQL query to execute (raw SQL string)

  • transpile (Optional[str]) -- Source dialect to transpile from (if different from target)

  • autocommit (Optional[bool]) -- Whether to run in autocommit mode (default: False - no commits after execution)

  • params (Union[Dict[str, Any], List[Dict[str, Any]], Tuple, None]) -- Query parameters (dict for named, tuple/list for positional)

  • safe (bool) -- If True, returns SQLErrorResponse on error instead of raising exception (default: False)

  • **kwargs -- Additional keyword arguments for query execution

返回:

Enhanced result wrapper with convenient data access SQLErrorResponse: Structured error response (only if safe=True) None: For operations that don't return results (e.g., INSERT, UPDATE, DELETE)

返回类型:

SQLResponse

示例

# Simple query (uses temporary connection with autocommit) result = db.execute("SELECT * FROM users") rows = list(result.fetchall())

# Parameterized query result = db.execute("SELECT * FROM users WHERE id = :id", params={"id": 1})

# Parameterized insert db.execute(

"INSERT INTO users (name) VALUES (:name)", params={"name": "Alice"}

)

# Transactional operation with db:

db.execute("INSERT INTO users (name) VALUES (:name)", params={"name": "Bob"}) db.execute("UPDATE users SET active = TRUE WHERE name = :name", params={"name": "Bob"})

# Cross-database SQL (transpile from PostgreSQL to the current database dialect, i.e., SQLite) result = db.execute("SELECT * FROM users LIMIT 10", transpile="postgresql")

# Safe mode - returns error instead of raising result = db.execute("SELECT * FROM nonexistent", safe=True) if isinstance(result, SQLErrorResponse):

print(result.to_string())

备注

For SQLAlchemy ORM operations, use orm_execute() method instead.

db_tabs()[源代码]

List all table names in the database.

返回:

List of table names

返回类型:

List[str]

db_views()[源代码]

List all view names in the database.

返回:

List of view names

返回类型:

List[str]

tab_cols(tab_name, full_info=False)[源代码]

List column information for a specific table.

参数:
  • tab_name (str) -- Name of the table

  • full_info (bool) -- If True, return full column information; if False, return only column names

返回:

List of column dictionaries with full metadata When full_info=False: List[str] of column names

返回类型:

When full_info=True

tab_pks(tab_name)[源代码]

List primary key column names for a specific table.

参数:

tab_name (str) -- Name of the table

返回:

List of primary key column names

返回类型:

List[str]

tab_fks(tab_name)[源代码]

List foreign key information for a specific table.

参数:

tab_name (str) -- Name of the table

返回:

List of foreign key information with keys:
  • col_name: Column name in the current table

  • tab_ref: Referenced table name

  • col_ref: Referenced column name

  • name: Foreign key constraint name

返回类型:

List[Dict[str, str]]

row_count(tab_name)[源代码]

Get row count for a specific table.

参数:

tab_name (str) -- Name of the table

返回:

Number of rows in the table

返回类型:

int

col_type(tab_name, col_name)[源代码]

Get column type for a specific column in a table.

参数:
  • tab_name (str) -- Name of the table

  • col_name (str) -- Name of the column

返回:

Column type

返回类型:

str

col_distincts(tab_name, col_name)[源代码]

Get distinct values for a specific column.

参数:
  • tab_name (str) -- Name of the table

  • col_name (str) -- Name of the column

返回:

List of distinct values

返回类型:

List[Any]

col_enums(tab_name, col_name)[源代码]

Get all enumerated values for a specific column (including duplicates).

This method returns all values from a column, including duplicates. For unique values only, use col_distincts() instead.

参数:
  • tab_name (str) -- Name of the table

  • col_name (str) -- Name of the column

返回:

List of all enumerated values (may contain duplicates)

返回类型:

List[Any]

col_freqs(tab_name, col_name)[源代码]

Get value frequencies for a specific column.

参数:
  • tab_name (str) -- Name of the table

  • col_name (str) -- Name of the column

返回:

List of value-frequency pairs

返回类型:

List[Dict[str, Any]]

col_freqk(tab_name, col_name, topk=20)[源代码]

Get top-k value frequencies for a specific column.

参数:
  • tab_name (str) -- Name of the table

  • col_name (str) -- Name of the column

  • k -- Number of top values to return

  • topk (int)

返回:

List of top-k value-frequency pairs

返回类型:

List[Dict[str, Any]]

col_nonnulls(tab_name, col_name)[源代码]

Get list of non-null values for a specific column.

参数:
  • tab_name (str) -- Name of the table

  • col_name (str) -- Name of the column

返回:

List of non-null values

返回类型:

List[Any]

clear_tab(tab_name)[源代码]

Clear all data from a specific table without deleting the table itself.

Uses SQLAlchemy ORM to ensure compatibility across all database backends.

参数:

tab_name (str) -- Name of the table to clear

抛出:

Exception -- If the clearing operation fails

返回类型:

None

drop_tab(tab_name)[源代码]

Drop a specific table from the database.

Uses SQLAlchemy ORM to ensure compatibility across all database backends.

参数:

tab_name (str) -- Name of the table to drop

抛出:

Exception -- If the drop operation fails

返回类型:

None

drop_view(view_name)[源代码]

Drop a specific view from the database.

参数:

view_name (str) -- Name of the view to drop

抛出:

Exception -- If the drop operation fails

返回类型:

None

drop()[源代码]

Drop all tables in the database.

Uses SQLAlchemy metadata reflection to drop all tables.

抛出:

DatabaseError -- If the database drop operation fails

返回类型:

None

init(connect=True)[源代码]

Drop the entire database and create a new one.

This method combines drop() and database creation. After dropping, it will recreate the database and establish a new connection.

抛出:

Exception -- If the database initialization fails

返回类型:

None

参数:

connect (bool)

clear()[源代码]

Clear all data from tables in the database without deleting the tables themselves.

Uses the clear_tab method to ensure compatibility across all database backends.

抛出:

Exception -- If the clearing operation fails

返回类型:

None

close()[源代码]

Close the database connection and dispose of the engine.

返回类型:

None

ahvn.utils.db.base.table_display(table, schema=None, max_rows=64, max_width=64, style='DEFAULT', **kwargs)[源代码]

Render a tabular display of SQL query results or iterable dictionaries using PrettyTable.

参数:
  • table (Union[SQLResponse, Iterable[Dict]]) -- The table data to display. Can be a SQLResponse object (from a database query) or any iterable of dictionaries (e.g., list of dicts).

  • schema (Optional[List[str]], optional) -- List of column names to use as the table schema. If not provided, the schema is inferred from the SQLResponse or from the first row of the iterable.

  • max_rows (int, optional) -- Maximum number of rows to display (including the last row and an ellipsis row if truncated). If the table has more than max_rows + 1 rows, the output will show the first max_rows-1 rows, an ellipsis row, and the last row. Defaults to 64.

  • max_width (int, optional) -- Maximum width for each column in the output table. Defaults to 64.

  • style (Literal["DEFAULT", "MARKDOWN", "PLAIN_COLUMNS", "MSWORD_FRIENDLY", "ORGMODE", "SINGLE_BORDER", "DOUBLE_BORDER", "RANDOM"], optional) -- The style to use for the table (supported by PrettyTable). Defaults to "DEFAULT".

  • **kwargs -- Additional keyword arguments passed to PrettyTable.

返回:

A string representation of the formatted table, including the number of rows in total.

返回类型:

str

抛出:

ValueError -- If the provided table rows do not match the schema in length.

示例

>>> result = db.execute("SELECT * FROM users")
>>> table_display(result, max_rows=5)