autokoopman.core package#

Submodules#

autokoopman.core.estimator module#

Model has:

  • a set of hyperparameters

class autokoopman.core.estimator.GradientEstimator(normalize=True, feature_range=(-1, 1), weights=None)#

Bases: TrajectoryEstimator

Estimator of discrete time dynamical systems

Requires that the data be uniform time

Parameters:
  • normalize – apply MinMax normalization to the fit data

  • feature_range – range for MinMax scaler

fit(X: TrajectoriesData) None#
abstract fit_gradient(X: ndarray, Y: ndarray, U: ndarray | None = None, weights: ndarray | None = None) None#

an alternative fit method that uses a trajectories data structure

class autokoopman.core.estimator.NextStepEstimator(normalize=True, feature_range=(-1, 1), weights=None)#

Bases: TrajectoryEstimator

Estimator of discrete time dynamical systems

Requires that the data be uniform time

Parameters:
  • normalize – apply MinMax normalization to the fit data

  • feature_range – range for MinMax scaler

fit(X: TrajectoriesData) None#
abstract fit_next_step(X: ndarray, Y: ndarray, U: ndarray | None = None, weights: ndarray | None = None) None#

an alternative fit method that uses a trajectories data structure

class autokoopman.core.estimator.TrajectoryEstimator(normalize=True, feature_range=(-1, 1))#

Bases: ABC

Trajectory based estimator base class

Parameters:
  • normalize – apply MinMax normalization to the fit data

  • feature_range – range for MinMax scaler

abstract fit(X: TrajectoriesData) None#
abstract property model: System#

autokoopman.core.format module#

Utilties to format strings

class autokoopman.core.format.hide_prints#

Bases: object

context manager to suppress python print output

this was created to hide “ugly stuff” (e.g., GPyOpt prints)

autokoopman.core.observables module#

autokoopman.core.system module#

class autokoopman.core.system.ContinuousSystem#

Bases: System

Continuous Time System

In this case, a CT system is a system whose evolution function is defined by a gradient.

abstract gradient(time: float, state: ndarray, sinput: ndarray | None) ndarray#
solve_ivp(initial_state: ndarray, tspan: Tuple[float, float] | None = None, teval: ndarray | None = None, inputs: ndarray | None = None, sampling_period: float = 0.1) Trajectory | UniformTimeTrajectory#
Solve the initial value (IV) problem for Continuous Time Systems

Given a system with a state space, specify how the system evolves with time given the initial conditions of the problem. The solution of a particular IV returns a trajectory over time teval if specified, or uniformly sampled over a time span given a sampling period.

A differential equation of the form \(\dot X_t = \operatorname{grad}(t, X_t)\).

Parameters:
  • initial_state – IV state of system

  • tspan – (if no teval is set) timespan to evolve system from iv (starting at tspan[0])

  • teval – (optional) values of time sample the IVP trajectory

  • sampling_period – (if no teval is set) sampling period of solution

Returns:

TimeTrajectory if teval is set, or UniformTimeTrajectory if not

solve_ivps(initial_states: ndarray, tspan: Tuple[float, float] | None = None, teval: ndarray | None = None, inputs: ndarray | None = None, sampling_period: float = 0.1) UniformTimeTrajectoriesData | TrajectoriesData#
class autokoopman.core.system.DiscreteSystem#

Bases: System

Discrete Time System

In this case, a CT system is a system whose evolution function is defined by a next step function. For IVP, the discrete time can be related to continuous time via a sampling period. This trajectory can be interpolated to evaluate time points nonuniformly.

TODO: should this have a sampling period instance member?

solve_ivp(initial_state: ndarray, tspan: Tuple[float, float] | None = None, teval: ndarray | None = None, inputs: ndarray | None = None, sampling_period: float = 0.1) Trajectory | UniformTimeTrajectory#
Solve the initial value (IV) problem for Discrete Time Systems

Given a system with a state space, specify how the system evolves with time given the initial conditions of the problem. The solution of a particular IV returns a trajectory over time teval if specified, or uniformly sampled over a time span given a sampling period.

A difference equation of the form \(X_{t+1} = \operatorname{step}(t, X_t)\).

Parameters:
  • initial_state – IV state of system

  • tspan – (if no teval is set) timespan to evolve system from iv (starting at tspan[0])

  • teval – (optional) values of time sample the IVP trajectory

  • sampling_period – (if no teval is set) sampling period of solution

Returns:

