feat(add-data): Add script to insert prepped data into database

The script `insert_sql.py` uses `pyodbc` to connect to the Azure SQL
database, loads the data from the preprocessed `customers.json` file,
formats them and then inserts them into the created table schema.
This commit is contained in:
Tobias Quadfasel
2024-08-31 14:39:21 +02:00
parent 66d1eec875
commit 61b238e87b

View File

@@ -0,0 +1,214 @@
"""Script to insert the preprocessed data into the Azure SQL Database."""
import json
import os
from datetime import datetime
from typing import List, Tuple
import pyodbc
from config import DATA_DIR
def insert_addresses(
cursor: pyodbc.Cursor, addresses: List[Tuple[str, str, str, str, float, float]]
) -> None:
"""Insert multiple addresses into the Addresses table using pyodbc.
Parameters
----------
cursor : pyodbc.Cursor
A pyodbc cursor object used to execute SQL commands.
addresses : List[Tuple[str, str, str, str, float, float]]
A list of tuples, where each tuple represents an address with the following elements:
(StreetName, HouseNumber, City, PostalCode, Longitude, Latitude).
Returns
-------
None
This function doesn't return any value.
"""
for address in addresses:
cursor.execute(
"""
INSERT INTO Addresses (StreetName, HouseNumber, City, PostalCode, Longitude, Latitude)
VALUES (?, ?, ?, ?, ?, ?);
""",
address,
)
conn.commit()
def insert_meters(cursor: pyodbc.Cursor, meter_data: List[Tuple[str, str, int]]) -> None:
"""Insert multiple meters into the Meters table in a database using pyodbc.
Parameters
----------
cursor : pyodbc.Cursor
A pyodbc cursor object used to execute SQL commands.
meter_data : List[Tuple[str, str, int]]
A list of tuples, where each tuple represents a meter with the following elements:
(Signature, MeterType, AddressID).
Returns
-------
None
This function doesn't return any value.
"""
for meter in meter_data:
cursor.execute(
"""
INSERT INTO Meters (Signature, MeterType, AddressID)
VALUES (?, ?, ?);
""",
meter,
)
conn.commit()
def insert_customers(cursor: pyodbc.Cursor, customer_data: List[Tuple[str, str, int, int]]) -> None:
"""Insert multiple customers into the Customers table in a database using pyodbc.
Parameters
----------
cursor : pyodbc.Cursor
A pyodbc cursor object used to execute SQL commands.
customer_data : List[Tuple[str, str, int, int]]
A list of tuples, where each tuple represents a customer with the following elements:
(FirstName, LastName, GasMeterID, EltMeterID).
Returns
-------
None
This function doesn't return any value.
"""
for customer in customer_data:
cursor.execute(
"""
INSERT INTO Customers (FirstName, LastName, GasMeterID, EltMeterID)
VALUES (?, ?, ?, ?);
""",
customer,
)
conn.commit()
def insert_readings(
cursor: pyodbc.Cursor, readings_data: List[Tuple[int, int, datetime, int]]
) -> None:
"""Insert multiple readings into the Readings table in a database using pyodbc.
Parameters
----------
cursor : pyodbc.Cursor
A pyodbc cursor object used to execute SQL commands.
readings_data : List[Tuple[int, int, datetime, int]]
A list of tuples, where each tuple represents a reading with the following elements:
(CustomerID, MeterID, ReadingDate, ReadingValue).
Returns
-------
None
This function doesn't return any value.
"""
for reading in readings_data:
cursor.execute(
"""
INSERT INTO Readings (CustomerID, MeterID, ReadingDate, ReadingValue)
VALUES (?, ?, ?, ?);
""",
reading,
)
conn.commit()
if __name__ == "__main__":
conn_str = os.environ.get("AZURE_SQL_CONNECTION_STRING")
try:
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
print("Connected to Azure SQL Database successfully!")
except pyodbc.Error as e:
print(f"Error connecting to Azure SQL Database: {e}")
# Load generated customer data
with open(DATA_DIR / "customers.json", "r") as file:
customers = json.load(file)
# Insert data into the database
# start with addresses
full_addr_list = []
for c in customers:
full_addr_list.append(
(
c["street"],
c["house_number"],
c["city"],
c["zip_code"],
c["longitude"],
c["latitude"],
)
)
unique_addresses = list(set(full_addr_list))
address_ids = []
for i in range(len(customers)):
address_ids.append(unique_addresses.index(full_addr_list[i]) + 1)
gas_meter_data = []
elt_meter_data = []
gas_readings_data = []
elt_readings_data = []
customer_data = []
gas_id = 1
elt_id = len(customers) + 1
for c_idx, c in enumerate(customers):
customer_data.append((c["given_name"], c["surname"], c_idx + 1, c_idx + 1))
for r in range(len(c["readings_gas"][1])):
if r == 0:
gas_meter_data.append((c["readings_gas"][0], "GAS", address_ids[c_idx]))
elt_meter_data.append((c["readings_elt"][0], "ELT", address_ids[c_idx]))
gas_readings_data.append(
(
c_idx + 1,
gas_id,
datetime.fromisoformat(c["readings_gas"][1][r]),
c["readings_gas"][2][r],
)
) # CustomerID, MeterID, ReadingDate, ReadingValue
elt_readings_data.append(
(
c_idx + 1,
elt_id,
datetime.fromisoformat(c["readings_elt"][1][r]),
c["readings_elt"][2][r],
)
)
gas_id += 1
elt_id += 1
print("Inserting addresses into the database...")
insert_addresses(cursor, unique_addresses)
print("Inserting gas meter data into the database...")
insert_meters(cursor, gas_meter_data)
print("Inserting electricity meter data into the database...")
insert_meters(cursor, elt_meter_data)
print("Inserting customer data into the database...")
insert_customers(cursor, customer_data)
print("Inserting gas readings into the database...")
insert_readings(cursor, gas_readings_data)
print("Inserting electricity readings into the database...")
insert_readings(cursor, elt_readings_data)
print("Data successfully inserted into the database!")
if "cursor" in locals():
cursor.close()
if "conn" in locals():
conn.close()