# Source code for moments.LD.Util

```import numpy as np

"""
Heterozygosity and LD stat names
"""

[docs]def het_names(num_pops):
"""
Returns the heterozygosity statistic representation names.

:param int num_pops: Number of populations.
"""
Hs = []
for ii in range(num_pops):
for jj in range(ii, num_pops):
Hs.append("H_{0}_{1}".format(ii, jj))
return Hs

[docs]def ld_names(num_pops):
"""
Returns the LD statistic representation names.

:param int num_pops: Number of populations.
"""
Ys = []
for ii in range(num_pops):
for jj in range(ii, num_pops):
Ys.append("DD_{0}_{1}".format(ii, jj))
for ii in range(num_pops):
for jj in range(num_pops):
for kk in range(jj, num_pops):
Ys.append("Dz_{0}_{1}_{2}".format(ii, jj, kk))
for ii in range(num_pops):
for jj in range(ii, num_pops):
for kk in range(ii, num_pops):
for ll in range(kk, num_pops):
if kk == ii == ll and jj != ii:
continue
if ii == kk and ll < jj:
continue
Ys.append("pi2_{0}_{1}_{2}_{3}".format(ii, jj, kk, ll))
return Ys

[docs]def moment_names(num_pops):
"""
Returns a tuple of length two with LD and heterozygosity moment names.

:param int num_pops: Number of populations
"""
hn = het_names(num_pops)
yn = ld_names(num_pops)
return (yn, hn)

##
## We need to map moments for split and rearrangement functions
##
mom_map = {}

[docs]def map_moment(mom):
"""
There are repeated moments with equal expectations, so we collapse them into
the same moment.

:param str mom: The moment to map to its "canonical" name.
"""
try:
return mom_map[mom]
except KeyError:
if mom.split("_") == "DD":
pops = sorted([int(p) for p in mom.split("_")[1:]])
mom_out = "DD_" + "_".join([str(p) for p in pops])
mom_map[mom] = mom_out
elif mom.split("_") == "Dz":
popD = mom.split("_")
popsz = sorted([int(p) for p in mom.split("_")[2:]])
mom_out = "Dz_" + popD + "_" + "_".join([str(p) for p in popsz])
mom_map[mom] = mom_out
elif mom.split("_") == "pi2":
popsp = sorted([int(p) for p in mom.split("_")[1:3]])
popsq = sorted([int(p) for p in mom.split("_")[3:]])
## pi2_2_2_1_1 -> pi2_1_1_2_2
## pi2_1_2_1_1 -> pi2_1_1_1_2,
## pi2_2_2_1_3 -> pi2_1_3_2_2
if popsp > popsq:  # switch them
mom_out = (
"pi2_"
+ "_".join([str(p) for p in popsq])
+ "_"
+ "_".join([str(p) for p in popsp])
)
elif popsp == popsq and popsp > popsq:  # switch them
mom_out = (
"pi2_"
+ "_".join([str(p) for p in popsq])
+ "_"
+ "_".join([str(p) for p in popsp])
)
else:
mom_out = (
"pi2_"
+ "_".join([str(p) for p in popsp])
+ "_"
+ "_".join([str(p) for p in popsq])
)
mom_map[mom] = mom_out
elif mom.split("_") == "H":
pops = sorted([int(p) for p in mom.split("_")[1:]])
mom_out = "H_" + "_".join([str(p) for p in pops])
mom_map[mom] = mom_out
else:
mom_out = mom
mom_map[mom] = mom_out
return mom_map[mom]

[docs]def perturb_params(params, fold=1, lower_bound=None, upper_bound=None):
"""
Generate a perturbed set of parameters. Each element of params is randomly
perturbed by the given factor of 2 up or down.

:param list params: A list of input parameters.
:param float fold: Number of factors of 2 to perturb by.
:param list lower_bound: If not None, the resulting parameter set is adjusted
to have all value greater than lower_bound. Must have equal length to
``params``.
:param list upper_bound: If not None, the resulting parameter set is adjusted
to have all value less than upper_bound. Must have equal length to
``params``.
"""
pnew = params * 2 ** (fold * (2 * np.random.random(len(params)) - 1))
if lower_bound is not None:
for ii, bound in enumerate(lower_bound):
if bound is None:
lower_bound[ii] = -np.inf
pnew = np.maximum(pnew, 1.01 * np.asarray(lower_bound))
if upper_bound is not None:
for ii, bound in enumerate(upper_bound):
if bound is None:
upper_bound[ii] = np.inf
pnew = np.minimum(pnew, 0.99 * np.asarray(upper_bound))
return pnew

[docs]def rescale_params(params, types, Ne=None, gens=1, uncerts=None, time_offset=0):
"""
Rescale parameters to physical units, so that times are in generations or years,
sizes in effective instead of relative sizes, and migration probabilities in
per-generation units.

For generation times of events to be correctly rescaled, times in the
parameters list must be specified so that earlier epochs are earlier in the
list, because we return rescaled cumulative times. All time parameters must
refer to consecutive epochs. Epochs need not start at contemporary time, and
we can specify the time offset using `time_offset`.

If uncertainties are not given (`uncerts = None`), the return value is an
array of rescaled parameters. If uncertainties are given, the return value
has length two: the first entry is an array of rescaled parameters, and the
second entry is an array of rescaled uncertainties.

:param list params: List of parameters.
:param list types: List of parameter types. Times are given by "T", sizes by "nu",
effective size by "Ne", migration rates by "m", and fractions by "x" or "f".
:param float Ne: The effective population size, typically as the last entry in
``params``.
:param float gens: The generation time.
:param list uncerts: List of uncertainties, same length as ``params``.
:param time_offset: The amount of time added to each rescaled time point. This
lets us have consecutive epochs that stop short of time 0 (final sampling
time).
:type time_offset: int or float
"""
if Ne is not None and "Ne" in types:
raise ValueError(
"Ne must be passed as a keywork argument or "
"specified in types, but not both."
)
elif Ne is None and "Ne" not in types:
raise ValueError("Ne must be given or must exist in the types list")

if types.count("Ne") > 1:
raise ValueError("Ne can only be defined once in the parameters list")

if Ne is None:
Ne = params[types.index("Ne")]

if len(params) != len(types):
raise ValueError("types and params must have same length")

if uncerts is not None:
assert len(params) == len(uncerts)

# rescale the params
# go backwards to add times in reverse
elapsed_t = 0
rescaled_params = np.array([0.0 for _ in params])
for ii, p in reversed(list(enumerate(params))):
if types[ii] == "T":
elapsed_t += p * 2 * Ne * gens
rescaled_params[ii] = elapsed_t + time_offset
elif types[ii] == "m":
rescaled_params[ii] = p / 2 / Ne
elif types[ii] == "nu":
rescaled_params[ii] = p * Ne
elif types[ii] in ["x", "f"]:
rescaled_params[ii] = p
elif types[ii] == "Ne":
rescaled_params[ii] = Ne
else:
raise ValueError("Unrecognized parameter type", types[ii])

if uncerts is None:
return rescaled_params
else:
## if uncerts are given
# rescale the uncerts
rescaled_uncerts = np.array([0.0 for _ in params])
for ii, p in enumerate(uncerts):
if types[ii] == "T":
rescaled_uncerts[ii] = p * 2 * Ne * gens
elif types[ii] == "m":
rescaled_uncerts[ii] = p / 2 / Ne
elif types[ii] == "nu":
rescaled_uncerts[ii] = p * Ne
else:
rescaled_uncerts[ii] = p
return rescaled_params, rescaled_uncerts
```