TimeTrajectory if teval is set, or UniformTimeTrajectory if not

abstract step(time: float, state: ndarray, sinput: ndarray | None) ndarray#
class autokoopman.core.system.GradientContinuousSystem(gradient_func: Callable[[float, ndarray, ndarray | None], ndarray], names)#

Bases: ContinuousSystem

gradient(time: float, state: ndarray, sinput: ndarray | None) ndarray#
property names#
class autokoopman.core.system.KoopmanContinuousSystem#

Bases: LinearContinuousSystem

class autokoopman.core.system.KoopmanGradientContinuousSystem(A, B, obs, names, dim=None, scaler=None)#

Bases: KoopmanSystem, GradientContinuousSystem

class autokoopman.core.system.KoopmanStepDiscreteSystem(A, B, obs, names, dim=None, scaler=None)#

Bases: KoopmanSystem, StepDiscreteSystem

class autokoopman.core.system.KoopmanSystem(A, B, obs, names, dim=None, scaler=None)#

Bases: object

mixin class for Koopman systems

property A#
property B#
property koopman_operator#
property obs_func#
class autokoopman.core.system.LinearContinuousSystem#

Bases: ContinuousSystem

class autokoopman.core.system.StepDiscreteSystem(step_func: Callable[[float, ndarray, ndarray | None], ndarray], names)#

Bases: DiscreteSystem

property names#
step(time: float, state: ndarray, sinput: ndarray | None) ndarray#
class autokoopman.core.system.SymbolicContinuousSystem(variables: Sequence[Symbol], gradient_exprs: Sequence[Expr], input_variables: Sequence[Symbol] | None = None, time_var=None)#

Bases: ContinuousSystem

gradient(time: float, state: ndarray, sinput: ndarray | None) ndarray#
property names: Sequence[str]#
class autokoopman.core.system.System#

Bases: ABC

property dimension: int#
abstract property names: Sequence[str]#
abstract solve_ivp(initial_state: ndarray, tspan: Tuple[float, float] | None = None, teval: ndarray | None = None, inputs: ndarray | None = None, sampling_period: float = 0.1) Trajectory | UniformTimeTrajectory#
Solve the initial value (IV) problem

Given a system with a state space, specify how the system evolves with time given the initial conditions of the problem. The solution of a particular IV returns a trajectory over time teval if specified, or uniformly sampled over a time span given a sampling period.

Parameters:
  • initial_state – IV state of system

  • tspan – (if no teval is set) timespan to evolve system from iv (starting at tspan[0])

  • teval – (optional) values of time sample the IVP trajectory

  • sampling_period – (if no teval is set) sampling period of solution

Returns:

TimeTrajectory if teval is set, or UniformTimeTrajectory if not

solve_ivps(initial_states: ndarray, tspan: Tuple[float, float] | None = None, teval: ndarray | None = None, inputs: ndarray | None = None, sampling_period: float = 0.1) UniformTimeTrajectoriesData | TrajectoriesData#

autokoopman.core.trajectory module#

class autokoopman.core.trajectory.TrajectoriesData(trajs: Dict[Hashable, UniformTimeTrajectory | Trajectory])#

Bases: object

A Dataset of Trajectories (Multiple Time Series Vectors)

This is a data access object (DAO) for datasets used to learn dynamical systems from observed trajectories. The object supports

  • state / trajectory identifiers trajectories can be accessed and manipulated by name

  • iteration the trajectories can be iterated through

  • serialization trajectory datasets can be read and saved from CSV files and Pandas DataFrames

  • interpolation trajectories can be resampled in time (e.g. convert to uniform time for certain technique)

abs()#
static equal_lists(lists: Sequence[Sequence])#
List Equality

In a sequence of lists, determine if all list are equal to one another.

classmethod from_csv(fname: str, threshold=None, traj_id='id', time_id='time')#
Create TrajectoriesData DAO from CSV File

To be a valid CSV the file must have a “time” field to record the sample time and an “id” field to distinguish the different trajectories.

Parameters:
  • fname – CSV filename (path included).

  • threshold – trajectories threshold value.

  • traj_id – column name for trajectory identifier.

  • time_id – column name for time identifier.

Returns:

TrajectoriesData

Examples
import autokoopman.core.trajectory as atraj

trajs = atraj.TrajectoriesData.from_csv("./path/my_csv.csv")
classmethod from_pandas(data_df: DataFrame, threshold=None, traj_id='id', time_id='time') TrajectoriesData#
Create TrajectoriesData DAO from Pandas DataFrame

