pir_pipeline.utils.SQLAlchemyUtils.SQLAlchemyUtils

class pir_pipeline.utils.SQLAlchemyUtils.SQLAlchemyUtils(user: str, password: str, host: str, port: int, database: str)

Bases: SQLUtils

Utilities for interacting with SQL via SQLAlchemy

__init__(user: str, password: str, host: str, port: int, database: str)

Initialize a SQLAlchemyUtils object

Args:

user (str): Database username password (str): Database password host (str): Database host port (int): Database port database (str): Database name

Methods

__init__(user, password, host, port, database)

Initialize a SQLAlchemyUtils object

close_connection()

Close database connection(s)

create_db()

Create the target database

drop_db()

Drop the target database

gen_engine(**kwargs)

Generate database engine

get_columns(table[, where])

Get column names from the specified table

get_records(query[, records])

Return records from the database

get_scalar(query, records)

Return a scalar from a query

insert_records(records, table)

Insert records into the target table

make_connection()

Make a connection to the target database(s)

to_dict(records, columns)

Convert a list of tuples to a list of dictionaries

update_records(table, set, where[, records])

Update records in the database

validate_table(table)

Assert that the table provided is among the list of valid tables.

Attributes

DB_CONFIG

Database configuration

database

Return the database name

engine

Return the database engine

tables

Return the tables in the database

property DB_CONFIG

Database configuration

close_connection()

Close database connection(s)

create_db() Self

Create the target database

Returns:

Self: Object of class SQLAlchemyUtils

property database

Return the database name

drop_db() Self

Drop the target database

Returns:

Self: Object of class SQLAlchemyUtils

property engine

Return the database engine

gen_engine(**kwargs) Self

Generate database engine

Returns:

Self: Object of class SQLAlchemyUtils

get_columns(table: str, where: str = '') list[str]

Get column names from the specified table

Args:

table (str): Table name where (str): Additional where condition by which to filter columns returned.

Returns:

list[str]: A list of column names

get_records(query: str | Select, records: dict | list[dict] = None) DataFrame

Return records from the database

Args:

query (str | Select): A query to execute records (dict | list[dict], optional): Records to use for bound parameters. Defaults to None.

Returns:

pd.DataFrame: Records returned by the query

get_scalar(query: Select, records: dict)

Return a scalar from a query

Args:

query (str|Select): A query to execute records (dict): A dictionary containing parameters to match

insert_records(records: list[dict], table: str)

Insert records into the target table

Args:

records (list[dict]): A list of records for insertion table (str): Table name

make_connection()

Make a connection to the target database(s)

property tables

Return the tables in the database

to_dict(records: list[tuple], columns: list[str]) list[dict]

Convert a list of tuples to a list of dictionaries

Args:

records (list[tuple]): Records, i.e. values in the resultant dictionaries. columns (list[str]): Columns, i.e. keys in the resultant dictionaries.

Returns:

list[dict]: List of dictionaries.

update_records(table: Table, set: dict[str], where: BinaryExpression | BooleanClauseList, records: list[dict] = [])

Update records in the database

Args:

table (Table): Table name set (dict[str]): Dictionary indicating how to update values. where (BinaryExpression | BooleanClauseList): Clause indicating which values to update. records (list[dict], optional): List of records to use in update. Defaults to [].

validate_table(table: str)

Assert that the table provided is among the list of valid tables.

Args:

table (str): Table name