Skip to content

flixopt.transform_accessor

Transform accessor for FlowSystem.

This module provides the TransformAccessor class that enables transformations on FlowSystem like clustering, selection, and resampling.

Attributes

Classes

TransformAccessor

TransformAccessor(flow_system: FlowSystem)

Accessor for transformation methods on FlowSystem.

This class provides transformations that create new FlowSystem instances with modified structure or data, accessible via flow_system.transform.

Examples:

Time series aggregation (8 typical days):

>>> reduced_fs = flow_system.transform.cluster(n_clusters=8, cluster_duration='1D')
>>> reduced_fs.optimize(solver)
>>> expanded_fs = reduced_fs.transform.expand()

Future MGA:

>>> mga_fs = flow_system.transform.mga(alternatives=5)
>>> mga_fs.optimize(solver)

Initialize the accessor with a reference to the FlowSystem.

Parameters:

Name Type Description Default
flow_system FlowSystem

The FlowSystem to transform.

required

Functions

sel
sel(time: str | slice | list[str] | Timestamp | DatetimeIndex | None = None, period: int | slice | list[int] | Index | None = None, scenario: str | slice | list[str] | Index | None = None) -> FlowSystem

Select a subset of the FlowSystem by label.

Creates a new FlowSystem with data selected along the specified dimensions. The returned FlowSystem has no solution (it must be re-optimized).

Parameters:

Name Type Description Default
time str | slice | list[str] | Timestamp | DatetimeIndex | None

Time selection (e.g., slice('2023-01-01', '2023-12-31'), '2023-06-15')

None
period int | slice | list[int] | Index | None

Period selection (e.g., slice(2023, 2024), or list of periods)

None
scenario str | slice | list[str] | Index | None

Scenario selection (e.g., 'scenario1', or list of scenarios)

None

Returns:

Name Type Description
FlowSystem FlowSystem

New FlowSystem with selected data (no solution).

Examples:

>>> # Select specific time range
>>> fs_jan = flow_system.transform.sel(time=slice('2023-01-01', '2023-01-31'))
>>> fs_jan.optimize(solver)
>>> # Select single scenario
>>> fs_base = flow_system.transform.sel(scenario='Base Case')
isel
isel(time: int | slice | list[int] | None = None, period: int | slice | list[int] | None = None, scenario: int | slice | list[int] | None = None) -> FlowSystem

Select a subset of the FlowSystem by integer indices.

Creates a new FlowSystem with data selected along the specified dimensions. The returned FlowSystem has no solution (it must be re-optimized).

Parameters:

Name Type Description Default
time int | slice | list[int] | None

Time selection by integer index (e.g., slice(0, 100), 50, or [0, 5, 10])

None
period int | slice | list[int] | None

Period selection by integer index

None
scenario int | slice | list[int] | None

Scenario selection by integer index

None

Returns:

Name Type Description
FlowSystem FlowSystem

New FlowSystem with selected data (no solution).

Examples:

>>> # Select first 24 timesteps
>>> fs_day1 = flow_system.transform.isel(time=slice(0, 24))
>>> fs_day1.optimize(solver)
>>> # Select first scenario
>>> fs_first = flow_system.transform.isel(scenario=0)
resample
resample(time: str, method: Literal['mean', 'sum', 'max', 'min', 'first', 'last', 'std', 'var', 'median', 'count'] = 'mean', hours_of_last_timestep: int | float | None = None, hours_of_previous_timesteps: int | float | ndarray | None = None, fill_gaps: Literal['ffill', 'bfill', 'interpolate'] | None = None, **kwargs: Any) -> FlowSystem

Create a resampled FlowSystem by resampling data along the time dimension.

Creates a new FlowSystem with resampled time series data. The returned FlowSystem has no solution (it must be re-optimized).

Parameters:

Name Type Description Default
time str

Resampling frequency (e.g., '3h', '2D', '1M')

required
method Literal['mean', 'sum', 'max', 'min', 'first', 'last', 'std', 'var', 'median', 'count']

Resampling method. Recommended: 'mean', 'first', 'last', 'max', 'min'

'mean'
hours_of_last_timestep int | float | None

Duration of the last timestep after resampling. If None, computed from the last time interval.

None
hours_of_previous_timesteps int | float | ndarray | None

Duration of previous timesteps after resampling. If None, computed from the first time interval. Can be a scalar or array.

None
fill_gaps Literal['ffill', 'bfill', 'interpolate'] | None

