"""
=================
binarymap
=================
Defines :class:`BinaryMap` objects for handling binary representations
of variants and their functional scores.
"""
import collections
import re
import numpy
import pandas as pd # noqa: F401
import scipy.sparse
import dms_variants.constants
[docs]class BinaryMap:
r"""Binary representations of variants and their functional scores.
Note
----
These maps represent variants as arrays of 0 and 1 integers indicating
whether a particular variant has a substitution. The wildtype is all 0.
Such representations are useful for fitting estimates of the effect of
each substitution.
Unless you are using the `expand` option, the binary maps only cover
substitutions relative to wildtype that are present in at least one
of the variants used to create the map.
Parameters
----------
func_scores_df : pandas.DataFrame
Data frame of variants and their functional scores. Each row is
a different variant, defined by space-delimited list of substitutions.
Data frames of this type are returned by
:meth:`dms_variants.codonvarianttable.CodonVariantTable.func_scores`.
substitutions_col : str
Column in `func_scores_df` giving substitutions for each variant.
func_score_col : str or None
Column in `func_scores_df` giving functional score for each variant,
or `None` if no functional scores available.
func_score_var_col : str or None
Column in `func_scores_df` giving variance on functional score
estimate, or `None` if no variance available.
n_pre_col : str or None
Column in `func_scores_df` giving pre-selection counts for each
variant, or `None` if counts not available.
n_post_col : str or None
Column in `func_scores_df` giving post-selection counts for each
variant, or `None` if counts not available.
cols_optional : True
All of the `*_col` parameters are optional except `substitutions_col`.
If `cols_optional` is `True`, the absence of any of these columns
is taken the same as setting that column's parameter to zero: the
corresponding attribute is set to `None`.
alphabet : list or tuple
Allowed characters (e.g., amino acids or codons).
expand : bool
If `False` (the default) the encoding only covers substitutions
relative to wildtype that are observed in the set of variants. If
`True` then the encoding covers all allowed characters at each
site regardless of whether they are wildtype or observed. In this
latter case, each binary representation is of length (alphabet size)
:math:`\times` (sequence length), and sums to the sequence length.
wtseq : None or str
Only set this option if `expand` is `True`. In that case, it
should be the wildtype sequence.
Attributes
----------
binarylength : int
Length of the binary representation of each variant.
nvariants : int
Number of variants.
binary_variants : scipy.sparse.csr.csr_matrix of dtype int8
Sparse matrix of shape `nvariants` by `binarylength`. Row
`binary_variants[ivariant]` gives the binary representation of
variant `ivariant`, and `binary_variants[ivariant, i]` is 1
if the variant has the substitution :meth:`BinaryMap.i_to_sub`
and 0 otherwise. To convert to dense `numpy.ndarray`, use
`toarray` method of the sparse matrix.
substitution_variants : list
All variants as substitution strings as provided in `substitutions_col`
of `func_scores_df`.
func_scores : numpy.ndarray of floats
A 1D array of length `nvariants` giving score for each variant.
func_scores_var : numpy.ndarray of floats, or None
A 1D array of length `nvariants` giving variance on score for each
variant, or `None` if no variance estimates provided.
n_pre : numpy.dnarray of integers, or None
A 1D array of length `nvariants` giving pre-selection counts for each
variant, or `None` if counts not provided.
n_post : numpy.dnarray of integers, or None
A 1D array of length `nvariants` giving post-selection counts for each
variant, or `None` if counts not provided.
alphabet : tuple
Allowed characters (e.g., amino acids or codons).
substitutions_col : str
Value set when initializing object.
Example
-------
Create a binary map:
>>> func_scores_df = pd.DataFrame.from_records(
... [('', 0.0, 0.2),
... ('M1A', -0.2, 0.1),
... ('M1C K3A', -0.4, 0.3),
... ('', 0.01, 0.15),
... ('A2C K3A', -0.05, 0.1),
... ('A2*', -1.2, 0.4),
... ],
... columns=['aa_substitutions', 'func_score', 'func_score_var'])
>>> binmap = BinaryMap(func_scores_df)
The length of the binary representation equals the number of unique
substitutions:
>>> binmap.binarylength
5
>>> binmap.all_subs
['M1A', 'M1C', 'A2*', 'A2C', 'K3A']
Scores, score variances, binary and string representations:
>>> binmap.nvariants
6
>>> binmap.func_scores
array([ 0. , -0.2 , -0.4 , 0.01, -0.05, -1.2 ])
>>> binmap.func_scores_var
array([0.2 , 0.1 , 0.3 , 0.15, 0.1 , 0.4 ])
>>> type(binmap.binary_variants)
<class 'scipy.sparse.csr.csr_matrix'>
>>> binmap.binary_variants.toarray()
array([[0, 0, 0, 0, 0],
[1, 0, 0, 0, 0],
[0, 1, 0, 0, 1],
[0, 0, 0, 0, 0],
[0, 0, 0, 1, 1],
[0, 0, 1, 0, 0]], dtype=int8)
>>> binmap.substitution_variants
['', 'M1A', 'M1C K3A', '', 'A2C K3A', 'A2*']
>>> binmap.substitutions_col
'aa_substitutions'
Validate binary map interconverts binary representations and substitutions:
>>> for ivar in range(binmap.nvariants):
... binvar = binmap.binary_variants.toarray()[ivar]
... subs_from_df = func_scores_df.at[ivar, 'aa_substitutions']
... assert subs_from_df == binmap.binary_to_sub_str(binvar)
... assert all(binvar == binmap.sub_str_to_binary(subs_from_df))
Demonstrate :meth:`BinaryMap.sub_str_to_indices`:
>>> for sub in binmap.substitution_variants:
... print(binmap.sub_str_to_indices(sub))
[]
[0]
[1, 4]
[]
[3, 4]
[2]
Now do similar operation but using `expand` to include full alphabet
(although to keep size manageable, we use an alphabet smaller than
all amino acids):
>>> wtseq = 'MAKG'
>>> alphabet = ['A', 'C', 'G', 'K', 'M', '*']
>>> binmap_expand = BinaryMap(func_scores_df,
... alphabet=alphabet,
... expand=True,
... wtseq=wtseq)
>>> binmap_expand.binarylength == len(wtseq) * len(alphabet)
True
>>> binmap_expand.all_subs
... # doctest: +NORMALIZE_WHITESPACE
['M1A', 'M1C', 'M1G', 'M1K', 'M1*',
'A2C', 'A2G', 'A2K', 'A2M', 'A2*',
'K3A', 'K3C', 'K3G', 'K3M', 'K3*',
'G4A', 'G4C', 'G4K', 'G4M', 'G4*']
>>> binmap_expand.binary_variants.toarray()
... # doctest: +NORMALIZE_WHITESPACE
array([[0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0,
0, 0],
[1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0,
0, 0],
[0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0,
0, 0],
[0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0,
0, 0],
[0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0,
0, 0],
[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0,
0, 0]], dtype=int8)
>>> all(numpy.sum(binmap_expand.binary_variants.toarray(), axis=1) ==
... numpy.full(binmap_expand.nvariants, len(wtseq)))
True
>>> binmap_expand.substitution_variants
['', 'M1A', 'M1C K3A', '', 'A2C K3A', 'A2*']
>>> for ivar in range(binmap_expand.nvariants):
... binvar = binmap_expand.binary_variants.toarray()[ivar]
... subs_from_df = func_scores_df.at[ivar, 'aa_substitutions']
... assert subs_from_df == binmap_expand.binary_to_sub_str(binvar)
... assert all(binvar == binmap_expand.sub_str_to_binary(subs_from_df))
Note that `binamp` does not have `n_pre` and `n_post` attributes set:
>>> binmap.n_pre == binmap.n_post == None
True
We would not have been able to initialize `binmap` if we weren't using
the `cols_optional` flag:
>>> BinaryMap(func_scores_df, alphabet=alphabet, cols_optional=False)
Traceback (most recent call last):
...
ValueError: `func_scores_df` lacks column pre_count
Now assign values to `n_pre` and `n_post` attributes:
>>> func_scores_df_counts = (
... func_scores_df.assign(pre_count=[10, 20, 15, 5, 6, 8],
... post_count=[0, 3, 12, 11, 9, 8])
... )
>>> binmap_counts = BinaryMap(func_scores_df_counts, alphabet=alphabet)
>>> binmap_counts.n_pre
array([10, 20, 15, 5, 6, 8])
>>> binmap_counts.n_post
array([ 0, 3, 12, 11, 9, 8])
"""
def __eq__(self, other):
"""Test if equal to object `other`.
>>> df = pd.DataFrame({'aa_substitutions': ['', 'M1A'],
... 'func_score': [0.0, -1.2],
... 'func_score_var': [0.1, 0.15]})
>>> df2 = df.copy()
>>> df3 = df.assign(func_score=lambda x: x['func_score'] + 0.1)
>>> bmap1 = BinaryMap(df)
>>> bmap2 = BinaryMap(df2)
>>> bmap3 = BinaryMap(df3)
>>> bmap1 == bmap2
True
>>> bmap1 == bmap3
False
"""
# following here: https://stackoverflow.com/a/390640
if type(other) != type(self):
return False
elif self.__dict__.keys() != other.__dict__.keys():
return False
else:
for key, val in self.__dict__.items():
val2 = getattr(other, key)
if type(val) != type(val2):
return False
elif isinstance(val, numpy.ndarray):
if not numpy.array_equal(val, val2):
return False
elif isinstance(val, scipy.sparse.csr.csr_matrix):
if (val - val2).nnz:
return False
elif isinstance(val, (pd.DataFrame, pd.Series)):
if not val.equals(val2):
return False
else:
if val != val2:
return False
return True
def __init__(self,
func_scores_df,
*,
substitutions_col='aa_substitutions',
func_score_col='func_score',
func_score_var_col='func_score_var',
n_pre_col='pre_count',
n_post_col='post_count',
cols_optional=True,
alphabet=dms_variants.constants.AAS_WITHSTOP,
expand=False,
wtseq=None,
):
"""Initialize object; see main class docstring."""
self.nvariants = len(func_scores_df)
self.alphabet = tuple(alphabet)
for col, attr, dtype, lim_min, lim_max in [
(func_score_col, 'func_scores', float, None, None),
(func_score_var_col, 'func_scores_var', float, 0, None),
(n_pre_col, 'n_pre', int, 0, None),
(n_post_col, 'n_post', int, 0, None),
]:
if col not in func_scores_df.columns:
if cols_optional:
setattr(self, attr, None)
else:
raise ValueError(f"`func_scores_df` lacks column {col}")
else:
vals = func_scores_df[col].values.astype(dtype)
if not all(vals == func_scores_df[col].values):
raise ValueError(f"{col} not of type {dtype}")
assert vals.shape == (self.nvariants,)
if any(numpy.isnan(vals)):
raise ValueError(f"some entries in {col} are NaN")
if (lim_min is not None) and any(vals < lim_min):
raise ValueError(f"some entries in {col} < {lim_min}")
if (lim_max is not None) and any(vals > lim_max):
raise ValueError(f"some entries in {col} < {lim_min}")
setattr(self, attr, vals)
# get list of substitution strings for each variant
if substitutions_col not in func_scores_df.columns:
raise ValueError('`func_scores_df` lacks `substitutions_col` ' +
substitutions_col)
substitutions = func_scores_df[substitutions_col].tolist()
if not all(isinstance(s, str) for s in substitutions):
raise ValueError('values in `substitutions_col` not all str')
self.substitution_variants = substitutions
self.substitutions_col = substitutions_col
# regex that matches substitution
chars = []
for char in alphabet:
if char.isalpha():
chars.append(char)
elif char == '*':
chars.append(r'\*')
else:
raise ValueError(f"invalid alphabet character: {char}")
chars = '|'.join(chars)
self._sub_regex = (rf"(?P<wt>{chars})"
rf"(?P<site>\d+)"
rf"(?P<mut>{chars})")
# build mapping from substitution to binary map index
wts = {}
muts = collections.defaultdict(set)
for subs in substitutions:
for sub in subs.split():
m = re.fullmatch(self._sub_regex, sub)
if not m:
raise ValueError(f"could not match substitution: {sub}")
site = int(m.group('site'))
if site not in wts:
wts[site] = m.group('wt')
elif m.group('wt') != wts[site]:
raise ValueError(f"different wildtypes at {site}:\n"
f"{m.group('wt')} versus {wts[site]}")
if m.group('mut') == wts[site]:
raise ValueError(f"wildtype and mutant the same in {sub}")
muts[site].add(m.group('mut'))
self._i_to_sub = {}
self._wt_indices = {} # keyed by site, values wildtype indices
if expand:
if not isinstance(wtseq, str):
raise ValueError('`wtseq` must be str if `expand` is True')
if not set(wtseq).issubset(set(alphabet)):
raise ValueError('`wtseq` has characters not in alphabet')
if min(wts.keys()) < 1:
raise ValueError('if `expand`, site numbers must start at 1')
if max(wts.keys()) > len(wtseq):
raise ValueError('`wtseq` not long enough given site numbers')
for site, wt in wts.items():
if wtseq[site - 1] != wt:
raise ValueError('`wtseq` and `func_scores_df` differ on '
f"identity at site {site}")
i = 0
for site, wt in enumerate(wtseq, start=1):
assert (site not in wts) or (wts[site] == wt)
for char in self.alphabet:
self._i_to_sub[i] = f"{wt}{site}{char}"
if char == wt:
assert site not in self._wt_indices
self._wt_indices[site] = i
i += 1
else:
if wtseq is not None:
raise ValueError('`wtseq` should be None if `expand` is False')
i = 0
for site, wt in sorted(wts.items()):
for mut in sorted(muts[site]):
self._i_to_sub[i] = f"{wt}{site}{mut}"
i += 1
self.binarylength = len(self._i_to_sub)
self._sub_to_i = {sub: i for i, sub in self._i_to_sub.items()}
self._wt_index_set = set(self._wt_indices.values())
assert len(self._sub_to_i) == len(self._i_to_sub) == self.binarylength
# build binary_variants
row_ind = [] # row indices of elements that are one
col_ind = [] # column indices of elements that are one
for ivariant, subs in enumerate(substitutions):
for isub in self.sub_str_to_indices(subs):
row_ind.append(ivariant)
col_ind.append(isub)
self.binary_variants = scipy.sparse.csr_matrix(
(numpy.ones(len(row_ind), dtype='int8'),
(row_ind, col_ind)),
shape=(self.nvariants, self.binarylength),
dtype='int8')
[docs] def sub_str_to_binary(self, sub_str):
"""Convert space-delimited substitutions to binary representation.
Parameters
----------
sub_str : str
Space-delimited substitutions.
Returns
-------
numpy.ndarray of dtype `int8`
Binary representation.
"""
binrep = numpy.zeros(self.binarylength, dtype='int8')
binrep[self.sub_str_to_indices(sub_str)] = 1
return binrep
[docs] def sub_str_to_indices(self, sub_str):
"""Convert space-delimited substitutions to list of non-zero indices.
Parameters
-----------
sub_str : str
Space-delimited substitutions.
Returns
-------
list
Contains binary representation index for each mutation, so wildtype
is an empty list.
"""
sites = set()
indices = []
for sub in sub_str.split():
m = re.fullmatch(self._sub_regex, sub)
if not m:
raise ValueError(f"substitution {sub} in {sub_str} invalid "
f"for alphabet {self.alphabet}")
if m.group('site') in sites:
raise ValueError("multiple subs at same site in {sub_str}")
sites.add(m.group('site'))
indices.append(self.sub_to_i(sub))
sites = {int(site) for site in sites}
for site, i in self._wt_indices.items():
if site not in sites:
indices.append(i)
return sorted(indices)
[docs] def binary_to_sub_str(self, binary):
"""Convert binary representation to space-delimited substitutions.
Note
----
This method is the inverse of :meth:`BinaryMap.sub_str_to_binary`.
Parameters
----------
binary : numpy.ndarray
Binary representation.
Returns
-------
str
Space-delimited substitutions.
"""
if binary.shape != (self.binarylength,):
raise ValueError(f"`binary` not length {self.binarylength}:\n" +
str(binary))
if not set(binary).issubset({0, 1}):
raise ValueError(f"`binary` not all 0 or 1:\n{binary}")
subs = [s for s in map(self.i_to_sub, numpy.flatnonzero(binary)) if s]
sites = [re.fullmatch(self._sub_regex, sub) for sub in subs]
if len(sites) != len(set(sites)):
raise ValueError('`binary` specifies multiple substitutions '
f"at same site:\n{binary}\n{' '.join(subs)}")
return ' '.join(subs)
[docs] def i_to_sub(self, i):
"""Substitution corresponding to index in binary representation.
Parameters
----------
i : int
Index in binary representation, 0 <= `i` < `binarylength`.
Returns
-------
str
The substitution corresponding to that index.
"""
try:
if i in self._wt_index_set:
return ''
else:
return self._i_to_sub[i]
except KeyError:
if i < 0 or i >= self.binarylength:
raise ValueError(f"invalid i of {i}. Must be >= 0 and "
f"< {self.binarylength}")
else:
raise ValueError(f"unexpected error, i = {i} should be in map")
[docs] def sub_to_i(self, sub):
"""Index in binary representation corresponding to substitution.
Parameters
----------
sub : str
The substitution.
Returns
-------
int
Index in binary representation, will be >= 0 and < `binarylength`.
"""
try:
return self._sub_to_i[sub]
except KeyError:
raise ValueError(f"sub of {sub} is not in the binary map. The map "
'only contains substitutions in the variants.')
@property
def all_subs(self):
"""list: Substitutions in order encoded in binary map."""
if not hasattr(self, '_all_subs'):
self._all_subs = [self.i_to_sub(i) for i in
range(self.binarylength) if self.i_to_sub(i)]
return self._all_subs
if __name__ == '__main__':
import doctest
doctest.testmod()