Source code for moments.Triallele.TriSpectrum_mod

"""
Contains triallele spectrum object
"""
import logging

logging.basicConfig()
logger = logging.getLogger("TriSpectrum_mod")

import os
import numpy, numpy as np
import moments.Triallele.Numerics
import moments.Triallele.Integration


[docs]class TriSpectrum(numpy.ma.masked_array): """ Represents a triallelic frequency spectrum. The triallelic spectrum is represented as a square numpy masked array in which the (i, j)-th element stores the count or density of loci in which there are i copies of the first derived allele and j copies of the second derived allele. :param array data: The frequency spectrum data of size (n+1)-by-(n+1) where n is the sample size. :param array mask: An optional array of the same size as data. 'True' entries in this array are masked in the TriSpectrum. These represent missing data categories, or invalid entries in the array :param bool mask_infeasible: If True, mask all bins for frequencies that cannot occur, e.g. i + j > n. Defaults to True. :param bool mask_fixed: If True, mask the fixed bins. Defaults to True. :param bool data_folded_major: If True, it is assumed that the input data is folded for the major and minor derived alleles. :param bool data_folded_ancestral: If True, it is assumed that the input data is folded to account for uncertainty in the ancestral state. Note that if True, data_folded_major must also be True. :param bool check_folding_major: If True and data_folded_ancestral=True, the data and mask will be checked to ensure they are consistent. :param bool check_folding_ancestral: If True and data_folded_ancestral=True, the data and mask will be checked to ensure they are consistent. """ def __new__( subtype, data, mask=numpy.ma.nomask, finite_genome=False, mask_infeasible=True, mask_fixed=True, data_folded_major=None, check_folding_major=True, data_folded_ancestral=None, check_folding_ancestral=True, dtype=float, copy=True, fill_value=numpy.nan, keep_mask=True, shrink=True, ): if finite_genome == True: mask_fixed = False # print('If we simulate under the finite genome model, we do not mask fixed sites') data = numpy.asanyarray(data) if mask is numpy.ma.nomask: mask = numpy.ma.make_mask_none(data.shape) subarr = numpy.ma.masked_array( data, mask=mask, dtype=dtype, copy=copy, fill_value=fill_value, keep_mask=True, shrink=True, ) subarr = subarr.view(subtype) if hasattr(data, "folded_major"): if data_folded_major is None or data_folded_major == data.folded_major: subarr.folded_major = data.folded_major elif data_folded_major != data.folded_major: raise ValueError( "Data does not have same major/minor folding status as " "was called for in Spectrum constructor." ) elif data_folded_major is not None: subarr.folded_major = data_folded_major else: subarr.folded_major = False if hasattr(data, "folded_ancestral"): if ( data_folded_ancestral is None or data_folded_ancestral == data.folded_ancestral ): subarr.folded_ancestral = data.folded_ancestral elif data_folded_ancestral != data.folded_ancestral: raise ValueError( "Data does not have same ancestral folding status as " "was called for in Spectrum constructor." ) elif data_folded_ancestral is not None: subarr.folded_ancestral = data_folded_ancestral else: subarr.folded_ancestral = False if mask_infeasible: subarr.mask_infeasible() if mask_fixed: subarr.mask_fixed() return subarr def __array_finalize__(self, obj): if obj is None: return numpy.ma.masked_array.__array_finalize__(self, obj) self.folded_major = getattr(obj, "folded_major", "unspecified") self.folded_ancestral = getattr(obj, "folded_ancestral", "unspecified") def __array_wrap__(self, obj, context=None): result = obj.view(type(self)) result = numpy.ma.masked_array.__array_wrap__(self, obj, context=context) result.folded_major = self.folded_major result.folded_ancestral = self.folded_ancestral return result def _update_from(self, obj): numpy.ma.masked_array._update_from(self, obj) if hasattr(obj, "folded_major"): self.folded_major = obj.folded_major if hasattr(obj, "folded_ancestral"): self.folded_ancestral = obj.folded_ancestral # masked_array has priority 15. __array_priority__ = 20 def __repr__(self): return "TriSpectrum(%s, folded_major=%s, folded_ancestral=%s)" % ( str(self), str(self.folded_major), str(self.folded_ancestral), )
[docs] def mask_infeasible(self): """ Mask any infeasible entries. """ for ii in range(len(self)): self.mask[ii, len(self) - ii :] = True
[docs] def mask_fixed(self): """ Mask entries that are not triallelic. """ self.mask[0, 0] = True for ii in range(len(self)): self.mask[ii, len(self) - ii - 1] = True
[docs] def unfold(self): """ Completely unfold the spectrum. Returns a new TriSpectrum. """ if not self.folded_major: raise ValueError("Input Spectrum is not folded.") data = self.data unfolded = TriSpectrum(data, mask_infeasible=True) return unfolded
def _get_sample_size(self): return numpy.asarray(self.shape)[0] - 1 sample_size = property(_get_sample_size) def _get_sample_sizes(self): return [self._get_sample_size()] sample_sizes = property(_get_sample_sizes) # Make from_file a static method, so we can use it without an instance.
[docs] @staticmethod def from_file(fid, mask_infeasible=True, return_comments=False): """ Read frequency spectrum from file. See to_file method for details on the file format. :param str fid: String with file name to read from or an open file object. :param bool mask_infeasible: If True, mask the infeasible entries in the triallelic spectrum. :param bool return_comments: If true, the return value is (fs, comments), where comments is a list of strings containing the comments from the file. """ newfile = False # Try to read from fid. If we can't, assume it's something that we can # use to open a file. if not hasattr(fid, "read"): newfile = True fid = open(fid, "r") line = fid.readline() # Strip out the comments comments = [] while line.startswith("#"): comments.append(line[1:].strip()) line = fid.readline() # Read the shape of the data shape, folded_major, folded_ancestral = line.split() shape = [int(shape) + 1, int(shape) + 1] data = numpy.fromstring( fid.readline().strip(), count=numpy.product(shape), sep=" " ) # fromfile returns a 1-d array. Reshape it to the proper form. data = data.reshape(*shape) maskline = fid.readline().strip() mask = numpy.fromstring(maskline, count=numpy.product(shape), sep=" ") mask = mask.reshape(*shape) if folded_major == "folded_major": folded_major = True else: folded_major = False if folded_ancestral == "folded_ancestral": folded_ancestral = True else: folded_ancestral = False # If we opened a new file, clean it up. if newfile: fid.close() fs = TriSpectrum( data, mask, mask_infeasible, data_folded_ancestral=folded_ancestral, data_folded_major=folded_major, ) if not return_comments: return fs else: return fs, comments
[docs] def to_file(self, fid, precision=16, comment_lines=[], foldmaskinfo=True): """ Write frequency spectrum to file. The file format is: - # Any number of comment lines beginning with a '#' - A single line containing the sample size. On the *same line*, the string 'folded_major' or 'unfolded_major' denoting the folding status of the array. And on the *same line*, the string 'folded_ancestral' or 'unfolded_ancestral' denoting the folding status of the array. - A single line giving the array elements. The order of elements is e.g.: fs[0, 0] fs[0, 1] fs[0, 2] ... fs[1, 0] fs[1, 1] ... - A single line giving the elements of the mask in the same order as the data line. '1' indicates masked, '0' indicates unmasked. :param str fid: String with file name to write to or an open file object. :param int precision: Precision with which to write out entries of the SFS. (They are formated via %.<p>g, where <p> is the precision.) :param list comment_lines: List of strings to be used as comment lines in the header of the output file. :param bool foldmaskinfo: If False, folding and mask and population label information will not be saved. """ # Open the file object. newfile = False if not hasattr(fid, "write"): newfile = True fid = open(fid, "w") # Write comments for line in comment_lines: fid.write("# ") fid.write(line.strip()) fid.write(os.linesep) # Write out the shape of the fs fid.write("{0} ".format(self.sample_size)) if foldmaskinfo: if not self.folded_major: fid.write("unfolded_major ") else: fid.write("folded_major ") if not self.folded_ancestral: fid.write("unfolded_ancestral ") else: fid.write("folded_ancestral ") fid.write(os.linesep) # Write the data to the file self.data.tofile(fid, " ", "%%.%ig" % precision) fid.write(os.linesep) if foldmaskinfo: # Write the mask to the file numpy.asarray(self.mask, int).tofile(fid, " ") fid.write(os.linesep) # Close file if newfile: fid.close()
[docs] def fold_major(self): """ Fold the spectrum based on the major allele(s). """ if self.folded_major: raise ValueError("Input Spectrum is already folded.") folded = self + np.transpose(self) for ii in range(len(folded)): folded[ii, ii] = np.divide(folded[ii, ii], 2) folded.mask[0, :] = True folded.mask[:, 0] = True for ii in range(len(folded)): folded[ii, ii + 1 :] = 0 folded.mask[ii, ii + 1 :] = True folded.mask[ii, len(self) - 1 - ii :] = True folded.folded_major = True folded.folded_ancestral = self.folded_ancestral return folded
[docs] def log(self): """ Return the natural logarithm of the entries of the frequency spectrum. Only necessary because numpy.ma.log now fails to propagate extra attributes after numpy 1.10. """ logfs = numpy.ma.log(self) logfs.folded_major = self.folded_major logfs.folded_ancestral = self.folded_ancestral return logfs
[docs] def fold_ancestral(self): """ Fold the spectrum based on the ancestral state """ if self.folded_ancestral: raise ValueError("Input Spectrum is already folded.") folded = 0 * self ns = len(folded) - 1 for ii in range(ns): for jj in range(ns): kk = ns - ii - jj if self.mask[ii, jj] == True: continue elif ii <= kk and jj <= kk: if ii >= jj: folded[ii, jj] += self[ii, jj] else: folded[jj, ii] += self[ii, jj] elif ii > kk and jj <= kk: folded[kk, jj] += self[ii, jj] elif ii <= kk and jj > kk: folded[kk, ii] += self[ii, jj] else: # ii > kk and jj > kk if ii >= jj: folded[jj, kk] += self[ii, jj] else: folded[ii, kk] += self[ii, jj] # mask if not a valid entry for ancestrally folded spectrum for ii in range(ns): for jj in range(ns): kk = ns - ii - jj if not (kk >= ii >= jj): folded.mask[ii, jj] = True folded.folded_major = True folded.folded_ancestral = True return folded
[docs] def S(self): """ Number of sites in the unmasked spectrum. """ return self.sum()
[docs] def project(self, ns, finite_genome=False): """ Project to smaller sample size. :param int ns: Sample size for new spectrum. """ ### Take from numerics and put here - projection cache might stay in Numerics if finite_genome == False: # projection doesn't reach edges data = moments.Triallele.Numerics.project(self, ns) output = TriSpectrum(data, mask_infeasible=True) if self[-1, 0].mask: output.mask_fixed() return output else: # all entries are projected, total sum of spectrum is maintained # output = moments.Triallele.Numerics.project_fg(self, ns) # if self.mask_infeasible == True: # output.mask_infeasible() # return output raise ValueError( "finite_genome is not yet implemented" ) # still to implement
[docs] def pi(self): """ Estimated expected number of pairwise differences between two samples from the population at loci that are triallelic """ raise ValueError("Not yet implemented")
# Ensures that when arithmetic is done with TriSpectrum objects, # attributes are preserved. For details, see similar code in # moments.Spectrum_mod for method in [ "__add__", "__radd__", "__sub__", "__rsub__", "__mul__", "__rmul__", "__div__", "__rdiv__", "__truediv__", "__rtruediv__", "__floordiv__", "__rfloordiv__", "__rpow__", "__pow__", ]: exec( """ def %(method)s(self, other): self._check_other_folding(other) if isinstance(other, numpy.ma.masked_array): newdata = self.data.%(method)s (other.data) newmask = numpy.ma.mask_or(self.mask, other.mask) else: newdata = self.data.%(method)s (other) newmask = self.mask outfs = self.__class__.__new__(self.__class__, newdata, newmask, mask_infeasible=False, data_folded_major=self.folded_major, data_folded_ancestral=self.folded_ancestral) return outfs """ % {"method": method} ) # Methods that modify the Spectrum in-place. for method in [ "__iadd__", "__isub__", "__imul__", "__idiv__", "__itruediv__", "__ifloordiv__", "__ipow__", ]: exec( """ def %(method)s(self, other): self._check_other_folding(other) if isinstance(other, numpy.ma.masked_array): self.data.%(method)s (other.data) self.mask = numpy.ma.mask_or(self.mask, other.mask) else: self.data.%(method)s (other) return self """ % {"method": method} ) def _check_other_folding(self, other): """ Ensure other Spectrum has same .folded status """ if isinstance(other, self.__class__) and ( other.folded_major != self.folded_major or other.folded_ancestral != self.folded_ancestral ): raise ValueError( "Cannot operate with a folded Spectrum and an " "unfolded one." )
[docs] def integrate(self, nu, tf, dt=0.001, gammas=None, theta=1.0): """ Method to simulate the triallelic fs forward in time. This integration scheme takes advantage of scipy's sparse methods. :param nu: The population effective size as positive value or callable function. :param float tf: The integration time in genetics units. :param float dt_fac: time step for integration :param list gammas: Population size scaled selection coefficients [sAA, sA0, sBB, sB0, sAB]. Here, 0 represents that ancestral allele, so we can implement dominance by picking the relationship between, e.g., sAA, sA0, sAB, and sA0. :param float theta: Population size scale mutation parameter, assuming equal mutation rates to both derived alleles. """ if gammas == None: gammas = [0, 0, 0, 0, 0] self.data[:] = moments.Triallele.Integration.integrate_cn( self.data, nu, tf, dt=dt, gammas=gammas, theta=theta )
# Allow TriSpectrum objects to be pickled. # See http://effbot.org/librarybook/copy-reg.htm try: import copy_reg except: import copyreg def TriSpectrum_pickler(fs): # Collect all the info necessary to save the state of a TriSpectrum return ( TriSpectrum_unpickler, (fs.data, fs.mask, fs.folded_major, fs.folded_ancestral), ) def TriSpectrum_unpickler(data, mask, folded_major, folded_ancestral): # Use that info to recreate the TriSpectrum return TriSpectrum( data, mask, mask_infeasible=False, data_folded_major=folded_major, data_folded_ancestral=folded_ancestral, ) try: copy_reg.pickle(TriSpectrum, TriSpectrum_pickler, TriSpectrum_unpickler) except: copyreg.pickle(TriSpectrum, TriSpectrum_pickler, TriSpectrum_unpickler)