Source code for

# -*- coding: utf-8 -*-
# Copyright 2016-2018 Europa-Universität Flensburg,
# Flensburg University of Applied Sciences,
# Centre for Sustainable Energy Systems
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <>.

# File description
"""This module collects useful functions for economic calculation of eGo
which can mainly distinguished in operational and investment costs.

import io
import logging
import os
import pkgutil

logger = logging.getLogger("ego")

if not "READTHEDOCS" in os.environ:
    import numpy as np
    import pandas as pd

    from import geolocation_buses

    from import get_time_steps

__copyright__ = (
    "Flensburg University of Applied Sciences, Europa-Universität"
    "Flensburg, Centre for Sustainable Energy Systems"
__license__ = "GNU Affero General Public License Version 3 (AGPL-3.0)"
__author__ = "wolfbunke"

# calculate annuity per time step or periode
[docs]def annuity_per_period(capex, n, wacc, t, p): """Calculate per given period Parameters ---------- capex : float Capital expenditure (NPV of investment) n : int Number of years that the investment is used (economic lifetime) wacc : float Weighted average cost of capital t : int Timesteps in hours p : float interest rate """ # ToDo change formular to hourly annuity costs return capex * (wacc * (1 + wacc) ** n) / ((1 + wacc) ** n - 1)
[docs]def edisgo_convert_capital_costs(overnight_cost, t, p, json_file): """Get scenario and calculation specific annuity cost by given capital costs and lifetime. Parameters ---------- json_file : :obj:dict Dictionary of the ``scenario_setting.json`` file _start_snapshot : int Start point of calculation from ``scenario_setting.json`` file _end_snapshot : int End point of calculation from ``scenario_setting.json`` file _p : numeric interest rate of investment _t : int lifetime of investment Returns ------- annuity_cost : numeric Scenario and calculation specific annuity cost by given capital costs and lifetime Examples -------- .. math:: PVA = (1 / p) - (1 / (p*(1 + p)^t)) """ # Based on eTraGo calculation in # # Calculate present value of an annuity (PVA) PVA = (1 / p) - (1 / (p * (1 + p) ** t)) year = 8760 # get period of calculation period = json_file["eTraGo"]["end_snapshot"] - json_file["eTraGo"]["start_snapshot"] # calculation of capital_cost annuity_cost = overnight_cost / (PVA * (year / (period + 1))) return annuity_cost
[docs]def etrago_convert_overnight_cost(annuity_cost, json_file, t=40, p=0.05): """Get annuity cost of simulation and calculation total ``overnight_costs`` by given capital costs and lifetime. Parameters ---------- json_file : :obj:dict Dictionary of the ``scenario_setting.json`` file _start_snapshot : int Start point of calculation from ``scenario_setting.json`` file _end_snapshot : int End point of calculation from ``scenario_setting.json`` file _p : numeric interest rate of investment _T : int lifetime of investment Returns ------- overnight_cost : numeric Scenario and calculation total ``overnight_costs`` by given annuity capital costs and lifetime. Examples -------- .. math:: PVA = (1 / p) - (1 / (p*(1 + p)^t)) K_{ON} = K_a*PVA*((t/(period+1)) """ # Based on eTraGo calculation in # # Calculate present value of an annuity (PVA) PVA = (1 / p) - (1 / (p * (1 + p) ** t)) year = 8760 # get period of calculation period = json_file["eTraGo"]["end_snapshot"] - json_file["eTraGo"]["start_snapshot"] # calculation of overnight_cost overnight_cost = annuity_cost * (PVA * (year / (period + 1))) return overnight_cost
[docs]def etrago_operating_costs(network): """Function to get all operating costs of eTraGo. Parameters ---------- network_etrago: :class:`` eTraGo network object compiled by :meth:`etrago.appl.etrago` Returns ------- operating_costs : :pandas:`pandas.Dataframe<dataframe>` DataFrame with aggregate operational costs per component and voltage level in [EUR] per calculated time steps. Example ------- .. code-block:: python >>> from import eGo >>> ego = eGo(jsonpath='scenario_setting.json') >>> ego.etrago.operating_costs +-------------+-------------------+------------------+ | component |operation_costs | voltage_level | +=============+===================+==================+ |biomass | 27.0 | ehv | +-------------+-------------------+------------------+ |line losses | 0.0 | ehv | +-------------+-------------------+------------------+ |wind_onshore | 0.0 | ehv | +-------------+-------------------+------------------+ """ etg = network # get v_nom _bus = pd.DataFrame(etg.buses["v_nom"]) = "name" _bus.reset_index(level=0, inplace=True) # Add voltage level idx = etg.generators.index etg.generators = pd.merge(etg.generators, _bus, left_on="bus", right_on="name") etg.generators.index = idx etg.generators["voltage_level"] = "unknown" # add ehv ix_ehv = etg.generators[etg.generators["v_nom"] >= 380].index etg.generators.set_value(ix_ehv, "voltage_level", "ehv") # add hv ix_hv = etg.generators[ (etg.generators["v_nom"] <= 220) & (etg.generators["v_nom"] >= 110) ].index etg.generators.set_value(ix_hv, "voltage_level", "hv") # get voltage_level index ix_by_ehv = etg.generators[etg.generators.voltage_level == "ehv"].index ix_by_hv = etg.generators[etg.generators.voltage_level == "hv"].index ix_slack = etg.generators[etg.generators.control != "Slack"].index ix_by_ehv = ix_slack.join( ix_by_ehv, how="left", level=None, return_indexers=False, sort=False ) ix_by_hv = ix_slack.join( ix_by_hv, how="right", level=None, return_indexers=False, sort=False ) # groupby v_nom ehv operating_costs_ehv = ( etg.generators_t.p[ix_by_ehv] * etg.generators.marginal_cost[ix_by_ehv] ) operating_costs_ehv = ( operating_costs_ehv.groupby(etg.generators.carrier, axis=1).sum().sum() ) operating_costs = pd.DataFrame(operating_costs_ehv) operating_costs.columns = ["operation_costs"] operating_costs["voltage_level"] = "ehv" # groupby v_nom ehv operating_costs_hv = ( etg.generators_t.p[ix_by_hv] * etg.generators.marginal_cost[ix_by_hv] ) operating_costs_hv = ( operating_costs_hv.groupby(etg.generators.carrier, axis=1).sum().sum() ) opt_costs_hv = pd.DataFrame(operating_costs_hv) opt_costs_hv.columns = ["operation_costs"] opt_costs_hv["voltage_level"] = "hv" # add df operating_costs = operating_costs.append(opt_costs_hv) tpc_ehv = pd.DataFrame( operating_costs_ehv.sum(), columns=["operation_costs"], index=["total_power_costs"], ) tpc_ehv["voltage_level"] = "ehv" operating_costs = operating_costs.append(tpc_ehv) tpc_hv = pd.DataFrame( operating_costs_hv.sum(), columns=["operation_costs"], index=["total_power_costs"], ) tpc_hv["voltage_level"] = "hv" operating_costs = operating_costs.append(tpc_hv) # add Grid and Transform Costs try: etg.lines["voltage_level"] = "unknown" ix_ehv = etg.lines[etg.lines["v_nom"] >= 380].index etg.lines.set_value(ix_ehv, "voltage_level", "ehv") ix_hv = etg.lines[ (etg.lines["v_nom"] <= 220) & (etg.lines["v_nom"] >= 110) ].index etg.lines.set_value(ix_hv, "voltage_level", "hv") losses_total = sum(etg.lines.losses) + sum(etg.transformers.losses) losses_costs = losses_total * np.average(etg.buses_t.marginal_price) # add Transform and Grid losses # etg.lines[['losses','voltage_level']].groupby('voltage_level', # axis=0).sum().reset_index() except AttributeError: "No Transform and Line losses are calcualted! \n" "Use eTraGo pf_post_lopf method" ) losses_total = 0 losses_costs = 0 # total grid losses costs tgc = pd.DataFrame( losses_costs, columns=["operation_costs"], index=["total_grid_losses"] ) tgc["voltage_level"] = "ehv/hv" operating_costs = operating_costs.append(tgc) # power_price = power_price.T.iloc[0] return operating_costs
[docs]def etrago_grid_investment(network, json_file, session): """Function to get grid expantion costs from eTraGo Parameters ---------- network_etrago: :class:`` eTraGo network object compiled by :meth:`etrago.appl.etrago` json_file : :obj:dict Dictionary of the ``scenario_setting.json`` file Returns ------- grid_investment_costs : :pandas:`pandas.Dataframe<dataframe>` Dataframe with ``voltage_level``, ``number_of_expansion`` and ``capital_cost`` per calculated time steps Example ------- .. code-block:: python >>> from import eGo >>> ego = eGo(jsonpath='scenario_setting.json') >>> ego.etrago.grid_investment_costs +---------------+---------------+-------------------+--------------+ |differentiation| voltage_level |number_of_expansion| capital_cost| +===============+===============+===================+==============+ | cross-border | ehv | 27.0 | 31514.1305 | +---------------+---------------+-------------------+--------------+ | domestic | hv | 0.0 | 0.0 | +---------------+---------------+-------------------+--------------+ """ # check settings for extendable if "network" not in json_file["eTraGo"]["extendable"]: "The optimizition was not using parameter" " 'extendable': network \n" "No grid expantion costs from etrago" ) if "network" in json_file["eTraGo"]["extendable"]: network = geolocation_buses(network, session) # differentiation by country_code network.lines["differentiation"] = "none" network.lines["bus0_c"] = network.lines["bus1_c"] = for idx, val in network.lines.iterrows(): check = val["bus0_c"] + val["bus1_c"] if "DE" in check: network.lines["differentiation"][idx] = "cross-border" if "DEDE" in check: network.lines["differentiation"][idx] = "domestic" if "DE" not in check: network.lines["differentiation"][idx] = "foreign" lines = network.lines[ [ "v_nom", "capital_cost", "s_nom", "s_nom_min", "s_nom_opt", "differentiation", ] ].reset_index() lines["s_nom_expansion"] = lines.s_nom_opt.subtract(lines.s_nom, axis="index") lines["capital_cost"] = lines.s_nom_expansion.multiply( lines.capital_cost, axis="index" ) lines["number_of_expansion"] = lines.s_nom_expansion > 0.0 lines["time_step"] = get_time_steps(json_file) # add v_level lines["voltage_level"] = "unknown" ix_ehv = lines[lines["v_nom"] >= 380].index lines.set_value(ix_ehv, "voltage_level", "ehv") ix_hv = lines[(lines["v_nom"] <= 220) & (lines["v_nom"] >= 110)].index lines.set_value(ix_hv, "voltage_level", "hv") # based on eTraGo Function: # # Definition trafo = pd.DataFrame() # get costs of transfomers if json_file["eTraGo"]["network_clustering_kmeans"] == False: network.transformers["differentiation"] = "none" trafos = network.transformers[ [ "v_nom0", "v_nom1", "capital_cost", "s_nom_extendable", "s_nom", "s_nom_opt", ] ] = "" = "" trafos.reset_index() trafos["s_nom_extendable"] = trafos.s_nom_opt.subtract( trafos.s_nom, axis="index" ) trafos["capital_cost"] = trafos.s_nom_extendable.multiply( trafos.capital_cost, axis="index" ) trafos["number_of_expansion"] = trafos.s_nom_extendable > 0.0 trafos["time_step"] = get_time_steps(json_file) # add v_level trafos["voltage_level"] = "unknown" # TODO check ix_ehv = trafos[trafos["v_nom0"] >= 380].index trafos.set_value(ix_ehv, "voltage_level", "ehv") ix_hv = trafos[(trafos["v_nom0"] <= 220) & (trafos["v_nom0"] >= 110)].index trafos.set_value(ix_hv, "voltage_level", "hv") # aggregate trafo trafo = ( trafos[["voltage_level", "capital_cost", "differentiation"]] .groupby(["differentiation", "voltage_level"]) .sum() .reset_index() ) # aggregate lines line = ( lines[["voltage_level", "capital_cost", "differentiation"]] .groupby(["differentiation", "voltage_level"]) .sum() .reset_index() ) # merge trafos and line frames = [line, trafo] grid_investment_costs = pd.concat(frames) return grid_investment_costs # ToDo: add .agg({'number_of_expansion':lambda x: x.count(), # 's_nom_expansion': np.sum, # 'grid_costs': np.sum}) <- time_step pass
[docs]def edisgo_grid_investment(edisgo, json_file): """ Function aggregates all costs, based on all calculated eDisGo grids and their weightings Parameters ---------- edisgo : :class:`` Contains multiple eDisGo networks Returns ------- None or :pandas:`pandas.DataFrame<dataframe>` Dataframe containing annuity costs per voltage level """ t = 40 p = 0.05"For all components T={} and p={} is used".format(t, p)) costs = pd.DataFrame(columns=["voltage_level", "annuity_costs", "overnight_costs"]) # Loop through all calculated eDisGo grids for key, value in if not hasattr(value, "network"): logger.warning("No results available for grid {}".format(key)) continue # eDisGo results (overnight costs) for this grid costs_single = costs_single.rename(columns={"total_costs": "overnight_costs"}, inplace=True) # continue if this grid was not reinforced if costs_single["overnight_costs"].sum() == 0.0:"No expansion costs for grid {}".format(key)) continue # Overnight cost translated in annuity costs costs_single["capital_cost"] = edisgo_convert_capital_costs( costs_single["overnight_costs"], t=t, p=p, json_file=json_file ) # Weighting (retrieves the singe (absolute) weighting for this grid) choice = edisgo.grid_choice weighting = choice.loc[choice["the_selected_network_id"] == key][ "no_of_points_per_cluster" ].values[0] costs_single[["capital_cost", "overnight_costs"]] = ( costs_single[["capital_cost", "overnight_costs"]] * weighting ) # Append costs of this grid costs = costs.append( costs_single[["voltage_level", "capital_cost", "overnight_costs"]], ignore_index=True, ) if len(costs) == 0:"No expansion costs in any MV grid") return None else: aggr_costs = costs.groupby(["voltage_level"]).sum().reset_index() # In eDisGo all costs are in kEuro (eGo only takes Euro) aggr_costs[["capital_cost", "overnight_costs"]] = ( aggr_costs[["capital_cost", "overnight_costs"]] * 1000 ) successfull_grids = edisgo.successfull_grids if successfull_grids < 1: logger.warning( "Only {} % of the grids were calculated.\n".format( "{:,.2f}".format(successfull_grids * 100) ) + "Costs are extrapolated..." ) aggr_costs[["capital_cost", "overnight_costs"]] = ( aggr_costs[["capital_cost", "overnight_costs"]] / successfull_grids ) return aggr_costs
[docs]def get_generator_investment(network, scn_name): """Get investment costs per carrier/ generator.""" etg = network try: data = pkgutil.get_data("ego", "data/investment_costs.csv") invest = pd.read_csv( io.BytesIO(data), encoding="utf8", sep=",", index_col="carriers" ) except FileNotFoundError: path = os.getcwd() filename = "investment_costs.csv" invest = pd.DataFrame.from_csv(path + "/data/" + filename) if scn_name in ["SH Status Quo", "Status Quo"]: invest_scn = "Status Quo" if scn_name in ["SH NEP 2035", "NEP 2035"]: invest_scn = "NEP 2035" if scn_name in ["SH eGo 100", "eGo 100"]: invest_scn = "eGo 100" gen_invest = pd.concat( [invest[invest_scn], etg.generators.groupby("carrier")["p_nom"].sum()], axis=1, join="inner", ) gen_invest = pd.concat( [invest[invest_scn], etg.generators.groupby("carrier")["p_nom"].sum()], axis=1, join="inner", ) gen_invest["carrier_costs"] = ( gen_invest[invest_scn] * gen_invest["p_nom"] * 1000 ) # in MW return gen_invest