Source code for ego.tools.economics

# -*- coding: utf-8 -*-
# Copyright 2016-2018 Europa-Universität Flensburg,
# Flensburg University of Applied Sciences,
# Centre for Sustainable Energy Systems
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

# File description
"""This module collects useful functions for economic calculation of eGo
which can mainly distinguished in operational and investment costs.
"""

import io
import os
import logging
logger = logging.getLogger('ego')

if not 'READTHEDOCS' in os.environ:
    import pandas as pd
    import numpy as np
    from ego.tools.utilities import get_time_steps

__copyright__ = "Flensburg University of Applied Sciences, Europa-Universität"\
    "Flensburg, Centre for Sustainable Energy Systems"
__license__ = "GNU Affero General Public License Version 3 (AGPL-3.0)"
__author__ = "wolfbunke"


# calculate annuity per time step or periode
[docs]def annuity_per_period(capex, n, wacc, t, p): """ Calculate per given period Parameters ---------- capex : float Capital expenditure (NPV of investment) n : int Number of years that the investment is used (economic lifetime) wacc : float Weighted average cost of capital t : int Timesteps in hours p : float interest rate """ # ToDo change formular to hourly annuity costs return capex * (wacc * (1 + wacc) ** n) / ((1 + wacc) ** n - 1)
[docs]def edisgo_convert_capital_costs(overnight_cost, t, p, json_file): """ Get scenario and calculation specific annuity cost by given capital costs and lifetime. Parameters ---------- json_file : :obj:dict Dictionary of the ``scenario_setting.json`` file _start_snapshot : int Start point of calculation from ``scenario_setting.json`` file _end_snapshot : int End point of calculation from ``scenario_setting.json`` file _p : numeric interest rate of investment _t : int lifetime of investment Returns ------- annuity_cost : numeric Scenario and calculation specific annuity cost by given capital costs and lifetime Examples -------- .. math:: PVA = (1 / p) - (1 / (p*(1 + p)^t)) """ # Based on eTraGo calculation in # https://github.com/openego/eTraGo/blob/dev/etrago/tools/utilities.py#L651 # Calculate present value of an annuity (PVA) PVA = (1 / p) - (1 / (p*(1 + p) ** t)) year = 8760 # get period of calculation period = (json_file['eTraGo']['start_snapshot'] - json_file['eTraGo']['start_snapshot']) # calculation of capital_cost annuity_cost = (overnight_cost / (PVA * (year/(period+1)))) return annuity_cost
[docs]def etrago_convert_overnight_cost(annuity_cost, json_file, t=40, p=0.05): """ Get annuity cost of simulation and calculation total ``overnight_costs`` by given capital costs and lifetime. Parameters ---------- json_file : :obj:dict Dictionary of the ``scenario_setting.json`` file _start_snapshot : int Start point of calculation from ``scenario_setting.json`` file _end_snapshot : int End point of calculation from ``scenario_setting.json`` file _p : numeric interest rate of investment _T : int lifetime of investment Returns ------- overnight_cost : numeric Scenario and calculation total ``overnight_costs`` by given annuity capital costs and lifetime. Examples -------- .. math:: PVA = (1 / p) - (1 / (p*(1 + p)^t)) K_{ON} = K_a*PVA*((t/(period+1)) """ # Based on eTraGo calculation in # https://github.com/openego/eTraGo/blob/dev/etrago/tools/utilities.py#L651 # Calculate present value of an annuity (PVA) PVA = (1 / p) - (1 / (p*(1 + p) ** t)) year = 8760 # get period of calculation period = (json_file['eTraGo']['start_snapshot'] - json_file['eTraGo']['start_snapshot']) # calculation of overnight_cost overnight_cost = annuity_cost*(PVA * (year/(period+1))) return overnight_cost
[docs]def etrago_operating_costs(network): """ Function to get all operating costs of eTraGo. Parameters ---------- network_etrago: :class:`etrago.tools.io.NetworkScenario` eTraGo network object compiled by :meth:`etrago.appl.etrago` Returns ------- operating_costs : :pandas:`pandas.Dataframe<dataframe>` DataFrame with aggregate operational costs per component and voltage level in [EUR] per calculated time steps. Example ------- .. code-block:: python >>> from ego.tools.io import eGo >>> ego = eGo(jsonpath='scenario_setting.json') >>> ego.etrago.operating_costs +-------------+-------------------+------------------+ | component |operation_costs | voltage_level | +=============+===================+==================+ |biomass | 27.0 | ehv | +-------------+-------------------+------------------+ |line losses | 0.0 | ehv | +-------------+-------------------+------------------+ |wind_onshore | 0.0 | ehv | +-------------+-------------------+------------------+ """ etg = network # get v_nom _bus = pd.DataFrame(etg.buses['v_nom']) _bus.index.name = "name" _bus.reset_index(level=0, inplace=True) # Add voltage level idx = etg.generators.index etg.generators = pd.merge(etg.generators, _bus, left_on='bus', right_on='name') etg.generators.index = idx etg.generators['voltage_level'] = 'unknown' # add ehv ix_ehv = etg.generators[etg.generators['v_nom'] >= 380].index etg.generators.set_value(ix_ehv, 'voltage_level', 'ehv') # add hv ix_hv = etg.generators[(etg.generators['v_nom'] <= 220) & (etg.generators['v_nom'] >= 110)].index etg.generators.set_value(ix_hv, 'voltage_level', 'hv') # get voltage_level index ix_by_ehv = etg.generators[etg.generators.voltage_level == 'ehv'].index ix_by_hv = etg.generators[etg.generators.voltage_level == 'hv'].index ix_slack = etg.generators[etg.generators.control != 'Slack'].index ix_by_ehv = ix_slack.join(ix_by_ehv, how='left', level=None, return_indexers=False, sort=False) ix_by_hv = ix_slack.join(ix_by_hv, how='right', level=None, return_indexers=False, sort=False) # groupby v_nom ehv operating_costs_ehv = (etg.generators_t.p[ix_by_ehv] * etg.generators. marginal_cost[ix_by_ehv]) operating_costs_ehv = operating_costs_ehv.groupby( etg.generators.carrier, axis=1).sum().sum() operating_costs = pd.DataFrame(operating_costs_ehv) operating_costs.columns = ['operation_costs'] operating_costs['voltage_level'] = 'ehv' # groupby v_nom ehv operating_costs_hv = (etg.generators_t.p[ix_by_hv] * etg.generators. marginal_cost[ix_by_hv]) operating_costs_hv = operating_costs_hv.groupby( etg.generators.carrier, axis=1).sum().sum() opt_costs_hv = pd.DataFrame(operating_costs_hv) opt_costs_hv.columns = ['operation_costs'] opt_costs_hv['voltage_level'] = 'hv' # add df operating_costs = operating_costs.append(opt_costs_hv) tpc_ehv = pd.DataFrame(operating_costs_ehv.sum(), columns=['operation_costs'], index=['total_power_costs']) tpc_ehv['voltage_level'] = 'ehv' operating_costs = operating_costs.append(tpc_ehv) tpc_hv = pd.DataFrame(operating_costs_hv.sum(), columns=['operation_costs'], index=['total_power_costs']) tpc_hv['voltage_level'] = 'hv' operating_costs = operating_costs.append(tpc_hv) # add Grid and Transform Costs try: etg.lines['voltage_level'] = 'unknown' ix_ehv = etg.lines[etg.lines['v_nom'] >= 380].index etg.lines.set_value(ix_ehv, 'voltage_level', 'ehv') ix_hv = etg.lines[(etg.lines['v_nom'] <= 220) & (etg.lines['v_nom'] >= 110)].index etg.lines.set_value(ix_hv, 'voltage_level', 'hv') losses_total = sum(etg.lines.losses) + sum(etg.transformers.losses) losses_costs = losses_total * np.average(etg.buses_t.marginal_price) # add Transform and Grid losses # etg.lines[['losses','voltage_level']].groupby('voltage_level', # axis=0).sum().reset_index() except AttributeError: logger.info("No Transform and Line losses are calcualted! \n" "Use eTraGo pf_post_lopf method") losses_total = 0 losses_costs = 0 # total grid losses costs tgc = pd.DataFrame(losses_costs, columns=['operation_costs'], index=['total_grid_losses']) tgc['voltage_level'] = 'ehv/hv' operating_costs = operating_costs.append(tgc) #power_price = power_price.T.iloc[0] return operating_costs
[docs]def etrago_grid_investment(network, json_file): """ Function to get grid expantion costs from eTraGo Parameters ---------- network_etrago: :class:`etrago.tools.io.NetworkScenario` eTraGo network object compiled by :meth:`etrago.appl.etrago` json_file : :obj:dict Dictionary of the ``scenario_setting.json`` file Returns ------- grid_investment_costs : :pandas:`pandas.Dataframe<dataframe>` Dataframe with ``voltage_level``, ``number_of_expansion`` and ``capital_cost`` per calculated time steps Example ------- .. code-block:: python >>> from ego.tools.io import eGo >>> ego = eGo(jsonpath='scenario_setting.json') >>> ego.etrago.grid_investment_costs +--------------+-------------------+--------------+ | voltage_level|number_of_expansion| capital_cost| +==============+===================+==============+ | ehv | 27.0 | 31514.1305 | +--------------+-------------------+--------------+ | hv | 0.0 | 0.0 | +--------------+-------------------+--------------+ """ # check settings for extendable if 'network' not in json_file['eTraGo']['extendable']: logger.info("The optimizition was not using parameter" " 'extendable': network \n" "No grid expantion costs from etrago") if 'network' in json_file['eTraGo']['extendable']: lines = network.lines[['v_nom', 'capital_cost', 's_nom', 's_nom_min', 's_nom_opt']].reset_index() lines['s_nom_expansion'] = lines.s_nom_opt.subtract( lines.s_nom, axis='index') lines['capital_cost'] = lines.s_nom_expansion.multiply( lines.capital_cost, axis='index') lines['number_of_expansion'] = lines.s_nom_expansion > 0.0 lines['time_step'] = get_time_steps(json_file) # add v_level lines['voltage_level'] = 'unknown' ix_ehv = lines[lines['v_nom'] >= 380].index lines.set_value(ix_ehv, 'voltage_level', 'ehv') ix_hv = lines[(lines['v_nom'] <= 220) & (lines['v_nom'] >= 110)].index lines.set_value(ix_hv, 'voltage_level', 'hv') # based on eTraGo Function: # https://github.com/openego/eTraGo/blob/dev/etrago/tools/utilities.py#L651 # Definition https://pypsa.org/doc/components.html#line trafo = pd.DataFrame() # get costs of transfomers if json_file['eTraGo']['network_clustering_kmeans'] == False: trafos = network.transformers[['v_nom0', 'v_nom1', 'capital_cost', 's_nom_extendable', 's_nom', 's_nom_opt']] trafos.columns.name = "" trafos.index.name = "" trafos.reset_index() trafos['s_nom_extendable'] = trafos.s_nom_opt.subtract( trafos.s_nom, axis='index') trafos['capital_cost'] = trafos.s_nom_extendable.multiply( trafos.capital_cost, axis='index') trafos['number_of_expansion'] = trafos.s_nom_extendable > 0.0 trafos['time_step'] = get_time_steps(json_file) # add v_level trafos['voltage_level'] = 'unknown' # TODO check ix_ehv = trafos[trafos['v_nom0'] >= 380].index trafos.set_value(ix_ehv, 'voltage_level', 'ehv') ix_hv = trafos[(trafos['v_nom0'] <= 220) & (trafos['v_nom0'] >= 110)].index trafos.set_value(ix_hv, 'voltage_level', 'hv') # aggregate trafo trafo = trafos[['voltage_level', 'capital_cost']].groupby('voltage_level' ).sum().reset_index() # aggregate lines line = lines[['voltage_level', 'capital_cost']].groupby('voltage_level' ).sum().reset_index() # merge trafos and line frames = [line, trafo] grid_investment_costs = pd.concat(frames) return grid_investment_costs # ToDo: add .agg({'number_of_expansion':lambda x: x.count(), # 's_nom_expansion': np.sum, # 'grid_costs': np.sum}) <- time_step pass
[docs]def edisgo_grid_investment(edisgo, json_file): """ Function aggregates all costs, based on all calculated eDisGo grids and their weightings Parameters ---------- edisgo : :class:`ego.tools.edisgo_integration.EDisGoNetworks` Contains multiple eDisGo networks Returns ------- None or :pandas:`pandas.DataFrame<dataframe>` Dataframe containing annuity costs per voltage level """ t = 40 p = 0.05 logger.info('For all components T={} and p={} is used'.format(t, p)) costs = pd.DataFrame( columns=['voltage_level', 'annuity_costs', 'overnight_costs']) # Loop through all calculated eDisGo grids for key, value in edisgo.network.items(): if not hasattr(value, 'network'): logger.warning('No results available for grid {}'.format(key)) continue # eDisGo results (overnight costs) for this grid costs_single = value.network.results.grid_expansion_costs costs_single.rename( columns={'total_costs': 'overnight_costs'}, inplace=True) # continue if this grid was not reinforced if (costs_single['overnight_costs'].sum() == 0.): logger.info('No expansion costs for grid {}'.format(key)) continue # Overnight cost translated in annuity costs costs_single['capital_cost'] = edisgo_convert_capital_costs( costs_single['overnight_costs'], t=t, p=p, json_file=json_file) # Weighting (retrieves the singe (absolute) weighting for this grid) choice = edisgo.grid_choice weighting = choice.loc[ choice['the_selected_network_id'] == key ][ 'no_of_points_per_cluster' ].values[0] costs_single[['capital_cost', 'overnight_costs']] = ( costs_single[['capital_cost', 'overnight_costs']] * weighting) # Append costs of this grid costs = costs.append( costs_single[[ 'voltage_level', 'capital_cost', 'overnight_costs']], ignore_index=True) if len(costs) == 0: logger.info('No expansion costs in any MV grid') return None else: aggr_costs = costs.groupby( ['voltage_level']).sum().reset_index() # In eDisGo all costs are in kEuro (eGo only takes Euro) aggr_costs[['capital_cost', 'overnight_costs']] = ( aggr_costs[['capital_cost', 'overnight_costs']] * 1000) successfull_grids = edisgo.successfull_grids if successfull_grids < 1: logger.warning( 'Only {} % of the grids were calculated.\n'.format( "{0:,.2f}".format(successfull_grids * 100) ) + 'Costs are extrapolated...') aggr_costs[['capital_cost', 'overnight_costs']] = ( aggr_costs[['capital_cost', 'overnight_costs']] / successfull_grids) return aggr_costs
[docs]def get_generator_investment(network, scn_name): """ Get investment costs per carrier/ generator. """ # TODO - change values in csv # - add values to database # work around later db table -> check capital_cost as cost input?!? etg = network try: dirname = os.path.dirname(__file__) filename = 'investment_costs.csv' path = os.path.join(dirname, filename) invest = pd.DataFrame.from_csv(path + '~/data/'+filename) except FileNotFoundError: path = os.getcwd() filename = 'investment_costs.csv' invest = pd.DataFrame.from_csv(path + '/data/'+filename) if scn_name in ['SH Status Quo', 'Status Quo']: invest_scn = 'Status Quo' if scn_name in ['SH NEP 2035', 'NEP 2035']: invest_scn = 'NEP 2035' if scn_name in ['SH eGo 100', 'eGo 100']: invest_scn = 'eGo 100' gen_invest = pd.concat([invest[invest_scn], etg.generators.groupby('carrier')['p_nom'].sum()], axis=1, join='inner') gen_invest = pd.concat([invest[invest_scn], etg.generators.groupby( 'carrier') ['p_nom'].sum()], axis=1, join='inner') gen_invest['carrier_costs'] = gen_invest[invest_scn] * \ gen_invest['p_nom'] * 1000 # in MW return gen_invest