Source code for ego.tools.edisgo_integration

# -*- coding: utf-8 -*-
# Copyright 2016-2018 Europa-Universität Flensburg,
# Flensburg University of Applied Sciences,
# Centre for Sustainable Energy Systems
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

# File description
"""
This file is part of the the eGo toolbox.
It contains the class definition for multiple eDisGo networks.
"""
__copyright__ = ("Flensburg University of Applied Sciences, "
                 "Europa-Universität Flensburg, "
                 "Centre for Sustainable Energy Systems")
__license__ = "GNU Affero General Public License Version 3 (AGPL-3.0)"
__author__ = "wolf_bunke, maltesc"

# Import
from traceback import TracebackException
import os
import pickle
import logging
import traceback
import pypsa
import csv
import dill
import pandas as pd
from time import localtime, sleep, strftime
from datetime import datetime, timedelta as td
import json
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
import multiprocess as mp2

if not 'READTHEDOCS' in os.environ:

    from egoio.db_tables import model_draft, grid
    from egoio.tools import db

    from edisgo.grid.network import Results, TimeSeriesControl
    from edisgo.grid import tools
    from edisgo.tools.plots import mv_grid_topology
    from edisgo.grid.network import EDisGo

    from ego.tools.specs import (
        get_etragospecs_direct
    )
    from ego.tools.mv_cluster import (
        analyze_attributes,
        cluster_mv_grids)
    from ego.tools.economics import (
        edisgo_grid_investment)


# Logging
logger = logging.getLogger(__name__)

pickle.DEFAULT_PROTOCOL = 4
dill.settings['protocol'] = 4


