# -*- coding: utf-8 -*-
"""
Python console script for `mud`, installed with
`pip install .` or `python setup.py install`
"""
import argparse
import sys
import logging
import numpy as np
from mud import __version__
from mud.base import DensityProblem, BayesProblem
from scipy.stats import distributions as dists
__author__ = "Mathematical Michael"
__copyright__ = "Mathematical Michael"
__license__ = "mit"
_logger = logging.getLogger(__name__)
[docs]def parse_args(args):
"""Parse command line parameters
Args:
args ([str]): command line parameters as list of strings
Returns:
:obj:`argparse.Namespace`: command line parameters namespace
"""
parser = argparse.ArgumentParser(
description="Demonstration of analytical MUD point"
)
parser.add_argument(
"--version", action="version", version="mud {ver}".format(ver=__version__)
)
parser.add_argument(dest="n", help="Number of QoI", type=int, metavar="INT")
parser.add_argument(
"-v",
"--verbose",
dest="loglevel",
help="set loglevel to INFO",
action="store_const",
const=logging.INFO,
)
parser.add_argument(
"-vv",
"--very-verbose",
dest="loglevel",
help="set loglevel to DEBUG",
action="store_const",
const=logging.DEBUG,
)
return parser.parse_args(args)
[docs]def setup_logging(loglevel):
"""Setup basic logging
Args:
loglevel (int): minimum loglevel for emitting messages
"""
logformat = "[%(asctime)s] %(levelname)s:%(name)s:%(message)s"
logging.basicConfig(
level=loglevel, stream=sys.stdout, format=logformat, datefmt="%Y-%m-%d %H:%M:%S"
)
[docs]def main(args):
"""Main entry point allowing external calls
Args:
args ([str]): command line parameter list
"""
args = parse_args(args)
setup_logging(args.loglevel)
_logger.debug("Starting crazy calculations...")
print("Using {} Quantities of Interest".format(args.n))
_logger.info("Script end.")
[docs]def run():
"""Entry point for console_scripts"""
main(sys.argv[1:])
############################################################
[docs]def wme(X, data, sd=None):
if sd is None:
sd = np.std(data)
if X.ndim == 1:
X = X.reshape(1, -1)
num_evals = X.shape[0]
assert X.shape[1] == len(data)
residuals = np.subtract(X, data)
weighted_residuals = np.divide(residuals, sd)
assert weighted_residuals.shape[0] == num_evals
weighted_sum = np.sum(weighted_residuals, axis=1)
return weighted_sum / np.sqrt(len(data))
[docs]def makeRi(A, initial_cov):
predicted_cov = A @ initial_cov @ A.T
if isinstance(predicted_cov, float):
ipc = 1.0 / predicted_cov * np.eye(1)
else:
ipc = np.linalg.inv(predicted_cov)
Ri = np.linalg.inv(initial_cov) - A.T @ ipc @ A
return Ri
[docs]def check_args(A, b, y, mean, cov, data_cov):
n_samples, dim_input = A.shape
if data_cov is None:
data_cov = np.eye(n_samples)
if cov is None:
cov = np.eye(dim_input)
if mean is None:
mean = np.zeros((dim_input, 1))
if b is None:
b = np.zeros((n_samples, 1))
if y is None:
y = np.zeros(n_samples)
ravel = False
if y.ndim == 1:
y = y.reshape(-1, 1)
ravel = True
if b.ndim == 1:
b = b.reshape(-1, 1)
if mean.ndim == 1:
mean = mean.reshape(-1, 1)
n_data, n_targets = y.shape
if n_samples != n_data:
raise ValueError(
"Number of samples in X and y does not correspond:"
" %d != %d" % (n_samples, n_data)
)
z = y - b - A @ mean
return ravel, z, mean, cov, data_cov
[docs]def mud_sol(A, b, y=None, mean=None, cov=None, data_cov=None, return_pred=False):
"""
For SWE problem, we are inverting N(0,1).
This is the default value for `data_cov`.
"""
ravel, z, mean, cov, _ = check_args(A, b, y, mean, cov, data_cov)
inv_pred_cov = np.linalg.pinv(A @ cov @ A.T)
update = cov @ A.T @ inv_pred_cov
mud_point = mean + update @ z
if ravel:
# When y was passed as a 1d-array, we flatten the coefficients.
mud_point = mud_point.ravel()
if return_pred:
return mud_point, update
else:
return mud_point
[docs]def updated_cov(X, init_cov=None, data_cov=None):
"""
We start with the posterior covariance from ridge regression
Our matrix R = init_cov^(-1) - X.T @ pred_cov^(-1) @ X
replaces the init_cov from the posterior covariance equation.
Simplifying, this is given as the following, which is not used
due to issues of numerical stability (a lot of inverse operations).
up_cov = (X.T @ np.linalg.inv(data_cov) @ X + R )^(-1)
up_cov = np.linalg.inv(\
X.T@(np.linalg.inv(data_cov) - inv_pred_cov)@X + \
np.linalg.inv(init_cov) )
We return the updated covariance using a form of it derived
which applies Hua's identity in order to use Woodbury's identity.
>>> updated_cov(np.eye(2))
array([[1., 0.],
[0., 1.]])
>>> updated_cov(np.eye(2)*2)
array([[0.25, 0. ],
[0. , 0.25]])
>>> updated_cov(np.eye(3)[:, :2]*2, data_cov=np.eye(3))
array([[0.25, 0. ],
[0. , 0.25]])
>>> updated_cov(np.eye(3)[:, :2]*2, init_cov=np.eye(2))
array([[0.25, 0. ],
[0. , 0.25]])
"""
if init_cov is None:
init_cov = np.eye(X.shape[1])
else:
assert X.shape[1] == init_cov.shape[1]
if data_cov is None:
data_cov = np.eye(X.shape[0])
else:
assert X.shape[0] == data_cov.shape[1]
pred_cov = X @ init_cov @ X.T
inv_pred_cov = np.linalg.pinv(pred_cov)
# pinv b/c inv unstable for rank-deficient A
# Form derived via Hua's identity + Woodbury
K = init_cov @ X.T @ inv_pred_cov
up_cov = init_cov - K @ (pred_cov - data_cov) @ K.T
return up_cov
[docs]def mud_sol_alt(A, b, y=None, mean=None, cov=None, data_cov=None, return_pred=False):
"""
Doesn't use R directly, uses new equations.
This presents the equation as a rank-k update
to the error of the initial estimate.
"""
ravel, z, mean, cov, data_cov = check_args(A, b, y, mean, cov, data_cov)
up_cov = updated_cov(X=A, init_cov=cov, data_cov=data_cov)
update = up_cov @ A.T @ np.linalg.inv(data_cov)
mud_point = mean + update @ z
if ravel:
# When y was passed as a 1d-array, we flatten the coefficients.
mud_point = mud_point.ravel()
if return_pred:
return mud_point, update
else:
return mud_point
[docs]def map_sol(A, b, y=None, mean=None, cov=None, data_cov=None, w=1, return_pred=False):
ravel, z, mean, cov, data_cov = check_args(A, b, y, mean, cov, data_cov)
inv = np.linalg.inv
post_cov = inv(A.T @ inv(data_cov) @ A + w * inv(cov))
update = post_cov @ A.T @ inv(data_cov)
map_point = mean + update @ z
if ravel:
# When y was passed as a 1d-array, we flatten the coefficients.
map_point = map_point.ravel()
if return_pred:
return map_point, update
else:
return map_point
[docs]def iterate(A, b, y, initial_mean, initial_cov, data_cov=None, num_epochs=1, idx=None):
chain = performEpoch(A, b, y, initial_mean, initial_cov, data_cov, idx)
for _ in range(1, num_epochs):
chain += performEpoch(A, b, y, chain[-1], initial_cov, data_cov, idx)
return chain
[docs]def mud_problem(lam, qoi, qoi_true, domain, sd=0.05, num_obs=None, split=None):
"""
Wrapper around mud problem, takes in raw qoi + synthetic data and
performs WME transformation, instantiates solver object
"""
if lam.ndim == 1:
lam = lam.reshape(-1, 1)
if qoi.ndim == 1:
qoi = qoi.reshape(-1, 1)
dim_output = qoi.shape[1]
if num_obs is None:
num_obs = dim_output
elif num_obs < 1:
raise ValueError("num_obs must be >= 1")
elif num_obs > dim_output:
raise ValueError("num_obs must be <= dim(qoi)")
# TODO: handle empty sd -> take it from the data.
# TODO: swap for data + leave noise generation separate. no randomness in method.
noise = np.random.randn(num_obs) * sd
if split is None:
# this is our data processing step.
data = qoi_true[0:num_obs] + noise
q = wme(qoi[:, 0:num_obs], data, sd).reshape(-1, 1)
else: # vector-valued QoI map. TODO: assert dimensions <= input_dim
q = []
for qoi_indices in split:
_q = qoi_indices[qoi_indices < num_obs]
_qoi = qoi[:, _q]
_data = np.array(qoi_true)[_q] + noise[_q]
_newqoi = wme(_qoi, _data, sd)
q.append(_newqoi)
q = np.vstack(q).T
# this implements density-based solutions, mud point method
d = DensityProblem(lam, q, domain)
return d
[docs]def map_problem(lam, qoi, qoi_true, domain, sd=0.05, num_obs=None, log=False):
"""
Wrapper around map problem, takes in raw qoi + synthetic data and
instantiates solver object
"""
if lam.ndim == 1:
lam = lam.reshape(-1, 1)
if qoi.ndim == 1:
qoi = qoi.reshape(-1, 1)
dim_output = qoi.shape[1]
if num_obs is None:
num_obs = dim_output
elif num_obs < 1:
raise ValueError("num_obs must be >= 1")
elif num_obs > dim_output:
raise ValueError("num_obs must be <= dim(qoi)")
# this is our data processing step.
data = qoi_true[0:num_obs] + np.random.randn(num_obs) * sd
# likelihood = dists.norm(loc=qoi[:, :num_obs], scale=sd)
likelihood = dists.norm(loc=data, scale=sd)
# this implements bayesian likelihood solutions, map point method
b = BayesProblem(lam, qoi[:, 0:num_obs], domain)
b.set_likelihood(likelihood, log=log)
return b
if __name__ == "__main__":
run()