pir_pipeline.ingestion.PIRIngestor.PIRIngestor¶
- class pir_pipeline.ingestion.PIRIngestor.PIRIngestor(workbook: str | PathLike, sql: SQLAlchemyUtils)¶
Bases:
object
Class for ingesting PIR data
- __init__(workbook: str | PathLike, sql: SQLAlchemyUtils)¶
Initialize a PIRIngestor object
- Args:
workbook (str|os.PathLike): File path to an Excel Workbook
Methods
__init__
(workbook, sql)Initialize a PIRIngestor object
Append the loaded section data to generate the response table
Align the PIR data with PIR database schemas
Close all Excel files
duplicated_question_error
(df, columns)Resolve duplicated questions
Load the workbook, and extract sheets and year.
get_section
(question_number)Extract the section from a question number
hash_columns
(row)Return the sha1 hash of a series of columns
ingest
()Ingestion entry point
Insert data into the target database
Load data from each sheet of the workbook
make_snake_name
(name)Convert a name to snake case
Merge response and question data frames
stringify
(value)Convert values to string
Validate the cleaned PIR data
- append_sections() Self ¶
Append the loaded section data to generate the response table
- Returns:
Self: PIRIngestor Object
- clean_pir_data() Self ¶
Align the PIR data with PIR database schemas
- Returns:
Self: PIRIngestor object
- close_excel_files()¶
Close all Excel files
- duplicated_question_error(df: DataFrame, columns: list[str]) DataFrame ¶
Resolve duplicated questions
This function is run when a duplicate question appears in the data. In that case, the data are sorted, grouped, and the first record in each group is taken.
- Args:
df (pd.DataFrame): A data frame containing duplicates columns (list[str]): A list of columns to group on
- Returns:
pd.DataFrame: A deduplicated data frame
- extract_sheets() Self ¶
Load the workbook, and extract sheets and year.
- Returns:
Self: PIRIngestor object
- get_section(question_number: str) str ¶
Extract the section from a question number
- Args:
question_number (str): A string question number
- Returns:
str: The section in which the question appears
- hash_columns(row: str | Series) str ¶
Return the sha1 hash of a series of columns
- Args:
row (pd.Series): A series of columns to hash
- Returns:
str: Hashed columns
- ingest() Self ¶
Ingestion entry point
- Returns:
Self: PIRIngestor object
- insert_data() Self ¶
Insert data into the target database
- Returns:
Self: PIRIngestor object
- load_data() Self ¶
Load data from each sheet of the workbook
- Returns:
Self: PIRIngestor object
- make_snake_name(name: str) str ¶
Convert a name to snake case
- Args:
name (str): A name to convert
- Returns:
str: Snake-cased name
- merge_response_question() Self ¶
Merge response and question data frames
Merges the response and question data frames to: 1) get question text onto the response data frame for question_id generation, 2) ensure that all questions that appear in the response data frame appear in the question data frame.
- Returns:
Self: PIRIngestor object
- stringify(value: Any) str ¶
Convert values to string
In the case of dates, this function applies the Month/Day/Year format.
- Args:
value (Any): Value to convert to string
- Returns:
str: Original value converted to string
- validate_data() Self ¶
Validate the cleaned PIR data
- Returns:
Self: PIRIngestor object