Skip to content

flixopt.aggregation

This module contains the Aggregation functionality for the flixopt framework. Through this, aggregating TimeSeriesData is possible.

Attributes

Classes

Aggregation

Aggregation(original_data: DataFrame, hours_per_time_step: Scalar, hours_per_period: Scalar, nr_of_periods: int = 8, weights: dict[str, float] | None = None, time_series_for_high_peaks: list[str] | None = None, time_series_for_low_peaks: list[str] | None = None)

aggregation organizing class

Parameters:

Name Type Description Default
original_data DataFrame

The original data to aggregate

required
hours_per_time_step Scalar

The duration of each timestep in hours.

required
hours_per_period Scalar

The duration of each period in hours.

required
nr_of_periods int

The number of typical periods to use in the aggregation.

8
weights dict[str, float] | None

The weights for aggregation. If None, all time series are equally weighted.

None
time_series_for_high_peaks list[str] | None

List of time series to use for explicitly selecting periods with high values.

None
time_series_for_low_peaks list[str] | None

List of time series to use for explicitly selecting periods with low values.

None

Functions

cluster
cluster() -> None

Durchführung der Zeitreihenaggregation

get_cluster_indices
get_cluster_indices() -> dict[str, list[np.ndarray]]

Generates a dictionary that maps each cluster to a list of index vectors representing the time steps assigned to that cluster for each period.

Returns:

Name Type Description
dict dict[str, list[ndarray]]

{cluster_0: [index_vector_3, index_vector_7, ...], cluster_1: [index_vector_1], ...}

get_equation_indices
get_equation_indices(skip_first_index_of_period: bool = True) -> tuple[np.ndarray, np.ndarray]

Generates pairs of indices for the equations by comparing index vectors of the same cluster. If skip_first_index_of_period is True, the first index of each period is skipped.

Parameters:

Name Type Description Default
skip_first_index_of_period bool

Whether to include or skip the first index of each period.

True

Returns:

Type Description
tuple[ndarray, ndarray]

tuple[np.ndarray, np.ndarray]: Two arrays of indices.

AggregationParameters

AggregationParameters(hours_per_period: float, nr_of_periods: int, fix_storage_flows: bool, aggregate_data_and_fix_non_binary_vars: bool, percentage_of_period_freedom: float = 0, penalty_of_period_freedom: float = 0, time_series_for_high_peaks: list[TimeSeriesData] | None = None, time_series_for_low_peaks: list[TimeSeriesData] | None = None)

Initializes aggregation parameters for time series data

Parameters:

Name Type Description Default
hours_per_period float

Duration of each period in hours.

required
nr_of_periods int

Number of typical periods to use in the aggregation.

required
fix_storage_flows bool

Whether to aggregate storage flows (load/unload); if other flows are fixed, fixing storage flows is usually not required.

required
aggregate_data_and_fix_non_binary_vars bool

Whether to aggregate all time series data, which allows to fix all time series variables (like flow_rate), or only fix binary variables. If False non time_series data is changed!! If True, the mathematical Problem is simplified even further.

required
percentage_of_period_freedom float

Specifies the maximum percentage (0–100) of binary values within each period that can deviate as "free variables", chosen by the solver (default is 0). This allows binary variables to be 'partly equated' between aggregated periods.

0
penalty_of_period_freedom float

The penalty associated with each "free variable"; defaults to 0. Added to Penalty

0
time_series_for_high_peaks list[TimeSeriesData] | None

List of TimeSeriesData to use for explicitly selecting periods with high values.

None
time_series_for_low_peaks list[TimeSeriesData] | None

List of TimeSeriesData to use for explicitly selecting periods with low values.

None

Functions

AggregationModel

AggregationModel(model: FlowSystemModel, aggregation_parameters: AggregationParameters, flow_system: FlowSystem, aggregation_data: Aggregation, components_to_clusterize: list[Component] | None)

Bases: Submodel

The AggregationModel holds equations and variables related to the Aggregation of a FlowSystem. It creates Equations that equates indices of variables, and introduces penalties related to binary variables, that escape the equation to their related binaries in other periods

Modeling-Element for "index-equating"-equations

Attributes

all_submodels property
all_submodels: list[Submodel]

Get all submodels including nested ones recursively.

variables_direct property
variables_direct: Variables

Variables of the model, excluding those of sub-models

constraints_direct property
constraints_direct: Constraints

Constraints of the model, excluding those of sub-models

constraints property
constraints: Constraints

All constraints of the model, including those of all sub-models

variables property
variables: Variables

All variables of the model, including those of all sub-models

Functions

add_submodels
add_submodels(submodel: Submodel, short_name: str = None) -> Submodel

Register a sub-model with the model

add_variables
add_variables(short_name: str = None, **kwargs) -> linopy.Variable

Create and register a variable in one step

add_constraints
add_constraints(expression, short_name: str = None, **kwargs) -> linopy.Constraint

Create and register a constraint in one step

register_variable
register_variable(variable: Variable, short_name: str = None) -> linopy.Variable

Register a variable with the model

register_constraint
register_constraint(constraint: Constraint, short_name: str = None) -> linopy.Constraint

Register a constraint with the model

get
get(name: str, default=None)

Get variable by short name, returning default if not found