Add `data_preparation/generate_customers.py`, a script that takes the `base_data.json` file generated by `get_base_data.py` and randomly samples a given number of customers. To simplify things, each customer is assigned exactly one gas and one electricity meter and each of them is read between 1 and 10 times. The full data including meters, meter readings and dates as well as customers and addresses is stored in a final JSON file named `customers.json`.
223 lines
8.4 KiB
Python
223 lines
8.4 KiB
Python
"""Module to randomly generate a JSON file with customer data for the Avacon app."""
|
|
|
|
import json
|
|
import math
|
|
import random
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Tuple
|
|
|
|
import pandas as pd
|
|
from config import DATA_DIR
|
|
from data_utils import DateTimeEncoder
|
|
|
|
|
|
def generate_readings(num_customers: int) -> List[Dict[str, Tuple[str, List[datetime], List[int]]]]:
|
|
"""Generate simulated meter readings for a specified number of customers.
|
|
|
|
This function creates synthetic data for both natural gas and electricity
|
|
meter readings. It simulates readings based on average consumption patterns
|
|
in Germany, applying random variations to model real-world scenarios.
|
|
|
|
Parameters
|
|
----------
|
|
num_customers : int
|
|
The number of customers for which to generate meter readings.
|
|
|
|
Returns
|
|
-------
|
|
List[Dict[str, Tuple[str, List[datetime], List[int]]]]
|
|
A list of dictionaries, where each dictionary represents a customer and
|
|
contains:
|
|
- 'electricity': A tuple of (meter number, list of reading dates, list of readings)
|
|
- 'gas': A tuple of (meter number, list of reading dates, list of readings)
|
|
"""
|
|
|
|
# NOTE: Of course, natural gas and electricity consumption depend on various factors, such as
|
|
# size of the household, age of the building, insulation, etc. For the sake of this example,
|
|
# we will simply take the known average value and sample from a Gaussian distribution around
|
|
# this value, with a standard deviation of 10% of the mean.
|
|
# For the mean we assumed the average size of a flat in Germany (90m²) and the average
|
|
# consumption of 140 kWh/m², which we need to convert to m³ for the meter reading. To do this,
|
|
# I am using the calorific value of natural gas. I assumed a value around 11.215 kWh/m³. I also
|
|
# need to account for slight pressure differences using the conversion factor ("Zustandszahl").
|
|
mean_natural_gas = 12600 # in kWh
|
|
calorific_value = 11.215 # in kWh/m³
|
|
conversion_factor = 0.9692
|
|
|
|
mean_cubic = mean_natural_gas / (calorific_value * conversion_factor)
|
|
|
|
# For electricity, we take as average consumption the one of a 2-person household in Germany.
|
|
mean_electricity = 3500 # in kWh
|
|
|
|
readings = []
|
|
# For each customer, generate between 1 and 10 readings (we assume that natural gas and
|
|
# electricity are always read at the same time)
|
|
for _ in range(num_customers):
|
|
# The initial reading of the customers meter
|
|
gas_reading = random.randint(1000, 60000)
|
|
elt_reading = random.randint(1000, 600_000)
|
|
|
|
# Create an avacon-style meter number
|
|
gas_meter_number = generate_meter_number()
|
|
elt_meter_number = generate_meter_number()
|
|
|
|
num_readings = random.randint(1, 10)
|
|
|
|
# Get initial timestamp: Assuming that each reading takes place once a year around a similar
|
|
# date, we just take today's date and subtract a number of years corresponding to the number
|
|
# of readings
|
|
init_date = generate_past_date_with_variance(num_readings)
|
|
tmp_gas_dates: list[datetime] = []
|
|
tmp_elt_dates: list[datetime] = []
|
|
tmp_gas_readings = []
|
|
tmp_elt_readings = []
|
|
for j in range(num_readings):
|
|
time_diff = 0
|
|
if j > 0:
|
|
time_diff = 365 + random.randint(-50, 50)
|
|
|
|
gas_date = tmp_gas_dates[-1] + timedelta(days=time_diff) if j > 0 else init_date
|
|
|
|
# Electricity is around a similar date as natural gas
|
|
elt_date = gas_date + timedelta(days=random.randint(-10, 10))
|
|
|
|
# Generate random readings
|
|
gas_reading += int(random.gauss(mean_cubic, mean_cubic * 0.1))
|
|
elt_reading += int(random.gauss(mean_electricity, mean_electricity * 0.1))
|
|
|
|
# Append to temporary lists
|
|
tmp_gas_dates.append(gas_date)
|
|
tmp_elt_dates.append(elt_date)
|
|
tmp_gas_readings.append(gas_reading)
|
|
tmp_elt_readings.append(elt_reading)
|
|
|
|
# Append to final list
|
|
full_readings_dict = {
|
|
"electricity": (elt_meter_number, tmp_elt_dates, tmp_elt_readings),
|
|
"gas": (gas_meter_number, tmp_gas_dates, tmp_gas_readings),
|
|
}
|
|
readings.append(full_readings_dict)
|
|
|
|
return readings
|
|
|
|
|
|
def generate_past_date_with_variance(years_ago):
|
|
# Get current date (ignoring time)
|
|
current_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
# Subtract the specified number of years
|
|
past_date = current_date.replace(year=current_date.year - years_ago)
|
|
|
|
# Generate a random number of days between -50 and 50
|
|
days_variance = random.randint(-50, 50)
|
|
|
|
# Apply the variance
|
|
final_date = past_date + timedelta(days=days_variance)
|
|
|
|
return final_date
|
|
|
|
|
|
def weighted_random_int(min_value: int = 1, max_value: int = 120) -> int:
|
|
"""Generate a random integer with a logarithmic distribution.
|
|
|
|
This function produces random integers between min_value and max_value (inclusive),
|
|
with a distribution skewed towards lower numbers. It uses a logarithmic
|
|
transformation to achieve this weighted distribution.
|
|
|
|
Parameters
|
|
----------
|
|
min_value : int, optional
|
|
The minimum value of the range (inclusive). Default is 1.
|
|
max_value : int, optional
|
|
The maximum value of the range (inclusive). Default is 120.
|
|
|
|
Returns
|
|
-------
|
|
int
|
|
A random integer between min_value and max_value, with a distribution
|
|
skewed towards lower numbers.
|
|
"""
|
|
r = random.random()
|
|
|
|
# Apply a logarithmic transformation to skew towards lower numbers
|
|
value = math.exp(r * math.log(max_value - min_value + 1)) + min_value - 1
|
|
|
|
# Round down to the nearest integer
|
|
return int(math.floor(value))
|
|
|
|
|
|
def generate_meter_number() -> str:
|
|
"""Generate a random meter number in a specific format.
|
|
|
|
This function creates a meter number string in the format "X.YYY.ZZZ.Q",
|
|
where X and Q are single digits, and YYY and ZZZ are three-digit numbers.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
A randomly generated meter number string in the format "X.YYY.ZZZ.Q".
|
|
"""
|
|
return (
|
|
f"{random.randint(1, 9)}.{random.randint(100, 999)}."
|
|
f"{random.randint(100, 999)}.{random.randint(1, 9)}"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Generate data for app
|
|
|
|
# Number of customers to generate
|
|
num_customers = 1000
|
|
|
|
# Load base data file
|
|
# NOTE: All of this information is publicly available, see readme for sources of information
|
|
# that were used
|
|
with open(DATA_DIR / "base_data.json", "r") as file:
|
|
base_data = json.load(file)
|
|
|
|
zip_results = base_data["zips"]
|
|
|
|
# Create weighted population-weighted sample of customers
|
|
df = pd.DataFrame(zip_results)
|
|
df["weight"] = df["population"] / df["population"].sum()
|
|
selected_zips = df.sample(n=num_customers, weights="weight", replace=True)
|
|
|
|
# Generate customer names
|
|
print("Generating customers data...")
|
|
c_given_names = random.choices(base_data["given_names"], k=num_customers)
|
|
c_surnames = random.choices(base_data["surnames"], k=num_customers)
|
|
|
|
# Generate addresses
|
|
print("Generating address data...")
|
|
c_streets = random.choices(base_data["streets"], k=num_customers)
|
|
|
|
# For street numbers, we just generate a random number between 1 and 120, weighted such that the
|
|
# lower numbers are more likely to occur
|
|
house_numbers = [weighted_random_int(1, 120) for _ in range(num_customers)]
|
|
|
|
# Finally, create meter readings
|
|
print("Generating meter readings...")
|
|
readings = generate_readings(num_customers)
|
|
|
|
# Create a final list of customers and store as JSON
|
|
print("Creating final JSON file...")
|
|
customers = []
|
|
for i in range(num_customers):
|
|
customers.append(
|
|
{
|
|
"given_name": c_given_names[i],
|
|
"surname": c_surnames[i],
|
|
"street": c_streets[i],
|
|
"house_number": house_numbers[i],
|
|
"city": selected_zips.iloc[i]["city"],
|
|
"zip_code": selected_zips.iloc[i]["zip_codes"],
|
|
"longitude": float(selected_zips.iloc[i]["longitude"]),
|
|
"latitude": float(selected_zips.iloc[i]["latitude"]),
|
|
"readings_elt": readings[i]["electricity"],
|
|
"readings_gas": readings[i]["gas"],
|
|
}
|
|
)
|
|
|
|
with open(DATA_DIR / "customers.json", "w") as file:
|
|
json.dump(customers, file, indent=4, ensure_ascii=False, cls=DateTimeEncoder)
|