# -*- coding: utf-8 -*-
"""
Python console script for `mud`, installed with
`pip install .` or `python setup.py install`
"""
import logging
import numpy as np
from scipy.stats import distributions as dists # type: ignore
from mud.base import (
BayesProblem,
DensityProblem,
IterativeLinearProblem,
LinearGaussianProblem,
SpatioTemporalProblem,
)
_logger = logging.getLogger(__name__)
[docs]def wme(predictions, data, sd=None):
"""
Calculates Weighted Mean Error (WME) functional.
Parameters
----------
predictions: numpy.ndarray of shape (n_samples, n_features)
Predicted values against which data is compared.
data: list or numpy.ndarray of shape (n_features, 1)
Collected (noisy) data
sd: float, optional
Standard deviation
Returns
-------
numpy.ndarray of shape (n_samples, 1)
"""
sd = np.std(data) if sd is None else sd
predictions = predictions.reshape(1, -1) if predictions.ndim == 1 else predictions
N = predictions.shape[1]
if N != len(data):
raise ValueError(f"Predictions and data dim mismatch {N} != {len(data)}")
return np.sum((1 / (sd * np.sqrt(N))) * (predictions - data), axis=1)
[docs]def updated_cov(X, init_cov=None, data_cov=None):
"""
We start with the posterior covariance from ridge regression
Our matrix R = init_cov^(-1) - X.T @ pred_cov^(-1) @ X
replaces the init_cov from the posterior covariance equation.
Simplifying, this is given as the following, which is not used
due to issues of numerical stability (a lot of inverse operations).
up_cov = (X.T @ np.linalg.inv(data_cov) @ X + R )^(-1)
up_cov = np.linalg.inv(\
X.T@(np.linalg.inv(data_cov) - inv_pred_cov)@X + \
np.linalg.inv(init_cov) )
We return the updated covariance using a form of it derived
which applies Hua's identity in order to use Woodbury's identity.
>>> updated_cov(np.eye(2))
array([[1., 0.],
[0., 1.]])
>>> updated_cov(np.eye(2)*2)
array([[0.25, 0. ],
[0. , 0.25]])
>>> updated_cov(np.eye(3)[:, :2]*2, data_cov=np.eye(3))
array([[0.25, 0. ],
[0. , 0.25]])
>>> updated_cov(np.eye(3)[:, :2]*2, init_cov=np.eye(2))
array([[0.25, 0. ],
[0. , 0.25]])
"""
if init_cov is None:
init_cov = np.eye(X.shape[1])
else:
assert X.shape[1] == init_cov.shape[1]
if data_cov is None:
data_cov = np.eye(X.shape[0])
else:
assert X.shape[0] == data_cov.shape[1]
pred_cov = X @ init_cov @ X.T
inv_pred_cov = np.linalg.pinv(pred_cov)
# pinv b/c inv unstable for rank-deficient A
# Form derived via Hua's identity + Woodbury
K = init_cov @ X.T @ inv_pred_cov
up_cov = init_cov - K @ (pred_cov - data_cov) @ K.T
return up_cov
[docs]def lin_prob(A, b, y=None, mean=None, cov=None, data_cov=None, alpha=None):
"""
Linear Gaussian Problem Solver Entrypoint
"""
lin_prob = LinearGaussianProblem(
A,
b=b,
y=y,
mean_i=mean,
cov_i=cov,
cov_o=data_cov,
alpha=alpha,
)
return lin_prob
[docs]def mud_sol(A, b, y=None, mean=None, cov=None, data_cov=None):
"""
For SWE problem, we are inverting N(0,1).
This is the default value for `data_cov`.
"""
lp = lin_prob(A, b, y=y, mean=mean, cov=cov, data_cov=data_cov)
mud_pt = lp.solve(method="mud")
mud_pt = mud_pt if np.array(y).ndim > 1 else mud_pt.ravel()
return mud_pt
[docs]def mud_sol_with_cov(A, b, y=None, mean=None, cov=None, data_cov=None):
"""
Doesn't use R directly, uses new equations.
This presents the equation as a rank-k update
to the error of the initial estimate.
"""
lp = lin_prob(A, b, y=y, mean=mean, cov=cov, data_cov=data_cov)
mud_pt = lp.solve(method="mud_alt")
mud_pt = mud_pt if np.array(y).ndim > 1 else mud_pt.ravel()
return mud_pt, lp.up_cov
[docs]def map_sol(A, b, y=None, mean=None, cov=None, data_cov=None, w=1):
"""MAP Linear Gaussian Problem Solve"""
lp = lin_prob(A, b, y=y, mean=mean, cov=cov, data_cov=data_cov, alpha=w)
map_pt = lp.solve(method="map")
map_pt = map_pt if np.array(y).ndim > 1 else map_pt.ravel()
return map_pt
[docs]def map_sol_with_cov(A, b, y=None, mean=None, cov=None, data_cov=None, w=1):
"""MAP Linear Gaussian Problem Solve"""
lp = lin_prob(A, b, y=y, mean=mean, cov=cov, data_cov=data_cov, alpha=w)
map_pt = lp.solve(method="map")
map_pt = map_pt if np.array(y).ndim > 1 else map_pt.ravel()
return map_pt, lp.cov_p
[docs]def iter_lin_solve(
A,
b,
y=None,
mean=None,
cov=None,
data_cov=None,
method="mud",
num_epochs=1,
idx_order=None,
):
"""
Iterative Linear Gaussian Problem Solver Entrypoint
"""
lin_prob = IterativeLinearProblem(
A,
b=b,
y=y,
mean_i=mean,
cov_i=cov,
cov_o=data_cov,
alpha=1.0,
idx_order=idx_order,
)
res = lin_prob.solve(num_epochs=num_epochs, method=method)
return res
[docs]def iterate(A, b, y, initial_mean, initial_cov, data_cov=None, num_epochs=1, idx=None):
chain = performEpoch(A, b, y, initial_mean, initial_cov, data_cov, idx)
for _ in range(1, num_epochs):
chain += performEpoch(A, b, y, chain[-1], initial_cov, data_cov, idx)
return chain
[docs]def data_prob(
lam,
qoi,
qoi_true=None,
measurements=None,
std_dev=None,
sample_dist="uniform",
domain=None,
lam_ref=None,
times=None,
sensors=None,
idxs=None,
method="wme",
init_dist=dists.uniform(loc=0, scale=1),
):
"""
Data-Constructed Map Solve
Wrapper around SpatioTemporalProblem class to create and solve a MUD problem
by first aggregating observed and siumalated data in a data-constructed
qoi map.
"""
data = {
"lam": lam,
"data": qoi,
"true_vals": qoi_true,
"measurements": measurements,
"std_dev": std_dev,
"sample_dist": sample_dist,
"domain": domain,
"lam_ref": lam_ref,
"times": times,
"sensors": sensors,
}
sp_prob = SpatioTemporalProblem()
sp_prob.load(data)
D = sp_prob.mud_problem(method=method)
D.set_initial(init_dist)
D.set_observed(dists.norm(0, 1)) # always N(0,1) for WME map
D.estimate()
return D
[docs]def map_problem(lam, qoi, qoi_true, domain, sd=0.05, num_obs=None, log=False):
"""
Wrapper around map problem, takes in raw qoi + synthetic data and
instantiates solver object
"""
lam = lam.reshape(-1, 1) if lam.ndim == 1 else lam
qoi = qoi.reshape(-1, 1) if qoi.ndim == 1 else qoi
dim_output = qoi.shape[1]
if num_obs is None:
num_obs = dim_output
elif num_obs < 1:
raise ValueError("num_obs must be >= 1")
elif num_obs > dim_output:
raise ValueError("num_obs must be <= dim(qoi)")
data = qoi_true[0:num_obs] + np.random.randn(num_obs) * sd
likelihood = dists.norm(loc=data, scale=sd)
b = BayesProblem(lam, qoi[:, 0:num_obs], domain)
b.set_likelihood(likelihood, log=log)
return b
[docs]def mud_problem(
lam, qoi, qoi_true, domain, sd=0.05, num_obs=None, split=None, weights=None
):
"""
Wrapper around mud problem, takes in raw qoi + synthetic data and
performs WME transformation, instantiates solver object.
"""
if lam.ndim == 1:
lam = lam.reshape(-1, 1)
if qoi.ndim == 1:
qoi = qoi.reshape(-1, 1)
dim_output = qoi.shape[1]
if num_obs is None:
num_obs = dim_output
elif num_obs < 1:
raise ValueError("num_obs must be >= 1")
elif num_obs > dim_output:
raise ValueError("num_obs must be <= dim(qoi)")
# TODO: handle empty sd -> take it from the data.
# TODO: swap for data + leave noise generation separate. no randomness in method.
noise = np.random.randn(num_obs) * sd
if split is None:
# this is our data processing step.
data = qoi_true[0:num_obs] + noise
q = wme(qoi[:, 0:num_obs], data, sd).reshape(-1, 1)
else: # vector-valued QoI map. TODO: assert dimensions <= input_dim
q = []
for qoi_indices in split:
_q = qoi_indices[qoi_indices < num_obs]
_qoi = qoi[:, _q]
_data = np.array(qoi_true)[_q] + noise[_q]
_newqoi = wme(_qoi, _data, sd)
q.append(_newqoi)
q = np.vstack(q).T
# this implements density-based solutions, mud point method
d = DensityProblem(lam, q, domain, weights=weights)
return d