Source code for mud.base

import pickle
from typing import Callable, List, Optional, Union

import numpy as np
from matplotlib import pyplot as plt  # type: ignore
from numpy.typing import ArrayLike
from scipy.stats import distributions as dist  # type: ignore
from scipy.stats import gaussian_kde as gkde  # type: ignore
from scipy.stats import rv_continuous  # type: ignore
from scipy.stats.contingency import margins  # type: ignore

from mud.plot import plot_dist, plot_vert_line
from mud.preprocessing import pca, svd
from mud.util import add_noise, fit_domain, make_2d_unit_mesh, null_space, set_shape

try:
    import xarray as xr  # type: ignore

    xr_avial = True
except ModuleNotFoundError:
    xr_avail = False
    pass


[docs]class DensityProblem(object): """ Sets up Data-Consistent Inverse Problem for parameter identification Data-Consistent inversion is a way to infer most likely model parameters using observed data and predicted data from the model. Attributes ---------- X : ArrayLike Array containing parameter samples from an initial distribution. Rows represent each sample while columns represent parameter values. If 1 dimensional input is passed, assumed that it represents repeated samples of a 1-dimensional parameter. y : ArrayLike Array containing push-forward values of parameters samples through the forward model. These samples will form the `predicted distribution`. domain : ArrayLike Array containing ranges of each parameter value in the parameter space. Note that the number of rows must equal the number of parameters, and the number of columns must always be two, for min/max range. weights : ArrayLike, optional Weights to apply to each parameter sample. Either a 1D array of the same length as number of samples or a 2D array if more than one set of weights is to be incorporated. If so the weights will be multiplied and normalized row-wise, so the number of columns must match the number of samples. Examples ------------- Generate test 1-D parameter estimation problem. Model to produce predicted data is the identity map and observed signal comes from true value plus some random gaussian nose. See :meth:`mud.examples.identity_uniform_1D_density_prob` for more details >>> from mud.examples.simple import identity_1D_density_prob as I1D First we set up a well-posed problem. Note the domain we are looking over contains our true value. We take 1000 samples, use 50 observations, assuming a true value of 0.5 populated with gaussian noise :math:`\\mathcal{N}(0,0.5)`. Or initial uniform distribution is taken from a :math:`[0,1]` range. >>> D = I1D(1000, 50, 0.5, 0.05, domain=[0,1]) Estimate mud_point -> Note since WME map used, observed implied to be the standard normal distribution and does not have to be set explicitly from observed data set. >>> np.round(D.mud_point()[0],1) 0.5 Expectation value of r, ratio of observed and predicted distribution, should be near 1 if predictability assumption is satisfied. >>> np.round(D.expected_ratio(),0) 1.0 Set up ill-posed problem -> Searching out of range of true value >>> D = I1D(1000, 50, 0.5, 0.05, domain=[0.6,1]) Mud point will be close as we can get within the range we are searching for >>> np.round(D.mud_point()[0],1) 0.6 Expectation of r is close to zero since predictability assumption violated. >>> np.round(D.expected_ratio(),1) 0.0 """ def __init__( self, X: ArrayLike, y: ArrayLike, domain: Optional[ArrayLike] = None, weights: Optional[ArrayLike] = None, normalize: bool = False, pad_domain: float = 0.1, ): self.X = set_shape(np.array(X), (1, -1)) self.y = set_shape(np.array(y), (-1, 1)) # These will be updated in set_ and fit() functions self._r = None # Ratio of observed to predicted self._up = None # Updated values self._in = None # Initial values self._pr = None # Predicted values self._ob = None # Observed values self._in_dist = None # Initial distribution self._pr_dist = None # Predicted distribution self._ob_dist = None # Observed distribution if domain is not None: # Assert domain passed in is consistent with data array self.domain = set_shape(np.array(domain), (1, -1)) assert ( self.domain.shape[0] == self.n_params ), f"Size mismatch: domain: {self.domain.shape}, params: {self.X.shape}" else: self.domain = fit_domain(self.X) # Initialize weights self.set_weights(weights, normalize=normalize) @property def n_params(self): return self.X.shape[1] @property def n_features(self): return self.y.shape[1] @property def n_samples(self): return self.y.shape[0]
[docs] def set_weights(self, weights: Optional[ArrayLike] = None, normalize: bool = False): """Set Sample Weights Sets the weights to use for each sample. Note weights can be one or two dimensional. If weights are two dimensional the weights are combined by multiplying them row wise and normalizing, to give one weight per sample. This combining of weights allows incorporating multiple sets of weights from different sources of prior belief. Parameters ---------- weights : np.ndarray, List Numpy array or list of same length as the `n_samples` or if two dimensional, number of columns should match `n_samples` normalize : bool, default=False Whether to normalize the weights vector. Returns ------- Warnings -------- Resetting weights will delete the predicted and updated distribution values in the class, requiring a re-run of adequate `set_` methods and/or `fit()` to reproduce with new weights. """ if weights is None: w = np.ones(self.X.shape[0]) else: w = np.array(weights) # Reshape to 2D w = w.reshape(1, -1) if w.ndim == 1 else w # assert appropriate size assert self.n_samples == w.shape[1], f"`weights` must size {self.n_samples}" # Multiply weights column wise for stacked weights w = np.prod(w, axis=0) if normalize: w = np.divide(w, np.linalg.norm(w)) self._weights = w self._pr = None self._up = None self._pr_dist = None
[docs] def set_observed(self, distribution: rv_continuous = dist.norm()): """Set distribution for the observed data. The observed distribution is determined from assumptions on the collected data. In the case of using a weighted mean error map on sequential data from a single output, the distribution is stationary with respect to the number data points collected and will always be the standard normal d distribution $N(0,1)$. Parameters ---------- distribution : scipy.stats.rv_continuous, default=scipy.stats.norm() scipy.stats continuous distribution like object representing the likelihood of observed data. Defaults to a standard normal distribution N(0,1). """ self._ob_dist = distribution self._ob = distribution.pdf(self.y).prod(axis=1)
[docs] def set_initial(self, distribution: Optional[rv_continuous] = None): """ Set initial probability distribution of model parameter values :math:`\\pi_{in}(\\lambda)`. Parameters ---------- distribution : scipy.stats.rv_continuous, optional scipy.stats continuous distribution object from where initial parameter samples were drawn from. If none provided, then a uniform distribution over domain of the density problem is assumed. If no domain is specified for density, then a standard normal distribution :math:`N(0,1)` is assumed. Warnings -------- Setting initial distribution resets the predicted and updated distributions, so make sure to set the initial first. """ if distribution is None: # assume standard normal by default if self.domain is not None: # assume uniform if domain specified mn = np.min(self.domain, axis=1) mx = np.max(self.domain, axis=1) distribution = dist.uniform(loc=mn, scale=mx - mn) else: distribution = dist.norm() self._in = distribution.pdf(self.X).prod(axis=1) self._in_dist = distribution self._up = None self._pr = None self._pr_dist = None
[docs] def set_predicted( self, distribution: Optional[rv_continuous] = None, bw_method: Optional[Union[str, Callable, np.generic]] = None, weights: Optional[ArrayLike] = None, **kwargs, ): """ Set Predicted Distribution The predicted distribution over the observable space is equal to the push-forward of the initial through the model :math:`\\pi_{pr}(Q(\\lambda)`. If no distribution is passed, :class:`scipy.stats.gaussian_kde` is used over the predicted values :attr:`y` to estimate the predicted distribution. Parameters ---------- distribution : :class:`scipy.stats.rv_continuous`, optional If specified, used as the predicted distribution instead of the default of using gaussian kernel density estimation on observed values y. This should be a frozen distribution if using `scipy`, and otherwise be a class containing a `pdf()` method return the probability density value for an array of values. bw_method : str, scalar, or `Callable`, optional Method to use to calculate estimator bandwidth. Only used if distribution is not specified, See documentation for :class:`scipy.stats.gaussian_kde` for more information. weights : ArrayLike, optional Weights to use on predicted samples. Note that if specified, :meth:`set_weights` will be run first to calculate new weights. Otherwise, whatever was previously set as the weights is used. Note this defaults to a weights vector of all 1s for every sample in the case that no weights were passed on upon initialization. **kwargs: dict, optional If specified, any extra keyword arguments will be passed along to the passed ``distribution.pdf()`` function for computing values of predicted samples. Note: `distribution` should be a frozen distribution if using `scipy`. Warnings -------- If passing a `distribution` argument, make sure that the initial distribution has been set first, either by having run :meth:`set_initial` or :meth:`fit` first. """ if weights is not None: self.set_weights(weights) if distribution is None: # Reweight kde of predicted by weights if present distribution = gkde(self.y.T, bw_method=bw_method, weights=self._weights) pred_pdf_values = distribution.pdf(self.y.T).T else: pred_pdf_values = distribution.pdf(self.y, **kwargs) self._pr_dist = distribution self._pr = pred_pdf_values.ravel() self._up = None
[docs] def fit(self, **kwargs): """ Update Initial Distribution Constructs the updated distribution by fitting observed data to predicted data with: .. math:: \\pi_{up}(\\lambda) = \\pi_{in}(\\lambda) \\frac{\\pi_{ob}(Q(\\lambda))}{\\pi_{pred}(Q(\\lambda))} :label: data_consistent_solution Note that if initial, predicted, and observed distributions have not been set before running this method, they will be run with default values. To set specific predicted, observed, or initial distributions use the ``set_`` methods. Parameters ----------- **kwargs : dict, optional If specified, optional arguments are passed to the :meth:`set_predicted` call in the case that the predicted distribution has not been set yet. Returns ----------- """ if self._in is None: self.set_initial() if self._pr is None: self.set_predicted(**kwargs) if self._ob is None: self.set_observed() # Store ratio of observed/predicted # e.g. to comptue E(r) and to pass on to future iterations self._r = np.divide(self._ob, self._pr) # Multiply by initial to get updated pdf up_pdf = np.multiply(self._in * self._weights, self._r) self._up = up_pdf
[docs] def mud_point(self): """Maximal Updated Density (MUD) Point Returns the Maximal Updated Density or MUD point as the parameter sample from the initial distribution with the highest update density value: .. math:: \\lambda^{MUD} := \\mathrm{argmax} \\pi_{up}(\\lambda) :label: mud Note if the updated distribution has not been computed yet, this function will call :meth:`fit` to compute it. Parameters ---------- Returns ------- mud_point : np.ndarray Maximal Updated Density (MUD) point. """ if self._up is None: self.fit() m = np.argmax(self._up) return self.X[m, :]
[docs] def estimate(self): """Estimate Returns the best estimate for most likely parameter values for the given model data using the data-consistent framework. Parameters ---------- Returns ------- mud_point : np.ndarray Maximal Updated Density (MUD) point. """ return self.mud_point()
[docs] def expected_ratio(self): """Expectation Value of R Returns the expectation value of the R, the ratio of the observed to the predicted density values. .. math:: R = \\frac{\\pi_{ob}(\\lambda)} {\\pi_{pred}(\\lambda)} :label: r_ratio If the predictability assumption for the data-consistent framework is satisfied, then :math:`E[R]\\approx 1`. Parameters ---------- Returns ------- expected_ratio : float Value of the E(r). Should be close to 1.0. """ if self._up is None: self.fit() return np.average(self._r, weights=self._weights)
# TODO: update documentation
[docs] def plot_param_space( self, param_idx: int = 0, true_val: Optional[ArrayLike] = None, ax: Optional[plt.Axes] = None, x_range: Optional[Union[list, np.ndarray]] = None, ylim: Optional[float] = None, pad_ratio: float = 0.05, aff: int = 100, in_opts={"color": "b", "linestyle": "-", "label": r"$\pi_\mathrm{init}$"}, up_opts={"color": "k", "linestyle": "-.", "label": r"$\pi_\mathrm{update}$"}, win_opts=None, mud_opts={"color": "g", "label": r"$\lambda^\mathrm{MUD}$"}, true_opts={"color": "r", "label": r"$\lambda^{\dagger}$"}, ): """ Plot probability distributions over parameter space Initial distribution is plotted using the distribution function passed to :meth:`set_initial`. The updated distribution is plotted using a weighted gaussian kernel density estimate (gkde) on the initial samples, using the product of the update ratio :eq:`r_ratio` value times the initial weights as weights for the gkde. The weighted initial is built using a weighted gkde on the initial samples, but only using the initial weights. Parameters ---------- param_idx : int, default=0 Index of parameter value to plot. ax : :class:`matplotlib.axes.Axes`, optional Axes to plot distributions on. If non specified, a figure will be initialized to plot on. x_range : list or np.ndarray, optional Range over parameter value to plot over. aff : int, default=100 Number of points to plot within x_range, evenly spaced. in_opts : dict, optional Plotting option for initial distribution line. Defaults to ``{'color':'b', 'linestyle':'--','linewidth':4, 'label':'Initial'}``. To suppress plotting, pass in ``None`` explicitly. up_opts : dict, optional Plotting option for updated distribution line. Defaults to ``{'color':'k', 'linestyle':'-.','linewidth':4, 'label':'Updated'}``. To suppress plotting, pass in ``None`` explicitly. win_opts : dict, optional Plotting option for weighted initial distribution line. Defaults to ``{'color':'g', 'linestyle':'--','linewidth':4, 'label':'Weighted Initial'}``. To suppress plotting, pass in ``None`` explicitly. Returns ------- """ # Default options for plotting figures io = {"color": "b", "linestyle": "--", "label": r"$\pi_\\mathrm{init}$"} uo = {"color": "k", "linestyle": "-.", "label": r"$\pi_\\mathrm{update}$"} wo = {"color": "b", "linestyle": ":", "label": r"$\\tilde{\pi}_\\mathrm{init}$"} mo = {"color": "g", "label": r"$\lambda^\mathrm{MUD}$"} to = {"color": "r", "linestyle": "-.", "label": r"$\lambda^{\dagger}$"} # Create plot if one isn't passed in _, ax = plt.subplots(1, 1) if ax is None else (None, ax) # Default x_range to full domain of all parameters if x_range is None: x_range = fit_domain(min_max_bounds=self.domain, pad_ratio=pad_ratio) # Plot distributions for all not set to None assert self._in_dist is not None if in_opts is not None: io.update(in_opts) plot_dist( self._in_dist, x_range, idx=param_idx, ax=ax, source="pdf", aff=aff, **io, ) if up_opts is not None: uo.update(up_opts) if self._r is not None: up_kde = gkde(self.X.T, weights=self._r * self._weights) else: up_kde = gkde(self.X.T, weights=None) plot_dist( up_kde, x_range, idx=param_idx, ax=ax, source="kde", aff=aff, **uo ) if win_opts is not None: wo.update(win_opts) w_kde = gkde(self.X.T, weights=self._weights) plot_dist(w_kde, x_range, idx=param_idx, ax=ax, source="kde", aff=aff, **wo) if mud_opts is not None: mo.update(mud_opts) mud_pt = self.estimate() plot_vert_line(ax, mud_pt[param_idx], ylim=ylim, **mo) if true_val is not None and true_opts: true_val = np.array(true_val) to.update(true_opts) plot_vert_line(ax, true_val[param_idx], ylim=ylim, **to) ax.set_xlabel(rf"$\lambda_{param_idx+1}$") return ax
[docs] def plot_obs_space( self, obs_idx: int = 0, ax: Optional[plt.Axes] = None, y_range: Optional[ArrayLike] = None, aff=100, ob_opts={"color": "r", "linestyle": "-", "label": r"$\pi_\mathrm{obs}$"}, pr_opts={"color": "b", "linestyle": "-", "label": r"$\pi_\mathrm{pred}$"}, pf_opts={"color": "k", "linestyle": "-.", "label": r"$\pi_\mathrm{pf-pr}$"}, ): """ Plot probability distributions over parameter space Observed distribution is plotted using the distribution function passed to :meth:`set_observed` (or default). The predicted distribution is plotted using the stored predicted distribution function set in :meth:`set_predicted`. The push-forward of the updated distribution is computed as a gkde on the predicted samples :attr:`y` as well, but using the product of the update ratio :eq:`r_ratio` and the initial weights as weights. Parameters ---------- obs_idx: int, default=0 Index of observable value to plot. ax : :class:`matplotlib.axes.Axes`, optional Axes to plot distributions on. If non specified, a figure will be initialized to plot on. y_range : list or np.ndarray, optional Range over parameter value to plot over. aff : int, default=100 Number of points to plot within x_range, evenly spaced. ob_opts : dict, optional Plotting option for observed distribution line. Defaults to ``{'color':'r', 'linestyle':'-','linewidth':4, 'label':'Observed'}``. To suppress plotting, pass in ``None``. pr_opts : dict, optional Plotting option for predicted distribution line. Defaults to ``{'color':'b', 'linestyle':'--','linewidth':4, 'label':'PF of Initial'}``. To suppress plotting, pass in ``None``. pf_opts : dict, optional Plotting option for push-forward of updated distribution line. Defaults to ``{'color':'k', 'linestyle':'-.','linewidth':4, 'label':'PF of Updated'}``. To suppress plotting, pass in ``None``. Returns ------- """ # observed, predicted, and push-forward opts respectively oo = {"color": "r", "linestyle": "-", "label": r"$\pi_\mathrm{obs}$"} po = {"color": "b", "linestyle": "-", "label": r"$\pi_\mathrm{pred}$"} fo = {"color": "k", "linestyle": "-.", "label": r"$\pi_\mathrm{pf-pr}$"} # Create plot if one isn't passed in _, ax = plt.subplots(1, 1) if ax is None else (None, ax) # Default range is (-1,1) over given observable index # TODO: Infer range from predicted y vals y_range = fit_domain(self.y) if y_range is None else np.array(y_range) if y_range.shape[0] != self.n_features: raise ValueError("Invalid domain dimension") assert self._ob_dist is not None, "Observed dist empty" if ob_opts: oo.update(ob_opts) plot_dist( self._ob_dist, y_range, idx=obs_idx, ax=ax, source="pdf", aff=aff, **oo ) if pr_opts: po.update(pr_opts) source = "pdf" if isinstance(self._pr_dist, type(dist.uniform())) else "kde" plot_dist( self._pr_dist, y_range, idx=obs_idx, ax=ax, source=source, aff=aff, **po ) if pf_opts: fo.update(pf_opts) # Compute PF of updated if self._r is not None: pf_kde = gkde(self.y.T, weights=self._weights * self._r) else: pf_kde = gkde(self.y.T, weights=None) plot_dist(pf_kde, y_range, idx=obs_idx, ax=ax, source="kde", aff=aff, **fo) return ax
[docs] def plot_qoi( self, idx_x: int = 0, idx_y: int = 1, ax: Optional[plt.Axes] = None, **kwargs ): """ Plot 2D plot over two indices of y space. Parameters ---------- idx_x: int, default=0 Index of observable value (column of y) to use as x value in plot. idx_y: int, default=1 Index of observable value (column of y) to use as y value in plot. ax : :class:`matplotlib.axes.Axes`, optional Axes to plot distributions on. If non specified, a figure will be initialized to plot on. kwargs : dict, optional Additional keyword arguments will be passed to `matplotlib.pyplot.scatter`. Returns ------- """ if ax is None: fig = plt.figure(figsize=(5, 5)) ax = fig.add_subplot(1, 1, 1) ax.scatter(self.y[:, idx_x], self.y[:, idx_y], **kwargs) return ax
[docs] def plot_params_2d( self, x_1: int = 0, x_2: int = 1, y: int = 0, contours: bool = False, colorbar: bool = True, ax: Optional[plt.Axes] = None, label=True, **kwargs, ): """ 2D plot over two indices of param space, optionally contoured by a y value. Parameters ---------- x_1 : int, default=0 Index of param value (column of X) to use as x value in plot. x_2 : int, default=1 Index of param value (column of X) to use as y value in plot. y: int, optional Index of observable (column of y) to use as z value in contour plot. ax : :class:`matplotlib.axes.Axes`, optional Axes to plot distributions on. If non specified, a figure will be initialized to plot on. kwargs : dict, optional Additional keyword arguments will be passed to `matplotlib.pyplot.scatter`. Returns ------- """ if self.n_params < 2: raise AttributeError("2D plot requires at least 2 dim param space.") if x_1 >= self.n_params or x_2 >= self.n_params: raise ValueError("indices {x_1+1},{x_2+1} > dim {self.n_params}") c = None if y >= self.n_features: raise ValueError("index {y+1} > dim {self.n_params}") c = self.y[:, y] if ax is None: fig = plt.figure(figsize=(5, 5)) ax = fig.add_subplot(1, 1, 1) sp = ax.scatter(self.X[:, 0], self.X[:, 1], c=c) if contours: ax.tricontour(self.X[:, 0], self.X[:, 1], c, levels=20) ax.set_title(f"$q_{y+1}$") if colorbar and c is not None: fig = plt.gcf() fig.colorbar(sp) ax.set_xlabel(rf"$\lambda_{x_1+1}$") ax.set_ylabel(rf"$\lambda_{x_2+1}$") return ax
[docs]class BayesProblem(object): """ Sets up Bayesian Inverse Problem for parameter identification Parameters ---------- X : ndarray 2D array containing parameter samples from an initial distribution. Rows represent each sample while columns represent parameter values. y : ndarray array containing push-forward values of parameters samples through the forward model. These samples will form the data-likelihood distribution. domain : array_like, optional 2D Array containing ranges of each parameter value in the parameter space. Note that the number of rows must equal the number of parameters, and the number of columns must always be two, for min/max range. Examples -------- >>> from mud.base import BayesProblem >>> import numpy as np >>> from scipy.stats import distributions as ds >>> X = np.random.rand(100,1) >>> num_obs = 50 >>> Y = np.repeat(X, num_obs, 1) >>> y = np.ones(num_obs)*0.5 + np.random.randn(num_obs)*0.05 >>> B = BayesProblem(X, Y, np.array([[0,1]])) >>> B.set_likelihood(ds.norm(loc=y, scale=0.05)) >>> np.round(B.map_point()[0],1) 0.5 """ def __init__( self, X: Union[np.ndarray, List], y: Union[np.ndarray, List], domain: Optional[Union[np.ndarray, List]] = None, ): # Set and validate inputs. Note we reshape inputs as necessary def set_shape(x, y): return x.reshape(y) if x.ndim < 2 else x self.X = set_shape(np.array(X), (1, -1)) self.y = set_shape(np.array(y), (1, -1)) if domain is not None: # Assert domain passed in is consistent with data array self.domain = set_shape(np.array(domain), (1, -1)) assert self.domain.shape[0] == self.n_params else: self.domain = fit_domain(self.X) # Initialize ps, predicted, and likelihood values/distributions self._ps = None self._pr = None self._ll = None self._ll_dist = None self._pr_dist = None @property def n_params(self): return self.X.shape[1] @property def n_features(self): return self.y.shape[1] @property def n_samples(self): return self.y.shape[0]
[docs] def set_likelihood(self, distribution, log=False): self._ll_dist = distribution if log: self._log = True self._ll = distribution.logpdf(self.y).sum(axis=1) # equivalent evaluation (demonstrating the expected symmetry) # std, mean = distribution.std(), distribution.mean() # self._ll = dist.norm(self.y, std).logpdf(mean).sum(axis=1) else: self._log = False self._ll = distribution.pdf(self.y).prod(axis=1) # equivalent # self._ll = dist.norm(self.y).pdf(distribution.mean())/distribution.std() # self._ll = self._ll.prod(axis=1) self._ps = None
[docs] def set_prior(self, distribution=None): if distribution is None: # assume standard normal by default if self.domain is not None: # assume uniform if domain specified mn = np.min(self.domain, axis=1) mx = np.max(self.domain, axis=1) distribution = dist.uniform(loc=mn, scale=mx - mn) else: distribution = dist.norm() self._pr_dist = distribution self._pr = self._pr_dist.pdf(self.X).prod(axis=1) self._ps = None
[docs] def fit(self): if self._pr is None: self.set_prior() if self._ll is None: self.set_likelihood() if self._log: ps_pdf = np.add(np.log(self._pr), self._ll) else: ps_pdf = np.multiply(self._pr, self._ll) assert ps_pdf.shape[0] == self.X.shape[0] if np.sum(ps_pdf) == 0: raise ValueError("Posterior numerically unstable.") self._ps = ps_pdf
[docs] def map_point(self): if self._ps is None: self.fit() m = np.argmax(self._ps) return self.X[m, :]
[docs] def estimate(self): return self.map_point()
# TODO: Make changes similar to DensityProblem class
[docs] def plot_param_space( self, param_idx=0, ax=None, x_range=None, aff=1000, pr_opts={"color": "b", "linestyle": "--", "linewidth": 4, "label": "Prior"}, ps_opts={"color": "g", "linestyle": ":", "linewidth": 4, "label": "Posterior"}, map_opts={"color": "g", "label": r"$\lambda^\mathrm{MAP}$"}, true_opts={"color": "r", "linestyle": "-.", "label": r"$\lambda^{\dagger}$"}, true_val=None, ): """ Plot probability distributions over parameter space """ # Default options for plotting figures pro = {"color": "b", "linestyle": "--", "linewidth": 4, "label": "Prior"} pso = {"color": "g", "linestyle": ":", "linewidth": 4, "label": "Posterior"} mo = {"color": "g", "label": r"$\lambda^\mathrm{MAP}$"} to = {"color": "r", "linestyle": "-.", "label": r"$\lambda^{\dagger}$"} # Create plot if one isn't passed in _, ax = plt.subplots(1, 1) if ax is None else (None, ax) # Default x_range to full domain of all parameters x_range = x_range if x_range is not None else self.domain # Plot distributions for all not set to None if pr_opts is not None: pro.update(pr_opts) plot_dist( self._pr_dist, x_range, idx=param_idx, ax=ax, source="pdf", aff=aff, **pro, ) if ps_opts is not None: pso.update(ps_opts) self.estimate() ps_kde = gkde(self.X.T, weights=self._ps) plot_dist( ps_kde, x_range, idx=param_idx, ax=ax, source="kde", aff=aff, **pso ) if map_opts is not None: mo.update(map_opts) map_pt = self.estimate() plot_vert_line(ax, map_pt[param_idx], **mo) if true_val is not None and true_opts: to.update(true_opts) plot_vert_line(ax, true_val[param_idx], **to) ax.set_xlabel(rf"$\lambda_{param_idx+1}$") return ax
[docs] def plot_obs_space( self, obs_idx=0, ax=None, y_range=None, aff=1000, ll_opts={ "color": "r", "linestyle": "-", "linewidth": 4, "label": "Data-Likelihood", }, pf_opts={ "color": "g", "linestyle": ":", "linewidth": 4, "label": "PF of Posterior", }, ): """ Plot probability distributions defined over observable space. """ lo = { "color": "r", "linestyle": "-", "linewidth": 4, "label": "Data-Likelihood", } po = { "color": "g", "linestyle": ":", "linewidth": 4, "label": "PF of Posterior", } # Create plot if one isn't passed in _, ax = plt.subplots(1, 1) if ax is None else (None, ax) # Default range is (-1,1) over each observable variable if y_range is None: y_range = np.repeat([[-1, 1]], self.y.shape[1], axis=0) # Build grid of points over range to compute marginals XXX = np.meshgrid(*[np.linspace(i, j, aff)[:-1] for i, j in y_range]) grid_points = np.vstack([x.ravel() for x in XXX]) y_plot = np.linspace(y_range[obs_idx, 0], y_range[obs_idx, 1], aff)[: aff - 1] if ll_opts is not None: lo.update(ll_opts) if self._ll is None: raise ValueError("Likelihood not set. Run fit()") # Compute observed distribution using stored pdf ll_plot = margins( np.reshape(self._ll_dist.pdf(grid_points).T.prod(axis=1), XXX[0].shape) )[obs_idx].reshape(-1) # Plot pf of initial ax.plot(y_plot, ll_plot, **lo) if pf_opts is not None: po.update(pf_opts) # Compute PF of posterior pf_kde = gkde(self.y.T, weights=self._ps) pf_p = margins(np.reshape(pf_kde(grid_points).T, XXX[0].shape))[ obs_idx ].reshape(-1) # Plut pf of updated ax.plot(y_plot, pf_p, **po) return ax
[docs]class LinearGaussianProblem(object): """Sets up inverse problems with Linear/Affine Maps Class provides solutions using MAP, MUD, and least squares solutions to the linear (or affine) problem from `p` parameters to `d` observables. .. math :: M(\\mathbf{x}) = A\\mathbf{x} + \\mathbf{b}, A \\in \\mathbb{R}^{d\\times p}, \\mathbf{x}, \\in \\mathbb{R}^{p}, \\mathbf{b}, \\in \\mathbb{R}^{d}, :label: linear_map Attributes ---------- A : ArrayLike 2D Array defining linear transformation from model parameter space to model output space. y : ArrayLike 1D Array containing observed values of Q(\\lambda) Array containing push-forward values of parameters samples through the forward model. These samples will form the `predicted distribution`. domain : ArrayLike Array containing ranges of each parameter value in the parameter space. Note that the number of rows must equal the number of parameters, and the number of columns must always be two, for min/max range. weights : ArrayLike, optional Weights to apply to each parameter sample. Either a 1D array of the same length as number of samples or a 2D array if more than one set of weights is to be incorporated. If so the weights will be multiplied and normalized row-wise, so the number of columns must match the number of samples. Examples ------------- Problem set-up: .. math :: A = \\begin{bmatrix} 1 & 1 \\end{bmatrix}, b = 0, y = 1 \\lambda_0 = \\begin{bmatrix} 0.25 & 0.25 \\end{bmatrix}^T, \\Sigma_{init} = \\begin{bmatrix} 1 & -0.25 \\\\ -0.25 & 0.5 \\end{bmatrix}, \\Sigma_{obs} = \\begin{bmatrix} 0.25 \\end{bmatrix} >>> from mud.base import LinearGaussianProblem as LGP >>> lg1 = LGP(A=np.array([[1, 1]]), ... b=np.array([[0]]), ... y=np.array([[1]]), ... mean_i=np.array([[0.25, 0.25]]).T, ... cov_i=np.array([[1, -0.25], [-0.25, 0.5]]), ... cov_o=np.array([[1]])) >>> lg1.solve('mud') array([[0.625], [0.375]]) """ def __init__( self, A=np.array([1, 1]).reshape(-1, 1), b=None, y=None, mean_i=None, cov_i=None, cov_o=None, alpha=1.0, ): # Make sure A is 2D array A = np.array(A) self.A = A if A.ndim == 2 else A.reshape(1, -1) ns, di = self.A.shape # Initialize to defaults - Reshape everything into 2D arrays. self.b = np.zeros((ns, 1)) if b is None else np.array(b).reshape(-1, 1) self.y = np.zeros((ns, 1)) if y is None else np.array(y).reshape(-1, 1) self.mean_i = ( np.zeros((di, 1)) if mean_i is None else np.array(mean_i).reshape(-1, 1) ) self.cov_i = np.eye(di) if cov_i is None else np.array(cov_i) self.cov_o = np.eye(ns) if cov_o is None else np.array(cov_o) # How much to scale regularization terms self.alpha = alpha if alpha is not None else 1.0 # Check appropriate dimensions of inputs n_data, n_targets = self.y.shape if ns != n_data: raise ValueError( "Number of samples in X and y does not correspond:" " %d != %d" % (ns, n_data) ) # Initialize to no solution self.sol = None @property def n_params(self): return self.A.shape[1] @property def n_features(self): return self.y.shape[1] @property def n_samples(self): return self.y.shape[0]
[docs] def compute_functionals(self, X, terms="all"): """ For a given input and observed data, compute functionals or individual terms in functionals that are minimized to solve the linear gaussian problem. """ # Compute observed mean mean_o = self.y - self.b # Define inner-producted induced by vector norm def ip(X, mat): return np.sum(X * (np.linalg.inv(mat) @ X), axis=0) # First compute data mismatch norm data_term = ip((self.A @ X.T + self.b) - mean_o.T, self.cov_o) if terms == "data": return data_term # Tikhonov Regularization Term reg_term = self.alpha * ip((X - self.mean_i.T).T, self.cov_i) if terms == "reg": return reg_term # Data-Consistent Term - "un-regularization" in data-informed directions dc_term = self.alpha * ip( self.A @ (X - self.mean_i.T).T, self.A @ self.cov_i @ self.A.T ) if terms == "dc_term": return dc_term # Modified Regularization Term reg_m_terms = reg_term - dc_term if terms == "reg_m": return reg_m_terms bayes_fun = data_term + reg_term if terms == "bayes": return bayes_fun dc_fun = bayes_fun - dc_term if terms == "dc": return dc_fun return (data_term, reg_term, dc_term, bayes_fun, dc_fun)
[docs] def solve(self, method="mud", output_dim=None): """ Explicitly solve linear problem using given method. """ # Reduce output dimension if desired od = self.A.shape[0] if output_dim is None else output_dim _A = self.A[:od, :] _b = self.b[:od, :] _y = self.y[:od, :] _cov_o = self.cov_o[:od, :od] # Compute observed mean mean_o = _y - _b # Compute residual z = mean_o - _A @ self.mean_i # Weight initial covariance to use according to alpha parameter a_cov_i = self.alpha * self.cov_i # Solve according to given method, or solve all methods if method == "mud" or method == "all": inv_pred_cov = np.linalg.pinv(_A @ a_cov_i @ _A.T) update = a_cov_i @ _A.T @ inv_pred_cov self.mud = self.mean_i + update @ z if method == "mud_alt" or method == "all": up_cov = self.updated_cov(A=_A, init_cov=a_cov_i, data_cov=_cov_o) update = up_cov @ _A.T @ np.linalg.inv(_cov_o) self.mud_alt = self.mean_i + update @ z self.up_cov = up_cov if method == "map" or method == "all": co_inv = np.linalg.inv(_cov_o) cov_p = np.linalg.inv(_A.T @ co_inv @ _A + np.linalg.inv(a_cov_i)) update = cov_p @ _A.T @ co_inv self.map = self.mean_i + update @ z self.cov_p = cov_p if method == "ls" or method == "all": # Compute ls solution from pinv method self.ls = np.linalg.pinv(_A) @ mean_o # Return solution or all solutions if method == "all": return (self.mud, self.map, self.ls) # return (self.mud, self.mud_alt, self.map, self.ls) else: return self.__getattribute__(method)
[docs] def updated_cov(self, A=None, init_cov=None, data_cov=None): """ We start with the posterior covariance from ridge regression Our matrix R = init_cov^(-1) - X.T @ pred_cov^(-1) @ X replaces the init_cov from the posterior covariance equation. Simplifying, this is given as the following, which is not used due to issues of numerical stability (a lot of inverse operations). up_cov = (X.T @ np.linalg.inv(data_cov) @ X + R )^(-1) up_cov = np.linalg.inv(\ X.T@(np.linalg.inv(data_cov) - inv_pred_cov)@X + \ np.linalg.inv(init_cov) ) We return the updated covariance using a form of it derived which applies Hua's identity in order to use Woodbury's identity. Check using alternate for updated_covariance. >>> from mud.base import LinearGaussianProblem as LGP >>> lg2 = LGP(A=np.eye(2)) >>> lg2.updated_cov() array([[1., 0.], [0., 1.]]) >>> lg3 = LGP(A=np.eye(2)*2) >>> lg3.updated_cov() array([[0.25, 0. ], [0. , 0.25]]) >>> lg3 = LGP(A=np.eye(3)[:, :2]*2) >>> lg3.updated_cov() array([[0.25, 0. ], [0. , 0.25]]) >>> lg3 = LGP(A=np.eye(3)[:, :2]*2, cov_i=np.eye(2)) >>> lg3.updated_cov() array([[0.25, 0. ], [0. , 0.25]]) """ X = A if A is not None else self.A if init_cov is None: init_cov = self.cov_i else: assert X.shape[1] == init_cov.shape[1] if data_cov is None: data_cov = self.cov_o else: assert X.shape[0] == data_cov.shape[1] pred_cov = X @ init_cov @ X.T inv_pred_cov = np.linalg.pinv(pred_cov) # pinv b/c inv unstable for rank-deficient A # Form derived via Hua's identity + Woodbury K = init_cov @ X.T @ inv_pred_cov up_cov = init_cov - K @ (pred_cov - data_cov) @ K.T return up_cov
[docs] def plot_sol( self, point="mud", ax=None, label=None, note_loc=None, pt_opts={"color": "k", "s": 100, "marker": "o"}, ln_opts={"color": "xkcd:blue", "marker": "d", "lw": 1, "zorder": 10}, annotate_opts={"fontsize": 14, "backgroundcolor": "w"}, ): """ Plot solution points """ if ax is None: _, ax = plt.subplots(1, 1) # Get solution point or initial poitn to plot. pt = self.mean_i if point == "initial" else self.solve(method=point) pt_opts["label"] = point # Plot point ax.scatter(pt[0], pt[1], **pt_opts) # Plot line connecting initial value and solution if ln_opts is not None and point != "initial": ax.plot( [self.mean_i.ravel()[0], pt.ravel()[0]], [self.mean_i.ravel()[1], pt.ravel()[1]], **ln_opts, ) if label is not None: # Annotate point with a label if desired nc = note_loc nc = (pt[0] - 0.02, pt[1] + 0.02) if nc is None else nc ax.annotate(label, nc, **annotate_opts)
[docs] def plot_contours( self, ref=None, subset=None, ax=None, annotate=False, note_loc=None, w=1, label="{i}", plot_opts={"color": "k", "ls": ":", "lw": 1, "fs": 20}, annotate_opts={"fontsize": 20}, ): """ Plot Linear Map Solution Contours """ # Initialize a plot if one hasn't been already if ax is None: _, ax = plt.subplots(1, 1) # All rows of A are default subset of contours to plot subset = np.arange(self.A.shape[0]) if subset is None else subset # Ref is the reference point to plot each contour line through. ref = ref if ref is not None else self.solve(method="ls") # Build null-space (contour lines) for each subset row of A A = self.A[np.array(subset), :] numQoI = A.shape[0] AA = np.hstack([null_space(A[i, :].reshape(1, -1)) for i in range(numQoI)]).T # Plot each contour line going through ref point for i, contour in enumerate(subset): xloc = [ref[0] - w * AA[i, 0], ref[1] + w * AA[i, 0]] yloc = [ref[0] - w * AA[i, 1], ref[1] + w * AA[i, 1]] ax.plot(xloc, yloc, **plot_opts) # If annotate is set, then label line with given annotations if annotate: nl = (xloc[0], yloc[0]) if note_loc is None else note_loc ax.annotate(label.format(i=contour + 1), nl, **annotate_opts)
[docs] def plot_fun_contours(self, mesh=None, terms="dc", ax=None, N=250, r=1, **kwargs): """ Plot contour map of functionals being minimized over input space """ if ax is None: _, ax = plt.subplots(1, 1) # Get mesh if one hasn't been passed if mesh is None: _, _, mesh = make_2d_unit_mesh(N, r) # Compute functional terms desired over range term = self.compute_functionals(mesh, terms=terms) # Plot contours _ = ax.contour( mesh[:, 0].reshape(N, N), mesh[:, 1].reshape(N, N), term.reshape(N, N), **kwargs, )
[docs]class LinearWMEProblem(LinearGaussianProblem): """Linear Inverse Problems using the Weighted Mean Error Map""" def __init__( self, operators, data, sigma, y=None, mean_i=None, cov_i=None, cov_o=None, alpha=1.0, ): if isinstance(sigma, (float, int)): sigma = [sigma] * len(data) results = [ self._transform_linear_map(o, d, s) for o, d, s in zip(operators, data, sigma) ] operators = [r[0] for r in results] datas = [r[1] for r in results] A, B = np.vstack(operators), np.vstack(datas) super().__init__( A=A, b=B, y=y, mean_i=mean_i, cov_i=cov_i, cov_o=cov_o, alpha=alpha ) def _transform_linear_map(self, operator, data, std): """ Takes a linear map `operator` of size (len(data), dim_input) or (1, dim_input) for repeated observations, along with a vector `data` representing observations. It is assumed that `data` is formed with `M@truth + sigma` where `sigma ~ N(0, std)` This then transforms it to the MWE form expected by the DCI framework. It returns a matrix `A` of shape (1, dim_input) and np.float `b` and transforms it to the MWE form expected by the DCI framework. >>> from mud.base import LinearWMEProblem as LWP >>> operators = [np.ones((10, 2))] >>> x = np.array([0.5, 0.5]).reshape(-1, 1) >>> sigma = 1 >>> data = [X @ x for X in operators] >>> lin_wme_prob = LWP(operators, data, sigma) >>> lin_wme_prob.y array([[0.]]) >>> lin_wme_prob = LWP(operators, data, [sigma]*10) >>> lin_wme_prob.y array([[0.]]) >>> operators = [np.array([[1, 1]])] >>> lin_wme_prob = LWP(operators, data, sigma) >>> lin_wme_prob.y array([[0.]]) """ if isinstance(data, np.ndarray): data = data.ravel() num_observations = len(data) if operator.shape[0] > 1: # if not repeated observations assert ( operator.shape[0] == num_observations ), f"Operator shape mismatch, op={operator.shape}, obs={num_observations}" if isinstance(std, (float, int)): std = np.array([std] * num_observations) if isinstance(std, (list, tuple)): std = np.array(std) assert len(std) == num_observations, "Standard deviation shape mismatch" assert 0 not in np.round(std, 14), "Std must be > 1E-14" D = np.diag(1.0 / (std * np.sqrt(num_observations))) A = np.sum(D @ operator, axis=0) else: if isinstance(std, (list, tuple, np.ndarray)): raise ValueError("For repeated measurements, pass a float for std") assert std > 1e-14, "Std must be > 1E-14" A = np.sqrt(num_observations) / std * operator b = -1.0 / np.sqrt(num_observations) * np.sum(np.divide(data, std)) return A, b
[docs]class IterativeLinearProblem(LinearGaussianProblem): def __init__( self, A, b, y=None, mu_i=None, cov=None, data_cov=None, idx_order=None ): # Make sure A is 2D array self.A = A if A.ndim == 2 else A.reshape(1, -1) # Initialize to defaults - Reshape everything into 2D arrays. n_samples, dim_input = self.A.shape self.data_cov = np.eye(n_samples) if data_cov is None else data_cov self.cov = np.eye(dim_input) if cov is None else cov self.mu_i = np.zeros((dim_input, 1)) if mu_i is None else mu_i.reshape(-1, 1) self.b = np.zeros((n_samples, 1)) if b is None else b.reshape(-1, 1) self.y = np.zeros(n_samples) if y is None else y.reshape(-1, 1) self.idx_order = range(self.A.shape[0]) if idx_order is None else idx_order # Verify arguments? # Initialize chain to initial mean self.epochs = [] self.solution_chains = [] self.errors = []
[docs] def solve(self, num_epochs=1, method="mud"): """ Iterative Solutions Performs num_epochs iterations of estimates """ m_init = ( self.mu_i if len(self.solution_chains) == 0 else self.solution_chains[-1][-1] ) solutions = [m_init] for _ in range(0, num_epochs): epoch = [] solutions = [solutions[-1]] for i in self.idx_order: # Add next sub-problem to chain epoch.append( LinearGaussianProblem( self.A[i, :], self.b[i], self.y[i], mean=solutions[-1], cov=self.cov, data_cov=self.data_cov, ) ) # Solve next mud problem solutions.append(epoch[-1].solve(method=method)) self.epochs.append(epoch) self.solution_chains.append(solutions) return self.solution_chains[-1][-1]
[docs] def get_errors(self, ref_param): """ Get errors with respect to a reference parameter """ solutions = np.concatenate([x[1:] for x in self.solution_chains]) if len(solutions) != len(self.errors): self.errors = [np.linalg.norm(s - ref_param) for s in solutions] return self.errors
[docs] def plot_chain(self, ref_param, ax=None, color="k", s=100, **kwargs): """ Plot chain of solutions and contours """ if ax is None: _, ax = plt.subplots(1, 1) for e, chain in enumerate(self.solution_chains): num_steps = len(chain) current_point = chain[0] ax.scatter(current_point[0], current_point[1], c="b", s=s) for i in range(0, num_steps): next_point = chain[i] points = np.hstack([current_point, next_point]) ax.plot(points[0, :], points[1, :], c=color) current_point = next_point ax.scatter(current_point[0], current_point[1], c="g", s=s) ax.scatter(ref_param[0], ref_param[1], c="r", s=s) self.plot_contours( ref_param, ax=ax, subset=self.idx_order, color=color, s=s, **kwargs )
[docs] def plot_chain_error( self, ref_param, ax=None, alpha=1.0, color="k", label=None, s=100, fontsize=12 ): """ Plot error over iterations """ _ = self.get_errors(ref_param) if ax is None: _, ax = plt.subplots(1, 1) ax.set_yscale("log") ax.plot(self.errors, color=color, alpha=alpha, label=label) ax.set_ylabel("$||\\lambda - \\lambda^\\dagger||$", fontsize=fontsize) ax.set_xlabel("Iteration step", fontsize=fontsize)
[docs]class SpatioTemporalProblem(object): """ Class for parameter estimation problems related to spatio-temporal problems. equation models of real world systems. Uses a QoI map of weighted residuals between simulated data and measurements to do inversion Attributes ---------- TODO: Finish Methods ------- TODO: Finish """ def __init__(self, df=None): self._domain = None self._lam = None self._data = None self._measurements = None self._true_lam = None self._true_vals = None self._sample_dist = None self.sensors = None self.times = None self.qoi = None self.pca = None self.std_dev = None if df is not None: self.load(df) @property def n_samples(self) -> int: if self.lam is None: raise AttributeError("lambda not yet set.") return self.lam.shape[0] @property def n_qoi(self) -> int: if self.qoi is None: raise AttributeError("qoi not yet set.") return self.qoi.shape[1] @property def n_sensors(self) -> int: if self.sensors is None: raise AttributeError("sensors not yet set.") return self.sensors.shape[0] @property def n_ts(self) -> int: if self.times is None: raise AttributeError("times not yet set.") return self.times.shape[0] @property def n_params(self) -> int: return self.domain.shape[0] @property def lam(self): return self._lam @lam.setter def lam(self, lam): lam = np.array(lam) lam = lam.reshape(-1, 1) if lam.ndim == 1 else lam if self.domain is not None: if lam.shape[1] != self.n_params: raise ValueError("Parameter dimensions do not match domain specified.") else: # TODO: Determine domain from min max in parameters self.domain = np.vstack([lam.min(axis=0), lam.max(axis=0)]).T if self.sample_dist is None: # Assume uniform distribution by default self.sample_dist = "u" self._lam = lam @property def lam_ref(self): return self._lam_ref @lam_ref.setter def lam_ref(self, lam_ref): if self.domain is None: raise AttributeError("domain not yet set.") lam_ref = np.reshape(lam_ref, (-1)) for idx, lam in enumerate(lam_ref): if (lam < self.domain[idx][0]) or (lam > self.domain[idx][1]): raise ValueError( f"lam_ref at idx {idx} must be inside {self.domain[idx]}." ) self._lam_ref = lam_ref @property def domain(self): return self._domain @domain.setter def domain(self, domain): domain = np.reshape(domain, (-1, 2)) if self.lam is not None: if self.domain.shape[0] != self.lam.shape[1]: raise ValueError("Domain and parameter array dimension mismatch.") min_max = np.vstack([self.lam.min(axis=0), self.lam.max(axis=0)]).T if not all( [all(domain[:, 0] <= min_max[:, 0]), all(domain[:, 1] >= min_max[:, 1])] ): raise ValueError("Parameter values exist outside of specified domain") self._domain = domain @property def data(self): return self._data @data.setter def data(self, data): dim = data.shape ndim = data.ndim if ndim == 1: data = np.reshape(data, (-1, 1)) if ndim == 3: # Expected to be in (# samples x # sensors # # timesteps) data = np.reshape(data, (dim[0], -1)) dim = data.shape ndim = data.ndim if self.sensors is None and self.times is None: self.sensors = np.array([0]) self.times = np.arange(0, dim[1]) if self.sensors is not None and self.times is None: if self.sensors.shape[0] != dim[1]: raise ValueError( "Dimensions of simulated data does not match number of sensors" ) self.times = np.array([0]) if self.sensors is None and self.times is not None: if self.times.shape[0] != dim[1]: raise ValueError( "Dimensions of simulated data does not match number of timesteps" ) self.sensors = np.array([0]) if self.sensors is not None and self.times is not None: # Assume data is already flattened, check dimensions match if self.times.shape[0] * self.sensors.shape[0] != dim[1]: raise ValueError( "Dimensions of simulated data != (timesteps x sensors)" ) # Flatten data_data into 2d array self._data = data @property def measurements(self): return self._measurements @measurements.setter def measurements(self, measurements): measurements = np.reshape(measurements, (self.n_sensors * self.n_ts, 1)) self._measurements = measurements @property def true_vals(self): return self._true_vals @true_vals.setter def true_vals(self, true_vals): true_vals = np.reshape(true_vals, (self.n_sensors * self.n_ts, 1)) self._true_vals = true_vals @property def sample_dist(self): return self._sample_dist @sample_dist.setter def sample_dist(self, dist): if dist not in ["u", "n"]: raise ValueError( "distribution could not be inferred. Must be from ('u', 'n')" ) self._sample_dist = dist
[docs] def get_closest_to_measurements( self, samples_mask=None, times_mask=None, sensors_mask=None, ): """ Get closest simulated data point to measured data in $l^2$-norm. """ lam, times, sensors, sub_data, sub_meas = self.sample_data( samples_mask=samples_mask, times_mask=times_mask, sensors_mask=sensors_mask, ) closest_idx = np.argmin(np.linalg.norm(sub_data - sub_meas.T, axis=1)) closest_lam = lam[closest_idx, :].ravel() return closest_lam
[docs] def get_closest_to_true_vals( self, ): """ Get closest simulated data point to noiseless true values in $l^2$-norm. Note for now no sub-sampling implemented here. """ if self.true_vals is None: raise AttributeError("True values is not set") closest_idx = np.argmin(np.linalg.norm(self.data - self.true_vals.T, axis=1)) closest_lam = self.lam[closest_idx, :].ravel() return closest_lam
[docs] def measurements_from_reference(self, ref=None, std_dev=None, seed=None): """ Add noise to a reference solution. """ if ref is not None: self._true_vals = ref if std_dev is not None: self.std_dev = std_dev if self.true_vals is None or self.std_dev is None: raise AttributeError( "Must set reference solution and std_dev first or pass as arguments." ) self.measurements = np.reshape( add_noise(self.true_vals.ravel(), self.std_dev, seed=seed), self.true_vals.shape, )
[docs] def load( self, df, lam="lam", data="data", **kwargs, ): """ Load data from a file on disk for a PDE parameter estimation problem. Parameters ---------- fname : str Name of file on disk. If ends in '.nc' then assumed to be netcdf file and the xarray library is used to load it. Otherwise the data is assumed to be pickled data. Returns ------- ds : dict, Dictionary containing data from file for PDE problem class """ if isinstance(df, str): try: if df.endswith("nc") and xr_avail: ds = xr.load_dataset(df) else: with open(df, "rb") as fp: ds = pickle.load(fp) except FileNotFoundError: raise FileNotFoundError(f"Couldn't find data file {df}") else: ds = df def get_set_val(f, v): if f in ds.keys(): self.__setattr__(f, ds[v]) elif v is not None and not isinstance(v, str): self.__setattr__(f, v) field_names = { "sample_dist": "sample_dist", "domain": "domain", "sensors": "sensors", "times": "times", "lam_ref": "lam_ref", "std_dev": "std_dev", "true_vals": "true_vals", "measurements": "measurements", } field_names.update(kwargs) for f, v in field_names.items(): get_set_val(f, v) get_set_val("lam", lam) get_set_val("data", data) return ds
[docs] def validate( self, check_meas=True, check_true=False, ): """Validates if class has been set-up appropriately for inversion""" req_attrs = ["domain", "lam", "data"] if check_meas: req_attrs.append("measurements") if check_true: req_attrs.append("true_lam") req_attrs.append("true_vals") missing = [x for x in req_attrs if self.__getattribute__(x) is None] if len(missing) > 0: raise ValueError(f"Missing attributes {missing}")
[docs] def sample_data( self, samples_mask=None, times_mask=None, sensors_mask=None, ): if self.data is None: raise AttributeError("data not set yet.") sub_data = np.reshape(self.data, (self.n_samples, self.n_sensors, self.n_ts)) if self.measurements is not None: sub_meas = np.reshape(self.measurements, (self.n_sensors, self.n_ts)) else: sub_meas = None sub_times = self.times sub_sensors = self.sensors sub_lam = self.lam if samples_mask is not None: sub_lam = self.lam[samples_mask, :] sub_data = sub_data[samples_mask, :, :] if times_mask is not None: sub_times = np.reshape(self.times[times_mask], (-1, 1)) sub_data = sub_data[:, :, times_mask] if sub_meas is not None: sub_meas = sub_meas[:, times_mask] if sensors_mask is not None: sub_sensors = np.reshape(self.sensors[sensors_mask], (-1, 2)) sub_data = sub_data[:, sensors_mask, :] if sub_meas is not None: sub_meas = sub_meas[sensors_mask, :] sub_data = np.reshape(sub_data, (-1, sub_times.shape[0] * sub_sensors.shape[0])) if sub_meas is not None: sub_meas = np.reshape(sub_meas, (len(sub_times) * len(sub_sensors))) return sub_lam, sub_times, sub_sensors, sub_data, sub_meas
[docs] def sensor_contour_plot( self, idx=0, c_vals=None, ax=None, mask=None, fill=True, colorbar=True, **kwargs ): """ Plot locations of sensors in space """ if ax is None: fig = plt.figure(figsize=(5, 5)) ax = fig.add_subplot(1, 1, 1) sensors = self.sensors[mask, :] if mask is not None else self.sensors contours = c_vals if c_vals is not None else self.data[idx, :].ravel() if fill: tc = ax.tricontourf(sensors[:, 0], sensors[:, 1], contours, **kwargs) else: tc = ax.tricontour(sensors[:, 0], sensors[:, 1], contours, **kwargs) if colorbar: fig = plt.gcf() fig.colorbar(tc) return ax
[docs] def sensor_scatter_plot(self, ax=None, mask=None, colorbar=None, **kwargs): """ Plot locations of sensors in space """ if ax is None: fig = plt.figure(figsize=(5, 5)) ax = fig.add_subplot(1, 1, 1) sensors = self.sensors[mask, :] if mask is not None else self.sensors sp = plt.scatter(sensors[:, 0], sensors[:, 1], **kwargs) if colorbar and "c" in kwargs.keys(): fig = plt.gcf() fig.colorbar(sp) return ax
[docs] def plot_ts( self, ax=None, samples=None, times=None, sensor_idx=0, max_plot=100, meas_kwargs={}, samples_kwargs={}, alpha=0.1, ): """ Plot time series data """ if ax is None: fig = plt.figure(figsize=(12, 5)) ax = fig.add_subplot(1, 1, 1) lam, times, _, sub_data, sub_meas = self.sample_data( samples_mask=samples, times_mask=times, sensors_mask=sensor_idx ) num_samples = sub_data.shape[0] max_plot = num_samples if max_plot > num_samples else max_plot # Plot measured time series def_kwargs = { "color": "k", "marker": "^", "label": "$\\zeta_{obs}$", "zorder": 50, "s": 2, } def_kwargs.update(meas_kwargs) _ = plt.scatter(times, sub_meas, **def_kwargs) # Plot simulated data time series def_sample_kwargs = {"color": "r", "linestyle": "-", "zorder": 1, "alpha": 0.1} def_sample_kwargs.update(samples_kwargs) for i, idx in enumerate(np.random.choice(num_samples, max_plot)): if i != (max_plot - 1): _ = ax.plot(times, sub_data[i, :], **def_sample_kwargs) else: _ = ax.plot( times, sub_data[i, :], "r-", alpha=alpha, label=f"Sensor {sensor_idx}", ) return ax
[docs] def mud_problem( self, method="pca", data_weights=None, sample_weights=None, num_components=2, samples_mask=None, times_mask=None, sensors_mask=None, ): """Build QoI Map Using Data and Measurements""" # TODO: Finish sample data implementation lam, times, sensors, sub_data, sub_meas = self.sample_data( samples_mask=samples_mask, times_mask=times_mask, sensors_mask=sensors_mask, ) residuals = (sub_meas - sub_data) / self.std_dev sub_n_samples = sub_data.shape[0] if data_weights is not None: data_weights = np.reshape(data_weights, (-1, 1)) if data_weights.shape[0] != self.n_sensors * self.n_ts: raise ValueError( "Data weights vector and dimension of data space does not match" ) data_weights = data_weights / np.linalg.norm(data_weights) residuals = data_weights * residuals if method == "wme": qoi = np.sum(residuals, axis=1) / np.sqrt(sub_n_samples) elif method == "pca": # Learn qoi to use using PCA pca_res, X_train = pca(residuals, n_components=num_components) self.pca = { "X_train": X_train, "vecs": pca_res.components_, "var": pca_res.explained_variance_, } # Compute WME qoi = residuals @ pca_res.components_.T elif method == "svd": # Learn qoi to use using SVD u, s, v = svd(residuals) self.svd = {"u": u, "singular_values": s, "singular_vectors": v} qoi = residuals @ (v[0:num_components, :]).T else: ValueError(f"Unrecognized QoI Map type {method}") # qoi = qoi.reshape(sub_n_samples, -1) d = DensityProblem(lam, qoi, self.domain, weights=sample_weights) return d