To be a valid DataFrame the object must have a “time” field to record the sample time and an “id” field to distinguish the different trajectories.

Parameters:
  • data_df – Pandas Dataframe to convert to trajectories.

  • threshold – trajectories threshold value.

  • traj_id – column name for trajectory identifier.

  • time_id – column name for time identifier.

Returns:

TrajectoriesData

Examples
import autokoopman.core.trajectory as atraj
import pandas as pd

# create example dataframe
t = np.linspace(0, 1.0, 10)
my_df = pd.DataFrame(
    columns=["time", "state", "id"],
    data=np.array([
        t,
        np.sin(t),
        np.ones(shape=t.shape)
    ]).T
)

# create TrajectoriesData from the DataFrame
trajs = atraj.TrajectoriesData.from_pandas(my_df)
get_trajectory(traj_id: Hashable) Trajectory#
property input_names: Sequence[str] | None#
interp_uniform_time(sampling_period) UniformTimeTrajectoriesData#
property n_trajs: int#
norm()#
property state_names: Sequence[str]#
time_id_hname = 'time'#
to_csv(fname: str)#

Convert to CSV File (serialization)

Examples
import autokoopman.benchmark.prde20 as prde

sys = prde.ProdDestr()
trajs = sys.solve_ivps(
    initial_states=[[9.98, 0.01, 0.01]],
    tspan=(0.0, 20.0),
    sampling_period=0.01
)
trajs.to_csv()
to_pandas() DataFrame#

Convert to Pandas DataFrame

Examples
import autokoopman.benchmark.prde20 as prde

sys = prde.ProdDestr()
trajs = sys.solve_ivps(
    initial_states=[[9.98, 0.01, 0.01]],
    tspan=(0.0, 20.0),
    sampling_period=0.01
)
trajs.to_pandas()
traj_id_hname = 'id'#
property traj_names: Set[Hashable]#
class autokoopman.core.trajectory.Trajectory(times: ndarray, states: ndarray, inputs: ndarray | None, state_names: Sequence[str] | None = None, input_names: Sequence[str] | None = None, threshold=None)#

Bases: object

a trajectory is a time series of vectors

abs() Trajectory#
property dimension: int#
property input_dimension: int | None#
property input_names: Sequence | None#
property inputs: ndarray | None#
interp1d(times) Trajectory#
interp_uniform_time(sampling_period) UniformTimeTrajectory#
property is_uniform_time: bool#

determine if a trajectory is uniform time

property names: Sequence#
norm(axis=1) Trajectory#
property size: int#
property states: ndarray#
property times: ndarray#
to_uniform_time_traj() UniformTimeTrajectory#
class autokoopman.core.trajectory.UniformTimeTrajectoriesData(trajs: Dict[Hashable, UniformTimeTrajectory | Trajectory])#

Bases: TrajectoriesData

a dataset of uniform time trajectories

property differentiate: Tuple[ndarray, ndarray, ndarray]#

Numerical Differentiation – Current State, Gradient State Estimate, Input

differentiate_weights(weights) Tuple[ndarray, ndarray, ndarray]#

Numerical Differentiation with Weights – Current State, Gradient State Estimate, Input, weight

classmethod from_pandas(data_df: DataFrame, threshold=None, time_id='time', traj_id='id')#
Create UniformTimeTrajectoriesData DAO from Pandas DataFrame

To be a valid DataFrame the object must have a “time” field to record the sample time and an “id” field to distinguish the different trajectories.

Parameters:
  • data_df – Pandas Dataframe to convert to trajectories.

  • threshold – trajectories threshold value.

  • traj_id – column name for trajectory identifier.

  • time_id – column name for time identifier.

Returns:

TrajectoriesData

Examples
import autokoopman.core.trajectory as atraj
import pandas as pd

# create example dataframe
t = np.linspace(0, 1.0, 10)
my_df = pd.DataFrame(
    columns=["time", "state", "id"],
    data=np.array([
        t,
        np.sin(t),
        np.ones(shape=t.shape)
    ]).T
)

# create UniformTimeTrajectoriesData from the DataFrame
trajs = atraj.UniformTimeTrajectoriesData.from_pandas(my_df)
n_step_matrices(nstep, weights=None) Tuple[ndarray, ndarray, ndarray | None]#
property next_step_matrices: Tuple[ndarray, ndarray, ndarray | None]#
Next Step Snapshot Matrices