Strategy for filling gaps (NaN values) that arise when resampling irregular timesteps to regular intervals. Options: 'ffill' (forward fill), 'bfill' (backward fill), 'interpolate' (linear interpolation). If None (default), raises an error when gaps are detected.

None
**kwargs Any

Additional arguments passed to xarray.resample()

{}

Returns:

Name Type Description
FlowSystem FlowSystem

New resampled FlowSystem (no solution).

Raises:

Type Description
ValueError

If resampling creates gaps and fill_gaps is not specified.

Examples:

>>> # Resample to 4-hour intervals
>>> fs_4h = flow_system.transform.resample(time='4h', method='mean')
>>> fs_4h.optimize(solver)
>>> # Resample to daily with max values
>>> fs_daily = flow_system.transform.resample(time='1D', method='max')
fix_sizes
fix_sizes(sizes: Dataset | dict[str, float] | None = None, decimal_rounding: int | None = 5) -> FlowSystem

Create a new FlowSystem with investment sizes fixed to specified values.

This is useful for two-stage optimization workflows: 1. Solve a sizing problem (possibly resampled for speed) 2. Fix sizes and solve dispatch at full resolution

The returned FlowSystem has InvestParameters with fixed_size set, making those sizes mandatory rather than decision variables.

Parameters:

Name Type Description Default
sizes Dataset | dict[str, float] | None

The sizes to fix. Can be: - None: Uses sizes from this FlowSystem's solution (must be solved) - xr.Dataset: Dataset with size variables (e.g., from statistics.sizes) - dict: Mapping of component names to sizes (e.g., {'Boiler(Q_fu)': 100})

None
decimal_rounding int | None

Number of decimal places to round sizes to. Rounding helps avoid numerical infeasibility. Set to None to disable.

5

Returns:

Name Type Description
FlowSystem FlowSystem

New FlowSystem with fixed sizes (no solution).

Raises:

Type Description
ValueError

If no sizes provided and FlowSystem has no solution.

KeyError

If a specified size doesn't match any InvestParameters.

Examples:

Two-stage optimization:

>>> # Stage 1: Size with resampled data
>>> fs_sizing = flow_system.transform.resample('2h')
>>> fs_sizing.optimize(solver)
>>>
>>> # Stage 2: Fix sizes and optimize at full resolution
>>> fs_dispatch = flow_system.transform.fix_sizes(fs_sizing.stats.sizes)
>>> fs_dispatch.optimize(solver)

Using a dict:

>>> fs_fixed = flow_system.transform.fix_sizes(
...     {
...         'Boiler(Q_fu)': 100,
...         'Storage': 500,
...     }
... )
>>> fs_fixed.optimize(solver)
clustering_data
clustering_data(period: Any | None = None, scenario: Any | None = None) -> xr.Dataset

Get the time-varying data that would be used for clustering.

This method extracts only the data arrays that vary over time, which is the data that clustering algorithms use to identify typical periods. Constant arrays (same value for all timesteps) are excluded since they don't contribute to pattern identification.

Use this to inspect or pre-process the data before clustering, or to understand which variables influence the clustering result.

Parameters:

Name Type Description Default
period Any | None

Optional period label to select. If None and the FlowSystem has multiple periods, returns data for all periods.

None
scenario Any | None

Optional scenario label to select. If None and the FlowSystem has multiple scenarios, returns data for all scenarios.

None

Returns:

Type Description
Dataset

xr.Dataset containing only time-varying data arrays. The dataset

Dataset

includes arrays like demand profiles, price profiles, and other

Dataset

time series that vary over the time dimension.

Examples:

Inspect clustering input data:

>>> data = flow_system.transform.clustering_data()
>>> print(f'Variables used for clustering: {list(data.data_vars)}')
>>> data['HeatDemand(Q)|fixed_relative_profile'].plot()

Get data for a specific period/scenario:

>>> data_2024 = flow_system.transform.clustering_data(period=2024)
>>> data_high = flow_system.transform.clustering_data(scenario='high')

Convert to DataFrame for external tools:

>>> df = flow_system.transform.clustering_data().to_dataframe()
cluster
cluster(n_clusters: int, cluster_duration: str | float, data_vars: list[str] | None = None, cluster: ClusterConfig | None = None, extremes: ExtremeConfig | None = None, segments: SegmentConfig | None = None, preserve_column_means: bool = True, rescale_exclude_columns: list[str] | None = None, round_decimals: int | None = None, numerical_tolerance: float = 1e-13, **tsam_kwargs: Any) -> FlowSystem

Create a FlowSystem with reduced timesteps using typical clusters.

