Files
Tobias Quadfasel 61b238e87b feat(add-data): Add script to insert prepped data into database
The script `insert_sql.py` uses `pyodbc` to connect to the Azure SQL
database, loads the data from the preprocessed `customers.json` file,
formats them and then inserts them into the created table schema.
2024-08-31 14:39:21 +02:00

215 lines
6.4 KiB
Python

"""Script to insert the preprocessed data into the Azure SQL Database."""
import json
import os
from datetime import datetime
from typing import List, Tuple
import pyodbc
from config import DATA_DIR
def insert_addresses(
cursor: pyodbc.Cursor, addresses: List[Tuple[str, str, str, str, float, float]]
) -> None:
"""Insert multiple addresses into the Addresses table using pyodbc.
Parameters
----------
cursor : pyodbc.Cursor
A pyodbc cursor object used to execute SQL commands.
addresses : List[Tuple[str, str, str, str, float, float]]
A list of tuples, where each tuple represents an address with the following elements:
(StreetName, HouseNumber, City, PostalCode, Longitude, Latitude).
Returns
-------
None
This function doesn't return any value.
"""
for address in addresses:
cursor.execute(
"""
INSERT INTO Addresses (StreetName, HouseNumber, City, PostalCode, Longitude, Latitude)
VALUES (?, ?, ?, ?, ?, ?);
""",
address,
)
conn.commit()
def insert_meters(cursor: pyodbc.Cursor, meter_data: List[Tuple[str, str, int]]) -> None:
"""Insert multiple meters into the Meters table in a database using pyodbc.
Parameters
----------
cursor : pyodbc.Cursor
A pyodbc cursor object used to execute SQL commands.
meter_data : List[Tuple[str, str, int]]
A list of tuples, where each tuple represents a meter with the following elements:
(Signature, MeterType, AddressID).
Returns
-------
None
This function doesn't return any value.
"""
for meter in meter_data:
cursor.execute(
"""
INSERT INTO Meters (Signature, MeterType, AddressID)
VALUES (?, ?, ?);
""",
meter,
)
conn.commit()
def insert_customers(cursor: pyodbc.Cursor, customer_data: List[Tuple[str, str, int, int]]) -> None:
"""Insert multiple customers into the Customers table in a database using pyodbc.
Parameters
----------
cursor : pyodbc.Cursor
A pyodbc cursor object used to execute SQL commands.
customer_data : List[Tuple[str, str, int, int]]
A list of tuples, where each tuple represents a customer with the following elements:
(FirstName, LastName, GasMeterID, EltMeterID).
Returns
-------
None
This function doesn't return any value.
"""
for customer in customer_data:
cursor.execute(
"""
INSERT INTO Customers (FirstName, LastName, GasMeterID, EltMeterID)
VALUES (?, ?, ?, ?);
""",
customer,
)
conn.commit()
def insert_readings(
cursor: pyodbc.Cursor, readings_data: List[Tuple[int, int, datetime, int]]
) -> None:
"""Insert multiple readings into the Readings table in a database using pyodbc.
Parameters
----------
cursor : pyodbc.Cursor
A pyodbc cursor object used to execute SQL commands.
readings_data : List[Tuple[int, int, datetime, int]]
A list of tuples, where each tuple represents a reading with the following elements:
(CustomerID, MeterID, ReadingDate, ReadingValue).
Returns
-------
None
This function doesn't return any value.
"""
for reading in readings_data:
cursor.execute(
"""
INSERT INTO Readings (CustomerID, MeterID, ReadingDate, ReadingValue)
VALUES (?, ?, ?, ?);
""",
reading,
)
conn.commit()
if __name__ == "__main__":
conn_str = os.environ.get("AZURE_SQL_CONNECTION_STRING")
try:
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
print("Connected to Azure SQL Database successfully!")
except pyodbc.Error as e:
print(f"Error connecting to Azure SQL Database: {e}")
# Load generated customer data
with open(DATA_DIR / "customers.json", "r") as file:
customers = json.load(file)
# Insert data into the database
# start with addresses
full_addr_list = []
for c in customers:
full_addr_list.append(
(
c["street"],
c["house_number"],
c["city"],
c["zip_code"],
c["longitude"],
c["latitude"],
)
)
unique_addresses = list(set(full_addr_list))
address_ids = []
for i in range(len(customers)):
address_ids.append(unique_addresses.index(full_addr_list[i]) + 1)
gas_meter_data = []
elt_meter_data = []
gas_readings_data = []
elt_readings_data = []
customer_data = []
gas_id = 1
elt_id = len(customers) + 1
for c_idx, c in enumerate(customers):
customer_data.append((c["given_name"], c["surname"], c_idx + 1, c_idx + 1))
for r in range(len(c["readings_gas"][1])):
if r == 0:
gas_meter_data.append((c["readings_gas"][0], "GAS", address_ids[c_idx]))
elt_meter_data.append((c["readings_elt"][0], "ELT", address_ids[c_idx]))
gas_readings_data.append(
(
c_idx + 1,
gas_id,
datetime.fromisoformat(c["readings_gas"][1][r]),
c["readings_gas"][2][r],
)
) # CustomerID, MeterID, ReadingDate, ReadingValue
elt_readings_data.append(
(
c_idx + 1,
elt_id,
datetime.fromisoformat(c["readings_elt"][1][r]),
c["readings_elt"][2][r],
)
)
gas_id += 1
elt_id += 1
print("Inserting addresses into the database...")
insert_addresses(cursor, unique_addresses)
print("Inserting gas meter data into the database...")
insert_meters(cursor, gas_meter_data)
print("Inserting electricity meter data into the database...")
insert_meters(cursor, elt_meter_data)
print("Inserting customer data into the database...")
insert_customers(cursor, customer_data)
print("Inserting gas readings into the database...")
insert_readings(cursor, gas_readings_data)
print("Inserting electricity readings into the database...")
insert_readings(cursor, elt_readings_data)
print("Data successfully inserted into the database!")
if "cursor" in locals():
cursor.close()
if "conn" in locals():
conn.close()