"""
Parameter uncertainties are computed using Godambe information, described in
Coffman et al, MBE (2016). doi: https://doi.org/10.1093/molbev/msv255
If you use moments.LD.Godambe to compute parameter uncertainties, please cite
that paper. This was first developed by Alec Coffman for computing uncertainties
from inferences performed with dadi, modified here to handle LD decay curves.
"""
import numpy as np, numpy
from moments.LD import Inference
from moments.LD.LDstats_mod import LDstats
import copy
# from moments.Misc import delayed_flush
from .. import Misc
import sys, os
def _hessian_elem(func, f0, p0, ii, jj, eps, args=(), one_sided=None):
"""
Calculate element [ii][jj] of the Hessian matrix, a matrix
of partial second derivatives w.r.t. to parameters ii and jj
:param func: Model function
:param f0: Evaluation of func at p0
:param p0: Parameters for func
:param eps: List of absolute step sizes to use for each parameter when taking
finite differences.
:param args: Additional arguments to func
:param one_sided: Optionally, pass in a sequence of length p0 that determines
whether a one-sided derivative will be used for each parameter.
"""
# Note that we need to specify dtype=float, to avoid this being an integer
# array which will silently fail when adding fractional eps.
if one_sided is None:
one_sided = [False] * len(p0)
pwork = numpy.array(p0, copy=True, dtype=float)
if ii == jj:
if pwork[ii] != 0 and not one_sided[ii]:
pwork[ii] = p0[ii] + eps[ii]
fp = func(pwork, *args)
pwork[ii] = p0[ii] - eps[ii]
fm = func(pwork, *args)
element = (fp - 2 * f0 + fm) / eps[ii] ** 2
else:
pwork[ii] = p0[ii] + 2 * eps[ii]
fpp = func(pwork, *args)
pwork[ii] = p0[ii] + eps[ii]
fp = func(pwork, *args)
element = (fpp - 2 * fp + f0) / eps[ii] ** 2
else:
if (
pwork[ii] != 0
and pwork[jj] != 0
and not one_sided[ii]
and not one_sided[jj]
):
# f(xi + hi, xj + h)
pwork[ii] = p0[ii] + eps[ii]
pwork[jj] = p0[jj] + eps[jj]
fpp = func(pwork, *args)
# f(xi + hi, xj - hj)
pwork[ii] = p0[ii] + eps[ii]
pwork[jj] = p0[jj] - eps[jj]
fpm = func(pwork, *args)
# f(xi - hi, xj + hj)
pwork[ii] = p0[ii] - eps[ii]
pwork[jj] = p0[jj] + eps[jj]
fmp = func(pwork, *args)
# f(xi - hi, xj - hj)
pwork[ii] = p0[ii] - eps[ii]
pwork[jj] = p0[jj] - eps[jj]
fmm = func(pwork, *args)
element = (fpp - fpm - fmp + fmm) / (4 * eps[ii] * eps[jj])
else:
# f(xi + hi, xj + h)
pwork[ii] = p0[ii] + eps[ii]
pwork[jj] = p0[jj] + eps[jj]
fpp = func(pwork, *args)
# f(xi + hi, xj)
pwork[ii] = p0[ii] + eps[ii]
pwork[jj] = p0[jj]
fpm = func(pwork, *args)
# f(xi, xj + hj)
pwork[ii] = p0[ii]
pwork[jj] = p0[jj] + eps[jj]
fmp = func(pwork, *args)
element = (fpp - fpm - fmp + f0) / (eps[ii] * eps[jj])
return element
def _get_hess(func, p0, eps, args=()):
"""
Calculate Hessian matrix of partial second derivatives.
Hij = dfunc/(dp_i dp_j)
:param func: Model function
:param p0: Parameter values to take derivative around
:param eps: Fractional stepsize to use when taking finite-difference derivatives
Note that if eps*param is < 1e-12, then the step size for that parameter
will simply be eps, to avoid numerical issues with small parameter
perturbations.
:param args: Additional arguments to func
"""
# Calculate step sizes for finite-differences.
eps_in = eps
eps = numpy.empty([len(p0)])
one_sided = [False] * len(p0)
for i, pval in enumerate(p0):
if pval != 0:
# Account for floating point arithmetic issues
if pval * eps_in < 1e-12:
eps[i] = eps_in
one_sided[i] = True
else:
eps[i] = eps_in * pval
else:
# Account for parameters equal to zero
eps[i] = eps_in
f0 = func(p0, *args)
hess = numpy.empty((len(p0), len(p0)))
for ii in range(len(p0)):
for jj in range(ii, len(p0)):
hess[ii][jj] = _hessian_elem(
func, f0, p0, ii, jj, eps, args=args, one_sided=one_sided
)
hess[jj][ii] = hess[ii][jj]
return hess
def _get_grad(func, p0, eps, args=()):
"""
Calculate gradient vector
:param func: Model function
:param p0: Parameters for func
:param eps: Fractional stepsize to use when taking finite-difference derivatives
Note that if eps*param is < 1e-12, then the step size for that parameter
will simply be eps, to avoid numerical issues with small parameter
perturbations.
:param args: Additional arguments to func
"""
# Calculate step sizes for finite-differences.
eps_in = eps
eps = numpy.empty([len(p0)])
one_sided = [False] * len(p0)
for i, pval in enumerate(p0):
if pval != 0:
# Account for floating point arithmetic issues
if pval * eps_in < 1e-12:
eps[i] = eps_in
one_sided[i] = True
else:
eps[i] = eps_in * pval
else:
# Account for parameters equal to zero
eps[i] = eps_in
grad = numpy.empty([len(p0), 1])
for ii in range(len(p0)):
pwork = numpy.array(p0, copy=True, dtype=float)
if p0[ii] != 0 and not one_sided[ii]:
pwork[ii] = p0[ii] + eps[ii]
fp = func(pwork, *args)
pwork[ii] = p0[ii] - eps[ii]
fm = func(pwork, *args)
grad[ii] = (fp - fm) / (2 * eps[ii])
else:
# Do one-sided finite-difference
pwork[ii] = p0[ii] + eps[ii]
fp = func(pwork, *args)
pwork[ii] = p0[ii]
fm = func(pwork, *args)
grad[ii] = (fp - fm) / eps[ii]
return grad
_ld_cache = {}
def _get_godambe(
func_ex, all_boot, p0, ms, vcs, eps, statistics, log=False, just_hess=False
):
"""
Godambe information and Hessian matrices
:param func_ex: Model function
:param all_boot: List of bootstrap frequency spectra
:param p0: Best-fit parameters for func_ex.
:param ms: Original data of statistics means.
:param vcs: Original data of statistics variance covariance matrices.
:param eps: Fractional stepsize to use when taking finite-difference derivatives
Note that if eps*param is < 1e-12, then the step size for that parameter
will simply be eps, to avoid numerical issues with small parameter
perturbations.
:param statistics:
:param log: If True, calculate derivatives in terms of log-parameters
:param just_hess: If True, only evaluate and return the Hessian matrix
"""
# Cache evaluations of the LDstats inside our hessian/J
# evaluation function
def func(params, m, v):
key = tuple(params)
if key not in _ld_cache:
_ld_cache[key] = func_ex(params, statistics)
y = _ld_cache[key]
ll = Inference.ll_over_bins(y, m, v)
return ll
def log_func(logparams, m, v):
return func(numpy.exp(logparams), m, v)
# First calculate the observed hessian.
if not log:
hess = -_get_hess(func, p0, eps, args=[ms, vcs])
else:
hess = -_get_hess(log_func, numpy.log(p0), eps, args=[ms, vcs])
if just_hess:
return hess
# Now the expectation of J over the bootstrap data
J = numpy.zeros((len(p0), len(p0)))
# cU is a column vector
cU = numpy.zeros((len(p0), 1))
for bs_ms in all_boot:
# boot = LDstats(boot)
if not log:
grad_temp = _get_grad(func, p0, eps, args=[bs_ms, vcs])
else:
grad_temp = _get_grad(log_func, numpy.log(p0), eps, args=[bs_ms, vcs])
J_temp = numpy.outer(grad_temp, grad_temp)
J = J + J_temp
cU = cU + grad_temp
J = J / len(all_boot)
cU = cU / len(all_boot)
# G = H*J^-1*H
J_inv = numpy.linalg.inv(J)
godambe = numpy.dot(numpy.dot(hess, J_inv), hess)
return godambe, hess, J, cU
def _remove_normalized_data(statistics, normalization, means, varcovs, all_boot):
# get indexes to remove
pi2_idx_to_del = statistics[0].index("pi2_{0}_{0}_{0}_{0}".format(normalization))
H_idx_to_del = statistics[1].index("H_{0}_{0}".format(normalization))
# remove from means
for ii in range(len(means) - 1):
means[ii] = np.delete(means[ii], pi2_idx_to_del)
means[-1] = np.delete(means[-1], H_idx_to_del)
# remove from varcovs
for ii in range(len(varcovs) - 1):
varcovs[ii] = np.delete(varcovs[ii], pi2_idx_to_del, axis=0)
varcovs[ii] = np.delete(varcovs[ii], pi2_idx_to_del, axis=1)
varcovs[-1] = np.delete(varcovs[-1], H_idx_to_del, axis=0)
varcovs[-1] = np.delete(varcovs[-1], H_idx_to_del, axis=1)
# remove from all_boot
if len(all_boot) > 0:
for jj, boot in enumerate(all_boot):
for ii in range(len(boot) - 1):
all_boot[jj][ii] = np.delete(boot[ii], pi2_idx_to_del)
all_boot[jj][-1] = np.delete(boot[-1], H_idx_to_del)
else:
all_boot = []
# remove from statistics lists
statistics[0].pop(pi2_idx_to_del)
statistics[1].pop(H_idx_to_del)
return statistics, means, varcovs, all_boot
_func_calls = 0
_output_stream = sys.stdout
def _expected_number_of_calls(params):
n = len(params)
return 4 * n * (n - 1) // 2 + 2 * n + 1
[docs]def GIM_uncert(
model_func,
all_boot,
p0,
ms,
vcs,
log=False,
eps=0.01,
return_GIM=False,
r_edges=None,
normalization=0,
pass_Ne=False,
statistics=None,
verbose=0,
):
"""
Parameter uncertainties from Godambe Information Matrix (GIM). If you use this
method, please cite
`Coffman et al., MBE (2016) <https://doi.org/10.1093/molbev/msv255>`_.
Returns standard deviations of parameter values.
:param model_func: Model function
:param all_boot: List of bootstrap LD stat means [m0, m1, m2, ...]
:param p0: Best-fit parameters for model_func, with inferred Ne in last entry of
parameter list.
:param ms: See below..
:param vcs: Original means and covariances of statistics from data. If statistics
are not give, we remove the normalizing statistics. Otherwise, these need
to be pared down so that the normalizing statistics are removed.
:param eps: Fractional stepsize to use when taking finite-difference derivatives.
Note that if eps*param is < 1e-12, then the step size for that parameter
will simply be eps, to avoid numerical issues with small parameter
perturbations.
:param log: If True, assume log-normal distribution of parameters. Returned values
are then the standard deviations of the *logs* of the parameter values,
which can be interpreted as relative parameter uncertainties.
:param return_GIM: If true, also return the full GIM.
:param r_edges: The bin edges for LD statistics.
:param normalization: The index of the population that we normalized by.
:param pass_Ne: If True, Ne is a parameter in the model function, and by convention
is the last entry in the parameters list. If False, Ne is only used to scale
recombination rates.
:param statistics: Statistics that we have included given as a list of lists:
[ld_stats, h_stats]. If statistics is not given, we assume all statistics
are included except for the normalizing statistic in each bin
:param verbose: If an integer greater than 0, prints updates
of the number of function calls and tested parameters
at intervals given by that spacing.
:type verbose: int, optional
"""
means = copy.deepcopy(ms)
varcovs = copy.deepcopy(vcs)
all_boots = copy.deepcopy(all_boot)
rs = np.array(r_edges)
if verbose > 0:
_output_stream.write(
"Expected number of function calls: "
+ str(_expected_number_of_calls(p0))
+ os.linesep
)
Misc.delayed_flush(delay=0.5)
if statistics is None:
# get statistics
if pass_Ne:
y = model_func(p0)
else:
y = model_func(p0[:-1])
statistics = y.names()
statistics, means, varcovs, all_boots = _remove_normalized_data(
statistics, normalization, means, varcovs, all_boots
)
def pass_func(params, statistics):
global _func_calls
_func_calls += 1
rho = 4 * params[-1] * rs
if pass_Ne:
y = Inference.bin_stats(model_func, params, rho=rho)
else:
y = Inference.bin_stats(model_func, params[:-1], rho=rho)
y = Inference.sigmaD2(y, normalization=normalization)
y = Inference.remove_nonpresent_statistics(y, statistics)
if (verbose > 0) and (_func_calls % verbose == 0):
param_str = "array([%s])" % (", ".join(["%- 12g" % v for v in params]))
_output_stream.write("%-8i, %s%s" % (_func_calls, param_str, os.linesep))
Misc.delayed_flush(delay=0.5)
return y
GIM, H, J, cU = _get_godambe(
pass_func, all_boots, p0, means, varcovs, eps, statistics, log=log
)
uncerts = numpy.sqrt(numpy.diag(numpy.linalg.inv(GIM)))
if not return_GIM:
return uncerts
else:
return uncerts, GIM
[docs]def FIM_uncert(
model_func,
p0,
ms,
vcs,
log=False,
eps=0.01,
r_edges=None,
normalization=0,
pass_Ne=False,
statistics=None,
verbose=0,
):
"""
Parameter uncertainties from Fisher Information Matrix. This approach typically
underestimates the size of the true confidence intervals, as it does not take
into account linkage between loci that causes data to be non-independent.
Returns standard deviations of parameter values.
:param model_func: Model function
:param p0: Best-fit parameters for model_func, with inferred Ne in last entry of
parameter list.
:param ms: See below..
:param vcs: Original means and covariances of statistics from data. If statistics
are not give, we remove the normalizing statistics. Otherwise, these need
to be pared down so that the normalizing statistics are removed.
:param eps: Fractional stepsize to use when taking finite-difference derivatives.
Note that if eps*param is < 1e-12, then the step size for that parameter
will simply be eps, to avoid numerical issues with small parameter
perturbations.
:param log: If True, assume log-normal distribution of parameters. Returned values
are then the standard deviations of the *logs* of the parameter values,
which can be interpreted as relative parameter uncertainties.
:param return_GIM: If true, also return the full GIM.
:param r_edges: The bin edges for LD statistics.
:param normalization: The index of the population that we normalized by.
:param pass_Ne: If True, Ne is a parameter in the model function, and by convention
is the last entry in the parameters list. If False, Ne is only used to scale
recombination rates.
:param statistics: Statistics that we have included given as a list of lists:
[ld_stats, h_stats]. If statistics is not given, we assume all statistics
are included except for the normalizing statistic in each bin
:param verbose: If an integer greater than 0, prints updates
of the number of function calls and tested parameters
at intervals given by that spacing.
:type verbose: int, optional
"""
means = copy.deepcopy(ms)
varcovs = copy.deepcopy(vcs)
rs = np.array(r_edges)
if verbose > 0:
_output_stream.write(
"Expected number of function calls: "
+ str(_expected_number_of_calls(p0))
+ os.linesep
)
Misc.delayed_flush(delay=0.5)
if statistics is None:
# get statistics
if pass_Ne:
y = model_func(p0)
else:
y = model_func(p0[:-1])
statistics = y.names()
statistics, means, varcovs, _ = _remove_normalized_data(
statistics, normalization, means, varcovs, []
)
def pass_func(params, statistics):
global _func_calls
_func_calls += 1
rho = 4 * params[-1] * rs
if pass_Ne:
y = Inference.bin_stats(model_func, params, rho=rho)
else:
y = Inference.bin_stats(model_func, params[:-1], rho=rho)
y = Inference.sigmaD2(y, normalization=normalization)
y = Inference.remove_nonpresent_statistics(y, statistics)
if (verbose > 0) and (_func_calls % verbose == 0):
param_str = "array([%s])" % (", ".join(["%- 12g" % v for v in params]))
_output_stream.write("%-8i, %s%s" % (_func_calls, param_str, os.linesep))
Misc.delayed_flush(delay=0.5)
return y
H = _get_godambe(
pass_func, 0, p0, means, varcovs, eps, statistics, log=log, just_hess=True
)
uncerts = numpy.sqrt(numpy.diag(numpy.linalg.inv(H)))
return uncerts