From 61b238e87b2e681389f20bc4fe01e278eed445ac Mon Sep 17 00:00:00 2001 From: Tobias Quadfasel Date: Sat, 31 Aug 2024 14:39:21 +0200 Subject: [PATCH] feat(add-data): Add script to insert prepped data into database The script `insert_sql.py` uses `pyodbc` to connect to the Azure SQL database, loads the data from the preprocessed `customers.json` file, formats them and then inserts them into the created table schema. --- data_preparation/insert_sql.py | 214 +++++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 data_preparation/insert_sql.py diff --git a/data_preparation/insert_sql.py b/data_preparation/insert_sql.py new file mode 100644 index 0000000..04c3ac7 --- /dev/null +++ b/data_preparation/insert_sql.py @@ -0,0 +1,214 @@ +"""Script to insert the preprocessed data into the Azure SQL Database.""" + +import json +import os +from datetime import datetime +from typing import List, Tuple + +import pyodbc +from config import DATA_DIR + + +def insert_addresses( + cursor: pyodbc.Cursor, addresses: List[Tuple[str, str, str, str, float, float]] +) -> None: + """Insert multiple addresses into the Addresses table using pyodbc. + + Parameters + ---------- + cursor : pyodbc.Cursor + A pyodbc cursor object used to execute SQL commands. + addresses : List[Tuple[str, str, str, str, float, float]] + A list of tuples, where each tuple represents an address with the following elements: + (StreetName, HouseNumber, City, PostalCode, Longitude, Latitude). + + Returns + ------- + None + This function doesn't return any value. + """ + for address in addresses: + cursor.execute( + """ + INSERT INTO Addresses (StreetName, HouseNumber, City, PostalCode, Longitude, Latitude) + VALUES (?, ?, ?, ?, ?, ?); + """, + address, + ) + conn.commit() + + +def insert_meters(cursor: pyodbc.Cursor, meter_data: List[Tuple[str, str, int]]) -> None: + """Insert multiple meters into the Meters table in a database using pyodbc. + + Parameters + ---------- + cursor : pyodbc.Cursor + A pyodbc cursor object used to execute SQL commands. + meter_data : List[Tuple[str, str, int]] + A list of tuples, where each tuple represents a meter with the following elements: + (Signature, MeterType, AddressID). + + Returns + ------- + None + This function doesn't return any value. + """ + for meter in meter_data: + cursor.execute( + """ + INSERT INTO Meters (Signature, MeterType, AddressID) + VALUES (?, ?, ?); + """, + meter, + ) + conn.commit() + + +def insert_customers(cursor: pyodbc.Cursor, customer_data: List[Tuple[str, str, int, int]]) -> None: + """Insert multiple customers into the Customers table in a database using pyodbc. + + Parameters + ---------- + cursor : pyodbc.Cursor + A pyodbc cursor object used to execute SQL commands. + customer_data : List[Tuple[str, str, int, int]] + A list of tuples, where each tuple represents a customer with the following elements: + (FirstName, LastName, GasMeterID, EltMeterID). + + Returns + ------- + None + This function doesn't return any value. + """ + for customer in customer_data: + cursor.execute( + """ + INSERT INTO Customers (FirstName, LastName, GasMeterID, EltMeterID) + VALUES (?, ?, ?, ?); + """, + customer, + ) + conn.commit() + + +def insert_readings( + cursor: pyodbc.Cursor, readings_data: List[Tuple[int, int, datetime, int]] +) -> None: + """Insert multiple readings into the Readings table in a database using pyodbc. + + Parameters + ---------- + cursor : pyodbc.Cursor + A pyodbc cursor object used to execute SQL commands. + readings_data : List[Tuple[int, int, datetime, int]] + A list of tuples, where each tuple represents a reading with the following elements: + (CustomerID, MeterID, ReadingDate, ReadingValue). + + Returns + ------- + None + This function doesn't return any value. + """ + for reading in readings_data: + cursor.execute( + """ + INSERT INTO Readings (CustomerID, MeterID, ReadingDate, ReadingValue) + VALUES (?, ?, ?, ?); + """, + reading, + ) + conn.commit() + + +if __name__ == "__main__": + conn_str = os.environ.get("AZURE_SQL_CONNECTION_STRING") + try: + conn = pyodbc.connect(conn_str) + cursor = conn.cursor() + print("Connected to Azure SQL Database successfully!") + + except pyodbc.Error as e: + print(f"Error connecting to Azure SQL Database: {e}") + + # Load generated customer data + with open(DATA_DIR / "customers.json", "r") as file: + customers = json.load(file) + + # Insert data into the database + # start with addresses + full_addr_list = [] + for c in customers: + full_addr_list.append( + ( + c["street"], + c["house_number"], + c["city"], + c["zip_code"], + c["longitude"], + c["latitude"], + ) + ) + + unique_addresses = list(set(full_addr_list)) + + address_ids = [] + for i in range(len(customers)): + address_ids.append(unique_addresses.index(full_addr_list[i]) + 1) + + gas_meter_data = [] + elt_meter_data = [] + gas_readings_data = [] + elt_readings_data = [] + customer_data = [] + gas_id = 1 + elt_id = len(customers) + 1 + for c_idx, c in enumerate(customers): + customer_data.append((c["given_name"], c["surname"], c_idx + 1, c_idx + 1)) + for r in range(len(c["readings_gas"][1])): + if r == 0: + gas_meter_data.append((c["readings_gas"][0], "GAS", address_ids[c_idx])) + elt_meter_data.append((c["readings_elt"][0], "ELT", address_ids[c_idx])) + gas_readings_data.append( + ( + c_idx + 1, + gas_id, + datetime.fromisoformat(c["readings_gas"][1][r]), + c["readings_gas"][2][r], + ) + ) # CustomerID, MeterID, ReadingDate, ReadingValue + elt_readings_data.append( + ( + c_idx + 1, + elt_id, + datetime.fromisoformat(c["readings_elt"][1][r]), + c["readings_elt"][2][r], + ) + ) + gas_id += 1 + elt_id += 1 + + print("Inserting addresses into the database...") + insert_addresses(cursor, unique_addresses) + + print("Inserting gas meter data into the database...") + insert_meters(cursor, gas_meter_data) + + print("Inserting electricity meter data into the database...") + insert_meters(cursor, elt_meter_data) + + print("Inserting customer data into the database...") + insert_customers(cursor, customer_data) + + print("Inserting gas readings into the database...") + insert_readings(cursor, gas_readings_data) + + print("Inserting electricity readings into the database...") + insert_readings(cursor, elt_readings_data) + + print("Data successfully inserted into the database!") + + if "cursor" in locals(): + cursor.close() + if "conn" in locals(): + conn.close()