# Source code for mud.examples

```import numpy as np
from matplotlib import pyplot as plt  # type: ignore
from scipy.stats import distributions as ds  # type: ignore

from mud.base import DensityProblem, IterativeLinearProblem
from mud.funs import wme
from mud.util import std_from_equipment

[docs]def rotation_map(qnum=10, tol=0.1, b=None, ref_param=None, seed=None):
"""
Generate test data linear rotation map

"""
if seed is not None:
np.random.seed(seed)

vec = np.linspace(0, np.pi, qnum)
A = np.array([[np.sin(theta), np.cos(theta)] for theta in vec])
A = A.reshape(qnum, 2)
b = np.zeros((qnum, 1)) if b is None else b
ref_param = (
np.array([[0.5, 0.5]]).reshape(-1, 1) if ref_param is None else ref_param
)

# Compute observed value
y = A @ ref_param + b
initial_mean = np.random.randn(2).reshape(-1, 1)
initial_cov = np.eye(2) * std_from_equipment(tol)

return (A, b, y, initial_mean, initial_cov, ref_param)

[docs]def rotation_map_trials(
numQoI=10,
method="ordered",
num_trials=100,
model_eval_budget=100,
ax=None,
color="r",
label="Ordered QoI \$(10\\times 10D)\$",
seed=None,
):
"""
Run a set of trials for linear rotation map problems

"""

# Initialize plot if axis object is not passed in
if ax is None:
_, ax = plt.subplots(1, 1, figsize=(20, 10))

# Build Rotation Map. This will initialize seed of trial if specified
A, b, y, initial_mean, initial_cov, ref_param = rotation_map(qnum=numQoI, seed=seed)

# Calcluate number of epochs per trial using budget and number of QoI
num_epochs = model_eval_budget // numQoI

errors = []
for trial in range(num_trials):
# Get a new random initial mean to start from per trial on same problem
initial_mean = np.random.rand(2, 1)

# Initialize number of epochs and idx choices to use on this trial
epochs = num_epochs
choice = np.arange(numQoI)

# Modify epochs/choices based off of method
if method == "ordered":
# Ordered - GO through each row in order once per epoch, each trial
epochs = num_epochs
elif method == "shuffle":
# Shuffled - Shuffle rows on each trial, in order once per epoch
np.random.shuffle(choice)
epochs = num_epochs
elif method == "batch":
# Batch - Perform only one epoch, but iterate in random
#         order num_epochs times over each row of A
choice = list(np.arange(numQoI)) * num_epochs
np.random.shuffle(choice)
epochs = 1
elif method == "random":
# Randoms - Perform only one epoch, but do num_epochs*rows
#           random choices of rows of A, with replacement
choice = np.random.choice(np.arange(numQoI), size=num_epochs * numQoI)
epochs = 1

# Initialize Iterative Linear Problem and solve using number of epochs
prob = IterativeLinearProblem(
A, b=b, y=y, initial_mean=initial_mean, cov=initial_cov, idx_order=choice
)
_ = prob.solve(num_epochs=epochs)

# Plot errors with respect to reference parameter over each iteration
prob.plot_chain_error(ref_param, alpha=0.1, ax=ax, color=color, fontsize=36)

# Append to erros matrix to calculate mean error accross trials
errors.append(prob.get_errors(ref_param))

# Compute mean errors at each iteration across all trials
avg_errs = np.mean(np.array(errors), axis=0)

# Plot mean errors
ax.plot(avg_errs, color, lw=5, label=label)

[docs]def identity_uniform_1D_density_prob(
num_samples=2000,
num_obs=20,
y_true=0.5,
noise=0.05,
weights=None,
domain=[0, 1],
wme_map=True,
analytical_pred=True,
):
"""
1D Density Problem using WME on identity map with uniform initial

Sets up a Density Problem using a given domain (unit by default) and a
uniform initial distribution under an identity map and the Weighted
Mean Error map to . This function is used
as a set-up for tejjsts to the DensityProblem class.

`num_obs` observations
are collected from an initial distribution and used as the true signal,
with noise being added to each observation.
Sets up an inverse problem using the unit domain and uniform distribution
under an identity map. This is equivalent to studying a
\"steady state\" signal over time, or taking repeated measurements
of the same quantity to reduce variance in the uncertainty.
"""
init_dist = ds.uniform(loc=domain, scale=domain - domain)
X = init_dist.rvs(size=(num_samples, 1))

if wme_map:
y_pred = np.repeat(X, num_obs, 1)
# data is truth + noise
y_observed = y_true * np.ones(num_obs) + noise * np.random.randn(num_obs)
Y = wme(y_pred, y_observed, sd=noise)

# Build Density problem of M(X) = WME(X,y_observed) over domain
D = DensityProblem(X, Y, np.array([domain]), weights=weights)
D.set_initial(init_dist)

if analytical_pred:
# analytical construction of predicted domain under identity map.
y_domain = np.repeat(np.array([, ]), num_obs, 1)
mn, mx = wme(y_domain, y_observed, sd=noise)
loc, scale = mn, mx - mn
D.set_predicted(ds.uniform(loc=loc, scale=scale))
else:
# Build Density problem of M(X) = X over domain
D = DensityProblem(X, X, np.array([domain]), weights=weights)
D.set_initial(init_dist)

if analytical_pred:
D.set_predicted(init_dist)

return D
```