"""Script to insert the preprocessed data into the Azure SQL Database.""" import json import os from datetime import datetime from typing import List, Tuple import pyodbc from config import DATA_DIR def insert_addresses( cursor: pyodbc.Cursor, addresses: List[Tuple[str, str, str, str, float, float]] ) -> None: """Insert multiple addresses into the Addresses table using pyodbc. Parameters ---------- cursor : pyodbc.Cursor A pyodbc cursor object used to execute SQL commands. addresses : List[Tuple[str, str, str, str, float, float]] A list of tuples, where each tuple represents an address with the following elements: (StreetName, HouseNumber, City, PostalCode, Longitude, Latitude). Returns ------- None This function doesn't return any value. """ for address in addresses: cursor.execute( """ INSERT INTO Addresses (StreetName, HouseNumber, City, PostalCode, Longitude, Latitude) VALUES (?, ?, ?, ?, ?, ?); """, address, ) conn.commit() def insert_meters(cursor: pyodbc.Cursor, meter_data: List[Tuple[str, str, int]]) -> None: """Insert multiple meters into the Meters table in a database using pyodbc. Parameters ---------- cursor : pyodbc.Cursor A pyodbc cursor object used to execute SQL commands. meter_data : List[Tuple[str, str, int]] A list of tuples, where each tuple represents a meter with the following elements: (Signature, MeterType, AddressID). Returns ------- None This function doesn't return any value. """ for meter in meter_data: cursor.execute( """ INSERT INTO Meters (Signature, MeterType, AddressID) VALUES (?, ?, ?); """, meter, ) conn.commit() def insert_customers(cursor: pyodbc.Cursor, customer_data: List[Tuple[str, str, int, int]]) -> None: """Insert multiple customers into the Customers table in a database using pyodbc. Parameters ---------- cursor : pyodbc.Cursor A pyodbc cursor object used to execute SQL commands. customer_data : List[Tuple[str, str, int, int]] A list of tuples, where each tuple represents a customer with the following elements: (FirstName, LastName, GasMeterID, EltMeterID). Returns ------- None This function doesn't return any value. """ for customer in customer_data: cursor.execute( """ INSERT INTO Customers (FirstName, LastName, GasMeterID, EltMeterID) VALUES (?, ?, ?, ?); """, customer, ) conn.commit() def insert_readings( cursor: pyodbc.Cursor, readings_data: List[Tuple[int, int, datetime, int]] ) -> None: """Insert multiple readings into the Readings table in a database using pyodbc. Parameters ---------- cursor : pyodbc.Cursor A pyodbc cursor object used to execute SQL commands. readings_data : List[Tuple[int, int, datetime, int]] A list of tuples, where each tuple represents a reading with the following elements: (CustomerID, MeterID, ReadingDate, ReadingValue). Returns ------- None This function doesn't return any value. """ for reading in readings_data: cursor.execute( """ INSERT INTO Readings (CustomerID, MeterID, ReadingDate, ReadingValue) VALUES (?, ?, ?, ?); """, reading, ) conn.commit() if __name__ == "__main__": conn_str = os.environ.get("AZURE_SQL_CONNECTION_STRING") try: conn = pyodbc.connect(conn_str) cursor = conn.cursor() print("Connected to Azure SQL Database successfully!") except pyodbc.Error as e: print(f"Error connecting to Azure SQL Database: {e}") # Load generated customer data with open(DATA_DIR / "customers.json", "r") as file: customers = json.load(file) # Insert data into the database # start with addresses full_addr_list = [] for c in customers: full_addr_list.append( ( c["street"], c["house_number"], c["city"], c["zip_code"], c["longitude"], c["latitude"], ) ) unique_addresses = list(set(full_addr_list)) address_ids = [] for i in range(len(customers)): address_ids.append(unique_addresses.index(full_addr_list[i]) + 1) gas_meter_data = [] elt_meter_data = [] gas_readings_data = [] elt_readings_data = [] customer_data = [] gas_id = 1 elt_id = len(customers) + 1 for c_idx, c in enumerate(customers): customer_data.append((c["given_name"], c["surname"], c_idx + 1, c_idx + 1)) for r in range(len(c["readings_gas"][1])): if r == 0: gas_meter_data.append((c["readings_gas"][0], "GAS", address_ids[c_idx])) elt_meter_data.append((c["readings_elt"][0], "ELT", address_ids[c_idx])) gas_readings_data.append( ( c_idx + 1, gas_id, datetime.fromisoformat(c["readings_gas"][1][r]), c["readings_gas"][2][r], ) ) # CustomerID, MeterID, ReadingDate, ReadingValue elt_readings_data.append( ( c_idx + 1, elt_id, datetime.fromisoformat(c["readings_elt"][1][r]), c["readings_elt"][2][r], ) ) gas_id += 1 elt_id += 1 print("Inserting addresses into the database...") insert_addresses(cursor, unique_addresses) print("Inserting gas meter data into the database...") insert_meters(cursor, gas_meter_data) print("Inserting electricity meter data into the database...") insert_meters(cursor, elt_meter_data) print("Inserting customer data into the database...") insert_customers(cursor, customer_data) print("Inserting gas readings into the database...") insert_readings(cursor, gas_readings_data) print("Inserting electricity readings into the database...") insert_readings(cursor, elt_readings_data) print("Data successfully inserted into the database!") if "cursor" in locals(): cursor.close() if "conn" in locals(): conn.close()