"""
Contains two locus spectrum object
"""
import logging
logging.basicConfig()
logger = logging.getLogger("TLSpectrum_mod")
import os
import copy
import numpy as np
from . import Numerics, Integration, Util
import scipy.special
[docs]class TLSpectrum(np.ma.masked_array):
"""
Represents a two locus frequency spectrum.
:param array data: The frequency spectrum data, which has shape
(n+1)-by-(n+1)-by-(n+1) where n is the sample size.
:param array mask: An optional array of the same size as data. 'True' entries
in this array are masked in the TLSpectrum.
:param bool mask_infeasible: If True, mask all bins for frequencies that cannot
occur, e.g. i + j > n. Defaults to True.
:param bool mask_fixed: If True, mask the fixed bins. Defaults to True.
:param bool data_folded: If True, it is assumed that the input data is folded
for the major and minor derived alleles
:param bool check_folding: If True and data_folded=True, the data and
mask will be checked to ensure they are consistent.
"""
def __new__(
subtype,
data,
mask=np.ma.nomask,
mask_infeasible=True,
mask_fixed=False,
data_folded=None,
check_folding=True,
dtype=float,
copy=True,
fill_value=np.nan,
keep_mask=True,
shrink=True,
):
data = np.asanyarray(data)
if mask is np.ma.nomask:
mask = np.ma.make_mask_none(data.shape)
subarr = np.ma.masked_array(
data,
mask=mask,
dtype=dtype,
copy=copy,
fill_value=fill_value,
keep_mask=True,
shrink=True,
)
subarr = subarr.view(subtype)
if hasattr(data, "folded"):
if data_folded is None or data_folded == data.folded:
subarr.folded = data.folded
elif data_folded != data.folded:
raise ValueError(
"Data does not have same folding status as "
"was called for in Spectrum constructor."
)
elif data_folded is not None:
subarr.folded = data_folded
else:
subarr.folded = False
if mask_infeasible:
subarr.mask_infeasible()
if mask_fixed:
subarr.mask_fixed()
return subarr
def __array_finalize__(self, obj):
if obj is None:
return
np.ma.masked_array.__array_finalize__(self, obj)
self.folded = getattr(obj, "folded", "unspecified")
def __array_wrap__(self, obj, context=None):
result = obj.view(type(self))
result = np.ma.masked_array.__array_wrap__(self, obj, context=context)
result.folded = self.folded
return result
def _update_from(self, obj):
np.ma.masked_array._update_from(self, obj)
if hasattr(obj, "folded"):
self.folded = obj.folded
# masked_array has priority 15.
__array_priority__ = 20
def __repr__(self):
return "TLSpectrum(%s, folded=%s)" % (str(self), str(self.folded))
[docs] def mask_infeasible(self):
"""
Mask any infeasible entries.
"""
ns = len(self) - 1
# mask entries with i+j+k > ns
for ii in range(len(self)):
for jj in range(len(self)):
for kk in range(len(self)):
if ii + jj + kk > ns:
self.mask[ii, jj, kk] = True
[docs] def mask_fixed(self):
"""
Mask all infeasible entries, as well as any where both sites are not
segregating.
"""
ns = len(self) - 1
# mask fixed entries
self.mask[0, 0, 0] = True
self.mask[0, 0, -1] = True
self.mask[0, -1, 0] = True
self.mask[-1, 0, 0] = True
# mask entries with i+j+k > ns
for ii in range(len(self)):
for jj in range(len(self)):
for kk in range(len(self)):
if ii + jj + kk > ns:
self.mask[ii, jj, kk] = True
# mask fA = 0 and fB = 0
for ii in range(len(self)):
self.mask[ii, ns - ii, 0] = True
self.mask[ii, 0, ns - ii] = True
self.mask[0, :, 0] = True
self.mask[0, 0, :] = True
[docs] def unfold(self):
"""
Remove folding from the spectrum.
"""
if not self.folded:
raise ValueError("Input Spectrum is not folded.")
data = self.data
unfolded = TLSpectrum(data, mask_infeasible=True)
return unfolded
def _get_sample_size(self):
return np.asarray(self.shape)[0] - 1
sample_size = property(_get_sample_size)
[docs] def left(self):
"""
The marginal allele frequency spectrum at the left locus.
"""
n = len(self) - 1
fl = np.zeros(n + 1)
for ii in range(n + 1):
for jj in range(n + 1 - ii):
for kk in range(n + 1 - ii - jj):
fl[ii + jj] += self[ii, jj, kk]
return fl
[docs] def right(self):
"""
The marginal AFS at the right locus.
"""
n = len(self) - 1
fr = np.zeros(n + 1)
for ii in range(n + 1):
for jj in range(n + 1 - ii):
for kk in range(n + 1 - ii - jj):
fr[ii + kk] += self[ii, jj, kk]
return fr
[docs] def ancestral_misid(self, p):
"""
Return a new SFS with a given ancestral misidentification, p.
:param p: The rate of ancestral state misidentification.
"""
if p == 0:
return (1 - p) * self
elif p > 1 or p < 0:
raise ValueError("probability of misid must be between 0 and 1")
F_new = (1 - p) ** 2 * self
for ii in range(self.sample_size + 1):
for jj in range(self.sample_size + 1 - ii):
for kk in range(self.sample_size + 1 - ii - jj):
ll = self.sample_size - ii - jj - kk
# left misid, right correct
F_new.data[kk, ll, ii] += p * (1 - p) * self.data[ii, jj, kk]
# right misid, left correct
F_new.data[jj, ii, ll] += (1 - p) * p * self.data[ii, jj, kk]
# both misid
F_new.data[ll, kk, jj] += p ** 2 * self.data[ii, jj, kk]
return F_new
# statistics, called from Util
[docs] def S(self, nA=None, nB=None):
"""
Return the sum of probabilities over all variable two-locus entries
in the spectrum.
"""
if nA is not None and (nA <= 0 or nA >= self.sample_size):
raise ValueError(f"nA must be between 1 and {self.sample_size - 1}")
if nB is not None and (nB <= 0 or nB >= self.sample_size):
raise ValueError(f"nB must be between 1 and {self.sample_size - 1}")
return Util.compute_S(self, nA, nB)
[docs] def D(self, proj=True, nA=None, nB=None):
"""
Return the expectation of :math:`D` from the spectrum.
:param proj: If True, use the unbiased estimator from downsampling. If False,
use naive maximum likelihood estimates for frequency.
:param nA: If None, the average is computed over all frequencies. If given,
condition on the given allele count for the left locus.
:param nB: If None, the average is computed over all frequencies. If given,
condition on the given allele count for the right locus.
"""
if nA is not None and (nA <= 0 or nA >= self.sample_size):
raise ValueError(f"nA must be between 1 and {self.sample_size - 1}")
if nB is not None and (nB <= 0 or nB >= self.sample_size):
raise ValueError(f"nB must be between 1 and {self.sample_size - 1}")
return Util._compute_D(self, proj, nA, nB)
[docs] def D2(self, proj=True, nA=None, nB=None):
"""
Return the expectation of :math:`D^2` from the spectrum.
:param proj: If True, use the unbiased estimator from downsampling. If False,
use naive maximum likelihood estimates for frequency.
:param nA: If None, the average is computed over all frequencies. If given,
condition on the given allele count for the left locus.
:param nB: If None, the average is computed over all frequencies. If given,
condition on the given allele count for the right locus.
"""
if nA is not None and (nA <= 0 or nA >= self.sample_size):
raise ValueError(f"nA must be between 1 and {self.sample_size - 1}")
if nB is not None and (nB <= 0 or nB >= self.sample_size):
raise ValueError(f"nB must be between 1 and {self.sample_size - 1}")
return Util._compute_D2(self, proj, nA, nB)
[docs] def Dz(self, proj=True, nA=None, nB=None):
"""
Compute the expectation of :math:`Dz = D(1-2p)(1-2q)` from the spectrum.
:param proj: If True, use the unbiased estimator from downsampling. If False,
use naive maximum likelihood estimates for frequency.
:param nA: If None, the average is computed over all frequencies. If given,
condition on the given allele count for the left locus.
:param nB: If None, the average is computed over all frequencies. If given,
condition on the given allele count for the right locus.
"""
if nA is not None and (nA <= 0 or nA >= self.sample_size):
raise ValueError(f"nA must be between 1 and {self.sample_size - 1}")
if nB is not None and (nB <= 0 or nB >= self.sample_size):
raise ValueError(f"nB must be between 1 and {self.sample_size - 1}")
return Util._compute_Dz(self, proj, nA, nB)
[docs] def pi2(self, proj=True, nA=None, nB=None):
"""
Return the expectation of :math:`\\pi_2 = p(1-p)q(1-q)` from the spectrum.
:param proj: If True, use the unbiased estimator from downsampling. If False,
use naive maximum likelihood estimates for frequency.
:param nA: If None, the average is computed over all frequencies. If given,
condition on the given allele count for the left locus.
:param nB: If None, the average is computed over all frequencies. If given,
condition on the given allele count for the right locus.
"""
if nA is not None and (nA <= 0 or nA >= self.sample_size):
raise ValueError(f"nA must be between 1 and {self.sample_size - 1}")
if nB is not None and (nB <= 0 or nB >= self.sample_size):
raise ValueError(f"nB must be between 1 and {self.sample_size - 1}")
return Util._compute_pi2(self, proj, nA, nB)
# Make from_file a static method, so we can use it without an instance.
[docs] @staticmethod
def from_file(fid, mask_infeasible=True, return_comments=False):
"""
Read frequency spectrum from file.
:param str fid: String with file name to read from or an open file object.
:param bool mask_infeasible: If True, mask the infeasible entries in the
two locus spectrum.
:param bool return_comments: If true, the return value is (fs, comments), where
comments is a list of strings containing the comments from the file.
"""
newfile = False
# Try to read from fid. If we can't, assume it's something that we can
# use to open a file.
if not hasattr(fid, "read"):
newfile = True
fid = open(fid, "r")
line = fid.readline()
# Strip out the comments
comments = []
while line.startswith("#"):
comments.append(line[1:].strip())
line = fid.readline()
# Read the shape of the data
shape, folded = line.split()
shape = [int(shape) + 1, int(shape) + 1, int(shape) + 1]
data = np.fromstring(fid.readline().strip(), count=np.product(shape), sep=" ")
# fromfile returns a 1-d array. Reshape it to the proper form.
data = data.reshape(*shape)
maskline = fid.readline().strip()
mask = np.fromstring(maskline, count=np.product(shape), sep=" ")
mask = mask.reshape(*shape)
if folded == "folded":
folded = True
else:
folded = False
# If we opened a new file, clean it up.
if newfile:
fid.close()
fs = TLSpectrum(data, mask, mask_infeasible, data_folded=folded)
if not return_comments:
return fs
else:
return fs, comments
[docs] def to_file(self, fid, precision=16, comment_lines=[], foldmaskinfo=True):
"""
Write frequency spectrum to file.
:param str fid: String with file name to write to or an open file object.
:param int precision: Precision with which to write out entries of the SFS.
(They are formated via %.<p>g, where <p> is the precision.)
:param list comment_lines: List of strings to be used as comment lines in
the header of the output file.
:param bool foldmaskinfo: If False, folding and mask and population label
information will not be saved.
"""
# Open the file object.
newfile = False
if not hasattr(fid, "write"):
newfile = True
fid = open(fid, "w")
# Write comments
for line in comment_lines:
fid.write("# ")
fid.write(line.strip())
fid.write(os.linesep)
# Write out the shape of the fs
fid.write("{0} ".format(self.sample_size))
if foldmaskinfo:
if not self.folded:
fid.write("unfolded ")
else:
fid.write("folded ")
fid.write(os.linesep)
# Write the data to the file
self.data.tofile(fid, " ", "%%.%ig" % precision)
fid.write(os.linesep)
if foldmaskinfo:
# Write the mask to the file
np.asarray(self.mask, int).tofile(fid, " ")
fid.write(os.linesep)
# Close file
if newfile:
fid.close()
[docs] def fold(self):
"""
Fold the two-locus spectrum by minor allele frequencies.
"""
if self.folded:
raise ValueError("Input Spectrum is already folded.")
ns = self.shape[0] - 1
folded = 0 * self
for ii in range(ns + 1):
for jj in range(ns + 1):
for kk in range(ns + 1):
if self.mask[ii, jj, kk]:
continue
p = ii + jj
q = ii + kk
if p > ns / 2 and q > ns / 2:
# Switch A/a and B/b, so AB becomes ab, Ab becomes aB, etc
folded[ns - ii - jj - kk, kk, jj] += self.data[ii, jj, kk]
folded.mask[ii, jj, kk] = True
elif p > ns / 2:
# Switch A/a, so AB -> aB, Ab -> ab, aB -> AB, and ab -> Ab
folded[kk, ns - ii - jj - kk, ii] += self.data[ii, jj, kk]
folded.mask[ii, jj, kk] = True
elif q > ns / 2:
# Switch B/b, so AB -> Ab, Ab -> AB, aB -> ab, and ab -> aB
folded[jj, ii, ns - ii - jj - kk] += self.data[ii, jj, kk]
folded.mask[ii, jj, kk] = True
else:
folded[ii, jj, kk] += self.data[ii, jj, kk]
folded.folded = True
return folded
[docs] def project(self, ns, finite_genome=False, cache=True):
"""
Project to smaller sample size.
param int ns: Sample size for new spectrum.
param bool finite_genome: If we also track proportions in fixed bins.
"""
if finite_genome:
raise ValueError("Projection with finite genome is not currently supported")
data = Numerics.project(self, ns, cache=cache)
output = TLSpectrum(data, mask_infeasible=True)
return output
# Ensures that when arithmetic is done with TLSpectrum objects,
# attributes are preserved. For details, see similar code in
# moments.Spectrum_mod
for method in [
"__add__",
"__radd__",
"__sub__",
"__rsub__",
"__mul__",
"__rmul__",
"__div__",
"__rdiv__",
"__truediv__",
"__rtruediv__",
"__floordiv__",
"__rfloordiv__",
"__rpow__",
"__pow__",
]:
exec(
"""
def %(method)s(self, other):
self._check_other_folding(other)
if isinstance(other, np.ma.masked_array):
newdata = self.data.%(method)s (other.data)
newmask = np.ma.mask_or(self.mask, other.mask)
else:
newdata = self.data.%(method)s (other)
newmask = self.mask
outfs = self.__class__.__new__(self.__class__, newdata, newmask,
mask_infeasible=False,
data_folded=self.folded)
return outfs
"""
% {"method": method}
)
# Methods that modify the Spectrum in-place.
for method in [
"__iadd__",
"__isub__",
"__imul__",
"__idiv__",
"__itruediv__",
"__ifloordiv__",
"__ipow__",
]:
exec(
"""
def %(method)s(self, other):
self._check_other_folding(other)
if isinstance(other, np.ma.masked_array):
self.data.%(method)s (other.data)
self.mask = np.ma.mask_or(self.mask, other.mask)
else:
self.data.%(method)s (other)
return self
"""
% {"method": method}
)
def _check_other_folding(self, other):
"""
Ensure other Spectrum has same .folded status
"""
if isinstance(other, self.__class__) and (other.folded != self.folded):
raise ValueError(
"Cannot operate with a folded Spectrum and an " "unfolded one."
)
[docs] def integrate(
self,
nu,
tf,
dt=0.01,
rho=None,
gamma=None,
sel_params=None,
sel_params_general=None,
theta=1.0,
finite_genome=False,
u=None,
v=None,
alternate_fg=None,
clustered_mutations=False,
):
"""
Simulate the two-locus haplotype frequency spectrum forward in time.
This integration scheme takes advantage of scipy's sparse methods.
When using the reversible mutation model (with `finite_genome` = True),
we are limited to selection at only one locus (the left locus), and
selection is additive. When using the default ISM, additive selection
is allowed at both loci, and we use `sel_params`, which specifies [sAB,
sA, and sB] in that order. Note that while this selection model is
additive within loci, it allows for epistasis between loci if sAB != sA
+ sB.
:param nu: Population effective size as positive value or callable function.
:param float tf: The integration time in genetics units.
:param float dt_fac: The time step for integration.
:param float rho: The population-size scaled recombination rate 4*Ne*r.
:param float gamma: The population-size scaled selection coefficient 2*Ne*s.
:param list sel_params: A list of selection parameters. See docstrings in
Numerics. Selection parameters will be deprecated when we clean up the
numerics and integration.
:param list sel_params_general: To be filled. ## TODO!!
:param float theta: Population size scale mutation parameter.
:param bool finite_genome: Defaults to False, in which case we use the
infinite sites model. Otherwise, we use a reversible mutation model, and
must specify ``u`` and ``v``.
:param float u: The mutation rate at the left locus in the finite genome model.
:param float v: The mutation rate at the right locus in the finite genome
model.
:param bool alternate_fg: If True, use the alternative finite genome model.
This parameter will be deprecated when we clean up the numerics and
integration.
"""
if gamma == None:
gamma = 0.0
if rho == None:
rho = 0.0
print("Warning: rho was not specified. Simulating with rho = 0")
self.data[:] = Integration.integrate(
self.data,
nu,
tf,
rho=rho,
dt=dt,
theta=theta,
gamma=gamma,
sel_params=sel_params,
sel_params_general=sel_params_general,
finite_genome=finite_genome,
u=u,
v=v,
alternate_fg=alternate_fg,
clustered_mutations=clustered_mutations,
)
# Allow TLSpectrum objects to be pickled.
# See http://effbot.org/librarybook/copy-reg.htm
try:
import copy_reg
except:
import copyreg
def TLSpectrum_pickler(fs):
# Collect all the info necessary to save the state of a TLSpectrum
return TLSpectrum_unpickler, (fs.data, fs.mask, fs.folded)
def TLSpectrum_unpickler(data, mask, folded):
# Use that info to recreate the TLSpectrum
return TLSpectrum(data, mask, mask_infeasible=False, data_folded=folded)
try:
copy_reg.pickle(TLSpectrum, TLSpectrum_pickler, TLSpectrum_unpickler)
except:
copyreg.pickle(TLSpectrum, TLSpectrum_pickler, TLSpectrum_unpickler)