The script `insert_sql.py` uses `pyodbc` to connect to the Azure SQL database, loads the data from the preprocessed `customers.json` file, formats them and then inserts them into the created table schema.
215 lines
6.4 KiB
Python
215 lines
6.4 KiB
Python
"""Script to insert the preprocessed data into the Azure SQL Database."""
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
from typing import List, Tuple
|
|
|
|
import pyodbc
|
|
from config import DATA_DIR
|
|
|
|
|
|
def insert_addresses(
|
|
cursor: pyodbc.Cursor, addresses: List[Tuple[str, str, str, str, float, float]]
|
|
) -> None:
|
|
"""Insert multiple addresses into the Addresses table using pyodbc.
|
|
|
|
Parameters
|
|
----------
|
|
cursor : pyodbc.Cursor
|
|
A pyodbc cursor object used to execute SQL commands.
|
|
addresses : List[Tuple[str, str, str, str, float, float]]
|
|
A list of tuples, where each tuple represents an address with the following elements:
|
|
(StreetName, HouseNumber, City, PostalCode, Longitude, Latitude).
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
This function doesn't return any value.
|
|
"""
|
|
for address in addresses:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO Addresses (StreetName, HouseNumber, City, PostalCode, Longitude, Latitude)
|
|
VALUES (?, ?, ?, ?, ?, ?);
|
|
""",
|
|
address,
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def insert_meters(cursor: pyodbc.Cursor, meter_data: List[Tuple[str, str, int]]) -> None:
|
|
"""Insert multiple meters into the Meters table in a database using pyodbc.
|
|
|
|
Parameters
|
|
----------
|
|
cursor : pyodbc.Cursor
|
|
A pyodbc cursor object used to execute SQL commands.
|
|
meter_data : List[Tuple[str, str, int]]
|
|
A list of tuples, where each tuple represents a meter with the following elements:
|
|
(Signature, MeterType, AddressID).
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
This function doesn't return any value.
|
|
"""
|
|
for meter in meter_data:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO Meters (Signature, MeterType, AddressID)
|
|
VALUES (?, ?, ?);
|
|
""",
|
|
meter,
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def insert_customers(cursor: pyodbc.Cursor, customer_data: List[Tuple[str, str, int, int]]) -> None:
|
|
"""Insert multiple customers into the Customers table in a database using pyodbc.
|
|
|
|
Parameters
|
|
----------
|
|
cursor : pyodbc.Cursor
|
|
A pyodbc cursor object used to execute SQL commands.
|
|
customer_data : List[Tuple[str, str, int, int]]
|
|
A list of tuples, where each tuple represents a customer with the following elements:
|
|
(FirstName, LastName, GasMeterID, EltMeterID).
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
This function doesn't return any value.
|
|
"""
|
|
for customer in customer_data:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO Customers (FirstName, LastName, GasMeterID, EltMeterID)
|
|
VALUES (?, ?, ?, ?);
|
|
""",
|
|
customer,
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def insert_readings(
|
|
cursor: pyodbc.Cursor, readings_data: List[Tuple[int, int, datetime, int]]
|
|
) -> None:
|
|
"""Insert multiple readings into the Readings table in a database using pyodbc.
|
|
|
|
Parameters
|
|
----------
|
|
cursor : pyodbc.Cursor
|
|
A pyodbc cursor object used to execute SQL commands.
|
|
readings_data : List[Tuple[int, int, datetime, int]]
|
|
A list of tuples, where each tuple represents a reading with the following elements:
|
|
(CustomerID, MeterID, ReadingDate, ReadingValue).
|
|
|
|
Returns
|
|
-------
|
|
None
|
|
This function doesn't return any value.
|
|
"""
|
|
for reading in readings_data:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO Readings (CustomerID, MeterID, ReadingDate, ReadingValue)
|
|
VALUES (?, ?, ?, ?);
|
|
""",
|
|
reading,
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
conn_str = os.environ.get("AZURE_SQL_CONNECTION_STRING")
|
|
try:
|
|
conn = pyodbc.connect(conn_str)
|
|
cursor = conn.cursor()
|
|
print("Connected to Azure SQL Database successfully!")
|
|
|
|
except pyodbc.Error as e:
|
|
print(f"Error connecting to Azure SQL Database: {e}")
|
|
|
|
# Load generated customer data
|
|
with open(DATA_DIR / "customers.json", "r") as file:
|
|
customers = json.load(file)
|
|
|
|
# Insert data into the database
|
|
# start with addresses
|
|
full_addr_list = []
|
|
for c in customers:
|
|
full_addr_list.append(
|
|
(
|
|
c["street"],
|
|
c["house_number"],
|
|
c["city"],
|
|
c["zip_code"],
|
|
c["longitude"],
|
|
c["latitude"],
|
|
)
|
|
)
|
|
|
|
unique_addresses = list(set(full_addr_list))
|
|
|
|
address_ids = []
|
|
for i in range(len(customers)):
|
|
address_ids.append(unique_addresses.index(full_addr_list[i]) + 1)
|
|
|
|
gas_meter_data = []
|
|
elt_meter_data = []
|
|
gas_readings_data = []
|
|
elt_readings_data = []
|
|
customer_data = []
|
|
gas_id = 1
|
|
elt_id = len(customers) + 1
|
|
for c_idx, c in enumerate(customers):
|
|
customer_data.append((c["given_name"], c["surname"], c_idx + 1, c_idx + 1))
|
|
for r in range(len(c["readings_gas"][1])):
|
|
if r == 0:
|
|
gas_meter_data.append((c["readings_gas"][0], "GAS", address_ids[c_idx]))
|
|
elt_meter_data.append((c["readings_elt"][0], "ELT", address_ids[c_idx]))
|
|
gas_readings_data.append(
|
|
(
|
|
c_idx + 1,
|
|
gas_id,
|
|
datetime.fromisoformat(c["readings_gas"][1][r]),
|
|
c["readings_gas"][2][r],
|
|
)
|
|
) # CustomerID, MeterID, ReadingDate, ReadingValue
|
|
elt_readings_data.append(
|
|
(
|
|
c_idx + 1,
|
|
elt_id,
|
|
datetime.fromisoformat(c["readings_elt"][1][r]),
|
|
c["readings_elt"][2][r],
|
|
)
|
|
)
|
|
gas_id += 1
|
|
elt_id += 1
|
|
|
|
print("Inserting addresses into the database...")
|
|
insert_addresses(cursor, unique_addresses)
|
|
|
|
print("Inserting gas meter data into the database...")
|
|
insert_meters(cursor, gas_meter_data)
|
|
|
|
print("Inserting electricity meter data into the database...")
|
|
insert_meters(cursor, elt_meter_data)
|
|
|
|
print("Inserting customer data into the database...")
|
|
insert_customers(cursor, customer_data)
|
|
|
|
print("Inserting gas readings into the database...")
|
|
insert_readings(cursor, gas_readings_data)
|
|
|
|
print("Inserting electricity readings into the database...")
|
|
insert_readings(cursor, elt_readings_data)
|
|
|
|
print("Data successfully inserted into the database!")
|
|
|
|
if "cursor" in locals():
|
|
cursor.close()
|
|
if "conn" in locals():
|
|
conn.close()
|