Source code for TFilterPy.utils.optimisation_utils

import numpy as np
import dask.array as da

from TFilterPy.base_estimator import BaseEstimator


[docs] class ParameterEstimator(BaseEstimator): r""" A class for estimating Kalman Filter parameters, such as process noise covariance (Q) and observation noise covariance (R), using different estimation methods. Parameter estimation is crucial for optimal performance of Kalman Filters, particularly in dynamic systems where noise characteristics are unknown or change over time. Estimation strategies include: - Residual Analysis - Maximum Likelihood Estimation (MLE) - Cross-Validation - Adaptive Filtering References: - Welch, G., & Bishop, G. (1995). An Introduction to the Kalman Filter. - Haykin, S. (2001). Kalman Filtering and Neural Networks. """ def __init__(self, estimation_strategy: str = "residual_analysis"): r""" Initialize the ParameterEstimator with the desired estimation strategy. Parameters ---------- estimation_strategy : str, optional The strategy to use for estimating parameters. Options include: - "residual_analysis": Estimate based on residuals after running the filter. - "mle": Maximum Likelihood Estimation (iterative approach). - "cross_validation": Perform k-fold cross-validation for best Q and R. - "adaptive_filtering": Dynamically update Q and R based on measurement residuals. Raises ------ ValueError If an invalid estimation strategy is specified. """ super().__init__(name="KalmanFilter") valid_strategies = { "residual_analysis", "mle", "cross_validation", "adaptive_filtering", } if estimation_strategy not in valid_strategies: raise ValueError( f"Invalid strategy: {estimation_strategy}. Must be one of {valid_strategies}." ) self.estimation_strategy = estimation_strategy
[docs] def estimate_with_residual_analysis(self, measurements: da.Array) -> tuple: r""" Estimate process (Q) and observation (R) noise covariances using residual analysis. Parameters ---------- measurements : da.Array, shape (n_timesteps, n_features) Observed measurements over time. Returns ------- Q : da.Array, shape (n_features, n_features) Estimated process noise covariance matrix. R : da.Array, shape (n_features, n_features) Estimated observation noise covariance matrix. Notes ----- - Residual analysis involves analyzing the discrepancies between measurements and predictions. - Q is derived from the covariance of the residuals, while R is scaled based on residual variance. """ state_estimates, residuals = self.run_filter( measurements ) # Call `run_filter` from the child class Q = da.cov(residuals, rowvar=False) R = da.eye(self.R.shape[0], chunks=self.R.chunks[0][0]) * da.var(residuals) return Q, R
[docs] def estimate_with_mle( self, measurements: da.Array, max_iterations: int = 5 ) -> tuple: r""" Estimate Q and R using Maximum Likelihood Estimation (MLE). Parameters ---------- measurements : da.Array, shape (n_timesteps, n_features) Observed measurements over time. max_iterations : int, optional Number of iterations to refine the estimates. Default is 5. Returns ------- Q : da.Array, shape (n_features, n_features) Estimated process noise covariance matrix. R : da.Array, shape (n_features, n_features) Estimated observation noise covariance matrix. Notes ----- - MLE iteratively adjusts Q and R by maximizing the likelihood of the observed measurements. - The process is repeated for `max_iterations` to converge towards optimal estimates. """ Q, R = self.Q, self.R for _ in range(max_iterations): state_estimates, residuals = self.run_filter(measurements) Q_new = da.eye(residuals.shape[1], chunks=residuals.chunks[1][0]) * da.var( residuals, axis=0 ) R_new = da.eye(self.R.shape[0], chunks=self.R.chunks[0][0]) * da.var( residuals ) Q, R = Q_new.persist(), R_new.persist() return Q, R
[docs] def estimate_with_cross_validation( self, measurements: da.Array, k_folds: int = 5 ) -> tuple: r""" Estimate Q and R using k-fold cross-validation. Parameters ---------- measurements : da.Array, shape (n_timesteps, n_features) Observed measurements over time. k_folds : int, optional Number of folds to split the data for cross-validation. Default is 5. Returns ------- best_Q : da.Array, shape (n_features, n_features) Best estimated process noise covariance matrix. best_R : da.Array, shape (n_features, n_features) Best estimated observation noise covariance matrix. Notes ----- - The data is split into k-folds. The model is trained on k-1 folds and validated on the remaining fold. - The fold with the lowest validation error determines the best Q and R estimates. """ n_samples = measurements.shape[0] fold_size = n_samples // k_folds fold_scores, fold_Qs, fold_Rs = [], [], [] for fold in range(k_folds): val_start = fold * fold_size val_end = val_start + fold_size train_data = da.concatenate( [measurements[:val_start], measurements[val_end:]] ) val_data = measurements[val_start:val_end] state_estimates, residuals = self.run_filter(train_data) val_score = da.mean((val_data - state_estimates[:fold_size]) ** 2) fold_scores.append(val_score) Q = da.cov(residuals.T) R = da.eye(self.R.shape[0], chunks=self.R.chunks[0][0]) * da.var(residuals) fold_Qs.append(Q) fold_Rs.append(R) fold_scores, fold_Qs, fold_Rs = da.compute(fold_scores, fold_Qs, fold_Rs) best_fold = np.argmin(fold_scores) return fold_Qs[best_fold], fold_Rs[best_fold]
[docs] def estimate_with_adaptive_filtering(self, measurements: da.Array) -> tuple: r""" Estimate Q and R adaptively based on innovations over time. Parameters ---------- measurements : da.Array, shape (n_timesteps, n_features) Observed measurements over time. Returns ------- Q : da.Array, shape (n_features, n_features) Estimated process noise covariance matrix. R : da.Array, shape (n_features, n_features) Estimated observation noise covariance matrix. Notes ----- - Innovations (differences between predictions and observations) are used to update Q and R adaptively. - A smoothing factor (alpha) determines the influence of recent measurements. """ Q, R = self.Q, self.R alpha = 0.01 innovations = measurements - da.dot(self.H, self.x) outer_products = da.einsum("ij,ik->ijk", innovations, innovations) Q_new = da.mean(outer_products, axis=0).rechunk("auto") R_new = da.var(innovations, axis=0).rechunk("auto") Q = (1 - alpha) * Q + alpha * Q_new R = (1 - alpha) * R + alpha * R_new return Q, R
[docs] def estimate_parameters(self, measurements: da.Array): r""" Estimate the parameters Q and R using the specified estimation strategy. """ return getattr(self, f"estimate_with_{self.estimation_strategy}")(measurements)