pir_pipeline.utils.SQLAlchemyUtils.SQLAlchemyUtils¶
- class pir_pipeline.utils.SQLAlchemyUtils.SQLAlchemyUtils(user: str, password: str, host: str, port: int, database: str)¶
Bases:
SQLUtils
Utilities for interacting with SQL via SQLAlchemy
- __init__(user: str, password: str, host: str, port: int, database: str)¶
Initialize a SQLAlchemyUtils object
- Args:
user (str): Database username password (str): Database password host (str): Database host port (int): Database port database (str): Database name
Methods
__init__
(user, password, host, port, database)Initialize a SQLAlchemyUtils object
Close database connection(s)
Create the target database
drop_db
()Drop the target database
gen_engine
(**kwargs)Generate database engine
get_columns
(table[, where])Get column names from the specified table
get_records
(query[, records])Return records from the database
get_scalar
(query, records)Return a scalar from a query
insert_records
(records, table)Insert records into the target table
Make a connection to the target database(s)
to_dict
(records, columns)Convert a list of tuples to a list of dictionaries
update_records
(table, set, where[, records])Update records in the database
validate_table
(table)Assert that the table provided is among the list of valid tables.
Attributes
Database configuration
Return the database name
Return the database engine
Return the tables in the database
- property DB_CONFIG¶
Database configuration
- close_connection()¶
Close database connection(s)
- create_db() Self ¶
Create the target database
- Returns:
Self: Object of class SQLAlchemyUtils
- property database¶
Return the database name
- drop_db() Self ¶
Drop the target database
- Returns:
Self: Object of class SQLAlchemyUtils
- property engine¶
Return the database engine
- gen_engine(**kwargs) Self ¶
Generate database engine
- Returns:
Self: Object of class SQLAlchemyUtils
- get_columns(table: str, where: str = '') list[str] ¶
Get column names from the specified table
- Args:
table (str): Table name where (str): Additional where condition by which to filter columns returned.
- Returns:
list[str]: A list of column names
- get_records(query: str | Select, records: dict | list[dict] = None) DataFrame ¶
Return records from the database
- Args:
query (str | Select): A query to execute records (dict | list[dict], optional): Records to use for bound parameters. Defaults to None.
- Returns:
pd.DataFrame: Records returned by the query
- get_scalar(query: Select, records: dict)¶
Return a scalar from a query
- Args:
query (str|Select): A query to execute records (dict): A dictionary containing parameters to match
- insert_records(records: list[dict], table: str)¶
Insert records into the target table
- Args:
records (list[dict]): A list of records for insertion table (str): Table name
- make_connection()¶
Make a connection to the target database(s)
- property tables¶
Return the tables in the database
- to_dict(records: list[tuple], columns: list[str]) list[dict] ¶
Convert a list of tuples to a list of dictionaries
- Args:
records (list[tuple]): Records, i.e. values in the resultant dictionaries. columns (list[str]): Columns, i.e. keys in the resultant dictionaries.
- Returns:
list[dict]: List of dictionaries.
- update_records(table: Table, set: dict[str], where: BinaryExpression | BooleanClauseList, records: list[dict] = [])¶
Update records in the database
- Args:
table (Table): Table name set (dict[str]): Dictionary indicating how to update values. where (BinaryExpression | BooleanClauseList): Clause indicating which values to update. records (list[dict], optional): List of records to use in update. Defaults to [].
- validate_table(table: str)¶
Assert that the table provided is among the list of valid tables.
- Args:
table (str): Table name