# -*- coding: utf-8 -*-
# Copyright 2016-2018 Europa-Universität Flensburg,
# Flensburg University of Applied Sciences,
# Centre for Sustainable Energy Systems
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# File description
"""
This file is part of the eGo toolbox.
It contains the class definition for multiple eDisGo networks.
"""
__copyright__ = (
"Flensburg University of Applied Sciences, "
"Europa-Universität Flensburg, "
"Centre for Sustainable Energy Systems"
)
__license__ = "GNU Affero General Public License Version 3 (AGPL-3.0)"
__author__ = "wolf_bunke, maltesc, mltja"
import json
import logging
import os
import pickle
from copy import deepcopy
from datetime import datetime
from datetime import timedelta as td
from time import localtime, sleep, strftime
# Import
from traceback import TracebackException
import dill
import multiprocess as mp2
import pandas as pd
if "READTHEDOCS" not in os.environ:
from edisgo.edisgo import import_edisgo_from_files
from edisgo.flex_opt.reinforce_grid import enhanced_reinforce_grid
from edisgo.network.overlying_grid import distribute_overlying_grid_requirements
from edisgo.tools.config import Config
from edisgo.tools.logger import setup_logger
from edisgo.tools.plots import mv_grid_topology
from edisgo.tools.temporal_complexity_reduction import (
get_most_critical_time_intervals,
)
from edisgo.tools.tools import (
aggregate_district_heating_components,
reduce_timeseries_data_to_given_timeindex,
)
from ego.mv_clustering import cluster_workflow, database
from ego.tools.economics import edisgo_grid_investment
from ego.tools.interface import (
ETraGoMinimalData,
get_etrago_results_per_bus,
map_etrago_heat_bus_to_district_heating_id,
rename_generator_carriers_edisgo,
)
# Logging
logger = logging.getLogger(__name__)
pickle.DEFAULT_PROTOCOL = 4
dill.settings["protocol"] = 4
[docs]class EDisGoNetworks:
"""
Performs multiple eDisGo runs and stores the resulting edisgo_grids
Parameters
----------
json_file : :obj:dict
Dictionary of the ``scenario_setting.json`` file
etrago_network: :class:`etrago.tools.io.NetworkScenario`
eTraGo network object compiled by :meth:`etrago.appl.etrago`
"""
def __init__(self, json_file, etrago_network):
# Genral Json Inputs
self._json_file = json_file
self._set_scenario_settings()
# Create reduced eTraGo network
self._etrago_network = ETraGoMinimalData(etrago_network)
del etrago_network
# Program information
self._run_finished = False
# eDisGo Result grids
self._edisgo_grids = {}
if self._csv_import:
self._load_edisgo_results()
self._successful_grids = self._successful_grids()
self._grid_investment_costs = edisgo_grid_investment(self, self._json_file)
else:
# Only clustering results
if self._only_cluster:
self._set_grid_choice()
if self._results:
self._save_edisgo_results()
self._grid_investment_costs = None
else:
# Execute Functions
self._set_grid_choice()
self._init_status()
self._run_edisgo_pool()
if self._results:
self._save_edisgo_results()
self._successful_grids = self._successful_grids()
self._grid_investment_costs = edisgo_grid_investment(
self, self._json_file
)
@property
def network(self):
"""
Container for EDisGo objects, including all results
Returns
-------
dict[int, :class:`edisgo.EDisGo`]
Dictionary of EDisGo objects, keyed by MV grid ID
"""
return self._edisgo_grids
@property
def grid_choice(self):
"""
Container for the choice of MV grids, including their weighting
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Dataframe containing the chosen grids and their weightings
'no_of_points_per_cluster', 'the_selected_network_id', 'represented_grids'
"""
return self._grid_choice
@property
def successful_grids(self):
"""
Relative number of successfully calculated MV grids
(Includes clustering weighting)
Returns
-------
int
Relative number of grids
"""
return self._successful_grids
@property
def grid_investment_costs(self):
"""
Grid investment costs
Returns
-------
None or :pandas:`pandas.DataFrame<dataframe>`
Dataframe containing annuity costs per voltage level
"""
return self._grid_investment_costs
[docs] def plot_storage_integration(self, mv_grid_id, **kwargs):
"""
Plots storage position in MV grid of integrated storages.
For more information see :func:`edisgo.tools.plots.mv_grid_topology`.
"""
mv_grid_topology(
self._edisgo_grids[mv_grid_id].network.pypsa,
self._edisgo_grids[mv_grid_id].network.config,
node_color=kwargs.get("storage_integration", None),
filename=kwargs.get("filename", None),
grid_district_geom=kwargs.get("grid_district_geom", True),
background_map=kwargs.get("background_map", True),
xlim=kwargs.get("xlim", None),
ylim=kwargs.get("ylim", None),
title=kwargs.get("title", ""),
)
[docs] def plot_grid_expansion_costs(self, mv_grid_id, **kwargs):
"""
Plots costs per MV line.
For more information see :func:`edisgo.tools.plots.mv_grid_topology`.
"""
mv_grid_topology(
self._edisgo_grids[mv_grid_id].network.pypsa,
self._edisgo_grids[mv_grid_id].network.config,
line_color="expansion_costs",
grid_expansion_costs=(
self._edisgo_grids[
mv_grid_id
].network.results.grid_expansion_costs.rename(
columns={"overnight_costs": "total_costs"}
)
),
filename=kwargs.get("filename", None),
grid_district_geom=kwargs.get("grid_district_geom", True),
background_map=kwargs.get("background_map", True),
limits_cb_lines=kwargs.get("limits_cb_lines", None),
xlim=kwargs.get("xlim", None),
ylim=kwargs.get("ylim", None),
lines_cmap=kwargs.get("lines_cmap", "inferno_r"),
title=kwargs.get("title", ""),
)
[docs] def plot_line_loading(self, mv_grid_id, **kwargs):
"""
Plots relative line loading (current from power flow analysis to
allowed current) of MV lines.
For more information see :func:`edisgo.tools.plots.mv_grid_topology`.
"""
mv_grid_topology(
self._edisgo_grids[mv_grid_id].network.pypsa,
self._edisgo_grids[mv_grid_id].network.config,
timestep=kwargs.get("timestep", None),
line_color="loading",
node_color=kwargs.get("node_color", None),
line_load=self._edisgo_grids[mv_grid_id].network.results.s_res(),
filename=kwargs.get("filename", None),
arrows=kwargs.get("arrows", None),
grid_district_geom=kwargs.get("grid_district_geom", True),
background_map=kwargs.get("background_map", True),
voltage=None, # change API
limits_cb_lines=kwargs.get("limits_cb_lines", None),
limits_cb_nodes=kwargs.get("limits_cb_nodes", None),
xlim=kwargs.get("xlim", None),
ylim=kwargs.get("ylim", None),
lines_cmap=kwargs.get("lines_cmap", "inferno_r"),
title=kwargs.get("title", ""),
)
[docs] def plot_mv_grid_topology(self, mv_grid_id, **kwargs):
"""
Plots plain MV grid topology.
For more information see :func:`edisgo.tools.plots.mv_grid_topology`.
"""
mv_grid_topology(
self._edisgo_grids[mv_grid_id].network.pypsa,
self._edisgo_grids[mv_grid_id].network.config,
filename=kwargs.get("filename", None),
grid_district_geom=kwargs.get("grid_district_geom", True),
background_map=kwargs.get("background_map", True),
xlim=kwargs.get("xlim", None),
ylim=kwargs.get("ylim", None),
title=kwargs.get("title", ""),
)
def _init_status(self):
"""
Creates a status csv file where statuses of MV grid calculations are tracked.
The file is saved to the directory 'status'. Filename indicates date and time
the file was created.
File contains the following information:
* 'MV grid id' (index)
* 'cluster_perc' - percentage of grids represented by this grid
* 'start_time' - start time of calculation
* 'end_time' - end time of calculation
"""
self._status_dir = os.path.join(self._json_file["eDisGo"]["results"], "status")
if not os.path.exists(self._status_dir):
os.makedirs(self._status_dir)
self._status_file_name = "eGo_" + strftime("%Y-%m-%d_%H%M%S", localtime())
status = self._grid_choice.copy()
status = status.set_index("the_selected_network_id")
status.index.names = ["MV grid id"]
status["cluster_perc"] = (
status["no_of_points_per_cluster"]
/ self._grid_choice["no_of_points_per_cluster"].sum()
)
status["start_time"] = "Not started yet"
status["end_time"] = "Not finished yet"
status.drop(
["no_of_points_per_cluster", "represented_grids"], axis=1, inplace=True
)
self._status_file_path = os.path.join(
self._status_dir, self._status_file_name + ".csv"
)
status.to_csv(self._status_file_path)
def _status_update(self, mv_grid_id, time, message=None, show=True):
"""
Updates status csv file where statuses of MV grid calculations are tracked.
Parameters
----------
mv_grid_id : int
MV grid ID of the ding0 grid.
time : str
Can be either 'start' to set information on when the calculation started
or 'end' to set information on when the calculation ended. In case a
message is provided through parameter `message`, the message instead of the
time is set.
message : str or None (optional)
Message to set for 'start_time' or 'end_time'. If None, the current time
is set. Default: None.
show : bool (optional)
If True, shows a logging message with the status information. Default: True.
"""
status = pd.read_csv(self._status_file_path, index_col=0)
status["start_time"] = status["start_time"].astype(str)
status["end_time"] = status["end_time"].astype(str)
if message:
now = message
else:
now = strftime("%Y-%m-%d_%H:%M", localtime())
if time == "start":
status.at[mv_grid_id, "start_time"] = now
elif time == "end":
status.at[mv_grid_id, "end_time"] = now
if show:
logger.info("\n\neDisGo status: \n\n" + status.to_string() + "\n\n")
status.to_csv(self._status_file_path)
def _update_edisgo_configs(self, edisgo_grid):
"""
This function overwrites some eDisGo configurations with eGo
settings.
The overwritten configs are:
* config['db_connection']['section']
* config['data_source']['oedb_data_source']
* config['versioned']['version']
"""
# Info and Warning handling
if not hasattr(self, "_suppress_log"):
self._suppress_log = False # Only in the first run warnings and
# info get thrown
# Versioned
ego_gridversion = self._grid_version
if ego_gridversion is None:
ego_versioned = "model_draft"
if not self._suppress_log:
logger.info(
"eGo's grid_version == None is "
+ "evaluated as data source: model_draft"
)
else:
ego_versioned = "versioned"
if not self._suppress_log:
logger.info(
(
"eGo's grid_version == '{}' is "
+ "evaluated as data source: versioned"
).format(ego_gridversion)
)
edisgo_versioned = edisgo_grid.network.config["data_source"]["oedb_data_source"]
if not ego_versioned == edisgo_versioned:
if not self._suppress_log:
logger.warning(
(
"eDisGo data source configuration ('{}') "
+ "will be overwritten with data source config. from "
+ "eGo's scenario settings (data source: '{}')"
).format(edisgo_versioned, ego_versioned)
)
edisgo_grid.network.config["data_source"][
"oedb_data_source"
] = ego_versioned
# Gridversion
ego_gridversion = self._grid_version
edisgo_gridversion = edisgo_grid.network.config["versioned"]["version"]
if not ego_gridversion == edisgo_gridversion:
if not self._suppress_log:
logger.warning(
(
"eDisGo version configuration (version: '{}') "
+ "will be overwritten with version configuration "
+ "from eGo's scenario settings (version: '{}')"
).format(edisgo_gridversion, ego_gridversion)
)
edisgo_grid.network.config["versioned"]["version"] = ego_gridversion
self._suppress_log = True
def _set_scenario_settings(self):
self._csv_import = self._json_file["eGo"]["csv_import_eDisGo"]
# eTraGo args
self._etrago_args = self._json_file["eTraGo"]
self._scn_name = self._etrago_args["scn_name"]
self._ext_storage = "storage" in self._etrago_args["extendable"]
if self._ext_storage:
logger.info("eTraGo Dataset used extendable storage")
self._pf_post_lopf = self._etrago_args["pf_post_lopf"]
# eDisGo args import
if self._csv_import:
# raise NotImplementedError
with open(os.path.join(self._csv_import, "edisgo_args.json")) as f:
edisgo_args = json.load(f)
self._json_file["eDisGo"] = edisgo_args
logger.info(
"All eDisGo settings are taken from CSV folder"
+ "(scenario settings are ignored)"
)
# This overwrites the original object...
# Imported or directly from the Settings
# eDisGo section of the settings
self._edisgo_args = self._json_file["eDisGo"]
# Reading all eDisGo settings
# TODO: Integrate into a for-loop
self._grid_version = self._edisgo_args["gridversion"]
self._solver = self._edisgo_args["solver"]
self._grid_path = self._edisgo_args["grid_path"]
self._choice_mode = self._edisgo_args["choice_mode"]
self._parallelization = self._edisgo_args["parallelization"]
self._cluster_attributes = self._edisgo_args["cluster_attributes"]
self._only_cluster = self._edisgo_args["only_cluster"]
self._max_workers = self._edisgo_args["max_workers"]
self._max_cos_phi_renewable = self._edisgo_args["max_cos_phi_renewable"]
self._results = self._edisgo_args["results"]
self._max_calc_time = self._edisgo_args["max_calc_time"]
# Some basic checks
if self._only_cluster:
logger.warning("\n\nThis eDisGo run only returns cluster results\n\n")
# Versioning
if self._grid_version is not None:
self._versioned = True
else:
self._versioned = False
def _successful_grids(self):
"""
Calculates the relative number of successfully calculated grids,
including the cluster weightings
"""
total, success, fail = 0, 0, 0
for key, value in self._edisgo_grids.items():
weight = self._grid_choice.loc[
self._grid_choice["the_selected_network_id"] == key
]["no_of_points_per_cluster"].values[0]
total += weight
if hasattr(value, "network"):
success += weight
else:
fail += weight
return success / total
def _cluster_mv_grids(self):
"""
Clusters the MV grids based on the attributes, for a given number
of MV grids
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Dataframe containing the clustered MV grids and their weightings
"""
cluster_df = cluster_workflow(config=self._json_file)
# Filter for clusters with representatives.
cluster_df = cluster_df[cluster_df["representative"].astype(bool)]
return cluster_df
def _identify_extended_storages(self):
all_mv_grids = self._check_available_mv_grids()
storages = pd.DataFrame(index=all_mv_grids, columns=["storage_p_nom"])
logger.info("Identifying extended storage")
for mv_grid in all_mv_grids:
min_extended = 0.3
stor_p_nom = self._etrago_network.storage_units.loc[
(self._etrago_network.storage_units["bus"] == str(mv_grid))
& (
self._etrago_network.storage_units["p_nom_extendable"]
== True # noqa: E712
)
& (self._etrago_network.storage_units["p_nom_opt"] > min_extended)
& (self._etrago_network.storage_units["max_hours"] <= 20.0)
]["p_nom_opt"]
if len(stor_p_nom) == 1:
stor_p_nom = stor_p_nom.values[0]
elif len(stor_p_nom) == 0:
stor_p_nom = 0.0
else:
raise IndexError
storages.at[mv_grid, "storage_p_nom"] = stor_p_nom
return storages
def _check_available_mv_grids(self):
"""
Checks all available MV grids in the given folder (from the settings)
Returns
-------
:obj:`list`
List of MV grid ID's
"""
mv_grids = []
for file in os.listdir(self._grid_path):
if os.path.isdir(os.path.join(self._grid_path, file)):
mv_grids.append(int(file))
return mv_grids
def _set_grid_choice(self):
"""
Sets the grid choice based on the settings file
"""
choice_df = pd.DataFrame(
columns=[
"no_of_points_per_cluster",
"the_selected_network_id",
"represented_grids",
]
)
if self._choice_mode == "cluster":
cluster_df = self._cluster_mv_grids()
n_clusters = self._json_file["eDisGo"]["n_clusters"]
n_clusters_found = cluster_df.shape[0]
if n_clusters == n_clusters_found:
logger.info(f"Clustering to {n_clusters} MV grids")
else:
logger.warning(
f"For {n_clusters} only for {n_clusters_found} clusters "
f"found working grids."
)
choice_df["the_selected_network_id"] = cluster_df["representative"]
choice_df["no_of_points_per_cluster"] = cluster_df["n_grids_per_cluster"]
choice_df["represented_grids"] = cluster_df["represented_grids"]
elif self._choice_mode == "manual":
man_grids = self._edisgo_args["manual_grids"]
choice_df["the_selected_network_id"] = man_grids
choice_df["no_of_points_per_cluster"] = 1
choice_df["represented_grids"] = [
[mv_grid_id] for mv_grid_id in choice_df["the_selected_network_id"]
]
logger.info("Calculating manually chosen MV grids {}".format(man_grids))
elif self._choice_mode == "all":
mv_grids = self._check_available_mv_grids()
choice_df["the_selected_network_id"] = mv_grids
choice_df["no_of_points_per_cluster"] = 1
choice_df["represented_grids"] = [
[mv_grid_id] for mv_grid_id in choice_df["the_selected_network_id"]
]
no_grids = len(mv_grids)
logger.info("Calculating all available {} MV grids".format(no_grids))
choice_df = choice_df.sort_values("no_of_points_per_cluster", ascending=False)
self._grid_choice = choice_df
def _run_edisgo_pool(self):
"""
Runs eDisGo for the chosen grids
"""
parallelization = self._parallelization
results_dir = self._results
if not os.path.exists(results_dir):
os.makedirs(results_dir)
if parallelization is True:
logger.info("Run eDisGo parallel")
mv_grids = self._grid_choice["the_selected_network_id"].tolist()
no_cpu = mp2.cpu_count()
if no_cpu > self._max_workers:
no_cpu = self._max_workers
logger.info(
"Number of workers limited to {} by user".format(self._max_workers)
)
self._edisgo_grids = set(mv_grids)
self._edisgo_grids = parallelizer(
mv_grids,
lambda *xs: xs[1].run_edisgo(xs[0]),
(self,),
self._max_calc_time,
workers=no_cpu,
)
for g in mv_grids:
if g not in self._edisgo_grids:
self._edisgo_grids[g] = "Timeout"
else:
logger.info("Run eDisGo sequencial")
no_grids = len(self._grid_choice)
count = 0
for idx, row in self._grid_choice.iterrows():
prog = "%.1f" % (count / no_grids * 100)
logger.info("{} % Calculated by eDisGo".format(prog))
mv_grid_id = int(row["the_selected_network_id"])
logger.info("MV grid {}".format(mv_grid_id))
try:
edisgo_grid = self.run_edisgo(mv_grid_id)
self._edisgo_grids[mv_grid_id] = edisgo_grid
except Exception as e:
self._edisgo_grids[mv_grid_id] = e
logger.exception("MV grid {} failed: \n".format(mv_grid_id))
count += 1
self._csv_import = self._json_file["eDisGo"]["results"]
self._save_edisgo_results()
self._load_edisgo_results()
self._run_finished = True
[docs] def run_edisgo(self, mv_grid_id):
"""
Performs a single eDisGo run
Parameters
----------
mv_grid_id : int
MV grid ID of the ding0 grid
Returns
-------
:class:`edisgo.EDisGo`
Returns the complete eDisGo container, also including results
"""
self._status_update(mv_grid_id, "start", show=False)
# ##################### general settings ####################
config = self._json_file
scenario = config["eTraGo"]["scn_name"]
engine = database.get_engine(config=config)
# results directory
results_dir = os.path.join(self._results, str(mv_grid_id))
if not os.path.exists(results_dir):
os.makedirs(results_dir)
# logger
if self._parallelization:
stream_level = None
else:
stream_level = "debug"
setup_logger(
loggers=[
# {"name": "root", "file_level": None, "stream_level": None},
# {"name": "ego", "file_level": None, "stream_level": None},
{"name": "edisgo", "file_level": "debug", "stream_level": stream_level},
],
file_name=f"run_edisgo_{mv_grid_id}.log",
log_dir=results_dir,
)
# use edisgo logger in order to have all logging information for one grid go
# to the same file
logger = logging.getLogger("edisgo.external.ego._run_edisgo")
edisgo_grid = None
time_intervals = None
# ################### task: setup grid ##################
if "1_setup_grid" in config["eDisGo"]["tasks"]:
# data is always imported for the full flex scenario, wherefore in case
# a low-flex scenario is given, the lowflex-extension is dropped for the
# data import
if scenario.split("_")[-1] == "lowflex":
scn = scenario.split("_")[0]
else:
scn = scenario
edisgo_grid = self._run_edisgo_task_setup_grid(
mv_grid_id, scn, logger, config, engine
)
edisgo_grid.save(
directory=os.path.join(results_dir, "grid_data"),
save_topology=True,
save_timeseries=True,
save_results=True,
save_electromobility=True,
save_dsm=True,
save_heatpump=True,
save_overlying_grid=False,
reduce_memory=True,
archive=True,
archive_type="zip",
parameters={"grid_expansion_results": ["equipment_changes"]},
)
if "2_specs_overlying_grid" not in config["eDisGo"]["tasks"]:
return {edisgo_grid.topology.id: results_dir}
# ################### task: specs overlying grid ##################
if "2_specs_overlying_grid" in config["eDisGo"]["tasks"]:
if edisgo_grid is None:
grid_path = os.path.join(results_dir, "grid_data.zip")
edisgo_grid = import_edisgo_from_files(
edisgo_path=grid_path,
import_topology=True,
import_timeseries=True,
import_results=True,
import_electromobility=True,
import_heat_pump=True,
import_dsm=True,
import_overlying_grid=False,
from_zip_archive=True,
)
edisgo_grid.legacy_grids = False
edisgo_grid = self._run_edisgo_task_specs_overlying_grid(
edisgo_grid, scenario, logger, config, engine
)
zip_name = "grid_data_overlying_grid"
if scenario in ["eGon2035_lowflex", "eGon100RE_lowflex"]:
zip_name += "_lowflex"
edisgo_grid.save(
directory=os.path.join(results_dir, zip_name),
save_topology=True,
save_timeseries=True,
save_results=True,
save_electromobility=True,
save_dsm=True,
save_heatpump=True,
save_overlying_grid=True,
reduce_memory=True,
archive=True,
archive_type="zip",
parameters={"grid_expansion_results": ["equipment_changes"]},
)
# ################### task: temporal complexity reduction ##################
# task temporal complexity reduction is optional
if "3_temporal_complexity_reduction" in config["eDisGo"]["tasks"]:
if edisgo_grid is None:
if scenario in ["eGon2035", "eGon100RE"]:
zip_name = "grid_data_overlying_grid.zip"
else:
zip_name = "grid_data_overlying_grid_lowflex.zip"
grid_path = os.path.join(results_dir, zip_name)
edisgo_grid = import_edisgo_from_files(
edisgo_path=grid_path,
import_topology=True,
import_timeseries=True,
import_results=True,
import_electromobility=True,
import_heat_pump=True,
import_dsm=True,
import_overlying_grid=True,
from_zip_archive=True,
)
edisgo_grid.legacy_grids = False
time_intervals = self._run_edisgo_task_temporal_complexity_reduction(
edisgo_grid, logger, config
)
# determine whether work flow ends here or continues, and if it continues
# whether time intervals need to be loaded
if "4_optimisation" not in config["eDisGo"]["tasks"]:
return {edisgo_grid.topology.id: results_dir}
# ########################## task: optimisation ##########################
if "4_optimisation" in config["eDisGo"]["tasks"]:
if edisgo_grid is None:
if scenario in ["eGon2035", "eGon100RE"]:
zip_name = "grid_data_overlying_grid.zip"
else:
zip_name = "grid_data_overlying_grid_lowflex.zip"
grid_path = os.path.join(results_dir, zip_name)
edisgo_grid = import_edisgo_from_files(
edisgo_path=grid_path,
import_topology=True,
import_timeseries=True,
import_results=True,
import_electromobility=True,
import_heat_pump=True,
import_dsm=True,
import_overlying_grid=True,
from_zip_archive=True,
)
edisgo_grid.legacy_grids = False
if time_intervals is None:
# load time intervals
time_intervals = pd.read_csv(
os.path.join(results_dir, "selected_time_intervals.csv"),
index_col=0,
)
for ti in time_intervals.index:
time_steps = time_intervals.at[ti, "time_steps"]
if time_steps is not None:
time_intervals.at[ti, "time_steps"] = pd.date_range(
start=time_steps.split("'")[1],
periods=int(time_steps.split("=")[-2].split(",")[0]),
freq="H",
)
edisgo_grid = self._run_edisgo_task_optimisation(
edisgo_grid, scenario, logger, time_intervals, results_dir
)
zip_name = "grid_data_optimisation"
if scenario in ["eGon2035_lowflex", "eGon100RE_lowflex"]:
zip_name += "_lowflex"
edisgo_grid.save(
directory=os.path.join(results_dir, zip_name),
save_topology=True,
save_timeseries=True,
save_results=True,
save_opf_results=True,
save_electromobility=False,
save_dsm=False,
save_heatpump=False,
save_overlying_grid=False,
reduce_memory=True,
archive=True,
archive_type="zip",
parameters={"grid_expansion_results": ["equipment_changes"]},
)
if "5_grid_reinforcement" not in config["eDisGo"]["tasks"]:
return {edisgo_grid.topology.id: results_dir}
# ########################## reinforcement ##########################
if "5_grid_reinforcement" in config["eDisGo"]["tasks"]:
if edisgo_grid is None:
if scenario in ["eGon2035", "eGon100RE"]:
zip_name = "grid_data_optimisation.zip"
else:
zip_name = "grid_data_optimisation_lowflex.zip"
grid_path = os.path.join(results_dir, zip_name)
edisgo_grid = import_edisgo_from_files(
edisgo_path=grid_path,
import_topology=True,
import_timeseries=True,
import_results=True,
import_electromobility=False,
import_heat_pump=False,
import_dsm=False,
import_overlying_grid=False,
from_zip_archive=True,
)
edisgo_grid.legacy_grids = False
edisgo_grid = self._run_edisgo_task_grid_reinforcement(edisgo_grid, logger)
edisgo_grid.save(
directory=os.path.join(
results_dir, f"grid_data_reinforcement_{scenario}"
),
save_topology=True,
save_timeseries=True,
save_results=True,
save_electromobility=False,
save_dsm=False,
save_heatpump=False,
save_overlying_grid=False,
reduce_memory=True,
archive=True,
archive_type="zip",
)
self._status_update(mv_grid_id, "end")
return {edisgo_grid.topology.id: results_dir}
def _run_edisgo_task_setup_grid(self, mv_grid_id, scenario, logger, config, engine):
"""
Sets up EDisGo object for future scenario (without specifications from overlying
grid).
The following data is set up:
* load time series of conventional loads
* generator park
* home storage units
* DSM data
* heat pumps including heat demand and COP time series per heat pump
* charging points with standing times, etc. as well as charging time series for
uncontrolled charging (done so that public charging points have a charging
time series) and flexibility bands for home and work charging points
A dummy time index is set that is later on overwritten by the time index used
in eTraGo.
Parameters
----------
mv_grid_id : int
MV grid ID of the ding0 grid.
scenario : str
Name of scenario to import data for. Possible options are "eGon2035"
and "eGon100RE".
logger : logger handler
config : dict
Dictionary with configuration data.
engine : :sqlalchemy:`sqlalchemy.Engine<sqlalchemy.engine.Engine>`
Database engine.
Returns
-------
:class:`edisgo.EDisGo`
"""
logger.info(f"MV grid {mv_grid_id}: Start task 'setup_grid'.")
logger.info(f"MV grid {mv_grid_id}: Initialize MV grid.")
grid_path = os.path.join(
config["eDisGo"]["grid_path"],
str(mv_grid_id),
)
if not os.path.isdir(grid_path):
msg = f"MV grid {mv_grid_id}: No grid data found."
logger.error(msg)
raise Exception(msg)
edisgo_grid = import_edisgo_from_files(edisgo_path=grid_path)
edisgo_grid.legacy_grids = False
# overwrite configs
edisgo_grid._config = Config()
edisgo_grid.set_timeindex(pd.date_range("1/1/2011", periods=8760, freq="H"))
logger.info("Set up load time series of conventional loads.")
edisgo_grid.set_time_series_active_power_predefined(
conventional_loads_ts="oedb", engine=engine, scenario=scenario
)
edisgo_grid.set_time_series_reactive_power_control(
control="fixed_cosphi",
generators_parametrisation=None,
loads_parametrisation="default",
storage_units_parametrisation=None,
)
# overwrite p_set of conventional loads as it changes from scenario to scenario
edisgo_grid.topology.loads_df[
"p_set"
] = edisgo_grid.timeseries.loads_active_power.max()
logger.info("Set up generator park.")
edisgo_grid.import_generators(generator_scenario=scenario, engine=engine)
logger.info("Set up home storage units.")
edisgo_grid.import_home_batteries(scenario=scenario, engine=engine)
logger.info("Set up DSM data.")
edisgo_grid.import_dsm(scenario=scenario, engine=engine)
logger.info("Set up heat supply and demand data.")
edisgo_grid.import_heat_pumps(scenario=scenario, engine=engine)
logger.info("Set up electromobility data.")
edisgo_grid.import_electromobility(
data_source="oedb", scenario=scenario, engine=engine
)
# apply charging strategy so that public charging points have a charging
# time series
edisgo_grid.apply_charging_strategy(strategy="dumb")
# get flexibility bands for home and work charging points
edisgo_grid.electromobility.get_flexibility_bands(
edisgo_obj=edisgo_grid, use_case=["home", "work"]
)
logger.info("Run integrity checks.")
edisgo_grid.topology.check_integrity()
edisgo_grid.electromobility.check_integrity()
edisgo_grid.heat_pump.check_integrity()
edisgo_grid.dsm.check_integrity()
return edisgo_grid
def _run_edisgo_task_specs_overlying_grid(
self, edisgo_grid, scenario, logger, config, engine
):
"""
Gets specifications from overlying grid and integrates them into the EDisGo
object.
The following data is set up:
* set generator time series
* set up thermal storage units
* requirements overlying grid on total renewables curtailment, DSM dispatch,
electromobility charging, heat pump dispatch,
A dummy time index is set that is later on overwritten by the time index used
in eTraGo
Parameters
----------
edisgo_grid : :class:`edisgo.EDisGo`
EDisGo object.
scenario : str
Name of scenario to import data for. Possible options are "eGon2035"
and "eGon100RE".
logger : logger handler
config : dict
Dictionary with configuration data.
engine : :sqlalchemy:`sqlalchemy.Engine<sqlalchemy.engine.Engine>`
Database engine.
Returns
-------
:class:`edisgo.EDisGo`
Returns the complete eDisGo container, also including results
"""
logger.info("Start task 'specs_overlying_grid'.")
logger.info("Get specifications from eTraGo.")
specs = get_etrago_results_per_bus(
edisgo_grid.topology.id,
self._etrago_network,
self._pf_post_lopf,
self._max_cos_phi_renewable,
)
snapshots = specs["timeindex"]
# get time steps that don't converge in overlying grid
try:
convergence = pd.read_csv(
os.path.join(config["eGo"]["csv_import_eTraGo"], "pf_solution.csv"),
index_col=0,
parse_dates=True,
)
ts_not_converged = convergence[~convergence.converged].index
except FileNotFoundError:
logger.info(
"No info on converged time steps, wherefore it is assumed that all "
"converged."
)
ts_not_converged = pd.Index([])
except Exception:
raise
# overwrite previously set dummy time index if year that was used differs from
# year used in etrago
edisgo_year = edisgo_grid.timeseries.timeindex[0].year
etrago_year = snapshots[0].year
if edisgo_year != etrago_year:
timeindex_new_full = pd.date_range(
f"1/1/{etrago_year}", periods=8760, freq="H"
)
# conventional loads
edisgo_grid.timeseries.loads_active_power.index = timeindex_new_full
edisgo_grid.timeseries.loads_reactive_power.index = timeindex_new_full
# DSM
edisgo_grid.dsm.e_max.index = timeindex_new_full
edisgo_grid.dsm.e_min.index = timeindex_new_full
edisgo_grid.dsm.p_max.index = timeindex_new_full
edisgo_grid.dsm.p_min.index = timeindex_new_full
# COP and heat demand
edisgo_grid.heat_pump.cop_df.index = timeindex_new_full
edisgo_grid.heat_pump.heat_demand_df.index = timeindex_new_full
# flexibility bands
edisgo_grid.electromobility.flexibility_bands[
"upper_power"
].index = timeindex_new_full
edisgo_grid.electromobility.flexibility_bands[
"upper_energy"
].index = timeindex_new_full
edisgo_grid.electromobility.flexibility_bands[
"lower_energy"
].index = timeindex_new_full
# TimeSeries.timeindex
edisgo_grid.timeseries.timeindex = snapshots
logger.info("Set generator time series.")
# rename carrier to match with carrier names in overlying grid
rename_generator_carriers_edisgo(edisgo_grid)
# active power
edisgo_grid.set_time_series_active_power_predefined(
dispatchable_generators_ts=specs["dispatchable_generators_active_power"],
fluctuating_generators_ts=specs["renewables_potential"],
)
# reactive power
if self._pf_post_lopf:
# ToDo (low priority) Use eTraGo time series to set reactive power
# (scale by nominal power)
edisgo_grid.set_time_series_manual(
generators_q=specs["generators_reactive_power"].loc[:, []],
)
pass
else:
edisgo_grid.set_time_series_reactive_power_control(
control="fixed_cosphi",
generators_parametrisation="default",
loads_parametrisation=None,
storage_units_parametrisation=None,
)
# ToDo (medium priority) for now additional optimised storage capacity is
# ignored as capacities are very small and optimisation does not offer storage
# positioning
# if specs["storage_units_p_nom"] > 0.3:
# logger.info("Set up large battery storage units.")
# edisgo_grid.add_component(
# comp_type="storage_unit",
# bus=edisgo_grid.topology.mv_grid.station.index[0],
# p_nom=specs["storage_units_p_nom"],
# max_hours=specs["storage_units_max_hours"],
# type="large_storage",
# )
logger.info("Set up thermal storage units.")
# decentral
hp_decentral = edisgo_grid.topology.loads_df[
edisgo_grid.topology.loads_df.sector == "individual_heating"
]
if hp_decentral.empty and specs["thermal_storage_rural_capacity"] > 0:
logger.warning(
"There are thermal storage units for individual heating but no "
"heat pumps."
)
if not hp_decentral.empty and specs["thermal_storage_rural_capacity"] > 0:
tes_cap_min_cumsum = (
edisgo_grid.topology.loads_df.loc[hp_decentral.index, "p_set"]
.sort_index()
.cumsum()
)
hps_selected = tes_cap_min_cumsum[
tes_cap_min_cumsum <= specs["thermal_storage_rural_capacity"]
].index
# distribute thermal storage capacity to all selected heat pumps depending
# on heat pump size
tes_cap = (
edisgo_grid.topology.loads_df.loc[hps_selected, "p_set"]
* specs["thermal_storage_rural_capacity"]
/ edisgo_grid.topology.loads_df.loc[hps_selected, "p_set"].sum()
)
edisgo_grid.heat_pump.thermal_storage_units_df = pd.DataFrame(
data={
"capacity": tes_cap,
"efficiency": specs["thermal_storage_rural_efficiency"],
}
)
# district heating
hp_dh = edisgo_grid.topology.loads_df[
edisgo_grid.topology.loads_df.sector.isin(
["district_heating", "district_heating_resistive_heater"]
)
]
# check if there are as many district heating systems in eTraGo as in eDisGo
if hp_dh.empty:
if len(specs["feedin_district_heating"].columns) != 0:
logger.warning(
f"There are {len(hp_dh.area_id.unique())} district heating "
f"systems in eDisGo and "
f"{len(specs['feedin_district_heating'].columns)} in eTraGo."
)
else:
if len(hp_dh.area_id.unique()) != len(
specs["feedin_district_heating"].columns
):
logger.warning(
f"There are {len(hp_dh.area_id.unique())} district heating "
f"systems in eDisGo and "
f"{len(specs['feedin_district_heating'].columns)} in eTraGo."
)
# check that installed PtH capacity is equal in eTraGo as in eDisGo
if abs(hp_dh.p_set.sum() - specs["heat_pump_central_p_nom"]) > 1e-3:
logger.warning(
f"Installed capacity of PtH units in district heating differs "
f"between eTraGo ({specs['heat_pump_central_p_nom']} MW) and "
f"eDisGo ({hp_dh.p_set.sum()} MW)."
)
if not specs["feedin_district_heating"].empty:
# map district heating ID to heat bus ID from eTraGo
if scenario.split("_")[-1] == "lowflex":
scn = scenario.split("_")[0]
else:
scn = scenario
map_etrago_heat_bus_to_district_heating_id(specs, scn, engine)
for dh_id in hp_dh.district_heating_id.unique():
if dh_id in specs["thermal_storage_central_capacity"].index:
if specs["thermal_storage_central_capacity"].at[dh_id] > 0:
# get PtH unit name to allocate thermal storage unit to
comp_name = hp_dh[hp_dh.district_heating_id == dh_id].index[
0
]
edisgo_grid.heat_pump.thermal_storage_units_df = pd.concat(
[
edisgo_grid.heat_pump.thermal_storage_units_df,
pd.DataFrame(
data={
"capacity": specs[
"thermal_storage_central_capacity"
].at[dh_id],
"efficiency": specs[
"thermal_storage_central_efficiency"
],
},
index=[comp_name],
),
]
)
logger.info("Set requirements from overlying grid.")
# all time series from overlying grid are also kept for low flex scenarios
# in order to afterwards check difference in dispatch between eTraGo and eDisGo
# curtailment
# scale curtailment by ratio of nominal power in eDisGo and eTraGo
for carrier in specs["renewables_curtailment"].columns:
p_nom_total = specs["renewables_p_nom"][carrier]
p_nom_mv_lv = edisgo_grid.topology.generators_df[
edisgo_grid.topology.generators_df["type"] == carrier
].p_nom.sum()
specs["renewables_curtailment"][carrier] *= p_nom_mv_lv / p_nom_total
# check that curtailment does not exceed feed-in (for all converged time steps)
vres_gens = edisgo_grid.topology.generators_df[
edisgo_grid.topology.generators_df["type"].isin(
specs["renewables_curtailment"].columns
)
].index
pot_vres_gens = edisgo_grid.timeseries.generators_active_power.loc[
:, vres_gens
].sum(axis=1)
pot_vres_gens.loc[ts_not_converged] = 0.0
total_curtailment = specs["renewables_curtailment"].loc[:].sum(axis=1)
total_curtailment.loc[ts_not_converged] = 0.0
diff = pot_vres_gens - total_curtailment
if (diff < 0).any():
# if curtailment is much larger than feed-in, throw an error
if (diff < -1e-3).any():
raise ValueError("Curtailment exceeds feed-in!")
# if curtailment is only slightly larger than feed-in, this is due to
# numerical errors and therefore corrected
else:
ts_neg_curtailment = diff[(diff < 0)].index
total_curtailment.loc[ts_neg_curtailment] += diff.loc[
ts_neg_curtailment
]
edisgo_grid.overlying_grid.renewables_curtailment = total_curtailment
# battery storage
# scale storage time series by ratio of nominal power in eDisGo and eTraGo
p_nom_total = specs["storage_units_p_nom"]
p_nom_mv_lv = edisgo_grid.topology.storage_units_df.p_nom.sum()
edisgo_grid.overlying_grid.storage_units_active_power = (
specs["storage_units_active_power"] * p_nom_mv_lv / p_nom_total
)
edisgo_grid.overlying_grid.storage_units_soc = specs["storage_units_soc"]
# DSM
edisgo_grid.overlying_grid.dsm_active_power = specs["dsm_active_power"]
# BEV
edisgo_grid.overlying_grid.electromobility_active_power = specs[
"electromobility_active_power"
]
# PtH
# scale heat pump time series by ratio of nominal power in eDisGo and eTraGo
p_nom_total = specs["heat_pump_rural_p_nom"]
p_nom_mv_lv = edisgo_grid.topology.loads_df[
edisgo_grid.topology.loads_df.sector.isin(
["individual_heating", "individual_heating_resistive_heater"]
)
].p_set.sum()
edisgo_grid.overlying_grid.heat_pump_decentral_active_power = (
specs["heat_pump_rural_active_power"] * p_nom_mv_lv / p_nom_total
)
p_nom_total = specs["heat_pump_central_p_nom"]
p_nom_mv_lv = edisgo_grid.topology.loads_df[
edisgo_grid.topology.loads_df.sector.isin(
["district_heating", "district_heating_resistive_heater"]
)
].p_set.sum()
edisgo_grid.overlying_grid.heat_pump_central_active_power = (
specs["heat_pump_central_active_power"] * p_nom_mv_lv / p_nom_total
)
# Other feed-in into district heating
edisgo_grid.overlying_grid.feedin_district_heating = specs[
"feedin_district_heating"
]
# Thermal storage units SoC
edisgo_grid.overlying_grid.thermal_storage_units_decentral_soc = specs[
"thermal_storage_rural_soc"
]
edisgo_grid.overlying_grid.thermal_storage_units_central_soc = specs[
"thermal_storage_central_soc"
]
# Delete some flex data in case of low flex scenario
if scenario in ["eGon2035_lowflex", "eGon100RE_flex"]:
# delete DSM and flexibility bands to save disk space
edisgo_grid.dsm = edisgo_grid.dsm.__class__()
edisgo_grid.electromobility.flexibility_bands = {
"upper_power": pd.DataFrame(),
"lower_energy": pd.DataFrame(),
"upper_energy": pd.DataFrame(),
}
logger.info("Run integrity check.")
edisgo_grid.check_integrity()
return edisgo_grid
def _run_edisgo_task_temporal_complexity_reduction(
self, edisgo_grid, logger, config
):
"""
Runs the temporal complexity reduction to select most critical time periods.
Parameters
----------
edisgo_grid : :class:`edisgo.EDisGo`
EDisGo object.
logger : logger handler
config : dict
Dictionary with configuration data.
engine : :sqlalchemy:`sqlalchemy.Engine<sqlalchemy.engine.Engine>`
Database engine.
Returns
-------
:class:`edisgo.EDisGo`
"""
logger.info("Start task 'temporal complexity reduction'.")
# get non-converging time steps
try:
convergence = pd.read_csv(
os.path.join(config["eGo"]["csv_import_eTraGo"], "pf_solution.csv"),
index_col=0,
parse_dates=True,
)
ts_not_converged = convergence[~convergence.converged].index
except FileNotFoundError:
logger.info(
"No info on converged time steps, wherefore it is assumed that all "
"converged."
)
ts_not_converged = []
except Exception:
raise
# set time series data at time steps with non-convergence issues to zero
if len(ts_not_converged) > 0:
logger.info(
f"{len(ts_not_converged)} time steps did not converge in overlying "
f"grid. Time series data at time steps with non-convergence issues is "
f"set to zero."
)
# set data in TimeSeries object to zero
attributes = edisgo_grid.timeseries._attributes
for attr in attributes:
ts = getattr(edisgo_grid.timeseries, attr)
if not ts.empty:
ts.loc[ts_not_converged, :] = 0
setattr(edisgo_grid.timeseries, attr, ts)
# set data in OverlyingGrid object to zero
attributes = edisgo_grid.overlying_grid._attributes
for attr in attributes:
ts = getattr(edisgo_grid.overlying_grid, attr)
if not ts.empty and "soc" not in attr:
if isinstance(ts, pd.Series):
ts.loc[ts_not_converged] = 0
else:
ts.loc[ts_not_converged, :] = 0
setattr(edisgo_grid.overlying_grid, attr, ts)
# distribute overlying grid data
logger.info("Distribute overlying grid data.")
edisgo_grid = distribute_overlying_grid_requirements(edisgo_grid)
# get critical time intervals
results_dir = os.path.join(self._results, str(edisgo_grid.topology.id))
time_intervals = get_most_critical_time_intervals(
edisgo_grid,
percentage=1.0,
time_steps_per_time_interval=168,
time_step_day_start=4,
save_steps=True,
path=results_dir,
use_troubleshooting_mode=True,
overloading_factor=0.95,
voltage_deviation_factor=0.95,
)
# drop time intervals with non-converging time steps
if len(ts_not_converged) > 0:
# check overloading time intervals
for ti in time_intervals.index:
# check if there is one time step in time interval that did not converge
non_converged_ts_in_ti = [
_
for _ in ts_not_converged
if _ in time_intervals.at[ti, "time_steps_overloading"]
]
if len(non_converged_ts_in_ti) > 0:
# if any time step did not converge, set time steps to None
time_intervals.at[ti, "time_steps_overloading"] = None
# check voltage issues time intervals
for ti in time_intervals.index:
# check if there is one time step in time interval that did not converge
non_converged_ts_in_ti = [
_
for _ in ts_not_converged
if _ in time_intervals.at[ti, "time_steps_voltage_issues"]
]
if len(non_converged_ts_in_ti) > 0:
# if any time step did not converge, set time steps to None
time_intervals.at[ti, "time_steps_voltage_issues"] = None
# select time intervals
if not time_intervals.loc[:, "time_steps_overloading"].dropna().empty:
tmp = time_intervals.loc[:, "time_steps_overloading"].dropna()
time_interval_1 = tmp.iloc[0]
time_interval_1_ind = tmp.index[0]
else:
time_interval_1 = pd.Index([])
time_interval_1_ind = None
if not time_intervals.loc[:, "time_steps_voltage_issues"].dropna().empty:
tmp = time_intervals.loc[:, "time_steps_voltage_issues"].dropna()
time_interval_2 = tmp.iloc[0]
time_interval_2_ind = tmp.index[0]
else:
time_interval_2 = pd.Index([])
time_interval_2_ind = None
# check if time intervals overlap
overlap = [_ for _ in time_interval_1 if _ in time_interval_2]
if len(overlap) > 0:
logger.info(
"Selected time intervals overlap. Trying to find another "
"time interval in voltage_issues intervals."
)
# check if time interval without overlap can be found
for ti in time_intervals.loc[:, "time_steps_voltage_issues"].dropna().index:
overlap = [
_
for _ in time_interval_1
if _ in time_intervals.at[ti, "time_steps_voltage_issues"]
]
if len(overlap) == 0:
time_interval_2 = time_intervals.at[ti, "time_steps_voltage_issues"]
time_interval_2_ind = ti
break
overlap = [_ for _ in time_interval_1 if _ in time_interval_2]
if len(overlap) > 0:
logger.info(
"Selected time intervals overlap. Trying to find another "
"time interval in overloading intervals."
)
# check if time interval without overlap can be found
for ti in time_intervals.loc[:, "time_steps_overloading"].dropna().index:
overlap = [
_
for _ in time_interval_2
if _ in time_intervals.at[ti, "time_steps_overloading"]
]
if len(overlap) == 0:
time_interval_1 = time_intervals.at[ti, "time_steps_overloading"]
time_interval_1_ind = ti
break
overlap = [_ for _ in time_interval_1 if _ in time_interval_2]
if len(overlap) > 0:
logger.info(
"Overlap of selected time intervals cannot be avoided. "
"Time intervals are therefore concatenated."
)
time_interval_1 = (
time_interval_1.append(time_interval_2).unique().sort_values()
)
time_interval_2 = None
# save to csv
percentage = pd.Series()
percentage["time_interval_1"] = (
None
if time_interval_1_ind is None
else time_intervals.at[
time_interval_1_ind, "percentage_max_overloaded_components"
]
)
percentage["time_interval_2"] = (
None
if time_interval_2_ind is None
else time_intervals.at[
time_interval_2_ind, "percentage_buses_max_voltage_deviation"
]
)
pd.DataFrame(
{
"time_steps": [time_interval_1, time_interval_2],
"percentage": percentage,
},
index=["time_interval_1", "time_interval_2"],
).to_csv(os.path.join(results_dir, "selected_time_intervals.csv"))
return time_interval_1, time_interval_2
def _run_edisgo_task_optimisation(
self,
edisgo_grid,
scenario,
logger,
time_intervals,
results_dir,
reduction_factor=0.3,
):
"""
Runs the dispatch optimisation.
Parameters
----------
edisgo_grid : :class:`edisgo.EDisGo`
EDisGo object.
scenario : str
Name of scenario to define flexible components. Possible options are
"eGon2035", "eGon2035_lowflex", "eGon100RE", and "eGon100RE_lowflex".
logger : logger handler
time_intervals : pd.DataFrame
Dataframe with information on time intervals to consider in the optimisation
in column "time_steps".
results_dir : str
Directory where to store OPF results.
reduction_factor : float
Reduction factor to use in spatial complexity reduction. Per default this
is set to 0.3.
Returns
-------
:class:`edisgo.EDisGo`
"""
logger.info("Start task 'optimisation'.")
# prepare district heating data
# make sure district heating ID is string of integer not float
columns_rename = [
str(int(float(_)))
for _ in edisgo_grid.overlying_grid.feedin_district_heating.columns
]
if len(columns_rename) > 0:
edisgo_grid.overlying_grid.feedin_district_heating.columns = columns_rename
cols = edisgo_grid.overlying_grid.thermal_storage_units_central_soc.columns
columns_rename = [str(int(float(_))) for _ in cols]
if len(columns_rename) > 0:
edisgo_grid.overlying_grid.thermal_storage_units_central_soc.columns = (
columns_rename
)
# aggregate PtH units in same district heating network and subtract feed-in
# from other heat sources from heat demand in district heating network
aggregate_district_heating_components(
edisgo_grid,
feedin_district_heating=edisgo_grid.overlying_grid.feedin_district_heating,
)
# apply operating strategy so that inflexible heat pumps (without heat
# storage units) have a time series
edisgo_grid.apply_heat_pump_operating_strategy()
timeindex = pd.Index([])
for ti in time_intervals.index:
time_steps = time_intervals.at[ti, "time_steps"]
if time_steps is None:
continue
else:
timeindex = timeindex.append(pd.Index(time_steps))
# copy edisgo object
edisgo_copy = deepcopy(edisgo_grid)
# temporal complexity reduction
reduce_timeseries_data_to_given_timeindex(edisgo_copy, time_steps)
# spatial complexity reduction
edisgo_copy.spatial_complexity_reduction(
mode="kmeansdijkstra",
cluster_area="feeder",
reduction_factor=reduction_factor,
reduction_factor_not_focused=False,
)
# OPF
# flexibilities in full flex: DSM, decentral and central PtH units,
# curtailment, EVs, storage units
# flexibilities in low flex: curtailment, storage units
psa_net = edisgo_copy.to_pypsa()
if scenario in ["eGon2035", "eGon100RE"]:
flexible_loads = edisgo_copy.dsm.p_max.columns
# flexible_hps = (
# edisgo_copy.heat_pump.thermal_storage_units_df.index.values
# )
flexible_cps = psa_net.loads.loc[
psa_net.loads.index.str.contains("home")
| (psa_net.loads.index.str.contains("work"))
].index.values
else:
flexible_loads = []
# flexible_hps = []
flexible_cps = []
flexible_hps = edisgo_copy.heat_pump.heat_demand_df.columns.values
flexible_storage_units = (
edisgo_copy.topology.storage_units_df.index.values
)
edisgo_copy.pm_optimize(
flexible_cps=flexible_cps,
flexible_hps=flexible_hps,
flexible_loads=flexible_loads,
flexible_storage_units=flexible_storage_units,
s_base=1,
opf_version=4,
silence_moi=False,
method="soc",
)
# save OPF results
zip_name = f"opf_results_{ti}"
if scenario in ["eGon2035_lowflex", "eGon100RE_lowflex"]:
zip_name += "_lowflex"
edisgo_copy.save(
directory=os.path.join(results_dir, zip_name),
save_topology=True,
save_timeseries=False,
save_results=False,
save_opf_results=True,
reduce_memory=True,
archive=True,
archive_type="zip",
)
# write flexibility dispatch results to spatially unreduced edisgo
# object
edisgo_grid.timeseries._loads_active_power.loc[
time_steps, :
] = edisgo_copy.timeseries.loads_active_power
edisgo_grid.timeseries._loads_reactive_power.loc[
time_steps, :
] = edisgo_copy.timeseries.loads_reactive_power
edisgo_grid.timeseries._generators_active_power.loc[
time_steps, :
] = edisgo_copy.timeseries.generators_active_power
edisgo_grid.timeseries._generators_reactive_power.loc[
time_steps, :
] = edisgo_copy.timeseries.generators_reactive_power
try:
edisgo_grid.timeseries._storage_units_active_power
except AttributeError:
edisgo_grid.timeseries.storage_units_active_power = pd.DataFrame(
index=edisgo_grid.timeseries.timeindex
)
edisgo_grid.timeseries._storage_units_active_power.loc[
time_steps,
edisgo_copy.timeseries.storage_units_active_power.columns,
] = edisgo_copy.timeseries.storage_units_active_power
try:
edisgo_grid.timeseries._storage_units_reactive_power
except AttributeError:
edisgo_grid.timeseries.storage_units_reactive_power = pd.DataFrame(
index=edisgo_grid.timeseries.timeindex
)
edisgo_grid.timeseries._storage_units_reactive_power.loc[
time_steps,
edisgo_copy.timeseries.storage_units_reactive_power.columns,
] = edisgo_copy.timeseries.storage_units_reactive_power
# write OPF results back
edisgo_grid.opf_results.overlying_grid = pd.concat(
[
edisgo_grid.opf_results.overlying_grid,
edisgo_copy.opf_results.overlying_grid,
]
)
edisgo_grid.opf_results.battery_storage_t.p = pd.concat(
[
edisgo_grid.opf_results.battery_storage_t.p,
edisgo_copy.opf_results.battery_storage_t.p,
]
)
edisgo_grid.opf_results.battery_storage_t.e = pd.concat(
[
edisgo_grid.opf_results.battery_storage_t.e,
edisgo_copy.opf_results.battery_storage_t.e,
]
)
edisgo_grid.timeseries.timeindex = timeindex
return edisgo_grid
def _run_edisgo_task_grid_reinforcement(self, edisgo_grid, logger):
"""
Runs the grid reinforcement.
Parameters
----------
edisgo_grid : :class:`edisgo.EDisGo`
EDisGo object.
logger : logger handler
Returns
-------
:class:`edisgo.EDisGo`
"""
logger.info("Start task 'grid_reinforcement'.")
# overwrite configs with new configs
edisgo_grid._config = Config()
edisgo_grid = enhanced_reinforce_grid(
edisgo_grid,
activate_cost_results_disturbing_mode=True,
separate_lv_grids=True,
separation_threshold=2,
copy_grid=False,
)
return edisgo_grid
def _save_edisgo_results(self):
results_dir = self._results
if not os.path.exists(results_dir):
os.makedirs(results_dir)
with open(os.path.join(results_dir, "edisgo_args.json"), "w") as fp:
json.dump(self._edisgo_args, fp)
self._grid_choice.to_csv(os.path.join(results_dir, "grid_choice.csv"))
def _load_edisgo_results(self):
"""
Loads eDisGo data for all specified grids
Returns
--------
dict[]
"""
# Load the grid choice from CSV
results_dir = self._results
self._grid_choice = pd.read_csv(
os.path.join(results_dir, "grid_choice.csv"), index_col=0
)
self._grid_choice["represented_grids"] = self._grid_choice.apply(
lambda x: eval(x["represented_grids"]), axis=1
)
for idx, row in self._grid_choice.iterrows():
mv_grid_id = int(row["the_selected_network_id"])
try:
edisgo_grid = import_edisgo_from_files(
edisgo_path=os.path.join(self._csv_import, str(mv_grid_id)),
import_topology=True,
import_timeseries=False,
import_results=True,
import_electromobility=False,
from_zip_archive=True,
dtype="float32",
parameters={
"powerflow_results": ["pfa_p", "pfa_q"],
"grid_expansion_results": ["grid_expansion_costs"],
},
)
self._edisgo_grids[mv_grid_id] = edisgo_grid
logger.info("Imported MV grid {}".format(mv_grid_id))
except: # noqa: E722
self._edisgo_grids[mv_grid_id] = "This grid failed to reimport"
logger.warning("MV grid {} could not be loaded".format(mv_grid_id))
class _ETraGoData:
"""
Container for minimal eTraGo network.
This minimal network only contains information relevant for eDisGo.
Parameters
----------
etrago_network : :pypsa:`PyPSA.Network<network>`
"""
def __init__(self, etrago_network):
def filter_by_carrier(
etrago_network_obj, component, carrier, like=True, timeseries=True
):
def filter_df_by_carrier(df):
if isinstance(carrier, str):
if like:
return df[df.carrier.str.contains(carrier)]
else:
return df[df.carrier == carrier]
elif isinstance(carrier, list):
return df[df.carrier.isin(carrier)]
elif carrier is None:
return df
if timeseries:
attribute_to_save = {
"links": "p0",
"generators": "p",
"stores": "p",
"storage_units": "p",
}
attribute_to_save = attribute_to_save[component]
df_to_filter = getattr(
getattr(etrago_network_obj, component + "_t"), attribute_to_save
)
df = df_to_filter.loc[
:,
filter_df_by_carrier(getattr(etrago_network_obj, component)).index,
]
else:
columns_to_save = {
"links": ["carrier", "p_nom"],
"generators": ["carrier", "p_nom"],
"stores": ["carrier", "e_nom"],
"storage_units": ["carrier", "p_nom", "max_hours"],
}
columns_to_save = columns_to_save[component]
df_to_filter = getattr(etrago_network_obj, component)
df = filter_df_by_carrier(df_to_filter)
df = df[columns_to_save]
unique_carriers = filter_df_by_carrier(
getattr(etrago_network_obj, component)
).carrier.unique()
logger.debug(
f"{component}, {carrier}, {timeseries}, {df.shape}, {unique_carriers}"
)
return df
logger.debug(
f"Carriers in links " f"{etrago_network.network.links.carrier.unique()}"
)
logger.debug(
f"Carriers in generators "
f"{etrago_network.network.generators.carrier.unique()}"
)
logger.debug(
f"Carriers in stores " f"{etrago_network.network.stores.carrier.unique()}"
)
logger.debug(
f"Carriers in storage_units "
f"{etrago_network.network.storage_units.carrier.unique()}"
)
self.snapshots = etrago_network.network.snapshots
self.bev_charger = filter_by_carrier(
etrago_network.network, "links", "BEV", timeseries=False
)
self.bev_charger_t = filter_by_carrier(
etrago_network.network, "links", "BEV", timeseries=True
)
self.dsm = filter_by_carrier(
etrago_network.network, "links", "dsm", timeseries=False
)
self.dsm_t = filter_by_carrier(
etrago_network.network, "links", "dsm", timeseries=True
)
self.rural_heat_t = filter_by_carrier(
etrago_network.network, "links", "rural_heat_pump", timeseries=True
)
self.rural_heat_store = filter_by_carrier(
etrago_network.network, "stores", "rural_heat_store", timeseries=False
)
self.central_heat_t = filter_by_carrier(
etrago_network.network,
"links",
["central_heat_pump", "central_resistive_heater"],
timeseries=True,
)
self.central_heat_store = filter_by_carrier(
etrago_network.network, "stores", "central_heat_store", timeseries=False
)
self.central_gas_chp_t = filter_by_carrier(
etrago_network.network, "links", "central_gas_chp_t", timeseries=True
)
#
self.generators = filter_by_carrier(
etrago_network.network, "generators", None, timeseries=False
)
self.generators_t = filter_by_carrier(
etrago_network.network, "generators", None, timeseries=True
)
self.battery_storage_units = filter_by_carrier(
etrago_network.network, "storage_units", "battery", timeseries=False
)
self.battery_storage_units_t = filter_by_carrier(
etrago_network.network, "storage_units", "battery", timeseries=True
)
[docs]def parallelizer(
ding0_id_list,
func,
func_arguments,
max_calc_time,
workers=mp2.cpu_count(),
worker_lifetime=1,
):
"""
Use python multiprocessing toolbox for parallelization
Several grids are analyzed in parallel based on your custom function that
defines the specific application of eDisGo.
Parameters
----------
ding0_id_list : list of int
List of ding0 grid data IDs (also known as HV/MV substation IDs)
func : any function
Your custom function that shall be parallelized
func_arguments : tuple
Arguments to custom function ``func``
workers: int
Number of parallel process
worker_lifetime : int
Bunch of grids sequentially analyzed by a worker
Notes
-----
Please note, the following requirements for the custom function which is to
be executed in parallel
#. It must return an instance of the type :class:`~.edisgo.EDisGo`.
#. The first positional argument is the MV grid district id (as int). It is
prepended to the tuple of arguments ``func_arguments``
Returns
-------
containers : dict of :class:`~.edisgo.EDisGo`
Dict of EDisGo instances keyed by its ID
"""
def collect_pool_results(result):
"""
Store results from parallelized calculation in structured manner
Parameters
----------
result: :class:`~.edisgo.EDisGo`
"""
results.update(result)
def error_callback(key):
# message='Failed'
# func_arguments[0]._status_update(key, 'end', message)
return lambda o: results.update({key: o})
results = {}
max_calc_time_seconds = max_calc_time * 3600
def initializer():
import pickle
pickle.DEFAULT_PROTOCOL = 4
import dill
dill.settings["protocol"] = 4
pool = mp2.Pool(workers, initializer=initializer, maxtasksperchild=worker_lifetime)
result_objects = {}
for ding0_id in ding0_id_list:
edisgo_args = (ding0_id, *func_arguments)
result_objects[ding0_id] = pool.apply_async(
func=func,
args=edisgo_args,
callback=collect_pool_results,
error_callback=error_callback(ding0_id),
)
errors = {}
successes = {}
start = datetime.now()
end = (start + td(hours=max_calc_time)).isoformat(" ")
logger.info("Jobs started. They will time out at {}.".format(end[: end.index(".")]))
current = datetime.now()
time_spent = 0
while result_objects and ((current - start).seconds <= max_calc_time_seconds):
done = []
tick = (current - start).seconds * 100 / max_calc_time_seconds
if tick - time_spent >= 1 or tick > 100:
hours_to_go = (current - start).seconds / 3600
logger.info(
"{:.2f}% ({:.2f}/{}h) spent".format(tick, hours_to_go, max_calc_time)
)
logger.info("Jobs time out in {:.2f}h.".format(max_calc_time - hours_to_go))
time_spent = tick
for grid_id, result in result_objects.items():
if result.ready():
logger.info(
"MV grid {} ready. Trying to `get` the result.".format(grid_id)
)
done.append(grid_id)
if not result.successful():
try:
# We already know that this was not successful, so the
# `get` is only here to re-raise the exception that
# occurred.
result.get()
except Exception as e:
logger.warning(
"MV grid {} failed due to {e!r}: '{e}'.".format(
grid_id, e=e
)
)
errors[grid_id] = e
else:
logger.info("MV grid {} calculated successfully.".format(grid_id))
successes[grid_id] = result.get()
logger.info("Done `get`ting the result for MV grid {}.".format(grid_id))
for grid_id in done:
del result_objects[grid_id]
sleep(1)
current = datetime.now()
# Now we know that we either reached the timeout, (x)or that all
# calculations are done. We just have collect what exactly is the case.
# This is done by `get`ting the results with a timeout of 0. If any of them
# are not yet done, a `TimeoutError` will be triggered, which we can
# collect like all other errors.
if not result_objects:
logger.info("All MV grids stopped before the timeout.")
else:
logger.warning("Some MV grid simulations timed out.")
pool.terminate()
end = datetime.now()
delta = end - start
logger.info("Execution finished after {:.2f} hours".format(delta.seconds / 3600))
done = []
for grid_id, result in result_objects.items():
done.append(grid_id)
try:
successes[grid_id] = result.get(timeout=0)
logger.info("MV grid {} calculated successfully.".format(grid_id))
except Exception as e:
logger.warning(
"MV grid {} failed due to {e!r}: '{e}'.".format(grid_id, e=e)
)
errors[grid_id] = e
for grid_id in done:
del result_objects[grid_id]
if errors:
logger.info("MV grid calculation error details:")
for grid_id, error in errors.items():
logger.info(" {}".format(grid_id))
strings = TracebackException.from_exception(error).format()
lines = [line for string in strings for line in string.split("\n")]
for line in lines:
logger.info(" " + line)
pool.close()
pool.join()
return results