Source code for alignparse.consensus

"""
=========
consensus
=========

Get consensus of sequences / mutations. Useful for grouping sequences
within a barcode.

"""


import collections
import io  # noqa: F401
import itertools
import math
import re
import textwrap  # noqa: F401

import numpy

import pandas as pd

import scipy.optimize
import scipy.special


Mutations = collections.namedtuple(
    "Mutations", ["substitutions", "deletions", "insertions"]
)


_MUT_REGEX = {
    "substitution": re.compile(r"[ACGTN](?P<start>\-?\d+)[ACGTN\-]"),
    "deletion": re.compile(r"del(?P<start>\-?\d+)to(?P<end>\-?\d+)"),
    "insertion": re.compile(r"ins(?P<start>\-?\d+)(?:len\d+|[ACGTN]+)"),
}
"""dict: Mutation regular expression matches."""


[docs]def process_mut_str(s): """Process a string of mutations. Parameters ---------- s : str Space-delimited mutations. Substitutions in form 'A6T'. Deletions in form 'del5to7'. Insertions in form 'ins6len2' or 'ins6TA'. Negative site numbers are allowed. Returns ------- namedtuple The tuple is named `Mutations` and has the following elements in order: - `substitutions`: list of substitutions in order of sequence. - `deletions`: list of deletions in order of sequence. - `insertions`: list of insertions in order of sequence. Example ------- >>> s = 'A1T del5to7 G9C ins6len2 del12to12' >>> process_mut_str(s) # doctest: +NORMALIZE_WHITESPACE Mutations(substitutions=['A1T', 'G9C'], deletions=['del5to7', 'del12to12'], insertions=['ins6len2']) >>> s = 'A11T del5to7 G-9C ins-6GA del12to15 ins13AAT' >>> process_mut_str(s) # doctest: +NORMALIZE_WHITESPACE Mutations(substitutions=['G-9C', 'A11T'], deletions=['del5to7', 'del12to15'], insertions=['ins-6GA', 'ins13AAT']) """ mut_lists = collections.defaultdict(list) for mut_str in s.split(): for mut_type, regex in _MUT_REGEX.items(): match = regex.fullmatch(mut_str) if match: start = int(match.group("start")) mut_lists[mut_type].append((start, mut_str)) break else: raise ValueError(f"cannot match mutation {mut_str}\n(in {s})") if any(len(mlist) != len(set(mlist)) for mlist in mut_lists.values()): raise ValueError(f"duplicate mutation in: {s}") return Mutations( substitutions=[m for _, m in sorted(mut_lists["substitution"])], deletions=[m for _, m in sorted(mut_lists["deletion"])], insertions=[m for _, m in sorted(mut_lists["insertion"])], )
[docs]def add_mut_info_cols( df, *, mutation_col="mutations", sub_str_col=None, del_str_col=None, ins_str_col=None, indel_str_col=None, n_sub_col=None, n_del_col=None, n_ins_col=None, n_indel_col=None, overwrite_cols=False, ): """Expand information about mutations in a data frame. Parameters ---------- df : pandas.DataFrame Data frame with each row giving information about a variant. mutation_col : str Name of column in `df` with str containing space-delimited list of mutations in format parseable by :func:`process_mut_str`. sub_str_col : str or None Name of added column with string giving substitutions. del_str_col : str or None Name of added column with string giving deletions. ins_str_col : str or None Name of added column with string giving insertions. indel_str_col : str or None Name of added column with string giving deletions and insertions. n_sub_col : str or None Name of added column with number of substitutions. n_del_col : str or None Name of added column with number of deletions. n_ins_col : str or None Name of added column with number of insertions. n_indel_col : str or None Name of added column with number of deletions and insertions. overwrite_cols : bool If column to be created is already in `df`, overwrite it? Returns ------- pandas.DataFrame A **copy** of `df` with the indicated columns added. Example ------- Data frame for which we get mutation info: >>> df = pd.DataFrame({ ... 'name': ['seq1', 'seq2', 'seq3'], ... 'mutations': ['A6G del7to8 T3A', '', 'T5A ins5AAT del8to9'], ... }) Get info on substitutions: >>> add_mut_info_cols(df, sub_str_col='subs', n_sub_col='nsubs') name mutations subs nsubs 0 seq1 A6G del7to8 T3A T3A A6G 2 1 seq2 0 2 seq3 T5A ins5AAT del8to9 T5A 1 Get info on substitutions and indels: >>> pd.set_option('display.max_columns', 10) >>> add_mut_info_cols(df, sub_str_col='subs', n_sub_col='nsubs', ... indel_str_col='indels', n_indel_col='nindels') name mutations subs indels nsubs nindels 0 seq1 A6G del7to8 T3A T3A A6G del7to8 2 1 1 seq2 0 0 2 seq3 T5A ins5AAT del8to9 T5A del8to9 ins5AAT 1 2 Just get counts for all types of mutations: >>> add_mut_info_cols(df, n_sub_col='nsubs', n_del_col='ndels', ... n_ins_col='nins', n_indel_col='nindels') name mutations nsubs ndels nins nindels 0 seq1 A6G del7to8 T3A 2 1 0 1 1 seq2 0 0 0 0 2 seq3 T5A ins5AAT del8to9 1 1 1 2 """ df = df.reset_index(drop=True) if mutation_col not in df.columns: raise ValueError(f"`df` lacks `mutation_col` {mutation_col}") new_cols = collections.OrderedDict( [ (key, val) for key, val in [ ("substitutions_str", sub_str_col), ("deletions_str", del_str_col), ("insertions_str", ins_str_col), ("indel_str", indel_str_col), ("substitutions_n", n_sub_col), ("deletions_n", n_del_col), ("insertions_n", n_ins_col), ("indel_n", n_indel_col), ] if val is not None ] ) if mutation_col in set(new_cols.values()): raise ValueError("`mutation_col` also name of col to add") already_has = set(new_cols.values()).intersection(set(df.columns)) if already_has and not overwrite_cols: raise ValueError(f"`df` already has these columns: {already_has}") def _mut_info(s): m = process_mut_str(s) returnlist = [] for col in new_cols.keys(): mut_type, col_type = col.split("_") if mut_type == "indel": mlist = m.deletions + m.insertions else: mlist = getattr(m, mut_type) if col_type == "str": returnlist.append(" ".join(mlist)) else: assert col_type == "n" returnlist.append(len(mlist)) return returnlist mut_info_df = pd.DataFrame( [_mut_info(m) for m in df[mutation_col].values], columns=new_cols.values() ).reindex(index=df.index) return df.drop(columns=new_cols.values(), errors="ignore").join(mut_info_df)
class _LnL_error_rate: """Log likelihood of sequences per group as function of error rate. Note ---- Used as utility function for computing the empirical accuracy by :func:`empirical_accuracy`. See the docs for that function to understand the notation / nomenclature used here. Parameters ---------- df : pandas.DataFrame A different row for each group of sequences. n_col : str Column in `df` with number of sequences per group. u_col : str Column in `df` with number of unique sequences per group. count_col : str Column in `df` with number of groups with this value of :math:`n` and :math:`u`. Example ------- >>> df = pd.DataFrame( ... [[4, 3, 1], ... [4, 2, 2], ... [4, 1, 1], ... ], ... columns=['n', 'u', 'count'], ... ) >>> lnl = _LnL_error_rate(df, n_col='n', u_col='u', count_col='count') >>> round(lnl.maxlik_eps(), 3) 0.25 """ def __init__(self, df, *, n_col, u_col, count_col): """See main class docstring.""" self._df = df.assign( n=lambda x: x[n_col], u=lambda x: x[u_col], count=lambda x: x[count_col], binom=lambda x: scipy.special.binom(x["n"], x["u"] - 1), delta_un=lambda x: (x["n"] == x["u"]).astype(int), ) def lnlik(self, eps): """Log likelihood for error rate `eps`.""" return sum( self._df["count"] * numpy.log( self._df["binom"] * (1 - eps) ** (self._df["n"] - self._df["u"] + 1) * eps ** (self._df["u"] - 1) + self._df["delta_un"] * eps ** self._df["n"] ) ) def neg_lnlik(self, eps): """Negative log likelihood for error rate `epsilon`.""" return -self.lnlik(eps) def maxlik_eps(self): """Maximum likelihood value of error rate `epsilon`.""" res = scipy.optimize.minimize_scalar( self.neg_lnlik, bounds=(1e-8, 1 - 1e-8), method="bounded" ) if not res.success: raise RuntimeError(f"optimization failed:\n{res}") return res.x
[docs]def empirical_accuracy( df, *, group_cols="barcode", upstream_group_cols="library", mutation_col="mutations", accuracy_col="accuracy", sort_mutations=True, ): r"""Accuracy from number of identical sequences in a group (i.e., barcode). Note ---- The accuracy :math:`1 - \epsilon` is calculated as follows: Let :math:`\epsilon` be the probability a sequence has an error. Consider a group of :math:`n` sequences that should be identical in the absence of errors (i.e., they all have the same barcode), and assume errors only make sequences **different** (the chance of two sequences having the same error is very low). We would like to calculate the probability of observing :math:`u` unique sequences in the group given :math:`\epsilon` and :math:`n`. First, consider the case where there are two sequences in the group (:math:`n = 2`). The probability of having :math:`u = 1` unique sequences in the group is the probability that neither have an error: .. math:: \Pr\left(u=1 | n=2, \epsilon\right) = \left(1 - \epsilon\right)^2. The probability of having :math:`u = 2` unique sequences is the the probability that either one or both have errors: .. math:: \Pr\left(u=2 | n=2, \epsilon\right) = 2\left(1 - \epsilon\right)\epsilon + \epsilon^2. Generalizing to arbitrary :math:`n`, we have: .. math:: \Pr\left(u | n, \epsilon\right) = \binom{n}{u-1} \left(1 - \epsilon\right)^{n - u + 1} \epsilon^{u - 1} + \delta_{un} \epsilon^n where :math:`\binom{n}{u-1}` is the binomial coefficient and :math:`\delta_{un}` is the Kronecker delta. Let there be :math:`G` groups of sequences, with the size and number of unique sequences in group :math:`g` be :math:`n_g` and :math:`u_g` (where :math:`g = 1, \ldots, G`). The overall likelihood :math:`L` of the observed numbers of unique sequences given the group sizes and error rate is: .. math:: L &=& \Pr\left(\left\{u_g\right\}|\left\{n_g\right\},\epsilon\right) \\ &=& \prod_g \Pr\left(u_g | n_g, \epsilon\right). To find the maximum likelihood of error rate, we simply use numerical optimization to find the value of :math:`\epsilon` that maximizes :math:`L`. In practice, we actually maximize :math:`\ln\left(L\right)`. Parameters ---------- df : pandas.DataFrame Each row gives data for a different sequence. group_cols : str or list Column(s) in `df` indicating how we group sequences before computing fraction within group that is identical. upstream_group_cols : str or None or list Column(s) in `df` that we use to group **before** doing the grouping for computing fraction identical. So a different average fraction will be returned for each of these upstream groups. mutation_col : str Column in `df` that we compare when determining if sequences are identical. Typically list of mutations or sequences themselves. accuracy_col : str Name of column in returned data frame that gives empirical accuracy. sort_mutations : bool Useful if you have strings of space-delimited mutations not guaranteed to be consistently ordered. If `True`, sort such space-delimited mutations. Returns ------- pandas.DataFrame Has all columns in `upstream_group_cols` plus the new column with name given by `accuracy_col`. Example ------- >>> df = pd.DataFrame({ ... 'barcode': ['AT', 'AT', 'TG', 'TA', 'TA', 'TA'], ... 'mutations': ['A1G', 'A1G', '', 'T2A C6G', 'T5C', 'C6G T2A']}) >>> with pd.option_context('display.precision', 4): ... empirical_accuracy(df, upstream_group_cols=None) accuracy 0 0.7692 If we do the same without sorting the mutations, we get lower values as 'T2A C6G' and 'C6G T2A' are then considered as different: >>> with pd.option_context('display.precision', 4): ... empirical_accuracy(df, upstream_group_cols=None, ... sort_mutations=False) accuracy 0 0.4766 Now another example with two libraries and non-standard column names. Note that we only get results for the two libraries with barcodes found multiple times: >>> df2 = pd.DataFrame({ ... 'bc' :['AT', 'AT', 'TG', 'TA', 'TA', 'TA', 'TA'], ... 'var' :['v1', 'v1', 'v2', 'v3', 'v4', 'v3', 'v3'], ... 'lib' :['s1', 's1', 's2', 's3', 's3', 's3', 's4']}) >>> with pd.option_context('display.precision', 4): ... empirical_accuracy(df2, upstream_group_cols='lib', group_cols='bc', ... mutation_col='var', sort_mutations=False) lib accuracy 0 s1 1.0000 1 s3 0.6667 """ reserved_cols = ["_n", "_u", "_ngroups", "_dummy"] for col in reserved_cols: if col in df.columns: raise ValueError(f"`df` cannot have column named {col}") if accuracy_col in df.columns: raise ValueError(f"`df` has column `accuracy_col` {accuracy_col}") if isinstance(group_cols, str): group_cols = [group_cols] if (upstream_group_cols is None) or upstream_group_cols == []: drop_upstream_col = True upstream_group_cols = "_dummy" df = df.assign(**{upstream_group_cols: "dummy"}) else: drop_upstream_col = False if isinstance(upstream_group_cols, str): upstream_group_cols = [upstream_group_cols] cols = group_cols + upstream_group_cols if len(set(cols)) != len(cols): raise ValueError("duplicate in `group_cols` / `upstream_group_cols`") if mutation_col in cols: raise ValueError(f"`mutation_col` {mutation_col} also grouping column") for col in cols + [mutation_col]: if col not in df.columns: raise ValueError(f"no column {col} in `df`: {list(df.columns)}") result = ( df # number of sequences in each group .assign(_n=lambda x: x.groupby(cols)[mutation_col].transform("count")) # only retain groups with >1 sequence .query("_n > 1") # sort mutations .assign( **{ mutation_col: ( lambda x: x[mutation_col].map(lambda s: " ".join(sorted(s.split()))) if sort_mutations else x[mutation_col] ) } ) # number of unique sequences in each group .assign(_u=lambda x: (x.groupby(cols)[mutation_col].transform("nunique"))) # number of groups with each combination of n and u .groupby(upstream_group_cols + ["_n", "_u"]) .size() .rename("_ngroups") .reset_index() # get error rate .groupby(upstream_group_cols) .apply( lambda x: 1 - _LnL_error_rate( x, n_col="_n", u_col="_u", count_col="_ngroups" ).maxlik_eps() ) .rename(accuracy_col) .reset_index() ) if drop_upstream_col: assert len(upstream_group_cols) == 1 result = result.drop(upstream_group_cols, axis="columns") return result
[docs]def simple_mutconsensus( df, *, group_cols=("library", "barcode"), mutation_col="mutations", max_sub_diffs=1, max_indel_diffs=2, max_minor_sub_frac=0.1, max_minor_indel_frac=0.25, max_minor_greater_or_equal=False, min_support=1, support_col="variant_call_support", ): """Get simple consensus of mutations with group (i.e., barcode). Parameters ---------- df : pandas.DataFrame Each row gives data on mutations for a different sequence. group_cols : list, tuple, or str Group all sequences that share values in these column(s) of `df`. mutation_col : str Column in `df` with mutations in form that can be processed by :func:`process_mut_str`. max_sub_diffs : int or None Drop groups where any variant differs from all other variants by more than this many substitution (point mutation) differences. Set to ``None`` if no limit. max_indel_diffs : int or None Drop groups where any variant differs from all other variants by more than this many indel differences. Set to ``None`` if no limit. max_minor_sub_frac : float Drop groups with a minor (non-consensus) substitution in > the **ceiling** of this fraction times the number of sequences in group. max_minor_indel_frac : float Drop groups with a minor (non-consensus) indel in > the **ceiling** of this fraction times the number of sequences in group. max_minor_greater_or_equal : bool For ``max_minor_sub_frac`` and ``max_minor_indel_frac``, use >= rather than >. This is may help if you want to be conservative in avoiding bad consensuses. But don't reject if zero differences. min_support : int Require at least this many supporting sequences to call consensus. support_col : str Name of column in returned `consensus` data frame with number of sequences supporting the consensus call. Note ---- The rationale behind this consensus calling scheme is that we want to build a consensus except in the following two cases, each of which indicate a likely problem **beyond** simple rare sequencing errors (such as strand exchange or barcode collisions): 1. A sequence within the group differs by too much from others in group (given by `max_sub_diffs` and `max_indel_diffs`). 2. There are "minor" (non-consensus) mutations present at too high frequency (given by `max_minor_indel_frac` and `max_minor_sub_frac`). Returns ------- tuple The returned tuple `(consensus, dropped)` consists of two pandas data frames: - `consensus`: Gives consensus for all groups for which this can be constructed. Columns are `group_cols`, column with name given by `mutation_col` that gives consensus mutations, and a column with name given by `support_col` that gives the number of variants used to generate the consensus. - `dropped`: Gives each dropped group, the reason it was dropped, and the number of variants (sequences) in that group. Example ------- Create a dummy CSV file with our variants and mutations: >>> f = io.StringIO() >>> _ = f.write(textwrap.dedent(''' ... library,barcode,mutations ... lib_1,AG,A2C del5to7 ... lib_1,AG,A2C ... lib_1,TA,G3A ins4len3 ... lib_1,TA,G3A ins4len3 ... lib_1,TA,G3A ins4len3 ... lib_2,TA,C5A T-6C ... lib_2,TA,ins5len1 T-6C ... lib_2,TA,T-6C ... lib_2,TA,A4G T-6C ... lib_2,TG,T6A ... lib_2,TG,A2G ... lib_2,GG,del1to4 ... lib_2,GG,A1C ... lib_2,AA, ... lib_2,AA, ... lib_2,AA,T6C ... lib_2,AA,T6C ... lib_3,AA,ins1len1 del1to2 T6G ... lib_3,AA,del5to7 T6G ... ''')) >>> _ = f.seek(0) Read into data frame. Note use of `na_filter=False` so empty strings are read as such rather than as `na` values: >>> df = pd.read_csv(f, na_filter=False) Get the consensus mutations, and the dropped libraries / barcodes: >>> consensus, dropped = simple_mutconsensus(df) >>> consensus library barcode mutations variant_call_support 0 lib_1 AG A2C 2 1 lib_1 TA G3A ins4len3 3 2 lib_2 GG 2 3 lib_2 TA T-6C 4 >>> dropped library barcode drop_reason nseqs 0 lib_2 AA minor subs too frequent 4 1 lib_2 TG subs diff too large 2 2 lib_3 AA indels diff too large 2 Get the consensus just for library 1: >>> lib1_consensus, _ = simple_mutconsensus(df.query('library == "lib_1"'), ... group_cols='barcode') >>> lib1_consensus barcode mutations variant_call_support 0 AG A2C 2 1 TA G3A ins4len3 3 Set ``max_sub_diffs`` to None: >>> consensus, dropped = simple_mutconsensus(df, max_sub_diffs=None) >>> consensus library barcode mutations variant_call_support 0 lib_1 AG A2C 2 1 lib_1 TA G3A ins4len3 3 2 lib_2 GG 2 3 lib_2 TA T-6C 4 4 lib_2 TG 2 >>> dropped library barcode drop_reason nseqs 0 lib_2 AA minor subs too frequent 4 1 lib_3 AA indels diff too large 2 Set ``max_minor_greater_or_equal`` to True: >>> consensus, dropped = simple_mutconsensus( ... df, max_minor_greater_or_equal=True) >>> consensus library barcode mutations variant_call_support 0 lib_1 TA G3A ins4len3 3 >>> dropped library barcode drop_reason nseqs 0 lib_1 AG minor indels too frequent 2 1 lib_2 AA minor subs too frequent 4 2 lib_2 GG minor subs too frequent 2 3 lib_2 TA minor subs too frequent 4 4 lib_2 TG subs diff too large 2 5 lib_3 AA indels diff too large 2 Use ``min_support``: >>> consensus, dropped = simple_mutconsensus(df, min_support=3) >>> consensus library barcode mutations variant_call_support 0 lib_1 TA G3A ins4len3 3 1 lib_2 TA T-6C 4 >>> dropped library barcode drop_reason nseqs 0 lib_1 AG too few sequences 2 1 lib_2 AA minor subs too frequent 4 2 lib_2 GG too few sequences 2 3 lib_2 TG too few sequences 2 4 lib_3 AA too few sequences 2 """ if isinstance(group_cols, str): group_cols = [group_cols] else: group_cols = list(group_cols) if set(group_cols) > set(df.columns): raise ValueError(f"`df` lacks `group_cols`: {group_cols}") if mutation_col in group_cols: raise ValueError(f"`mutation_col` {mutation_col} in `group_cols`") if mutation_col not in df.columns: raise ValueError(f"`df` lacks `mutation_col`: {mutation_col}") df = df[group_cols + [mutation_col]] if df.isnull().values.any(): raise ValueError( "`df` contains `na` (null) entries. It is possible " "that you read a CSV without using `na_filter=False` " "so that empty mutation strings were read as `na` " "rather than empty strings." ) dropped = [] consensus = [] for g, g_df in df.groupby(group_cols, observed=True)[mutation_col]: if len(group_cols) == 1: g = [g] nseqs = len(g_df) half_nseqs = 0.5 * nseqs assert nseqs > 0 if nseqs < min_support: dropped.append((*g, "too few sequences", nseqs)) continue mutations = [process_mut_str(s) for s in g_df.values] drop_reason = None g_consensus = [] mutlists = { "subs": [m.substitutions for m in mutations], "indels": [m.deletions + m.insertions for m in mutations], } for mtype, maxd, max_frac in [ ("subs", max_sub_diffs, max_minor_sub_frac), ("indels", max_indel_diffs, max_minor_indel_frac), ]: if maxd is not None: # is max_sub_diffs or max_indel_diffs satisfied? ndiffs_by_seq = collections.defaultdict(list) for (i1, m1set), (i2, m2set) in itertools.combinations( enumerate(mutlists[mtype]), 2 ): ndiffs = len(numpy.setxor1d(m1set, m2set, assume_unique=True)) ndiffs_by_seq[i1].append(ndiffs) ndiffs_by_seq[i2].append(ndiffs) if any(min(ndifflist) > maxd for ndifflist in ndiffs_by_seq.values()): drop_reason = f"{mtype} diff too large" break # see if max_minor_mut_frac is satisfied max_muts = math.ceil(max_frac * nseqs) nseqs_minus_max_muts = nseqs - max_muts counts = collections.Counter(itertools.chain.from_iterable(mutlists[mtype])) if max_minor_greater_or_equal: if any( (max_muts <= count <= nseqs_minus_max_muts) and (count != nseqs) for count in counts.values() ): drop_reason = f"minor {mtype} too frequent" break elif any( max_muts < count < nseqs_minus_max_muts for count in counts.values() ): drop_reason = f"minor {mtype} too frequent" break # consensus for mutation type g_consensus += [m for m, c in counts.items() if c > half_nseqs] if drop_reason is not None: dropped.append((*g, drop_reason, nseqs)) else: consensus.append((*g, " ".join(g_consensus), nseqs)) consensus = pd.DataFrame( consensus, columns=group_cols + [mutation_col, support_col] ) dropped = pd.DataFrame(dropped, columns=group_cols + ["drop_reason", "nseqs"]) return consensus, dropped
if __name__ == "__main__": import doctest doctest.testmod()