"""Module to randomly generate a JSON file with customer data for the Avacon app.""" import json import math import random from datetime import datetime, timedelta from typing import Dict, List, Tuple import pandas as pd from config import DATA_DIR from data_utils import DateTimeEncoder def generate_readings(num_customers: int) -> List[Dict[str, Tuple[str, List[datetime], List[int]]]]: """Generate simulated meter readings for a specified number of customers. This function creates synthetic data for both natural gas and electricity meter readings. It simulates readings based on average consumption patterns in Germany, applying random variations to model real-world scenarios. Parameters ---------- num_customers : int The number of customers for which to generate meter readings. Returns ------- List[Dict[str, Tuple[str, List[datetime], List[int]]]] A list of dictionaries, where each dictionary represents a customer and contains: - 'electricity': A tuple of (meter number, list of reading dates, list of readings) - 'gas': A tuple of (meter number, list of reading dates, list of readings) """ # NOTE: Of course, natural gas and electricity consumption depend on various factors, such as # size of the household, age of the building, insulation, etc. For the sake of this example, # we will simply take the known average value and sample from a Gaussian distribution around # this value, with a standard deviation of 10% of the mean. # For the mean we assumed the average size of a flat in Germany (90m²) and the average # consumption of 140 kWh/m², which we need to convert to m³ for the meter reading. To do this, # I am using the calorific value of natural gas. I assumed a value around 11.215 kWh/m³. I also # need to account for slight pressure differences using the conversion factor ("Zustandszahl"). mean_natural_gas = 12600 # in kWh calorific_value = 11.215 # in kWh/m³ conversion_factor = 0.9692 mean_cubic = mean_natural_gas / (calorific_value * conversion_factor) # For electricity, we take as average consumption the one of a 2-person household in Germany. mean_electricity = 3500 # in kWh readings = [] # For each customer, generate between 1 and 10 readings (we assume that natural gas and # electricity are always read at the same time) for _ in range(num_customers): # The initial reading of the customers meter gas_reading = random.randint(1000, 60000) elt_reading = random.randint(1000, 600_000) # Create an avacon-style meter number gas_meter_number = generate_meter_number() elt_meter_number = generate_meter_number() num_readings = random.randint(1, 10) # Get initial timestamp: Assuming that each reading takes place once a year around a similar # date, we just take today's date and subtract a number of years corresponding to the number # of readings init_date = generate_past_date_with_variance(num_readings) tmp_gas_dates: list[datetime] = [] tmp_elt_dates: list[datetime] = [] tmp_gas_readings = [] tmp_elt_readings = [] for j in range(num_readings): time_diff = 0 if j > 0: time_diff = 365 + random.randint(-50, 50) gas_date = tmp_gas_dates[-1] + timedelta(days=time_diff) if j > 0 else init_date # Electricity is around a similar date as natural gas elt_date = gas_date + timedelta(days=random.randint(-10, 10)) # Generate random readings gas_reading += int(random.gauss(mean_cubic, mean_cubic * 0.1)) elt_reading += int(random.gauss(mean_electricity, mean_electricity * 0.1)) # Append to temporary lists tmp_gas_dates.append(gas_date) tmp_elt_dates.append(elt_date) tmp_gas_readings.append(gas_reading) tmp_elt_readings.append(elt_reading) # Append to final list full_readings_dict = { "electricity": (elt_meter_number, tmp_elt_dates, tmp_elt_readings), "gas": (gas_meter_number, tmp_gas_dates, tmp_gas_readings), } readings.append(full_readings_dict) return readings def generate_past_date_with_variance(years_ago): # Get current date (ignoring time) current_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) # Subtract the specified number of years past_date = current_date.replace(year=current_date.year - years_ago) # Generate a random number of days between -50 and 50 days_variance = random.randint(-50, 50) # Apply the variance final_date = past_date + timedelta(days=days_variance) return final_date def weighted_random_int(min_value: int = 1, max_value: int = 120) -> int: """Generate a random integer with a logarithmic distribution. This function produces random integers between min_value and max_value (inclusive), with a distribution skewed towards lower numbers. It uses a logarithmic transformation to achieve this weighted distribution. Parameters ---------- min_value : int, optional The minimum value of the range (inclusive). Default is 1. max_value : int, optional The maximum value of the range (inclusive). Default is 120. Returns ------- int A random integer between min_value and max_value, with a distribution skewed towards lower numbers. """ r = random.random() # Apply a logarithmic transformation to skew towards lower numbers value = math.exp(r * math.log(max_value - min_value + 1)) + min_value - 1 # Round down to the nearest integer return int(math.floor(value)) def generate_meter_number() -> str: """Generate a random meter number in a specific format. This function creates a meter number string in the format "X.YYY.ZZZ.Q", where X and Q are single digits, and YYY and ZZZ are three-digit numbers. Returns ------- str A randomly generated meter number string in the format "X.YYY.ZZZ.Q". """ return ( f"{random.randint(1, 9)}.{random.randint(100, 999)}." f"{random.randint(100, 999)}.{random.randint(1, 9)}" ) if __name__ == "__main__": # Generate data for app # Number of customers to generate num_customers = 1000 # Load base data file # NOTE: All of this information is publicly available, see readme for sources of information # that were used with open(DATA_DIR / "base_data.json", "r") as file: base_data = json.load(file) zip_results = base_data["zips"] # Create weighted population-weighted sample of customers df = pd.DataFrame(zip_results) df["weight"] = df["population"] / df["population"].sum() selected_zips = df.sample(n=num_customers, weights="weight", replace=True) # Generate customer names print("Generating customers data...") c_given_names = random.choices(base_data["given_names"], k=num_customers) c_surnames = random.choices(base_data["surnames"], k=num_customers) # Generate addresses print("Generating address data...") c_streets = random.choices(base_data["streets"], k=num_customers) # For street numbers, we just generate a random number between 1 and 120, weighted such that the # lower numbers are more likely to occur house_numbers = [weighted_random_int(1, 120) for _ in range(num_customers)] # Finally, create meter readings print("Generating meter readings...") readings = generate_readings(num_customers) # Create a final list of customers and store as JSON print("Creating final JSON file...") customers = [] for i in range(num_customers): customers.append( { "given_name": c_given_names[i], "surname": c_surnames[i], "street": c_streets[i], "house_number": house_numbers[i], "city": selected_zips.iloc[i]["city"], "zip_code": selected_zips.iloc[i]["zip_codes"], "longitude": float(selected_zips.iloc[i]["longitude"]), "latitude": float(selected_zips.iloc[i]["latitude"]), "readings_elt": readings[i]["electricity"], "readings_gas": readings[i]["gas"], } ) with open(DATA_DIR / "customers.json", "w") as file: json.dump(customers, file, indent=4, ensure_ascii=False, cls=DateTimeEncoder)