This method creates a new FlowSystem optimized for sizing studies by reducing the number of timesteps to only the typical (representative) clusters identified through time series aggregation using the tsam package.

The method: 1. Performs time series clustering using tsam (hierarchical by default) 2. Extracts only the typical clusters (not all original timesteps) 3. Applies timestep weighting for accurate cost representation 4. Handles storage states between clusters based on each Storage's cluster_mode

Use this for initial sizing optimization, then use fix_sizes() to re-optimize at full resolution for accurate dispatch results.

To reuse an existing clustering on different data, use apply_clustering() instead.

Parameters:

Name Type Description Default
n_clusters int

Number of clusters (typical periods) to extract (e.g., 8 typical days).

required
cluster_duration str | float

Duration of each cluster. Can be a pandas-style string ('1D', '24h', '6h') or a numeric value in hours.

required
data_vars list[str] | None

Optional list of variable names to use for clustering. If specified, only these variables are used to determine cluster assignments, but the clustering is then applied to ALL time-varying data in the FlowSystem. Use transform.clustering_data() to see available variables. Example: data_vars=['HeatDemand(Q)|fixed_relative_profile'] to cluster based only on heat demand patterns.

None
cluster ClusterConfig | None

Optional tsam ClusterConfig object specifying clustering algorithm, representation method, and weights. If None, uses default settings (hierarchical clustering with medoid representation) and automatically calculated weights based on data variance.

None
extremes ExtremeConfig | None

Optional tsam ExtremeConfig object specifying how to handle extreme periods (peaks). Use this to ensure peak demand days are captured. Example: ExtremeConfig(method='new_cluster', max_value=['demand']).

None
segments SegmentConfig | None

Optional tsam SegmentConfig object specifying intra-period segmentation. Segments divide each cluster period into variable-duration sub-segments. Example: SegmentConfig(n_segments=4).

None
preserve_column_means bool

Rescale typical periods so each column's weighted mean matches the original data's mean. Ensures total energy/load is preserved when weights represent occurrence counts. Default is True.

True
rescale_exclude_columns list[str] | None

Column names to exclude from rescaling when preserve_column_means=True. Useful for binary/indicator columns (0/1 values) that should not be rescaled.

None
round_decimals int | None

Round output values to this many decimal places. If None (default), no rounding is applied.

None
numerical_tolerance float

Tolerance for numerical precision issues. Controls when warnings are raised for aggregated values exceeding original time series bounds. Default is 1e-13.

1e-13
**tsam_kwargs Any

Additional keyword arguments passed to tsam.aggregate() for forward compatibility. See tsam documentation for all options.

{}

Returns:

Type Description
FlowSystem

A new FlowSystem with reduced timesteps (only typical clusters).

FlowSystem

The FlowSystem has metadata stored in clustering for expansion.

Raises:

Type Description
ValueError

If timestep sizes are inconsistent.

ValueError

If cluster_duration is not a multiple of timestep size.

Examples:

Basic clustering with peak preservation:

>>> from tsam import ExtremeConfig
>>> fs_clustered = flow_system.transform.cluster(
...     n_clusters=8,
...     cluster_duration='1D',
...     extremes=ExtremeConfig(
...         method='new_cluster',
...         max_value=['HeatDemand(Q_th)|fixed_relative_profile'],
...     ),
... )
>>> fs_clustered.optimize(solver)

Clustering based on specific variables only:

>>> # See available variables for clustering
>>> print(flow_system.transform.clustering_data().data_vars)
>>>
>>> # Cluster based only on demand profile
>>> fs_clustered = flow_system.transform.cluster(
...     n_clusters=8,
...     cluster_duration='1D',
...     data_vars=['HeatDemand(Q)|fixed_relative_profile'],
... )
Note
  • This is best suited for initial sizing, not final dispatch optimization
  • Use extremes to ensure peak demand clusters are captured
  • A 5-10% safety margin on sizes is recommended for the dispatch stage
  • For seasonal storage (e.g., hydrogen, thermal storage), set Storage.cluster_mode='intercluster' or 'intercluster_cyclic'
apply_clustering
apply_clustering(clustering: Clustering) -> FlowSystem

Apply an existing clustering to this FlowSystem.

This method applies a previously computed clustering (from another FlowSystem) to the current FlowSystem's data. The clustering structure (cluster assignments, number of clusters, etc.) is preserved while the time series data is aggregated according to the existing cluster assignments.

Use this to: - Compare different scenarios with identical cluster assignments - Apply a reference clustering to new data

Parameters:

Name Type Description Default
clustering Clustering

