pir_pipeline.ingestion.PIRIngestor.PIRIngestor

class pir_pipeline.ingestion.PIRIngestor.PIRIngestor(workbook: str | PathLike, sql: SQLAlchemyUtils)

Bases: object

Class for ingesting PIR data

__init__(workbook: str | PathLike, sql: SQLAlchemyUtils)

Initialize a PIRIngestor object

Args:

workbook (str|os.PathLike): File path to an Excel Workbook

Methods

__init__(workbook, sql)

Initialize a PIRIngestor object

append_sections()

Append the loaded section data to generate the response table

clean_pir_data()

Align the PIR data with PIR database schemas

close_excel_files()

Close all Excel files

duplicated_question_error(df, columns)

Resolve duplicated questions

extract_sheets()

Load the workbook, and extract sheets and year.

get_section(question_number)

Extract the section from a question number

hash_columns(row)

Return the sha1 hash of a series of columns

ingest()

Ingestion entry point

insert_data()

Insert data into the target database

load_data()

Load data from each sheet of the workbook

make_snake_name(name)

Convert a name to snake case

merge_response_question()

Merge response and question data frames

stringify(value)

Convert values to string

validate_data()

Validate the cleaned PIR data

append_sections() Self

Append the loaded section data to generate the response table

Returns:

Self: PIRIngestor Object

clean_pir_data() Self

Align the PIR data with PIR database schemas

Returns:

Self: PIRIngestor object

close_excel_files()

Close all Excel files

duplicated_question_error(df: DataFrame, columns: list[str]) DataFrame

Resolve duplicated questions

This function is run when a duplicate question appears in the data. In that case, the data are sorted, grouped, and the first record in each group is taken.

Args:

df (pd.DataFrame): A data frame containing duplicates columns (list[str]): A list of columns to group on

Returns:

pd.DataFrame: A deduplicated data frame

extract_sheets() Self

Load the workbook, and extract sheets and year.

Returns:

Self: PIRIngestor object

get_section(question_number: str) str

Extract the section from a question number

Args:

question_number (str): A string question number

Returns:

str: The section in which the question appears

hash_columns(row: str | Series) str

Return the sha1 hash of a series of columns

Args:

row (pd.Series): A series of columns to hash

Returns:

str: Hashed columns

ingest() Self

Ingestion entry point

Returns:

Self: PIRIngestor object

insert_data() Self

Insert data into the target database

Returns:

Self: PIRIngestor object

load_data() Self

Load data from each sheet of the workbook

Returns:

Self: PIRIngestor object

make_snake_name(name: str) str

Convert a name to snake case

Args:

name (str): A name to convert

Returns:

str: Snake-cased name

merge_response_question() Self

Merge response and question data frames

Merges the response and question data frames to: 1) get question text onto the response data frame for question_id generation, 2) ensure that all questions that appear in the response data frame appear in the question data frame.

Returns:

Self: PIRIngestor object

stringify(value: Any) str

Convert values to string

In the case of dates, this function applies the Month/Day/Year format.

Args:

value (Any): Value to convert to string

Returns:

str: Original value converted to string

validate_data() Self

Validate the cleaned PIR data

Returns:

Self: PIRIngestor object