Return the two “snapshot matrices” \(\mathbf X, \mathbf X'\) of observations \(\{x_1, x_2, ..., x_n \}\),

\[\mathbf X = \begin{bmatrix} x_1 && x_2 && ... && x_{n-1} \end{bmatrix}, \quad \mathbf X = \begin{bmatrix} x_2 && x_3 && ... && x_{n} \end{bmatrix}\]

Note this is useful for DMD techniques where the relationship

\[\mathbf X' \approx A \mathbf X\]

is used. Importantly, note that the observations are column vectors, which is counter to convention commonly used in vector processing.

Parameters:

nstep – number of time steps in the future

Returns:

tuple of (\(\mathbf X, \mathbf X'\)).

References

Brunton, S. L., & Kutz, J. N. (2022). Data-driven science and engineering: Machine learning, dynamical systems, and control. Cambridge University Press. pp 236-7

next_step_matrices_weights(weights, nstep=1) Tuple[ndarray, ndarray, ndarray | None]#

Weighted Next Step Snapshot Matrices

property sampling_period: float#
Sampling Period

A sampling period allows the trajectories to relate to discrete and continuous time signals. Given a discrete time signal \(x_d\), it relates to an analog signal \(x_a\) by a sampling time \(T_s\),

\[x_a(n T_s) = x_d(n)\]
class autokoopman.core.trajectory.UniformTimeTrajectory(states: ndarray, inputs: ndarray | None, sampling_period: float, state_names: Sequence[str] | None = None, input_names: Sequence[str] | None = None, start_time=0.0, threshold=None)#

Bases: Trajectory

uniform time is a trajectory advanced by a sampling period

property is_uniform_time: bool#

determine if a trajectory is uniform time

property sampling_frequency: float#
property sampling_period: float#
Sampling Period

A sampling period allows the trajectories to relate to discrete and continuous time signals. Given a discrete time signal \(x_d\), it relates to an analog signal \(x_a\) by a sampling time \(T_s\),

\[x_a(n T_s) = x_d(n)\]

autokoopman.core.tuner module#

Model Tuning

class autokoopman.core.tuner.ContinuousParameter(name: str, domain_lower, domain_upper, distribution='uniform')#

Bases: Parameter

is_member(item) bool#
static loguniform(low=0.1, high=1, size=None)#
random()#
static uniform(low=0, high=1, size=None)#
class autokoopman.core.tuner.DiscreteParameter(name: str, domain_lower: int, domain_upper: int, step=1)#

Bases: FiniteParameter

class autokoopman.core.tuner.FiniteParameter(name: str, elements: Sequence)#

Bases: Parameter

is_member(item) bool#
random()#
class autokoopman.core.tuner.HyperparameterMap(parameter_space: ParameterSpace)#

Bases: object

define and associate a hyperparameter space with a moddel

abstract get_model(hyperparams: Sequence) TrajectoryEstimator#
class autokoopman.core.tuner.HyperparameterTuner(parameter_model: HyperparameterMap, training_data: TrajectoriesData, n_splits=None, display_progress: bool = True, verbose: bool = True)#

Bases: ABC

Tune a HyperparameterMap Object

The tuner implements a model learning pipeline that can take a mapping from hyperparameters to a model and training data and learn an optimized model. For example, given a SINDy estimator, hyperparameter tuner can implement gridsearch to find the best sparsity threshold and library type.

static generate_predictions(trained_model: TrajectoryEstimator, holdout_data: TrajectoriesData)#
abstract tune(nattempts=100, scoring_func: ~typing.Callable[[~autokoopman.core.trajectory.TrajectoriesData, ~autokoopman.core.trajectory.TrajectoriesData], float] = <function TrajectoryScoring.total_score>) TuneResults#
tune_sampling(nattempts, scoring_func: Callable[[TrajectoriesData, TrajectoriesData], float])#
class autokoopman.core.tuner.Parameter(name)#

Bases: object

abstract is_member(item) bool#
property name#
abstract random()#
class autokoopman.core.tuner.ParameterSpace(name: str, coords: Sequence[Parameter])#

Bases: Parameter

property dimension#
is_member(item) bool#
random()#
class autokoopman.core.tuner.TuneResults#

Bases: TypedDict

model: TrajectoryEstimator#
param: Sequence[Any]#
score: float#

autokoopman.core.visualizer module#

Module contents#