A Clustering object from a previously clustered FlowSystem. Obtain this via fs.clustering from a clustered FlowSystem.

required

Returns:

Type Description
FlowSystem

A new FlowSystem with reduced timesteps (only typical clusters).

FlowSystem

The FlowSystem has metadata stored in clustering for expansion.

Raises:

Type Description
ValueError

If the clustering dimensions don't match this FlowSystem's periods/scenarios.

Examples:

Apply clustering from one FlowSystem to another:

>>> fs_reference = fs_base.transform.cluster(n_clusters=8, cluster_duration='1D')
>>> fs_other = fs_high.transform.apply_clustering(fs_reference.clustering)
expand
expand() -> FlowSystem

Expand a clustered FlowSystem back to full original timesteps.

After solving a FlowSystem created with cluster(), this method disaggregates the FlowSystem by: 1. Expanding all time series data from typical clusters to full timesteps 2. Expanding the solution by mapping each typical cluster back to all original clusters it represents

For FlowSystems with periods and/or scenarios, each (period, scenario) combination is expanded using its own cluster assignment.

This enables using all existing solution accessors (statistics, plot, etc.) with full time resolution, where both the data and solution are consistently expanded from the typical clusters.

Returns:

Name Type Description
FlowSystem FlowSystem

A new FlowSystem with full timesteps and expanded solution.

Raises:

Type Description
ValueError

If the FlowSystem was not created with cluster().

ValueError

If the FlowSystem has no solution.

Examples:

Two-stage optimization with expansion:

>>> # Stage 1: Size with reduced timesteps
>>> fs_reduced = flow_system.transform.cluster(
...     n_clusters=8,
...     cluster_duration='1D',
... )
>>> fs_reduced.optimize(solver)
>>>
>>> # Expand to full resolution FlowSystem
>>> fs_expanded = fs_reduced.transform.expand()
>>>
>>> # Use all existing accessors with full timesteps
>>> fs_expanded.stats.flow_rates  # Full 8760 timesteps
>>> fs_expanded.stats.plot.balance('HeatBus')  # Full resolution plots
>>> fs_expanded.stats.plot.heatmap('Boiler(Q_th)|flow_rate')
Note

The expanded FlowSystem repeats the typical cluster values for all original clusters belonging to the same cluster. Both input data and solution are consistently expanded, so they match. This is an approximation - the actual dispatch at full resolution would differ due to intra-cluster variations in time series data.

For accurate dispatch results, use fix_sizes() to fix the sizes from the reduced optimization and re-optimize at full resolution.

Segmented Systems Variable Handling:

For systems clustered with SegmentConfig, special handling is applied to time-varying solution variables. Variables without a time dimension are unaffected by segment expansion. This includes:

  • Investment: {component}|size, {component}|exists
  • Storage boundaries: {storage}|SOC_boundary
  • Aggregated totals: {flow}|total_flow_hours, {flow}|active_hours
  • Effect totals: {effect}, {effect}(temporal), {effect}(periodic)

Time-varying variables are categorized and handled as follows:

  1. State variables - Interpolated within segments:

  2. {storage}|charge_state: Linear interpolation between segment boundary values to show the charge trajectory during charge/discharge.

  3. Segment totals - Divided by segment duration:

These variables represent values summed over the segment. Division converts them back to hourly rates for correct plotting and analysis.

  • {effect}(temporal)|per_timestep: Per-timestep effect contributions
  • {flow}->{effect}(temporal): Flow contributions (includes both effects_per_flow_hour and effects_per_startup)
  • {component}->{effect}(temporal): Component-level contributions
  • {source}(temporal)->{target}(temporal): Effect-to-effect shares

  • Rate/average variables - Expanded as-is:

These variables represent average values within the segment. tsam already provides properly averaged values, so no correction needed.

  • {flow}|flow_rate: Average flow rate during segment
  • {storage}|netto_discharge: Net discharge rate (discharge - charge)

  • Binary status variables - Constant within segment:

These variables cannot be meaningfully interpolated. The status indicates the dominant state during the segment.

  • {flow}|status: On/off status (0 or 1), repeated for all timesteps

  • Binary event variables (segmented systems only) - First timestep of segment:

For segmented systems, these variables indicate that an event occurred somewhere during the segment. When expanded, the event is placed at the first timestep of each segment, with zeros elsewhere. This preserves the total count of events while providing a reasonable temporal placement.

For non-segmented systems, the timing within the cluster is preserved by normal expansion (no special handling needed).

  • {flow}|startup: Startup event
  • {flow}|shutdown: Shutdown event