# -*- coding: utf-8 -*-
# Copyright 2016-2018 Europa-Universität Flensburg,
# Flensburg University of Applied Sciences,
# Centre for Sustainable Energy Systems
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# File description
"""This file contains the eGo main class as well as input & output functions
of eGo in order to build the eGo application container.
"""
import logging
import os
import pandas as pd
if "READTHEDOCS" not in os.environ:
import re
from importlib import import_module
import pypsa
from egoio.db_tables.model_draft import EgoGridPfHvSource as Source
from egoio.db_tables.model_draft import EgoGridPfHvTempResolution as TempResolution
from egoio.tools import db
from etrago import Etrago
from etrago.appl import run_etrago
from etrago.tools.io import load_config_file
from sqlalchemy import and_
from sqlalchemy.orm import sessionmaker
from ego.tools.economics import etrago_convert_overnight_cost
from ego.tools.edisgo_integration import EDisGoNetworks
from ego.tools.plots import (
igeoplot,
plot_edisgo_cluster,
plot_grid_storage_investment,
plot_line_expansion,
plot_storage_expansion,
plot_storage_use,
power_price_plot,
)
from ego.tools.utilities import get_scenario_setting
logger = logging.getLogger("ego")
__copyright__ = "Europa-Universität Flensburg, " "Centre for Sustainable Energy Systems"
__license__ = "GNU Affero General Public License Version 3 (AGPL-3.0)"
__author__ = "wolf_bunke,maltesc"
[docs]class egoBasic(object):
"""The eGo basic class select and creates based on your
``scenario_setting.json`` file your definded eTraGo and
eDisGo results container. And contains the session for the
database connection.
Parameters
----------
jsonpath : :obj:`json`
Path to ``scenario_setting.json`` file.
Returns
-------
json_file : :obj:dict
Dictionary of the ``scenario_setting.json`` file
session : :sqlalchemy:`sqlalchemy.orm.session.Session<orm/session_basics.html>`
SQLAlchemy session to the OEDB
"""
def __init__(self, *args, **kwargs):
""" """
logger.info("Using scenario setting: {}".format(self.jsonpath))
self.json_file = None
self.session = None
self.scn_name = None
self.json_file = get_scenario_setting(jsonpath=self.jsonpath)
# Database connection from json_file
try:
conn = db.connection(section=self.json_file["eTraGo"]["db"])
Session = sessionmaker(bind=conn)
self.session = Session()
logger.info("Connected to Database")
except: # noqa: E722
logger.error("Failed connection to Database", exc_info=True)
# get scn_name
self.scn_name = self.json_file["eTraGo"]["scn_name"]
[docs]class eTraGoResults(egoBasic):
"""The ``eTraGoResults`` class creates and contains all results
of eTraGo and it's network container for eGo.
Returns
-------
network_etrago: :class:`etrago.tools.io.NetworkScenario`
eTraGo network object compiled by :func:`etrago.appl.etrago`
etrago: :pandas:`pandas.Dataframe<dataframe>`
DataFrame which collects several eTraGo results
"""
def __init__(self, *args, **kwargs):
""" """
super(eTraGoResults, self).__init__(self, *args, **kwargs)
self.etrago = None
logger.info("eTraGo section started")
if self.json_file["eGo"]["result_id"] is not None:
# Delete arguments from scenario_setting
logger.info("Remove given eTraGo settings from scenario_setting")
try:
self.json_file["eGo"]["eTraGo"] = False
for key in self.json_file["eTraGo"].keys():
self.json_file["eTraGo"][key] = "removed by DB recover"
# ToDo add scenario_setting for results
self.json_file["eTraGo"]["db"] = self.json_file["eTraGo"]["db"]
logger.info("Add eTraGo scenario_setting from oedb result")
# To do ....
_prefix = "EgoGridPfHvResult"
schema = "model_draft"
packagename = "egoio.db_tables"
_pkg = import_module(packagename + "." + schema)
# get metadata
orm_meta = getattr(_pkg, _prefix + "Meta")
self.jsonpath = recover_resultsettings(
self.session,
self.json_file,
orm_meta,
self.json_file["eGo"]["result_id"],
)
# add etrago_disaggregated_network from DB
logger.info(
"Recovered eTraGo network uses kmeans: {}".format(
self.json_file["eTraGo"]["network_clustering_kmeans"]
)
)
except KeyError:
pass
logger.info("Create eTraGo network from oedb result")
self._etrago_network = etrago_from_oedb(self.session, self.json_file)
if self.json_file["eTraGo"]["disaggregation"] is not False:
self._etrago_disaggregated_network = self._etrago_network
else:
logger.warning("No disaggregated network found in DB")
self._etrago_disaggregated_network = None
# create eTraGo NetworkScenario
if self.json_file["eGo"]["eTraGo"] is True:
if self.json_file["eGo"].get("csv_import_eTraGo") is not False:
logger.info("Import eTraGo network from csv files")
self.etrago = Etrago(
csv_folder_name=self.json_file["eGo"].get("csv_import_eTraGo")
)
else:
logger.info("Create eTraGo network calcualted by eGo")
run_etrago(args=self.json_file["eTraGo"], json_path=None)
[docs]class eDisGoResults(eTraGoResults):
"""The ``eDisGoResults`` class create and contains all results
of eDisGo and its network containers.
"""
def __init__(self, *args, **kwargs):
super(eDisGoResults, self).__init__(self, *args, **kwargs)
if self.json_file["eGo"]["eDisGo"] is True:
logger.info("Create eDisGo network")
self._edisgo = EDisGoNetworks(
json_file=self.json_file,
etrago_network=self.etrago.disaggregated_network,
)
else:
self._edisgo = None
logger.info("No eDisGo network")
@property
def edisgo(self):
"""
Contains basic informations about eDisGo
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
"""
return self._edisgo
[docs]class eGo(eDisGoResults):
"""Main eGo module which includs all results and main functionalities.
Returns
-------
network_etrago: :class:`etrago.tools.io.NetworkScenario`
eTraGo network object compiled by :meth:`etrago.appl.etrago`
edisgo.network : :class:`ego.tools.edisgo_integration.EDisGoNetworks`
Contains multiple eDisGo networks
edisgo : :pandas:`pandas.Dataframe<dataframe>`
aggregated results of eDisGo
etrago : :pandas:`pandas.Dataframe<dataframe>`
aggregated results of eTraGo
"""
def __init__(self, jsonpath, *args, **kwargs):
self.jsonpath = jsonpath
super(eGo, self).__init__(self, *args, **kwargs)
# add total results here
self._total_investment_costs = None
self._total_operation_costs = None
self._calculate_investment_cost()
self._storage_costs = None
self._ehv_grid_costs = None
self._mv_grid_costs = None
def _calculate_investment_cost(self, storage_mv_integration=True):
"""Get total investment costs of all voltage level for storages
and grid expansion
"""
self._total_inv_cost = pd.DataFrame(
columns=["component", "voltage_level", "capital_cost"]
)
_grid_ehv = None
if "network" in self.json_file["eTraGo"]["extendable"]:
_grid_ehv = self.etrago.grid_investment_costs
_grid_ehv["component"] = "grid"
self._total_inv_cost = self._total_inv_cost.append(
_grid_ehv, ignore_index=True
)
_storage = None
if "storage" in self.json_file["eTraGo"]["extendable"]:
_storage = self.etrago.storage_investment_costs
_storage["component"] = "storage"
self._total_inv_cost = self._total_inv_cost.append(
_storage, ignore_index=True
)
_grid_mv_lv = None
if self.json_file["eGo"]["eDisGo"] is True:
_grid_mv_lv = self.edisgo.grid_investment_costs
if _grid_mv_lv is not None:
_grid_mv_lv["component"] = "grid"
_grid_mv_lv["differentiation"] = "domestic"
self._total_inv_cost = self._total_inv_cost.append(
_grid_mv_lv, ignore_index=True
)
# add overnight costs
self._total_investment_costs = self._total_inv_cost
self._total_investment_costs["overnight_costs"] = etrago_convert_overnight_cost(
self._total_investment_costs["capital_cost"], self.json_file
)
# Include MV storages into the _total_investment_costs dataframe
if storage_mv_integration is True:
if _grid_mv_lv is not None:
self._integrate_mv_storage_investment()
# sort values
self._total_investment_costs["voltage_level"] = pd.Categorical(
self._total_investment_costs["voltage_level"],
["ehv", "hv", "mv", "lv", "mv/lv"],
)
self._total_investment_costs = self._total_investment_costs.sort_values(
"voltage_level"
)
self._storage_costs = _storage
self._ehv_grid_costs = _grid_ehv
self._mv_grid_costs = _grid_mv_lv
def _integrate_mv_storage_investment(self):
"""
Updates the total investment costs dataframe and includes the
storage integrated in MV grids.
"""
costs_df = self._total_investment_costs
total_stor = self._calculate_all_extended_storages()
mv_stor = self._calculate_mv_storage()
integrated_share = mv_stor / total_stor
try:
if integrated_share > 0:
ehv_stor_idx = costs_df.index[
(costs_df["component"] == "storage")
& (costs_df["voltage_level"] == "ehv")
][0]
int_capital_costs = (
costs_df.loc[ehv_stor_idx]["capital_cost"] * integrated_share
)
int_overnight_costs = (
costs_df.loc[ehv_stor_idx]["overnight_costs"] * integrated_share
)
costs_df.at[ehv_stor_idx, "capital_cost"] = (
costs_df.loc[ehv_stor_idx]["capital_cost"] - int_capital_costs
)
costs_df.at[ehv_stor_idx, "overnight_costs"] = (
costs_df.loc[ehv_stor_idx]["overnight_costs"] - int_overnight_costs
)
new_storage_row = {
"component": ["storage"],
"voltage_level": ["mv"],
"differentiation": ["domestic"],
"capital_cost": [int_capital_costs],
"overnight_costs": [int_overnight_costs],
}
new_storage_row = pd.DataFrame(new_storage_row)
costs_df = costs_df.append(new_storage_row)
self._total_investment_costs = costs_df
except: # noqa: E722
logger.info("Something went wrong with the MV storage distribution.")
def _calculate_all_extended_storages(self):
"""
Returns the all extended storage p_nom_opt in MW.
"""
etrago_network = self._etrago_disaggregated_network
stor_df = etrago_network.storage_units.loc[
(etrago_network.storage_units["p_nom_extendable"] is True)
]
stor_df = stor_df[["bus", "p_nom_opt"]]
all_extended_storages = stor_df["p_nom_opt"].sum()
return all_extended_storages
def _calculate_mv_storage(self):
"""
Returns the storage p_nom_opt in MW, integrated in MV grids
"""
etrago_network = self._etrago_disaggregated_network
min_extended = 0.3
stor_df = etrago_network.storage_units.loc[
(etrago_network.storage_units["p_nom_extendable"] is True)
& (etrago_network.storage_units["p_nom_opt"] > min_extended)
& (etrago_network.storage_units["max_hours"] <= 20.0)
]
stor_df = stor_df[["bus", "p_nom_opt"]]
integrated_storage = 0.0 # Storage integrated in MV grids
for idx, row in stor_df.iterrows():
mv_grid_id = row["bus"]
p_nom_opt = row["p_nom_opt"]
if not mv_grid_id:
continue
logger.info(
"Checking storage integration for MV grid {}".format(mv_grid_id)
)
grid_choice = self.edisgo.grid_choice
cluster = grid_choice.loc[
[
mv_grid_id in repr_grids
for repr_grids in grid_choice["represented_grids"]
]
]
if len(cluster) == 0:
continue
else:
representative_grid = cluster["the_selected_network_id"].values[0]
if hasattr(self.edisgo.network[representative_grid], "network"):
integration_df = self.edisgo.network[
representative_grid
].network.results.storages
integrated_power = integration_df["nominal_power"].sum() / 1000
else:
integrated_power = 0.0
if integrated_power > p_nom_opt:
integrated_power = p_nom_opt
integrated_storage = integrated_storage + integrated_power
return integrated_storage
@property
def total_investment_costs(self):
"""
Contains all investment informations about eGo
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
"""
return self._total_investment_costs
@property
def total_operation_costs(self):
"""
Contains all operation costs information about eGo
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
"""
self._total_operation_costs = self.etrago.operating_costs
# append eDisGo
return self._total_operation_costs
[docs] def plot_total_investment_costs(self, filename=None, display=False, **kwargs):
"""Plot total investment costs"""
if filename is None:
filename = "results/plot_total_investment_costs.pdf"
display = True
return plot_grid_storage_investment(
self._total_investment_costs, filename=filename, display=display, **kwargs
)
[docs] def plot_power_price(self, filename=None, display=False):
"""Plot power prices per carrier of calculation"""
if filename is None:
filename = "results/plot_power_price.pdf"
display = True
return power_price_plot(self, filename=filename, display=display)
[docs] def plot_storage_usage(self, filename=None, display=False):
"""Plot storage usage by charge and discharge"""
if filename is None:
filename = "results/plot_storage_usage.pdf"
display = True
return plot_storage_use(self, filename=filename, display=display)
[docs] def plot_edisgo_cluster(self, filename=None, display=False, **kwargs):
"""Plot the Clustering of selected Dingo networks"""
if filename is None:
filename = "results/plot_edisgo_cluster.pdf"
display = True
return plot_edisgo_cluster(self, filename=filename, display=display, **kwargs)
[docs] def plot_line_expansion(self, **kwargs):
"""Plot line expantion per line"""
return plot_line_expansion(self, **kwargs)
[docs] def plot_storage_expansion(self, **kwargs):
"""Plot storage expantion per bus"""
return plot_storage_expansion(self, **kwargs)
@property
def iplot(self):
"""Get iplot of results as html"""
return igeoplot(self)
# write_results_to_db():
logging.info("Initialisation of eGo Results")
[docs]def results_to_excel(ego):
"""
Wirte results of ego.total_investment_costs to an excel file
"""
# Write the results as xlsx file
# ToDo add time of calculation to file name
# add xlsxwriter to setup
writer = pd.ExcelWriter("open_ego_results.xlsx", engine="xlsxwriter")
# write results of installed Capacity by fuels
ego.total_investment_costs.to_excel(
writer, index=False, sheet_name="Total Calculation"
)
# Close the Pandas Excel writer and output the Excel file.
writer.save()
# buses
[docs]def etrago_from_oedb(session, json_file):
"""Function which import eTraGo results for the Database by the
``result_id`` number.
Parameters
----------
session : :sqlalchemy:`sqlalchemy.orm.session.Session<orm/session_basics.html>`
SQLAlchemy session to the OEDB
json_file : :obj:`dict`
Dictionary of the ``scenario_setting.json`` file
Returns
-------
network_etrago: :class:`etrago.tools.io.NetworkScenario`
eTraGo network object compiled by :meth:`etrago.appl.etrago`
"""
result_id = json_file["eGo"]["result_id"]
# functions
def map_ormclass(name):
"""
Function to map sqlalchemy classes
"""
try:
_mapped[name] = getattr(_pkg, _prefix + name)
except AttributeError:
logger.warning("Relation %s does not exist." % name)
return _mapped
def id_to_source(query):
# ormclass = map_ormclass(name)
# query = session.query(ormclass).filter(ormclass.result_id == result_id)
# TODO column naming in database
return {k.source_id: k.name for k in query.all()}
def dataframe_results(name, session, result_id, ormclass):
"""
Function to get pandas DataFrames by the result_id
Parameters
----------
session : :sqlalchemy:`sqlalchemy.orm.session.Session<orm/session_basics.html>`
SQLAlchemy session to the OEDB
"""
query = session.query(ormclass).filter(ormclass.result_id == result_id)
if name == "Transformer":
name = "Trafo"
df = pd.read_sql(query.statement, session.bind, index_col=name.lower() + "_id")
if name == "Link":
df["bus0"] = df.bus0.astype(int)
df["bus1"] = df.bus1.astype(int)
if "source" in df:
source_orm = Source
source_query = session.query(source_orm)
df.source = df.source.map(id_to_source(source_query))
if str(ormclass)[:-2].endswith("T"):
df = pd.Dataframe()
return df
def series_results(name, column, session, result_id, ormclass):
"""
Function to get Time Series as pandas DataFrames by the result_id
Parameters
----------
session: : sqlalchemy: `sqlalchemy.orm.session.Session<orm/session_basics.html>`
SQLAlchemy session to the OEDB
"""
# TODO - check index of bus_t and soon is wrong!
# TODO: pls make more robust
id_column = re.findall(r"[A-Z][^A-Z]*", name)[0] + "_" + "id"
id_column = id_column.lower()
query = session.query(
getattr(ormclass, id_column), getattr(ormclass, column).label(column)
).filter(and_(ormclass.result_id == result_id))
df = pd.io.sql.read_sql(
query.statement, session.bind, columns=[column], index_col=id_column
)
df.index = df.index.astype(str)
# change of format to fit pypsa
df = df[column].apply(pd.Series).transpose()
try:
assert not df.empty
df.index = timeindex
except AssertionError:
logger.warning("No data for {} in column {}.".format(name, column))
return df
# create config for results
path = os.getcwd()
# add meta_args with args of results
config = load_config_file(path + "/tools/config.json")["results"]
# map and Database settings of etrago_from_oedb()
_prefix = "EgoGridPfHvResult"
schema = "model_draft"
packagename = "egoio.db_tables"
_pkg = import_module(packagename + "." + schema)
temp_ormclass = "TempResolution"
carr_ormclass = "Source"
_mapped = {}
# get metadata
orm_meta = getattr(_pkg, _prefix + "Meta")
# check result_id
result_id_in = (
session.query(orm_meta.result_id).filter(orm_meta.result_id == result_id).all()
)
if result_id_in:
logger.info("Choosen result_id %s found in DB", result_id)
else:
logger.info("Error: result_id not found in DB")
# get meta data as args
meta_args = recover_resultsettings(session, json_file, orm_meta, result_id)
# get TempResolution
temp = TempResolution
tr = session.query(
temp.temp_id, temp.timesteps, temp.resolution, temp.start_time
).one()
timeindex = pd.DatetimeIndex(
start=tr.start_time, periods=tr.timesteps, freq=tr.resolution
)
timeindex = timeindex[
meta_args["eTraGo"]["start_snapshot"] - 1 : meta_args["eTraGo"]["end_snapshot"]
]
# create df for PyPSA network
network = pypsa.Network()
network.set_snapshots(timeindex)
timevarying_override = False
if pypsa.__version__ == "0.11.0":
old_to_new_name = {
"Generator": {
"p_min_pu_fixed": "p_min_pu",
"p_max_pu_fixed": "p_max_pu",
"source": "carrier",
"dispatch": "former_dispatch",
},
"Bus": {"current_type": "carrier"},
"Transformer": {"trafo_id": "transformer_id"},
"Storage": {
"p_min_pu_fixed": "p_min_pu",
"p_max_pu_fixed": "p_max_pu",
"soc_cyclic": "cyclic_state_of_charge",
"soc_initial": "state_of_charge_initial",
"source": "carrier",
},
}
timevarying_override = True
else:
old_to_new_name = {
"Storage": {
"soc_cyclic": "cyclic_state_of_charge",
"soc_initial": "state_of_charge_initial",
}
}
# get data into dataframes
logger.info("Start building eTraGo results network")
for comp, comp_t_dict in config.items():
orm_dict = map_ormclass(comp)
pypsa_comp_name = "StorageUnit" if comp == "Storage" else comp
ormclass = orm_dict[comp]
if not comp_t_dict:
df = dataframe_results(comp, session, result_id, ormclass)
if comp in old_to_new_name:
tmp = old_to_new_name[comp]
df.rename(columns=tmp, inplace=True)
network.import_components_from_dataframe(df, pypsa_comp_name)
if comp_t_dict:
for name, columns in comp_t_dict.items():
name = name[:-1]
pypsa_comp_name = name
if name == "Storage":
pypsa_comp_name = "StorageUnit"
if name == "Transformer":
name = "Trafo"
for col in columns:
df_series = series_results(name, col, session, result_id, ormclass)
# TODO: VMagPuSet?
if timevarying_override and comp == "Generator":
idx = df[df.former_dispatch == "flexible"].index
idx = [i for i in idx if i in df_series.columns]
df_series.drop(idx, axis=1, inplace=True)
try:
pypsa.io.import_series_from_dataframe(
network, df_series, pypsa_comp_name, col
)
except (ValueError, AttributeError):
logger.warning(
"Series %s of component %s could not be"
" imported" % (col, pypsa_comp_name)
)
logger.info("Imported eTraGo results of id = %s ", result_id)
return network
[docs]def recover_resultsettings(session, json_file, orm_meta, result_id):
"""Recover scenario_setting from database"""
# check result_id
result_id_in = (
session.query(orm_meta.result_id).filter(orm_meta.result_id == result_id).all()
)
# get meta data as json_file
meta = session.query(
orm_meta.result_id,
orm_meta.scn_name,
orm_meta.calc_date,
orm_meta.user_name,
orm_meta.method,
orm_meta.start_snapshot,
orm_meta.end_snapshot,
orm_meta.solver,
orm_meta.settings,
).filter(orm_meta.result_id == result_id)
meta_df = pd.read_sql(meta.statement, meta.session.bind, index_col="result_id")
# update json_file with main data by result_id
json_file["eTraGo"]["scn_name"] = meta_df.scn_name[result_id]
json_file["eTraGo"]["method"] = meta_df.method[result_id]
json_file["eTraGo"]["start_snapshot"] = meta_df.start_snapshot[result_id]
json_file["eTraGo"]["end_snapshot"] = meta_df.end_snapshot[result_id]
json_file["eTraGo"]["solver"] = meta_df.solver[result_id]
# update json_file with specific data by result_id
meta_set = dict(meta_df.settings[result_id])
for key in json_file["eTraGo"].keys():
try:
json_file["eTraGo"][key] = meta_set[key]
except KeyError:
pass
return json_file
if __name__ == "__main__":
pass