[docs]class EDisGoNetworks: """ Performs multiple eDisGo runs and stores the resulting edisgo_grids Parameters ---------- json_file : :obj:dict Dictionary of the ``scenario_setting.json`` file etrago_network: :class:`etrago.tools.io.NetworkScenario` eTraGo network object compiled by :meth:`etrago.appl.etrago` """ def __init__(self, json_file, etrago_network): # Genral Json Inputs self._json_file = json_file self._set_scenario_settings() # Create reduced eTraGo network self._etrago_network = _ETraGoData(etrago_network) del etrago_network # eDisGo specific naming self._edisgo_scenario_translation() # Program information self._run_finished = False # eDisGo Result grids self._edisgo_grids = {} if self._csv_import: self._laod_edisgo_results() self._successfull_grids = self._successfull_grids() self._grid_investment_costs = edisgo_grid_investment( self, self._json_file ) else: # Only clustering results if self._only_cluster: self._set_grid_choice() if self._results: self._save_edisgo_results() self._grid_investment_costs = None else: # Execute Functions self._set_grid_choice() self._init_status() self._run_edisgo_pool() if self._results: self._save_edisgo_results() self._successfull_grids = self._successfull_grids() self._grid_investment_costs = edisgo_grid_investment( self, self._json_file ) @property def network(self): """ Container for eDisGo grids, including all results Returns ------- :obj:`dict` of :class:`edisgo.grid.network.EDisGo` Dictionary of eDisGo objects, keyed by MV grid ID """ return self._edisgo_grids @property def grid_choice(self): """ Container for the choice of MV grids, including their weighting Returns ------- :pandas:`pandas.DataFrame<dataframe>` Dataframe containing the chosen grids and their weightings """ return self._grid_choice @property def successfull_grids(self): """ Relative number of successfully calculated MV grids (Includes clustering weighting) Returns ------- int Relative number of grids """ return self._successfull_grids @property def grid_investment_costs(self): """ Grid investment costs Returns ------- None or :pandas:`pandas.DataFrame<dataframe>` Dataframe containing annuity costs per voltage level """ return self._grid_investment_costs
[docs] def get_mv_grid_from_bus_id(self, bus_id): """ Queries the MV grid ID for a given eTraGo bus Parameters ---------- bus_id : int eTraGo bus ID Returns ------- int MV grid (ding0) ID """ conn = db.connection(section=self._db_section) session_factory = sessionmaker(bind=conn) Session = scoped_session(session_factory) session = Session() mv_grid_id = self._get_mv_grid_from_bus_id(session, bus_id) Session.remove() return mv_grid_id
[docs] def get_bus_id_from_mv_grid(self, subst_id): """ Queries the eTraGo bus ID for given MV grid (ding0) ID Parameters ---------- subst_id : int MV grid (ding0) ID Returns ------- int eTraGo bus ID """ conn = db.connection(section=self._db_section) session_factory = sessionmaker(bind=conn) Session = scoped_session(session_factory) session = Session() bus_id = self._get_bus_id_from_mv_grid(session, subst_id) Session.remove() return bus_id
[docs] def plot_storage_integration(self, mv_grid_id, **kwargs): """ Plots storage position in MV grid of integrated storages. For more information see :func:`edisgo.tools.plots.mv_grid_topology`. """ mv_grid_topology( self._edisgo_grids[mv_grid_id].network.pypsa, self._edisgo_grids[mv_grid_id].network.config, node_color=kwargs.get('storage_integration', None), filename=kwargs.get('filename', None), grid_district_geom=kwargs.get('grid_district_geom', True), background_map=kwargs.get('background_map', True), xlim=kwargs.get('xlim', None), ylim=kwargs.get('ylim', None), title=kwargs.get('title', ''))
[docs] def plot_grid_expansion_costs(self, mv_grid_id, ** kwargs): """ Plots costs per MV line. For more information see :func:`edisgo.tools.plots.mv_grid_topology`. """ mv_grid_topology( self._edisgo_grids[mv_grid_id].network.pypsa, self._edisgo_grids[mv_grid_id].network.config, line_color='expansion_costs', grid_expansion_costs=( self._edisgo_grids[mv_grid_id].network. results.grid_expansion_costs.rename(columns={ "overnight_costs": "total_costs"})), filename=kwargs.get('filename', None), grid_district_geom=kwargs.get('grid_district_geom', True), background_map=kwargs.get('background_map', True), limits_cb_lines=kwargs.get('limits_cb_lines', None), xlim=kwargs.get('xlim', None), ylim=kwargs.get('ylim', None), lines_cmap=kwargs.get('lines_cmap', 'inferno_r'), title=kwargs.get('title', ''))
[docs] def plot_line_loading(self, mv_grid_id, **kwargs): """ Plots relative line loading (current from power flow analysis to allowed current) of MV lines. For more information see :func:`edisgo.tools.plots.mv_grid_topology`. """ mv_grid_topology( self._edisgo_grids[mv_grid_id].network.pypsa, self._edisgo_grids[mv_grid_id].network.config, timestep=kwargs.get('timestep', None), line_color='loading', node_color=kwargs.get('node_color', None), line_load=self._edisgo_grids[mv_grid_id].network.results.s_res(), filename=kwargs.get('filename', None), arrows=kwargs.get('arrows', None), grid_district_geom=kwargs.get('grid_district_geom', True), background_map=kwargs.get('background_map', True), voltage=None, # change API limits_cb_lines=kwargs.get('limits_cb_lines', None), limits_cb_nodes=kwargs.get('limits_cb_nodes', None), xlim=kwargs.get('xlim', None), ylim=kwargs.get('ylim', None), lines_cmap=kwargs.get('lines_cmap', 'inferno_r'), title=kwargs.get('title', ''))
[docs] def plot_mv_grid_topology(self, mv_grid_id, **kwargs): """ Plots plain MV grid topology. For more information see :func:`edisgo.tools.plots.mv_grid_topology`. """ mv_grid_topology(self._edisgo_grids[mv_grid_id].network.pypsa, self._edisgo_grids[mv_grid_id].network.config, filename=kwargs.get('filename', None), grid_district_geom=kwargs.get( 'grid_district_geom', True), background_map=kwargs.get('background_map', True), xlim=kwargs.get('xlim', None), ylim=kwargs.get('ylim', None), title=kwargs.get('title', ''))
def _init_status(self): """ Creates a Status file where all eDisGo statuses are tracked... """ self._status_dir = 'status' if not os.path.exists(self._status_dir): os.makedirs(self._status_dir) self._status_file = 'eGo_' + strftime("%Y-%m-%d_%H%M%S", localtime()) status = self._grid_choice.copy() status = status.set_index('the_selected_network_id') status.index.names = ['MV grid'] tot_reprs = self._grid_choice['no_of_points_per_cluster'].sum() status['cluster_perc'] = status['no_of_points_per_cluster'] / tot_reprs status['start_time'] = 'Not started yet' status['end_time'] = 'Not finished yet' status.drop( ['no_of_points_per_cluster', 'represented_grids'], axis=1, inplace=True) self._status_path = os.path.join( self._status_dir, self._status_file + '.csv') status.to_csv(self._status_path) def _status_update(self, mv_grid_id, time, message=None, show=True): """ Updtaed eDisGo's status files """ status = pd.read_csv( self._status_path, index_col=0) status['start_time'] = status['start_time'].astype(str) status['end_time'] = status['end_time'].astype(str) if message: now = message else: now = strftime("%Y-%m-%d_%H:%M", localtime()) if time == 'start': status.at[mv_grid_id, 'start_time'] = now elif time == 'end': status.at[mv_grid_id, 'end_time'] = now if show: logger.info("\n\neDisGo Status: \n\n" + status.to_string() + "\n\n") status.to_csv(self._status_path) def _update_edisgo_configs(self, edisgo_grid): """ This function overwrites some eDisGo configurations with eGo settings. """ # Info and Warning handling if not hasattr(self, '_suppress_log'): self._suppress_log = False # Only in the first run warnings and # info get thrown # Database section ego_db = self._db_section edisgo_db = edisgo_grid.network.config['db_connection']['section'] if not ego_db == edisgo_db: if not self._suppress_log: logger.warning( ("eDisGo database configuration (db: '{}') " + "will be overwritten with database configuration " + "from eGo's scenario settings (db: '{}')").format( edisgo_db, ego_db)) edisgo_grid.network.config['db_connection']['section'] = ego_db # Versioned ego_gridversion = self._grid_version if ego_gridversion == None: ego_versioned = 'model_draft' if not self._suppress_log: logger.info("eGo's grid_version == None is " + "evaluated as data source: model_draft") else: ego_versioned = 'versioned' if not self._suppress_log: logger.info(("eGo's grid_version == '{}' is " + "evaluated as data source: versioned").format( ego_gridversion)) edisgo_versioned = edisgo_grid.network.config[ 'data_source']['oedb_data_source'] if not ego_versioned == edisgo_versioned: if not self._suppress_log: logger.warning( ("eDisGo data source configuration ('{}') " + "will be overwritten with data source config. from " + "eGo's scenario settings (data source: '{}')" ).format( edisgo_versioned, ego_versioned)) edisgo_grid.network.config[ 'data_source']['oedb_data_source'] = ego_versioned # Gridversion ego_gridversion = self._grid_version edisgo_gridversion = edisgo_grid.network.config[ 'versioned']['version'] if not ego_gridversion == edisgo_gridversion: if not self._suppress_log: logger.warning( ("eDisGo version configuration (version: '{}') " + "will be overwritten with version configuration " + "from eGo's scenario settings (version: '{}')" ).format( edisgo_gridversion, ego_gridversion)) edisgo_grid.network.config[ 'versioned']['version'] = ego_gridversion self._suppress_log = True def _set_scenario_settings(self): self._csv_import = self._json_file['eGo']['csv_import_eDisGo'] # eTraGo args self._etrago_args = self._json_file['eTraGo'] self._scn_name = self._etrago_args['scn_name'] self._ext_storage = ( 'storage' in self._etrago_args['extendable'] ) if self._ext_storage: logger.info("eTraGo Dataset used extendable storage") self._pf_post_lopf = self._etrago_args['pf_post_lopf'] # eDisGo args import if self._csv_import: # raise NotImplementedError with open(os.path.join( self._csv_import, 'edisgo_args.json')) as f: edisgo_args = json.load(f) self._json_file['eDisGo'] = edisgo_args logger.info("All eDisGo settings are taken from CSV folder" + "(scenario settings are ignored)") # This overwrites the original object... # Imported or directly from the Settings # eDisGo section of the settings self._edisgo_args = self._json_file['eDisGo'] # Reading all eDisGo settings # TODO: Integrate into a for-loop self._db_section = self._edisgo_args['db'] self._grid_version = self._edisgo_args['gridversion'] self._timesteps_pfa = self._edisgo_args['timesteps_pfa'] self._solver = self._edisgo_args['solver'] self._curtailment_voltage_threshold = self._edisgo_args[ 'curtailment_voltage_threshold'] self._ding0_files = self._edisgo_args['ding0_files'] self._choice_mode = self._edisgo_args['choice_mode'] self._parallelization = self._edisgo_args['parallelization'] self._initial_reinforcement = self._edisgo_args[ 'initial_reinforcement'] self._storage_distribution = self._edisgo_args['storage_distribution'] self._apply_curtailment = self._edisgo_args['apply_curtailment'] self._cluster_attributes = self._edisgo_args['cluster_attributes'] self._only_cluster = self._edisgo_args['only_cluster'] self._max_workers = self._edisgo_args['max_workers'] self._max_cos_phi_renewable = self._edisgo_args[ 'max_cos_phi_renewable'] self._results = self._edisgo_args['results'] self._max_calc_time = self._edisgo_args['max_calc_time'] # Some basic checks if (self._storage_distribution is True) & (self._ext_storage is False): logger.warning("Storage distribution (MV grids) is active, " + "but eTraGo dataset has no extendable storages") if not self._initial_reinforcement: raise NotImplementedError( "Skipping the initial reinforcement is not yet implemented" ) if self._only_cluster: logger.warning( "\n\nThis eDisGo run only returns cluster results\n\n") # Versioning if self._grid_version is not None: self._versioned = True else: self._versioned = False def _edisgo_scenario_translation(self): # Scenario translation if self._scn_name == 'Status Quo': self._generator_scn = None elif self._scn_name == 'NEP 2035': self._generator_scn = 'nep2035' elif self._scn_name == 'eGo 100': self._generator_scn = 'ego100' def _successfull_grids(self): """ Calculates the relative number of successfully calculated grids, including the cluster weightings """ total, success, fail = 0, 0, 0 for key, value in self._edisgo_grids.items(): weight = self._grid_choice.loc[ self._grid_choice['the_selected_network_id'] == key ]['no_of_points_per_cluster'].values[0] total += weight if hasattr(value, 'network'): success += weight else: fail += weight return success/total def _analyze_cluster_attributes(self): """ Analyses the attributes wind and solar capacity and farthest node for clustering. These are considered the "standard" attributes for the MV grid clustering. """ analyze_attributes(self._ding0_files) def _cluster_mv_grids( self, no_grids): """ Clusters the MV grids based on the attributes, for a given number of MV grids Parameters ---------- no_grids : int Desired number of clusters (of MV grids) Returns ------- :pandas:`pandas.DataFrame<dataframe>` Dataframe containing the clustered MV grids and their weightings """ # TODO: This first dataframe contains the standard attributes... # ...Create an Interface in order to use attributes more flexibly. # Make this function more generic. attributes_path = self._ding0_files + '/attributes.csv' if not os.path.isfile(attributes_path): logger.info('Attributes file is missing') logger.info('Attributes will be calculated') self._analyze_cluster_attributes() df = pd.read_csv(self._ding0_files + '/attributes.csv') df = df.set_index('id') df.drop(['Unnamed: 0'], inplace=True, axis=1) df.rename( columns={ "Solar_cumulative_capacity": "solar_cap", "Wind_cumulative_capacity": "wind_cap", "The_Farthest_node": "farthest_node"}, inplace=True) if 'extended_storage' in self._cluster_attributes: if self._ext_storage: storages = self._identify_extended_storages() if not (storages.max().values[0] == 0.): df = pd.concat([df, storages], axis=1) df.rename( columns={"storage_p_nom": "extended_storage"}, inplace=True) else: logger.warning('Extended storages all 0. \ Therefore, extended storages \ are excluded from clustering') found_atts = [ i for i in self._cluster_attributes if i in df.columns ] missing_atts = [ i for i in self._cluster_attributes if i not in df.columns ] logger.info( 'Available attributes are: {}'.format(df.columns.tolist()) ) logger.info( 'Chosen/found attributes are: {}'.format(found_atts) ) if len(missing_atts) > 0: logger.warning( 'Missing attributes: {}'.format(missing_atts) ) if 'extended_storage' in missing_atts: logger.info('Hint: eTraGo dataset must contain ' 'extendable storage in order to include ' 'storage extension in MV grid clustering.') return cluster_mv_grids( no_grids, cluster_base=df) def _identify_extended_storages(self): conn = db.connection(section=self._db_section) session_factory = sessionmaker(bind=conn) Session = scoped_session(session_factory) session = Session() all_mv_grids = self._check_available_mv_grids() storages = pd.DataFrame( index=all_mv_grids, columns=['storage_p_nom']) logger.info('Identifying extended storage') for mv_grid in all_mv_grids: bus_id = self._get_bus_id_from_mv_grid(session, mv_grid) min_extended = 0.3 stor_p_nom = self._etrago_network.storage_units.loc[ (self._etrago_network.storage_units['bus'] == str(bus_id)) & (self._etrago_network.storage_units[ 'p_nom_extendable' ] == True) & (self._etrago_network.storage_units[ 'p_nom_opt' ] > min_extended) & (self._etrago_network.storage_units['max_hours'] <= 20.) ]['p_nom_opt'] if len(stor_p_nom) == 1: stor_p_nom = stor_p_nom.values[0] elif len(stor_p_nom) == 0: stor_p_nom = 0. else: raise IndexError storages.at[mv_grid, 'storage_p_nom'] = stor_p_nom Session.remove() return storages def _check_available_mv_grids(self): """ Checks all available MV grids in the given folder (from the settings) Returns ------- :obj:`list` List of MV grid ID's """ mv_grids = [] for file in os.listdir(self._ding0_files): if file.endswith('.pkl'): mv_grids.append( int(file.replace( 'ding0_grids__', '' ).replace('.pkl', ''))) return mv_grids def _set_grid_choice(self): """ Sets the grid choice based on the settings file """ choice_df = pd.DataFrame( columns=[ 'no_of_points_per_cluster', 'the_selected_network_id', 'represented_grids']) if self._choice_mode == 'cluster': no_grids = self._edisgo_args['no_grids'] logger.info('Clustering to {} MV grids'.format(no_grids)) cluster_df = self._cluster_mv_grids(no_grids) choice_df[ 'the_selected_network_id' ] = cluster_df['the_selected_network_id'] choice_df[ 'no_of_points_per_cluster' ] = cluster_df['no_of_points_per_cluster'] choice_df[ 'represented_grids' ] = cluster_df['represented_grids'] elif self._choice_mode == 'manual': man_grids = self._edisgo_args['manual_grids'] choice_df['the_selected_network_id'] = man_grids choice_df['no_of_points_per_cluster'] = 1 choice_df['represented_grids'] = [ [mv_grid_id] for mv_grid_id in choice_df['the_selected_network_id']] logger.info( 'Calculating manually chosen MV grids {}'.format(man_grids) ) elif self._choice_mode == 'all': mv_grids = self._check_available_mv_grids() choice_df['the_selected_network_id'] = mv_grids choice_df['no_of_points_per_cluster'] = 1 choice_df['represented_grids'] = [ [mv_grid_id] for mv_grid_id in choice_df['the_selected_network_id']] no_grids = len(mv_grids) logger.info( 'Calculating all available {} MV grids'.format(no_grids) ) choice_df = choice_df.sort_values( 'no_of_points_per_cluster', ascending=False) self._grid_choice = choice_df def _run_edisgo_pool(self): """ Runs eDisGo for the chosen grids """ parallelization = self._parallelization if not os.path.exists(self._results): os.makedirs(self._results) if parallelization is True: logger.info('Run eDisGo parallel') mv_grids = self._grid_choice['the_selected_network_id'].tolist() no_cpu = mp2.cpu_count() if no_cpu > self._max_workers: no_cpu = self._max_workers logger.info( 'Number of workers limited to {} by user'.format( self._max_workers )) self._edisgo_grids = set(mv_grids) self._edisgo_grids = parallelizer( mv_grids, lambda *xs: xs[1]._run_edisgo(xs[0]), (self,), self._max_calc_time, workers=no_cpu) for g in mv_grids: if not g in self._edisgo_grids: self._edisgo_grids[g] = 'Timeout' self._csv_import = self._json_file['eDisGo']['results'] self._save_edisgo_results() self._laod_edisgo_results() else: logger.info('Run eDisGo sequencial') no_grids = len(self._grid_choice) count = 0 for idx, row in self._grid_choice.iterrows(): prog = '%.1f' % (count / no_grids * 100) logger.info( '{} % Calculated by eDisGo'.format(prog) ) mv_grid_id = int(row['the_selected_network_id']) logger.info( 'MV grid {}'.format(mv_grid_id) ) try: edisgo_grid = self._run_edisgo(mv_grid_id) self._edisgo_grids[ mv_grid_id ] = edisgo_grid except Exception as e: self._edisgo_grids[mv_grid_id] = e logger.exception( 'MV grid {} failed: \n'.format(mv_grid_id) ) count += 1 self._run_finished = True def _run_edisgo( self, mv_grid_id): """ Performs a single eDisGo run Parameters ---------- mv_grid_id : int MV grid ID of the ding0 grid Returns ------- :class:`edisgo.grid.network.EDisGo` Returns the complete eDisGo container, also including results """ self._status_update(mv_grid_id, 'start', show=False) storage_integration = self._storage_distribution apply_curtailment = self._apply_curtailment logger.info( 'MV grid {}: Calculating interface values'.format(mv_grid_id)) conn = db.connection(section=self._db_section) session_factory = sessionmaker(bind=conn) Session = scoped_session(session_factory) session = Session() # Query bus ID for this MV grid bus_id = self._get_bus_id_from_mv_grid(session, mv_grid_id) # Calculate Interface values for this MV grid specs = get_etragospecs_direct( session, bus_id, self._etrago_network, self._scn_name, self._grid_version, self._pf_post_lopf, self._max_cos_phi_renewable) Session.remove() # Get ding0 (MV grid) form folder ding0_filepath = ( self._ding0_files + '/ding0_grids__' + str(mv_grid_id) + '.pkl') if not os.path.isfile(ding0_filepath): msg = 'No MV grid file for MV grid {}'.format(mv_grid_id) logger.error(msg) raise Exception(msg) # Initalize eDisGo with this MV grid logger.info(("MV grid {}: Initialize MV grid").format(mv_grid_id)) edisgo_grid = EDisGo(ding0_grid=ding0_filepath, worst_case_analysis='worst-case') logger.info(("MV grid {}: Changing eDisGo's voltage configurations " + "for initial reinforcement").format(mv_grid_id)) edisgo_grid.network.config[ 'grid_expansion_allowed_voltage_deviations'] = { 'hv_mv_trafo_offset': 0.04, 'hv_mv_trafo_control_deviation': 0.0, 'mv_load_case_max_v_deviation': 0.055, 'mv_feedin_case_max_v_deviation': 0.02, 'lv_load_case_max_v_deviation': 0.065, 'lv_feedin_case_max_v_deviation': 0.03, 'mv_lv_station_load_case_max_v_deviation': 0.02, 'mv_lv_station_feedin_case_max_v_deviation': 0.01 } # Inital grid reinforcements logger.info(("MV grid {}: Initial MV grid reinforcement " + "(worst-case anaylsis)").format(mv_grid_id)) edisgo_grid.reinforce() # Get costs for initial reinforcement # TODO: Implement a separate cost function costs_grouped = \ edisgo_grid.network.results.grid_expansion_costs.groupby( ['type']).sum() costs = pd.DataFrame( costs_grouped.values, columns=costs_grouped.columns, index=[[edisgo_grid.network.id] * len(costs_grouped), costs_grouped.index]).reset_index() costs.rename(columns={'level_0': 'grid'}, inplace=True) costs_before = costs total_costs_before_EUR = costs_before['total_costs'].sum() * 1000 logger.info( ("MV grid {}: Costs for initial " + "reinforcement: EUR {}").format( mv_grid_id, "{0:,.2f}".format(total_costs_before_EUR))) logger.info(( "MV grid {}: Resetting grid after initial reinforcement" ).format(mv_grid_id)) edisgo_grid.network.results = Results(edisgo_grid.network) # Reload the (original) eDisGo configs edisgo_grid.network.config = None # eTraGo case begins here logger.info("MV grid {}: eTraGo feed-in case".format(mv_grid_id)) # Update eDisGo settings (from config files) with scenario settings logger.info("MV grid {}: Updating eDisgo configuration".format( mv_grid_id)) # Update configs with eGo's scenario settings self._update_edisgo_configs(edisgo_grid) # Generator import for NEP 2035 and eGo 100 scenarios if self._generator_scn: logger.info( 'Importing generators for scenario {}'.format( self._scn_name) ) edisgo_grid.import_generators( generator_scenario=self._generator_scn) else: logger.info( 'No generators imported for scenario {}'.format( self._scn_name) ) edisgo_grid.network.pypsa = None # Time Series from eTraGo logger.info('Updating eDisGo timeseries with eTraGo values') if self._pf_post_lopf: logger.info('(Including reactive power)') edisgo_grid.network.timeseries = TimeSeriesControl( network=edisgo_grid.network, timeseries_generation_fluctuating=specs['ren_potential'], timeseries_generation_dispatchable=specs['conv_dispatch'], timeseries_generation_reactive_power=specs['reactive_power'], timeseries_load='demandlib', timeindex=specs['conv_dispatch'].index).timeseries else: logger.info('(Only active power)') edisgo_grid.network.timeseries = TimeSeriesControl( network=edisgo_grid.network, timeseries_generation_fluctuating=specs['ren_potential'], timeseries_generation_dispatchable=specs['conv_dispatch'], timeseries_load='demandlib', timeindex=specs['conv_dispatch'].index).timeseries # Curtailment if apply_curtailment: logger.info('Including Curtailment') gens_df = tools.get_gen_info(edisgo_grid.network) solar_wind_capacities = gens_df.groupby( by=['type', 'weather_cell_id'] )['nominal_capacity'].sum() curt_cols = [ i for i in specs['ren_curtailment'].columns if i in solar_wind_capacities.index ] if not curt_cols: raise ImportError( ("MV grid {}: Data doesn't match").format(mv_grid_id)) curt_abs = pd.DataFrame( columns=pd.MultiIndex.from_tuples(curt_cols)) for col in curt_abs: curt_abs[col] = ( specs['ren_curtailment'][col] * solar_wind_capacities[col]) edisgo_grid.curtail( curtailment_timeseries=curt_abs, methodology='voltage-based', solver=self._solver, voltage_threshold=self._curtailment_voltage_threshold) else: logger.info('No curtailment applied') # Storage Integration costs_without_storage = None if storage_integration: if self._ext_storage: if not specs['battery_p_series'] is None: logger.info('Integrating storages in MV grid') edisgo_grid.integrate_storage( timeseries=specs['battery_p_series'], position='distribute_storages_mv', timeseries_reactive_power=specs[ 'battery_q_series' ]) # None if no pf_post_lopf costs_without_storage = ( edisgo_grid.network.results.storages_costs_reduction[ 'grid_expansion_costs_initial'].values[0]) else: logger.info('No storage integration') logger.info("MV grid {}: eDisGo grid analysis".format(mv_grid_id)) edisgo_grid.reinforce(timesteps_pfa=self._timesteps_pfa) if costs_without_storage is not None: costs_with_storage = ( edisgo_grid.network.results.grid_expansion_costs[ 'total_costs'].sum()) if costs_with_storage >= costs_without_storage: logger.warning( "Storage did not benefit MV grid {}".format( mv_grid_id)) st = edisgo_grid.network.mv_grid.graph.nodes_by_attribute( 'storage') for storage in st: tools.disconnect_storage(edisgo_grid.network, storage) self._status_update(mv_grid_id, 'end') path = os.path.join(self._results, str(mv_grid_id)) edisgo_grid.network.results.save(path) return {edisgo_grid.network.id: path} def _save_edisgo_results(self): if not os.path.exists(self._results): os.makedirs(self._results) with open( os.path.join(self._results, 'edisgo_args.json'), 'w') as fp: json.dump(self._edisgo_args, fp) self._grid_choice.to_csv(self._results + '/grid_choice.csv') def _laod_edisgo_results(self): # Load the grid choice form CSV self._grid_choice = pd.read_csv( os.path.join(self._csv_import, 'grid_choice.csv'), index_col=0) self._grid_choice['represented_grids'] = self._grid_choice.apply( lambda x: eval(x['represented_grids']), axis=1) for idx, row in self._grid_choice.iterrows(): mv_grid_id = int(row['the_selected_network_id']) try: # Grid expansion costs file_path = os.path.join( self._csv_import, str(mv_grid_id), 'grid_expansion_results', 'grid_expansion_costs.csv') grid_expansion_costs = pd.read_csv( file_path, index_col=0) # powerflow results pf_path = os.path.join( self._csv_import, str(mv_grid_id), 'powerflow_results', 'apparent_powers.csv') s_res = pd.read_csv( pf_path, index_col=0, parse_dates=True) # Configs config_path = os.path.join( self._csv_import, str(mv_grid_id), 'configs.csv') edisgo_config = {} with open(config_path, 'r') as f: reader = csv.reader(f) for row in reader: a = iter(row[1:]) edisgo_config[row[0]] = dict(zip(a, a)) # PyPSA network pypsa_path = os.path.join( self._csv_import, str(mv_grid_id), 'pypsa_network') imported_pypsa = pypsa.Network() imported_pypsa.import_from_csv_folder(pypsa_path) # Storages storage_path = os.path.join( self._csv_import, str(mv_grid_id), 'storage_integration_results', 'storages.csv') if os.path.exists(storage_path): storages = pd.read_csv( storage_path, index_col=0) else: storages = pd.DataFrame( columns=['nominal_power', 'voltage_level']) edisgo_grid = _EDisGoImported( grid_expansion_costs, s_res, storages, imported_pypsa, edisgo_config) self._edisgo_grids[ mv_grid_id ] = edisgo_grid logger.info("Imported MV grid {}".format(mv_grid_id)) except: self._edisgo_grids[ mv_grid_id ] = "This grid failed to reimport" logger.warning( "MV grid {} could not be loaded".format(mv_grid_id)) def _get_mv_grid_from_bus_id(self, session, bus_id): """ Queries the MV grid ID for a given eTraGo bus Parameters ---------- bus_id : int eTraGo bus ID Returns ------- int MV grid (ding0) ID """ if self._versioned is True: ormclass_hvmv_subst = grid.__getattribute__( 'EgoDpHvmvSubstation' ) subst_id = session.query( ormclass_hvmv_subst.subst_id ).filter( ormclass_hvmv_subst.otg_id == bus_id, ormclass_hvmv_subst.version == self._grid_version ).scalar() if self._versioned is False: ormclass_hvmv_subst = model_draft.__getattribute__( 'EgoGridHvmvSubstation' ) subst_id = session.query( ormclass_hvmv_subst.subst_id ).filter( ormclass_hvmv_subst.otg_id == bus_id ).scalar() return subst_id def _get_bus_id_from_mv_grid(self, session, subst_id): """ Queries the eTraGo bus ID for given MV grid (ding0) ID Parameters ---------- subst_id : int MV grid (ding0) ID Returns ------- int eTraGo bus ID """ if self._versioned is True: ormclass_hvmv_subst = grid.__getattribute__( 'EgoDpHvmvSubstation' ) bus_id = session.query( ormclass_hvmv_subst.otg_id ).filter( ormclass_hvmv_subst.subst_id == subst_id, ormclass_hvmv_subst.version == self._grid_version ).scalar() if self._versioned is False: ormclass_hvmv_subst = model_draft.__getattribute__( 'EgoGridHvmvSubstation' ) bus_id = session.query( ormclass_hvmv_subst.otg_id ).filter( ormclass_hvmv_subst.subst_id == subst_id ).scalar() return bus_id
class _ETraGoData: """ Container for minimal eTraGo network. This minimal network is required for the parallelization of eDisGo. """ def __init__(self, etrago_network): self.snapshots = getattr( etrago_network, "snapshots") self.storage_units = getattr( etrago_network, "storage_units") self.storage_units_t = getattr( etrago_network, "storage_units_t") self.generators = getattr( etrago_network, "generators") self.generators_t = getattr( etrago_network, "generators_t") class _EDisGoImported: """ Imported (reduced) eDisGo class. This class allows the import reduction to only the attributes used in eGo """ def __init__( self, grid_expansion_costs, s_res, storages, pypsa, edisgo_config): self.network = _NetworkImported( grid_expansion_costs, s_res, storages, pypsa, edisgo_config) class _NetworkImported: """ Reduced eDisG network class, used of eGo's reimport """ def __init__( self, grid_expansion_costs, s_res, storages, pypsa, edisgo_config): self.results = _ResultsImported( grid_expansion_costs, s_res, storages) self.pypsa = pypsa self.config = edisgo_config class _ResultsImported: """ Reduced eDisG results class, used of eGo's reimport """ def __init__( self, grid_expansion_costs, s_res, storages): self.grid_expansion_costs = grid_expansion_costs self.storages = storages self._s_res = s_res def s_res(self): return self._s_res
[docs]def parallelizer( ding0_id_list, func, func_arguments, max_calc_time, workers=mp2.cpu_count(), worker_lifetime=1): """ Use python multiprocessing toolbox for parallelization Several grids are analyzed in parallel based on your custom function that defines the specific application of eDisGo. Parameters ---------- ding0_id_list : list of int List of ding0 grid data IDs (also known as HV/MV substation IDs) func : any function Your custom function that shall be parallelized func_arguments : tuple Arguments to custom function ``func`` workers: int Number of parallel process worker_lifetime : int Bunch of grids sequentially analyzed by a worker Notes ----- Please note, the following requirements for the custom function which is to be executed in parallel #. It must return an instance of the type :class:`~.edisgo.EDisGo`. #. The first positional argument is the MV grid district id (as int). It is prepended to the tuple of arguments ``func_arguments`` Returns ------- containers : dict of :class:`~.edisgo.EDisGo` Dict of EDisGo instances keyed by its ID """ def collect_pool_results(result): """ Store results from parallelized calculation in structured manner Parameters ---------- result: :class:`~.edisgo.EDisGo` """ results.update(result) def error_callback(key): # message='Failed' # func_arguments[0]._status_update(key, 'end', message) return lambda o: results.update({key: o}) results = {} max_calc_time_seconds = max_calc_time * 3600 def initializer(): import pickle pickle.DEFAULT_PROTOCOL = 4 import dill dill.settings['protocol'] = 4 pool = mp2.Pool( workers, initializer=initializer, maxtasksperchild=worker_lifetime) result_objects = {} for ding0_id in ding0_id_list: edisgo_args = (ding0_id, *func_arguments) result_objects[ding0_id] = pool.apply_async( func=func, args=edisgo_args, callback=collect_pool_results, error_callback=error_callback(ding0_id)) errors = {} successes = {} start = datetime.now() end = (start + td(hours=max_calc_time)).isoformat(' ') logger.info( "Jobs started. They will time out at {}." .format(end[:end.index('.')])) current = datetime.now() time_spent = 0 while (result_objects and ((current - start).seconds <= max_calc_time_seconds)): done = [] tick = (current - start).seconds * 100 / max_calc_time_seconds if tick - time_spent >= 1 or tick > 100: hours_to_go = (current - start).seconds / 3600 logger.info("{:.2f}% ({:.2f}/{}h) spent" .format(tick, hours_to_go, max_calc_time)) logger.info("Jobs time out in {:.2f}h." .format(max_calc_time - hours_to_go)) time_spent = tick for grid, result in result_objects.items(): if result.ready(): logger.info( "MV grid {} ready. Trying to `get` the result." .format(grid)) done.append(grid) if not result.successful(): try: # We already know that this was not successful, so the # `get` is only here to re-raise the exception that # occurred. result.get() except Exception as e: logger.warning( "MV grid {} failed due to {e!r}: '{e}'." .format(grid, e=e)) errors[grid] = e else: logger.info( "MV grid {} calculated successfully.".format(grid)) successes[grid] = result.get() logger.info( "Done `get`ting the result for MV grid {}." .format(grid)) for grid in done: del result_objects[grid] sleep(1) current = datetime.now() # Now we know that we either reached the timeout, (x)or that all # calculations are done. We just have collect what exactly is the case. # This is done by `get`ting the results with a timeout of 0. If any of them # are not yet done, a `TimeoutError` will be triggered, which we can # collect like all other errors. if not result_objects: logger.info("All MV grids stopped before the timeout.") else: logger.warning("Some MV grid simulations timed out.") pool.terminate() end = datetime.now() delta = end - start logger.info("Execution finished after {:.2f} hours".format( delta.seconds / 3600)) done = [] for grid, result in result_objects.items(): done.append(grid) try: successes[grid] = result.get(timeout=0) logger.info("MV grid {} calculated successfully.".format(grid)) except Exception as e: logger.warning( "MV grid {} failed due to {e!r}: '{e}'.".format(grid, e=e)) errors[grid] = e for grid in done: del result_objects[grid] if errors: logger.info("MV grid calculation error details:") for grid, error in errors.items(): logger.info(" {}".format(grid)) strings = TracebackException.from_exception(error).format() lines = [line for string in strings for line in string.split("\n")] for line in lines: logger.info(" " + line) pool.close() pool.join() return results