Source code for dms_variants.codonvarianttable

"""
=================
codonvarianttable
=================

Defines :class:`CodonVariantTable` objects for storing and
handling codon variants of a gene.

"""

import collections
import itertools
import os
import re
import tempfile

import Bio.SeqUtils.ProtParamData

import numpy

import pandas as pd

import plotnine as p9

import dms_variants.utils
from dms_variants.constants import (
    AAS_WITHSTOP,
    AAS_WITHSTOP_WITHGAP,
    AA_TO_CODONS,
    AA_TO_CODONS_WITHGAP,
    CBPALETTE,
    CODONS,
    CODONS_WITHGAP,
    CODON_TO_AA,
    CODON_TO_AA_WITHGAP,
    NTS,
    NTS_WITHGAP,
)


[docs] class CodonVariantTable: """Store and handle counts of barcoded codon variants of a gene. Parameters ---------- barcode_variant_file : str CSV file giving barcodes and variants. Must have columns "library", "barcode", "substitutions" (nucleotide mutations in 1, ..., numbering in a format like "G301A A302T G856C"), and "variant_call_support" (sequences supporting barcode-variant call). Additional columns are removed unless they are specified `extra_cols`. geneseq : str Sequence of wildtype protein-coding gene. substitutions_are_codon : bool If `True`, then "substitutions" column in `barcode_variant_file` gives substitutions as codon rather than nucleotide mutations (e.g., "ATG1ATA GTA5CCC" for substitutions at codons 1 and 5. allowgaps : bool Allow in-frame codon-length gaps (``---``) as valid codon substitution. extra_cols : list Additional columns in `barcode_variant_file` to retain when creating `barcode_variant_df` and `variant_count_df` attributes. substitutions_col : str Name of substitutions column in `barcode_variant_file` (use if you want it to be something other than "substitutions"). primary_target : str or None Use this option if you have additional targets beyond the main gene for which we are analyzing variants. This might be the case if you have spiked other genes into the library. If this option is set to something other than `None`, then there must be a column in `barcode_variant_file` named "target" and one of these targets must be equal to 'primary_target'. If there are other targets, they should **not** have any substitutions as we don't parse substitutions in non-primary targets. Instead, `substitutions_col` for secondary targets should be empty or just have the name of the secondary target. Attributes ---------- geneseq : str Wild-type sequence passed at initialization. sites : list List of all codon sites in 1, 2, ... numbering. codons : collections.OrderedDict `codons[r]` is wildtype codon at site `r`, ordered by sites. aas : collections.OrderedDict `aas[r]` is wildtype amino acid at site `r`, ordered by sites. libraries : list List of libraries in `barcode_variant_file`. barcode_variant_df : pandas.DataFrame Info about codon mutations parsed from `barcode_variant_file`. For non-primary targets, the mutations column just hold the target name. variant_count_df : pandas.DataFrame or None Initially `None`, but after data added with :class:`CodonVariantTable.addSampleCounts`, holds counts of each variant for each sample. Differs from `barcode_variant_df` in that the former just holds barcode-variant definitions, whereas `variant_count_df` has counts for each sample. primary_target : str or None If multiple targets, name of the main target for which we are calling variants. """ _CODON_SUB_RE_NOGAP = re.compile( f"^(?P<wt>{'|'.join(CODONS)})" r"(?P<r>\d+)" f"(?P<mut>{'|'.join(CODONS)})" ) _CODON_SUB_RE_WITHGAP = re.compile( ( f"^(?P<wt>{'|'.join(CODONS)})" # don't allow gaps in wildtype r"(?P<r>\d+)" f"(?P<mut>{'|'.join(CODONS_WITHGAP)})" ).replace("-", r"\-") ) def _set_alphabets(self, allowgaps): """Set class variables representing alphabets.""" self._CODONS = CODONS_WITHGAP if allowgaps else CODONS self._NTS = NTS_WITHGAP if allowgaps else NTS self._AAS = AAS_WITHSTOP_WITHGAP if allowgaps else AAS_WITHSTOP self._AA_TO_CODONS = AA_TO_CODONS_WITHGAP if allowgaps else AA_TO_CODONS self._CODON_TO_AA = CODON_TO_AA_WITHGAP if allowgaps else CODON_TO_AA # don't allow gaps in wildtype for regexes self._CODON_SUB_RE = ( self._CODON_SUB_RE_WITHGAP if allowgaps else self._CODON_SUB_RE_NOGAP ) self._AA_SUB_RE = re.compile( ( f"^(?P<wt>{'|'.join(AAS_WITHSTOP)})" r"(?P<r>\d+)" rf"(?P<mut>{'|'.join(self._AAS)})" ) .replace("*", r"\*") .replace("-", r"\-") ) self._NT_SUB_RE = re.compile( ( f"^(?P<wt>{'|'.join(NTS)})" r"(?P<r>\d+)" rf"(?P<mut>{'|'.join(self._NTS)})" ).replace("-", r"\-") ) def __eq__(self, other): """Test if equal to object `other`.""" # following here: https://stackoverflow.com/a/390640 if type(other) is not type(self): return False elif self.__dict__.keys() != other.__dict__.keys(): return False else: for key, val in self.__dict__.items(): val2 = getattr(other, key) if isinstance(val, pd.DataFrame): if not val.equals(val2): return False else: if val != val2: return False return True
[docs] @classmethod def from_variant_count_df( cls, *, variant_count_df_file, geneseq, drop_all_libs=True, allowgaps=False, primary_target=None, extra_cols=None, ): """:class:`CodonVariantTable` from CSV of `variant_count_df`. Note ---- Use this method when you have written a CSV file of `variant_count_df` attribute of a :class:`CodonVariantTable`, and wish to re-initialize. Parameters ---------- variant_count_df_file : str Name of CSV file containing the `variant_count_df`. Must have following columns: "barcode", "library", "variant_call_support", "codon_substitutions", "sample", and "count". geneseq : str Sequence of wildtype protein-coding gene. drop_all_libs : bool If there is a library named "all libraries", drop it as it probably added by :meth:`CodonVariantTable.addMergedLibraries` and duplicates information for the individual libraries. allowgaps : bool Meaning described in main :class:`CodonVariantTable` doc string. primary_target : None or str Meaning described in main :class:`CodonVariantTable` doc string. extra_cols : list Meaning described in main :class:`CodonVariantTable` doc string. Returns ------- :class:`CodonVariantTable` """ df = pd.read_csv(variant_count_df_file) req_cols = [ "barcode", "library", "variant_call_support", "codon_substitutions", "sample", "count", ] if primary_target is not None: if "target" not in set(df.columns): raise ValueError('primary_target not None but no "target" col') req_cols.append("target") else: if "target" in set(df.columns): raise ValueError('primary_target is None but "target" col') if not set(req_cols).issubset(df.columns): raise ValueError( f"{variant_count_df_file} lacks required " f"columns {req_cols}. It has: {set(df.columns)}" ) if extra_cols and not set(extra_cols).issubset(df.columns): raise ValueError( f"{variant_count_df_file} lacks `extra_cols` " f"columns {extra_cols}. Has: {set(df.columns)}" ) else: if extra_cols: df = df[req_cols + extra_cols] else: df = df[req_cols] if drop_all_libs: dropcol = "all libraries" if dropcol in df["library"].unique(): df = df.query("library != @dropcol") with tempfile.NamedTemporaryFile(mode="w") as f: ( df.drop(columns=["sample", "count"]) .drop_duplicates() .to_csv(f, index=False) ) f.flush() cvt = cls( barcode_variant_file=f.name, geneseq=geneseq, substitutions_are_codon=True, allowgaps=allowgaps, substitutions_col="codon_substitutions", primary_target=primary_target, extra_cols=extra_cols, ) cvt.add_sample_counts_df(df[["library", "sample", "barcode", "count"]]) return cvt
def __init__( self, *, barcode_variant_file, geneseq, allowgaps=False, substitutions_are_codon=False, extra_cols=None, substitutions_col="substitutions", primary_target=None, ): """See main class doc string.""" self._set_alphabets(allowgaps) self.geneseq = geneseq.upper() if not re.fullmatch(f"[{''.join(NTS)}]+", self.geneseq): raise ValueError(f"invalid nucleotides in {self.geneseq}") if ((len(geneseq) % 3) != 0) or len(geneseq) == 0: raise ValueError(f"`geneseq` invalid length {len(self.geneseq)}") self.sites = list(range(1, len(self.geneseq) // 3 + 1)) self.codons = collections.OrderedDict( [(r, self.geneseq[3 * (r - 1) : 3 * r]) for r in self.sites] ) self.aas = collections.OrderedDict( [(r, CODON_TO_AA[codon]) for r, codon in self.codons.items()] ) df = ( pd.read_csv(barcode_variant_file) .rename(columns={substitutions_col: "substitutions"}) .assign(substitutions=lambda x: x["substitutions"].fillna("")) ) required_cols = ["library", "barcode", "substitutions", "variant_call_support"] sort_cols = ["library", "barcode"] self.primary_target = primary_target if self.primary_target is not None: required_cols.insert(0, "target") sort_cols.insert(0, "target") if "target" not in set(df.columns): raise ValueError( "cannot use `primary_target` as the variant " 'file lacks column named "target"' ) if self.primary_target not in set(df["target"]): raise ValueError(f"{self.primary_target} not in 'target' col") df = df.assign( # if substitutions col is secondary target name, make '' substitutions=lambda x: x["substitutions"].where( ( (x["target"] == self.primary_target) | (x["substitutions"] != x["target"]) ), "", ) ) subs_non_primary = ( df.query("target != @self.primary_target") .assign( has_subs=lambda x: ( x["substitutions"].str.strip().str.len().astype(bool) ) ) .query("has_subs == True") ) if len(subs_non_primary): raise ValueError( "non-primary targets have substitutions:\n" + subs_non_primary.head().to_csv() ) elif "target" in set(df.columns): raise ValueError( 'variant file has column "target" but ' "you did not specify `primary_target`" ) if not set(df.columns).issuperset(set(required_cols)): raise ValueError( "`variantfile` does not have " f"required columns {required_cols}" ) if extra_cols and not set(df.columns).issuperset(set(extra_cols)): raise ValueError(f"`variantfile` lacks `extra_cols` {extra_cols}") if extra_cols: df = df[required_cols + extra_cols] else: df = df[required_cols] self.libraries = sorted(df.library.unique().tolist()) self._valid_barcodes = {} for lib in self.libraries: barcodes = df.query("library == @lib").barcode if len(set(barcodes)) != len(barcodes): raise ValueError(f"duplicated barcodes for {lib}") self._valid_barcodes[lib] = set(barcodes) self._samples = {lib: [] for lib in self.libraries} self.variant_count_df = None if substitutions_are_codon: codonSubsFunc = self._sortCodonMuts else: codonSubsFunc = self._ntToCodonMuts self.barcode_variant_df = ( df # info about codon and amino-acid substitutions .assign( codon_substitutions=lambda x: (x["substitutions"].apply(codonSubsFunc)), aa_substitutions=lambda x: ( x.codon_substitutions.apply(self.codonToAAMuts, allowgaps=allowgaps) ), n_codon_substitutions=lambda x: ( x.codon_substitutions.str.split().apply(len) ), n_aa_substitutions=lambda x: ( x["aa_substitutions"].str.split().apply(len) ), ) # we no longer need initial `substitutions` column .drop("substitutions", axis="columns") # sort to ensure consistent order .assign( library=lambda x: pd.Categorical( x["library"], self.libraries, ordered=True, ) ) .sort_values(sort_cols) .reset_index(drop=True) ) assert ( self.primary_target is None and "target" not in set(self.barcode_variant_df.columns) ) or ( self.primary_target is not None and "target" in set(self.barcode_variant_df.columns) ) assert self.primary_target is None or not ( self.barcode_variant_df.query("target != @self.primary_target")[ "n_codon_substitutions" ].any() ) # check validity of codon substitutions given `geneseq` for codonmut in itertools.chain.from_iterable( self.barcode_variant_df.codon_substitutions.str.split() ): m = self._CODON_SUB_RE.fullmatch(codonmut) if m is None: raise ValueError(f"invalid mutation {codonmut}") wt = m.group("wt") r = int(m.group("r")) mut = m.group("mut") if r not in self.sites: raise ValueError(f"invalid site {r} in mutation {codonmut}") if self.codons[r] != wt: raise ValueError( f"Wrong wildtype codon in {codonmut}. " f"Expected wildtype of {self.codons[r]}." ) if wt == mut: raise ValueError(f"invalid mutation {codonmut}") # define some colors for plotting self._mutation_type_colors = { "nonsynonymous": CBPALETTE[1], "synonymous": CBPALETTE[2], "stop": CBPALETTE[3], } if allowgaps: self._mutation_type_colors["deletion"] = CBPALETTE[4] # for "safety" make the substitutions column for non-primary targets # just the target name if self.primary_target is not None: targets = self.barcode_variant_df["target"] primary = targets == self.primary_target for col in ["aa_substitutions", "codon_substitutions"]: self.barcode_variant_df[col] = self.barcode_variant_df[col].where( primary, targets )
[docs] @classmethod def add_frac_counts(self, variant_count_df): """Add fraction of counts from each variant in library/sample. Parameters ---------- variant_count_df : pandas.DataFrame Same format as :attr:`CodonVariantTable`. Returns ------- pandas.DataFrame A copy of `variant_count_df` with added column 'frac_counts' that gives fraction of all counts in that library / sample for that variant. Example -------- >>> variant_count_df = pd.DataFrame.from_records( ... [('lib1', 's1', 'AA', 1), ... ('lib1', 's1', 'AT', 3), ... ('lib1', 's2', 'GG', 0), ... ('lib1', 's2', 'GA', 10), ... ('lib2', 's1', 'CC', 5), ... ('lib2', 's1', 'GA', 5), ... ], ... columns=['library', 'sample', 'barcode', 'count'], ... ) >>> CodonVariantTable.add_frac_counts(variant_count_df) library sample barcode count frac_counts 0 lib1 s1 AA 1 0.25 1 lib1 s1 AT 3 0.75 2 lib1 s2 GG 0 0.00 3 lib1 s2 GA 10 1.00 4 lib2 s1 CC 5 0.50 5 lib2 s1 GA 5 0.50 """ if variant_count_df is None: return None return variant_count_df.assign( frac_counts=lambda x: ( x["count"] / x.groupby(["library", "sample"], observed=True, sort=False)[ "count" ].transform("sum") ) )
[docs] def samples(self, library): """List of all samples for `library`. Parameters ---------- library : str Valid `library` for the :class:`CodonVariantTable`. Returns ------- list All samples for which barcode counts have been added. """ try: return self._samples[library] except KeyError: raise ValueError(f"invalid `library` {library}")
[docs] def addSampleCounts(self, library, sample, barcodecounts): """Add variant counts for a sample to `variant_count_df`. Note ---- If you have many samples to add at once, it is faster to use :meth:`CodonVariantTable.add_sample_counts_df` rather than repeatedly calling this method. Parameters ---------- library : str Valid `library` for the :class:`CodonVariantTable`. sample : str Sample name, must **not** already be in :class:`CodonVariantTable.samples` for `library`. barcodecounts : pandas.DataFrame Counts for each variant by barcode. Must have columns "barcode" and "count". The "barcode" column must have all barcodes in :class:`CodonVariantTable.valid_barcodes` for `library`. Such data frames are returned by :mod:`dms_variants.illuminabarcodeparser.IlluminaBarcodeParser`. """ if library not in self.libraries: raise ValueError(f"invalid library {library}") if sample in self.samples(library): raise ValueError(f"`library` {library} already " f"has `sample` {sample}") req_cols = ["barcode", "count"] if not set(barcodecounts.columns).issuperset(set(req_cols)): raise ValueError(f"`barcodecounts` lacks columns {req_cols}") if len(barcodecounts) != len(set(barcodecounts.barcode.unique())): raise ValueError("`barcodecounts` has non-unique barcodes") if set(barcodecounts.barcode.unique()) != self.valid_barcodes(library): raise ValueError( "barcodes in `barcodecounts` do not match " f"those expected for `library` {library}" ) self._samples[library].append(sample) df = ( barcodecounts[req_cols] .assign(library=library, sample=sample) .merge( self.barcode_variant_df, how="inner", on=["library", "barcode"], sort=False, validate="one_to_one", ) ) if self.variant_count_df is None: self.variant_count_df = df else: self.variant_count_df = pd.concat( [self.variant_count_df, df], axis="index", ignore_index=True, sort=False ) # samples in order added after ordering by library, getting # unique ones as here: https://stackoverflow.com/a/39835527 unique_samples = list( collections.OrderedDict.fromkeys( itertools.chain.from_iterable( [self.samples(lib) for lib in self.libraries] ) ) ) # make library and sample categorical and sort sort_cols = ["library", "sample", "count"] order_cols = self.variant_count_df.columns.tolist() if self.primary_target is not None: sort_cols.insert(0, "target") assert "target" in order_cols order_cols.remove("target") order_cols.insert(0, "target") self.variant_count_df = ( self.variant_count_df.assign( library=lambda x: pd.Categorical( x["library"], self.libraries, ordered=True ), sample=lambda x: pd.Categorical( x["sample"], unique_samples, ordered=True ), ) .sort_values(sort_cols, ascending=[True] * (len(sort_cols) - 1) + [False]) .reset_index(drop=True)[order_cols] )
[docs] def add_sample_counts_df(self, counts_df): """Add variant counts for several samples to `variant_count_df`. Parameters ---------- counts_df : pandas.DataFrame Must have columns 'library', 'sample', 'barcode', and 'count'. The sample must **not** already be in `CodonVariantTable.samples` for that library. The barcode columns must have all barcodes for that library including zero-count ones. """ req_cols = ["library", "sample", "barcode", "count"] if not (set(counts_df.columns) >= set(req_cols)): raise ValueError(f"`counts_df` lacks required columns {req_cols}") for lib in counts_df["library"].unique(): if lib not in self.libraries: raise ValueError(f"`counts_df` has unknown library {lib}") for s in counts_df.query("library == @lib")["sample"].unique(): if s in self.samples(lib): raise ValueError( f"library {lib} already has counts for " f"sample {s}, so you cannot add them" ) else: self._samples[lib].append(s) df = counts_df[req_cols].merge( self.barcode_variant_df, on=["library", "barcode"], sort=False, how="inner", validate="many_to_one", ) if self.variant_count_df is None: self.variant_count_df = df else: assert not ( set(df.groupby(["library", "sample"]).groups).intersection( set(self.variant_count_df.groupby(["library", "sample"]).groups) ) ) self.variant_count_df = pd.concat( [self.variant_count_df, df], axis="index", ignore_index=True, sort=False, ) # samples in order added after ordering by library, getting # unique ones as here: https://stackoverflow.com/a/39835527 unique_samples = list( collections.OrderedDict.fromkeys( itertools.chain.from_iterable( [self.samples(lib) for lib in self.libraries] ) ) ) # make library and sample categorical and sort sort_cols = ["library", "sample", "count"] order_cols = self.variant_count_df.columns.tolist() if self.primary_target is not None: sort_cols.insert(0, "target") assert "target" in order_cols order_cols.remove("target") order_cols.insert(0, "target") self.variant_count_df = ( self.variant_count_df.assign( library=lambda x: pd.Categorical( x["library"], self.libraries, ordered=True ), sample=lambda x: pd.Categorical( x["sample"], unique_samples, ordered=True ), ) .sort_values(sort_cols, ascending=[True] * (len(sort_cols) - 1) + [False]) .reset_index(drop=True)[order_cols] )
[docs] def valid_barcodes(self, library): """Set of valid barcodes for `library`. Parameters ---------- library : str Name of a valid library. Returns ------- set """ if library not in self.libraries: raise ValueError( f"invalid `library` {library}; " f"valid libraries are {self.libraries}" ) else: return self._valid_barcodes[library]
[docs] def prob_escape( self, *, selections_df, neut_standard_target="neut_standard", by="barcode", min_neut_standard_frac=1e-3, min_neut_standard_count=1e3, ceil_n_aa_substitutions=4, drop_neut_standard_target=True, primary_target_only=False, ): r"""Compute probability of escape relative to a neutralization standard. Assumes one of the targets is the neutralization standard and is unaffected by antibody selection. Then computes escape probability of each variant as :math:`p_v = \left(n_v^{sel}/S^{sel}\right) / \left(n_v^0/S^0\right)` where :math:`n_v^{sel}` is the counts of variant :math:`v` in the selected condition, :math:`n_v^0` is the counts of variant :math:`v` in the no-antibody contition, :math:`S^{sel}` is the total counts of the neutralization standard in the selected condition, and :math:`S^0` is the total counts of the neutralization standard in the no-antibody condition. Parameters ---------- selections_df : pandas.DataFrame How to pair samples. Should have columns 'library', 'antibody_sample', and 'no-antibody_sample' for each row. The 'library' and 'antibody_sample' columns must be unique for each row. neut_standard_target : str Name of target that is neutralization standard. by : {'barcode', 'aa_substitutions', 'codon_substitutions'} Compute for each barcode", set of amino-acid substitutions, or set of codon substitutions. In the last two cases, all barcodes with each set of substitutions are combined. min_neut_standard_frac : float Raise error if neutralization standard not at least this fraction of the total counts for each library / sample. min_neut_standard_count : int Raise error if not at least this many counts for neutralization standard for each library / sample. ceil_n_aa_substitutions: int When computing the returned `neutralization` data frame, group variants with >= this many amino-acid substitutions. drop_neut_standard_target : bool Drop the neutralization standard variant-level results from the returned data frames. primary_target_only : bool Drop everything except the primary target and neutralization standard target before beginning calculations. Returns ------- (prob_escape, neut_standard_fracs, neutralization) This tuple consists of three pandas data frames: - `prob_escape`: gives the escape probabilities, and has the following columns: + 'library' + 'antibody_sample' + 'no-antibody_sample' + as many of 'target', 'aa_substitutions', 'n_aa_substitutions', 'codon_substitutions', and 'n_codon_substitutions' as makes sense to retain given value of `by`. + 'prob_escape': censored to be between 0 and 1 + 'prob_escape_uncensored': not censored to be between 0 and 1 + 'antibody_count': counts for variant in antibody condition + 'no-antibody_count': counts for variant no-antibody condition + 'antibody_neut_standard_count': counts for neut standard + 'no-antibody_neut_standard_count': counts for neut standard - `neut_standard_fracs` : a version of `selections_df` that also has columns `antibody_sample_frac`, `no-antibody_sample_frac`, `antibody_sample_count`, `no-antibody_sample_count` giving total counts and fraction of neutralization standard for each row in `selections_df`. - `neutralization` for each target and number of amino-acid (up to `ceil_n_aa_substitutions`), gives the overall probability escape in a column `prob_escape`. """ # check validity of `selections_df` req_cols = {"library", "antibody_sample", "no-antibody_sample"} if not req_cols.issubset(selections_df.columns): raise ValueError(f"`selections_df` lacks columns {req_cols}") if len(selections_df) != len( selections_df.groupby(["library", "antibody_sample"]) ): raise ValueError("library/antibody_sample not unique in selection_df rows") invalid_samples = selections_df.assign( invalid=lambda x: x.apply( lambda row: ( (row["antibody_sample"] not in self.samples(row["library"])) | (row["no-antibody_sample"] not in self.samples(row["library"])) ), axis=1, ) )[["library", "antibody_sample", "no-antibody_sample", "invalid"]].query( "invalid" ) if len(invalid_samples): raise ValueError(f"invalid samples in selections_df\n{invalid_samples}") valid_targets = self.barcode_variant_df["target"].unique() if neut_standard_target not in valid_targets: raise ValueError(f"{neut_standard_target=} not in targets {valid_targets}") # get neut_standard fracs for each library / sample fracs = self.n_variants_df(primary_target_only=False) if primary_target_only: fracs = fracs.query( "(target in [@self.primary_target, @neut_standard_target])" ) fracs = ( fracs.assign( n=lambda x: x.groupby(["library", "sample"], observed=False)[ "count" ].transform("sum"), frac=lambda x: x["count"] / x["n"], ) .query("target == @neut_standard_target") .drop(columns=["target", "n"]) ) # merge neut_standard fracs into `selections_df` neut_standard_fracs = selections_df for stype in ["antibody", "no-antibody"]: col_renames = {col: f"{stype}_{col}" for col in ["count", "frac"]} neut_standard_fracs = ( neut_standard_fracs.merge( fracs, left_on=["library", f"{stype}_sample"], right_on=["library", "sample"], validate="many_to_one", how="left", ) .drop(columns="sample") .rename(columns=col_renames) ) # check we have a neut standard to compute fracs if neut_standard_fracs[list(col_renames.values())].isnull().any().any(): raise ValueError(f"no {neut_standard_target=} for some sample") # check the neut standard fracs and counts sufficiently high for var, threshold in [ ("count", min_neut_standard_count), ("frac", min_neut_standard_frac), ]: too_low = neut_standard_fracs[ neut_standard_fracs[col_renames[var]] < threshold ] if len(too_low): raise ValueError(f"neut standard {var} too low:\n{too_low}") # get variant counts grouped by `by` count_df = self.variant_count_df if primary_target_only: count_df = count_df.query( "(target in [@self.primary_target, @neut_standard_target])" ) group_cols = [ "codon_substitutions", "n_codon_substitutions", "aa_substitutions", "n_aa_substitutions", ] if self.primary_target is not None: group_cols.insert(0, "target") if by in {"aa_substitutions", "codon_substitutions"}: group_cols = group_cols[group_cols.index(by) :] count_df = ( count_df.groupby( ["library", "sample", *group_cols], observed=True, sort=False ) .aggregate({"count": "sum"}) .reset_index() ) elif by == "barcode": group_cols.append("barcode") else: raise ValueError(f"invalid `by` of {by}") # compute prob_escape prob_escape = ( neut_standard_fracs[ [ "library", "antibody_sample", "antibody_count", "no-antibody_sample", "no-antibody_count", ] ] .rename( columns={ "antibody_count": "antibody_neut_standard_count", "no-antibody_count": "no-antibody_neut_standard_count", } ) .merge( count_df, how="left", left_on=["library", "antibody_sample"], right_on=["library", "sample"], validate="one_to_many", ) .drop(columns="sample") .rename(columns={"count": "antibody_count"}) .merge( count_df, how="left", left_on=["library", "no-antibody_sample", *group_cols], right_on=["library", "sample", *group_cols], ) .rename(columns={"count": "no-antibody_count"}) .assign( prob_escape_uncensored=lambda x: ( (x["antibody_count"] / x["antibody_neut_standard_count"]) / (x["no-antibody_count"] / x["no-antibody_neut_standard_count"]) ), prob_escape=lambda x: x["prob_escape_uncensored"].clip(upper=1), )[ [ "library", "antibody_sample", "no-antibody_sample", *group_cols, "prob_escape", "prob_escape_uncensored", "antibody_count", "no-antibody_count", "antibody_neut_standard_count", "no-antibody_neut_standard_count", ] ] ) # compute neutralization data frame to return neutralization = ( prob_escape.assign( n_aa_substitutions=lambda x: ( x["n_aa_substitutions"].clip(upper=ceil_n_aa_substitutions) ) ) .groupby( [ "library", "antibody_sample", "no-antibody_sample", "target", "n_aa_substitutions", "antibody_neut_standard_count", "no-antibody_neut_standard_count", ], as_index=False, ) .aggregate({"antibody_count": "sum", "no-antibody_count": "sum"}) .assign( prob_escape_uncensored=lambda x: ( (x["antibody_count"] / x["antibody_neut_standard_count"]) / (x["no-antibody_count"] / x["no-antibody_neut_standard_count"]) ), prob_escape=lambda x: x["prob_escape_uncensored"].clip(upper=1), ) ) if drop_neut_standard_target: prob_escape = prob_escape.query("target != @neut_standard_target") if prob_escape["target"].nunique() < 2: prob_escape = prob_escape.drop(columns="target").reset_index(drop=True) neutralization = neutralization.query("target != @neut_standard_target") if neutralization["target"].nunique() < 2: neutralization = neutralization.drop(columns="target").reset_index( drop=True ) return prob_escape, neut_standard_fracs, neutralization
[docs] def escape_scores( self, sample_df, score_type, *, pseudocount=0.5, by="barcode", logbase=2, floor_B=0.01, floor_E=0.01, ceil_B=1.0, ceil_E=1.0, ): r"""Compute a score that represents escape from binding. Note ---- Here we couch the explanation in terms of variant escape from antibody binding. Let :math:`v` be a variant, let :math:`B_v` be the fraction of this variant that is bound by antibody, and let :math:`E_v = 1 - B_v` be the fraction that escapes binding. A variant that completely escapes binding has :math:`B_v = 0` and :math:`E_v = 1`; a variant that is completely bound has :math:`B_v = 1` and :math:`E_v = 0`. We define the escape score :math:`s_v` in one of three ways depending on the value of `score_type` parameter: 1. As the escape fraction, so :math:`s_v = E_v`. 2. As the log of the escape fraction, so :math:`s_v = \log_b E_v` where :math:`b` is the logarithm base. 3. As minus the log of the binding fraction, so :math:`s_v = -\log_b B_v`. In all cases, larger values of :math:`s_v` indicate more escape. We calculate :math:`E_v` as follows. Let :math:`f_v^{\rm{pre}}` be the frequency of :math:`v` prior to selection for binding (so :math:`\sum_v f_v^{\rm{pre}} = 1`). Then the fraction of the library that is :math:`v` **after** selecting for unbound variants is .. math:: f_v^{\rm{post}} = \frac{f_v^{\rm{pre}} \times E_v} {\sum_{v'} f_{v'}^{\rm{pre}} \times E_{v'}}. Note that the denominator of the above equation, :math:`F = \sum_v f_v^{\rm{pre}} \times E_v`, represents the overall fraction of the library that escapes binding, **which we assume is directly measured experimentally**. We can easily solve for :math:`E_v` as .. math:: E_v = \frac{F \times f_v^{\rm{post}}}{f_v^{\rm{pre}}}, and can similarly obtain :math:`B_v` from :math:`B_v = 1 - E_v`. We calculate :math:`E_v` directly from the actual counts of the variants pre- and post-selection. Let :math:`n_v^{\rm{pre}}` and :math:`n_v^{\rm{post}}` be the counts of variant :math:`v` pre- and post-selection **after** adding a pseudocount of :math:`P \ge 0`, and let :math:`N^{\rm{pre}} = \sum_v n_v^{\rm{pre}}` and :math:`N^{\rm{post}} = \sum_v n_v^{\rm{post}}` be the total counts of all variants pre- and post-selection. Then: .. math:: E_v = F \times \frac{n_v^{\rm{post}} N^{\rm{pre}}} {n_v^{\rm{pre}} N^{\rm{post}}} A complication is that :math:`s_v` can be undefined for certain values of :math:`E_v` or :math:`B_v`. Specifically, when using the log escape fraction definition (:math:`s_v = \log_b E_v`) then :math:`s_v` is undefined if :math:`E_v = 0`, so we put a floor on :math:`E_v` by defining :math:`s_v = \log_b \max\left(E_v, E_{\rm{floor}}\right)`. When using the minus log binding fraction definition (:math:`s_v = -\log_b B_v`), then :math:`s_v` is undefined if :math:`B_v \le 0`, so we put a floor on :math:`B_v` by defining defining :math:`s_v = -\log_b \max\left(B_v, B_{\rm{floor}}\right)`. We similarly define ceilings on :math:`E_v` and :math:`B_v`, although these ceilings are optional. We also estimate the variance :math:`\sigma_{s_v}^2` on :math:`s_v` from the variances on the counts, which we assume are :math:`\sigma_{n_v^{\rm{pre}}}^2 = n_v^{\rm{pre}}` and :math:`\sigma_{n_v^{\rm{post}}}^2 = n_v^{\rm{post}}` from Poisson counting statistics. To do this, we propagate the errors: .. math:: \sigma_{s_v}^2 &=& \left(\frac{\partial s_v}{\partial n_v^{\rm{pre}}}\right)^2 \sigma_{n_v^{\rm{pre}}}^2 + \left(\frac{\partial s_v}{\partial n_v^{\rm{post}}}\right)^2 \sigma_{n_v^{\rm{post}}}^2 \\ &=& \left(\frac{\partial s_v}{\partial n_v^{\rm{pre}}}\right)^2 n_v^{\rm{pre}} + \left(\frac{\partial s_v}{\partial n_v^{\rm{post}}}\right)^2 n_v^{\rm{post}}. We calculate the derivatives of :math:`s_v` with respect to the counts numerically with a step size of one rather than analytically, since analytical calculations are confounded by the floors on :math:`E_v` or :math:`B_v`. Parameters ---------- sample_df : pandas.DataFrame Comparisons we use to compute the functional scores. Should have these columns: 'pre_sample' (pre-selection sample), 'post_sample' (post-selection sample), 'library', 'name' (name for output), 'frac_escape' (the overall fraction escaping :math:`F`). score_type : {'frac_escape', 'log_escape', 'minus_log_bind'} How to define escape score: :math:`E_v` if 'frac_escape'; :math:`\log_b E_v` if 'log_escape'; :math:`-\log_b B_v` if 'minus_log_bind'. pseudocount : float Pseudocount added to each count. by : {'barcode', 'aa_substitutions', 'codon_substitutions'} Compute effects for each barcode", set of amino-acid substitutions, or set of codon substitutions. In the last two cases, all barcodes with each set of substitutions are combined. logbase : float Base for logarithm when calculating functional score. floor_B : float Floor assigned to :math:`B_v`, :math:`B_{\rm{floor}}` if `score_type` is 'minus_log_bind'. floor_E : float Floor assigned to :math:`E_v`, :math:`E_{\rm{floor}}` if `score_type` is 'frac_escape' or 'log_escape'. ceil_B : float or None Ceiling assigned to :math:`B_v`, or `None` if no ceiling. ceil_E : float or None Ceiling assigned to :math:`E_v`, or `None` if no ceiling. Returns ------- pandas.DataFrame Has the following columns: - 'name': specified in `sample_df` - 'library': the library - 'pre_sample': specified in `sample_df` - 'post_sample': specified in `sample_df` - the grouping used to compute scores (the value of `by`) - 'score': :math:`s_v` - 'score_var': :math:`\sigma_{s_v}^2` - 'pre_count': :math:`n_v^{\rm{pre}}` (without pseudocount) - 'post_count': :math:`n_v^{\rm{post}}` (without pseudocount) - as many of 'aa_substitutions', 'n_aa_substitutions', 'codon_substitutions', and 'n_codon_substitutions' as makes sense to retain given value of `by`. """ req_cols = {"pre_sample", "post_sample", "library", "name", "frac_escape"} if not set(sample_df.columns).issuperset(req_cols): raise ValueError(f"`sample_df` lacks required columns: {req_cols}") if len(sample_df) != len(sample_df.groupby(["name", "library"])): raise ValueError("names / libraries in `sample_df` not unique") if (0 >= sample_df["frac_escape"]).any() or ( sample_df["frac_escape"] >= 1 ).any(): raise ValueError("in `sample_df`, `frac_escape` must be > 0, < 1") # get data frame with samples of interest df = [] already_added = set() for tup in sample_df.itertuples(): lib = tup.library for stype in ["pre_sample", "post_sample"]: sample = getattr(tup, stype) if (sample, lib) in already_added: continue already_added.add((sample, lib)) tup_df = self.variant_count_df.query( "(sample == @sample) and (library == @lib)" ) if len(tup_df) < 1: raise ValueError(f"no sample {sample} library {lib}") df.append(tup_df) df = pd.concat(df, ignore_index=True, sort=False) # sum counts in groups specified by `by` group_cols = [ "codon_substitutions", "n_codon_substitutions", "aa_substitutions", "n_aa_substitutions", ] if self.primary_target is not None: group_cols.append("target") if by in {"aa_substitutions", "codon_substitutions"}: group_cols = group_cols[group_cols.index(by) + 1 :] df = ( df.groupby( ["library", "sample", by, *group_cols], observed=True, sort=False ) .aggregate({"count": "sum"}) .reset_index() ) elif by != "barcode": raise ValueError(f"invalid `by` of {by}") # get data frame with pre- and post-selection samples / counts df_scores = [] for tup in sample_df.itertuples(): name_dfs = [] lib = tup.library # noqa: F841 for stype in ("pre_sample", "post_sample"): s_name = getattr(tup, stype) # noqa: F841 name_dfs.append( df.query("(sample == @s_name) and (library == @lib)") .rename( columns={ "count": stype.split("_")[0] + "_count", "sample": stype, } ) .assign(name=tup.name, frac_escape=tup.frac_escape) ) df_scores.append(pd.merge(*name_dfs, how="inner", validate="1:1")) df_scores = pd.concat(df_scores, ignore_index=True, sort=False) # check pseudocount if pseudocount < 0: raise ValueError(f"`pseudocount` is < 0: {pseudocount}") elif (pseudocount == 0) and any( (df_scores[c] <= 0).any() for c in ["pre_count", "post_count"] ): raise ValueError("some counts are zero, you must use " "`pseudocount` > 0") if floor_B <= 0: raise ValueError("`floor_B` must be > 0") if floor_E <= 0 and score_type != "frac_escape": raise ValueError("`floor_E` must be > 0") if (ceil_B is not None) and (ceil_B <= floor_B): raise ValueError("`ceil_B` must be > `floor_B`") if (ceil_E is not None) and (ceil_E <= floor_E): raise ValueError("`ceil_E` must be > `floor_E`") # compute escape scores def _compute_escape_scores(): _df_scores = df_scores.assign( n_v_pre=lambda x: x["pre_count"] + pseudocount, n_v_post=lambda x: x["post_count"] + pseudocount, N_pre=lambda x: ( x.groupby(["name", "library"], observed=True, sort=False)[ "n_v_pre" ].transform("sum") ), N_post=lambda x: ( x.groupby(["name", "library"], observed=True, sort=False)[ "n_v_post" ].transform("sum") ), E_v=lambda x: ( x["frac_escape"] * x["n_v_post"] * x["N_pre"] / (x["n_v_pre"] * x["N_post"]) ), B_v=lambda x: 1 - x["E_v"], # for computing derivatives, increment counts by one n_v_pre_d=lambda x: x["n_v_pre"] + 1, n_v_post_d=lambda x: x["n_v_post"] + 1, N_pre_d=lambda x: x["N_pre"] + 1, N_post_d=lambda x: x["N_post"] + 1, E_v_dpre=lambda x: ( x["frac_escape"] * x["n_v_post"] * x["N_pre_d"] / (x["n_v_pre_d"] * x["N_post"]) ), E_v_dpost=lambda x: ( x["frac_escape"] * x["n_v_post_d"] * x["N_pre"] / (x["n_v_pre"] * x["N_post_d"]) ), B_v_dpre=lambda x: 1 - x["E_v_dpre"], B_v_dpost=lambda x: 1 - x["E_v_dpost"], ) if score_type == "minus_log_bind": floor = floor_B ceil = ceil_B cols = ["B_v", "B_v_dpre", "B_v_dpost"] def func(x): return -numpy.log(x) / numpy.log(logbase) elif score_type == "log_escape": floor = floor_E ceil = ceil_E cols = ["E_v", "E_v_dpre", "E_v_dpost"] def func(x): return numpy.log(x) / numpy.log(logbase) elif score_type == "frac_escape": floor = floor_E ceil = ceil_E cols = ["E_v", "E_v_dpre", "E_v_dpost"] def func(x): return x else: raise ValueError(f"invalid `score_type` {score_type}") for col in cols: _df_scores[col] = numpy.clip(_df_scores[col], floor, ceil) _df_scores["score"] = func(_df_scores[cols[0]]) _df_scores["score_dpre"] = func(_df_scores[cols[1]]) _df_scores["score_dpost"] = func(_df_scores[cols[2]]) _df_scores["score_var"] = ( _df_scores["score_dpre"] - _df_scores["score"] ) ** 2 * _df_scores["n_v_pre"] + ( _df_scores["score_dpost"] - _df_scores["score"] ) ** 2 * _df_scores[ "n_v_post" ] return _df_scores df_scores = _compute_escape_scores() assert df_scores["score"].notnull().all(), df_scores # get columns to keep col_order = [ "name", "library", "pre_sample", "post_sample", by, "score", "score_var", "pre_count", "post_count", *group_cols, ] if self.primary_target is not None: assert col_order.count("target") == 1 col_order.remove("target") col_order.insert(1, "target") else: assert "target" not in col_order return df_scores[col_order]
[docs] def func_scores( self, preselection, *, pseudocount=0.5, by="barcode", libraries="all", syn_as_wt=False, logbase=2, permit_zero_wt=False, permit_self_comp=False, ): r"""Get data frame with functional scores for variants. Note ---- The functional score is calculated from the change in counts for a variant pre- and post-selection using the formula in `Otwinoski et al (2018) <https://doi.org/10.1073/pnas.1804015115>`_. Specifically, let :math:`n^v_{pre}` and :math:`n^v_{post}` be the counts of variant :math:`v` pre- and post-selection, and let :math:`n^{wt}_{pre}` and :math:`n^{wt}_{post}` be the summed counts of **all** wildtype variants pre- and post- selection. Then the functional score of the variant is: .. math:: f_v = \log_b\left(\frac{n^v_{post} / n^{wt}_{post}}{n^v_{pre} / n^{wt}_{pre}}\right). The variance due to Poisson sampling statistics is: .. math:: \sigma^2_v = \frac{1}{\left(\ln b\right)^2} \left(\frac{1}{n^v_{post}} + \frac{1}{n^{wt}_{post}} + \frac{1}{n^v_{pre}} + \frac{1}{n^{wt}_{pre}}\right) where :math:`b` is logarithm base (see `logbase` parameter). For both calculations, a pseudocount (see `pseudocount` parameter) is added to each count first. The wildtype counts are computed across all **fully wildtype** variants (see `syn_as_wt` for how this is defined). If there are multiple targets, the functional scores for all targets are relative to the wildtype of the primary target. Parameters ---------- preselection : str or dict Pre-selection sample. If the same for all post-selection then provide the name as str. If it differs among post-selection samples, then provide a dict keyed by each post-selection sample with the pre-selection sample being the value. pseudocount : float Pseudocount added to each count. by : {'barcode', 'aa_substitutions', 'codon_substitutions'} Compute effects for each barcode", set of amino-acid substitutions, or set of codon substitutions. In the last two cases, all barcodes with each set of substitutions are combined. If you use "aa_substitutions" then it may be more sensible to set `syn_as_wt` to `True`. syn_as_wt : bool In formula for functional scores, consider variants with only synonymous mutations when determining wildtype counts? If `False`, only variants with **no** mutations of any type contribute. libraries : {'all', 'all_only', list} Perform calculation for all libraries including a merge (named "all libraries"), only for the merge of all libraries, or for the libraries in the list. If `by` is 'barcode', then the barcodes for the merge have the library name pre-pended. logbase : float Base for logarithm when calculating functional score. permit_zero_wt : bool If the wildtype counts are zero for any sample, raise an error or permit the calculation to proceed just using pseudocount? permit_self_comp : bool Permit comparisons for sample pre- and post-selection? Returns ------- pandas.DataFrame Has the following columns: - "library": the library - "pre_sample": the pre-selection sample - "post_sample": the post-selection sample - value corresponding to grouping used to compute effects (`by`) - "func_score": the functional score - "func_score_var": variance on the functional score - "pre_count": pre-selection counts - "post_count: post-selection counts - "pre_count_wt": pre-selection counts for all wildtype - "post_count_wt": post-selection counts for all wildtype - "pseudocount": the pseudocount value - as many of "aa_substitutions", "n_aa_substitutions", "codon_substitutions", and "n_codon_substitutions" as can be retained given value of `by`. """ ordered_samples = self.variant_count_df["sample"].unique() if isinstance(preselection, str): # make `preselection` into dict preselection = { s: preselection for s in ordered_samples if s != preselection or permit_self_comp } elif not isinstance(preselection, dict): raise ValueError("`preselection` not str or dict") if not permit_self_comp: if any(pre == post for pre, post in preselection.items()): raise ValueError( "`permit_self_comp` is False but there" " are identical pre and post samples" ) # all samples of interest samples = set(preselection.keys()).union(set(preselection.values())) if not samples.issubset(set(ordered_samples)): extra_samples = samples - set(ordered_samples) raise ValueError(f"invalid samples: {extra_samples}") # get data frame with samples of interest if self.variant_count_df is None: raise ValueError("no sample variant counts have been added") df = self.variant_count_df.query("sample in @samples") if libraries == "all": df = self.addMergedLibraries(df) elif libraries == "all_only": df = self.addMergedLibraries(df).query('library == "all libraries"') else: if set(libraries) > set(self.libraries): raise ValueError( f"invalid `libraries` of {libraries}. Must " 'be "all", "all_only", or a list containing ' f"some subset of {self.libraries}" ) df = df.query("library in @libraries") # get wildtype counts for each sample and library if syn_as_wt: wt_col = "n_aa_substitutions" else: wt_col = "n_codon_substitutions" wt_counts = ( df.assign( count=lambda x: ( x["count"] * (0 == x[wt_col]).astype("int") * ( self.primary_target is None or x["target"] == self.primary_target ) ) ) .groupby(["library", "sample"], sort=False, observed=True) .aggregate({"count": "sum"}) .reset_index() ) if (wt_counts["count"] <= 0).any() and not permit_zero_wt: raise ValueError(f"no wildtype counts:\n{wt_counts}") # sum counts in groups specified by `by` group_cols = [ "codon_substitutions", "n_codon_substitutions", "aa_substitutions", "n_aa_substitutions", ] if self.primary_target is not None: group_cols.append("target") if by in {"aa_substitutions", "codon_substitutions"}: group_cols = group_cols[group_cols.index(by) + 1 :] df = ( df.groupby( ["library", "sample", by, *group_cols], observed=True, sort=False ) .aggregate({"count": "sum"}) .reset_index() ) elif by != "barcode": raise ValueError(f"invalid `by` of {by}") # get data frame with pre- and post-selection samples / counts df_func_scores = [] for post_sample in ordered_samples: if post_sample not in preselection: continue pre_sample = preselection[post_sample] sample_dfs = [] for stype, s in [("pre", pre_sample), ("post", post_sample)]: # noqa: B007 sample_dfs.append( df.query("sample == @s") .rename(columns={"count": f"{stype}_count"}) .merge( wt_counts.rename(columns={"count": f"{stype}_count_wt"}), how="inner", validate="many_to_one", ) .rename(columns={"sample": f"{stype}_sample"}) ) df_func_scores.append( pd.merge(sample_dfs[0], sample_dfs[1], how="inner", validate="1:1") ) df_func_scores = pd.concat(df_func_scores, ignore_index=True, sort=False) # check pseudocount if pseudocount < 0: raise ValueError(f"`pseudocount` is < 0: {pseudocount}") elif (pseudocount == 0) and any( (df_func_scores[c] <= 0).any() for c in ["pre_count", "post_count", "pre_count_wt", "post_count_wt"] ): raise ValueError("some counts are zero, you must use " "`pseudocount` > 0") # calculate functional score and variance df_func_scores = df_func_scores.assign( pseudocount=pseudocount, func_score=lambda x: numpy.log( ((x.post_count + x.pseudocount) / (x.post_count_wt + x.pseudocount)) / ((x.pre_count + x.pseudocount) / (x.pre_count_wt + x.pseudocount)) ) / numpy.log(logbase), func_score_var=lambda x: ( 1 / (x.post_count + x.pseudocount) + 1 / (x.post_count_wt + x.pseudocount) + 1 / (x.pre_count + x.pseudocount) + 1 / (x.pre_count_wt + x.pseudocount) ) / (numpy.log(logbase) ** 2), ) col_order = [ "library", "pre_sample", "post_sample", by, "func_score", "func_score_var", "pre_count", "post_count", "pre_count_wt", "post_count_wt", "pseudocount", *group_cols, ] if self.primary_target is not None: assert col_order.count("target") == 1 col_order.remove("target") col_order.insert(0, "target") else: assert "target" not in col_order return df_func_scores[col_order]
[docs] def n_variants_df( self, *, libraries="all", samples="all", min_support=1, variant_type="all", mut_type=None, sample_rename=None, primary_target_only=False, ): """Get number variants per library / sample (and target if specified). Parameters ---------- variant_type : {'single', 'all'} Include all variants or just those with <= 1 `mut_type` mutation. mut_type : {'aa', 'codon', None} If `variant_type` is 'single', indicate what type of single mutants we are filtering for. primary_target_only : bool Only return counts for the primary target. All other args Same as for :class:`CodonVariantTable.plotNumMutsHistogram`. Returns ------- pandas.DataFrame """ df, nlibraries, nsamples = self._getPlotData( libraries, samples, min_support, primary_target_only=primary_target_only, sample_rename=sample_rename, ) if variant_type == "single": if mut_type in {"aa", "codon"}: df = df.query(f"n_{mut_type}_substitutions <= 1") else: raise ValueError('`mut_type` must be "aa" or "single"') elif variant_type != "all": raise ValueError(f"invalid `variant_type` {variant_type}") group_cols = ["library", "sample"] if (self.primary_target is not None) and (not primary_target_only): group_cols.insert(0, "target") assert "target" in set(df.columns) else: assert "target" not in set(df.columns) return ( df.groupby(group_cols, observed=True) .aggregate({"count": "sum"}) .reset_index() )
[docs] def mutCounts( self, variant_type, mut_type, *, libraries="all", samples="all", min_support=1, sample_rename=None, ): """Get counts of each individual mutations (only in primary target). Parameters ---------- variant_type : {'single', 'all'} Include just single mutants, or all mutants? other_parameters Same as for :class:`CodonVariantTable.plotNumMutsHistogram`. Returns ------- pandas.DataFrame Tidy data frame with columns named "library", "sample", "mutation", "count", "mutation_type", and "site". If there are multiple targets, only returns counts for the primary target. """ df, nlibraries, nsamples = self._getPlotData( libraries, samples, min_support, primary_target_only=True ) assert "target" not in set(df.columns) samplelist = df["sample"].unique().tolist() librarylist = df["library"].unique().tolist() if mut_type == "codon": wts = self.codons chars = self._CODONS mutation_types = list(self._mutation_type_colors) elif mut_type == "aa": wts = self.aas chars = self._AAS mutation_types = [ mtype for mtype in self._mutation_type_colors if mtype != "synonymous" ] else: raise ValueError(f"invalid mut_type {mut_type}") # data frame listing all mutations with count 0 mut_list = [] for r, wt in wts.items(): for mut in chars: if mut != wt: mut_list.append(f"{wt}{r}{mut}") all_muts = pd.concat( [ pd.DataFrame( { "mutation": mut_list, "library": library, "sample": sample, "count": 0, } ) for library, sample in itertools.product(librarylist, samplelist) ] ) if variant_type == "single": df = df.query(f"n_{mut_type}_substitutions == 1") elif variant_type == "all": df = df.query(f"n_{mut_type}_substitutions >= 1") else: raise ValueError(f"invalid variant_type {variant_type}") def _classify_mutation(mut_str): if mut_type == "aa": m = self._AA_SUB_RE.fullmatch(mut_str) assert m is not None, f"cannot match aa mut: {mut_str}" wt_aa = m.group("wt") mut_aa = m.group("mut") else: m = self._CODON_SUB_RE.fullmatch(mut_str) assert m is not None, f"cannot match codon mut: {mut_str}" wt_aa = CODON_TO_AA[m.group("wt")] mut_aa = self._CODON_TO_AA[m.group("mut")] if wt_aa == mut_aa: return "synonymous" elif mut_aa == "*": return "stop" elif mut_aa == "-": return "deletion" else: return "nonsynonymous" def _get_site(mut_str): if mut_type == "aa": m = self._AA_SUB_RE.fullmatch(mut_str) else: m = self._CODON_SUB_RE.fullmatch(mut_str) site = int(m.group("r")) assert site in self.sites return site if sample_rename is None: sample_rename_dict = _dict_missing_is_key() else: if len(sample_rename) != len(set(sample_rename.values())): raise ValueError("duplicates in `sample_rename`") sample_rename_dict = _dict_missing_is_key(sample_rename) df = ( df.rename(columns={f"{mut_type}_substitutions": "mutation"})[ ["library", "sample", "mutation", "count"] ] .pipe(dms_variants.utils.tidy_split, column="mutation") .merge(all_muts, how="outer") .groupby(["library", "sample", "mutation"]) .aggregate({"count": "sum"}) .reset_index() .assign( library=lambda x: pd.Categorical( x["library"], librarylist, ordered=True ), sample=lambda x: pd.Categorical( x["sample"].map(sample_rename_dict), [sample_rename_dict[s] for s in samplelist], ordered=True, ), mutation_type=lambda x: pd.Categorical( x["mutation"].apply(_classify_mutation), mutation_types, ordered=True, ), site=lambda x: x["mutation"].apply(_get_site), ) .sort_values( ["library", "sample", "count", "mutation"], ascending=[True, True, False, True], ) .reset_index(drop=True) ) return df
[docs] def plotMutHeatmap( self, variant_type, mut_type, *, count_or_frequency="frequency", libraries="all", samples="all", plotfile=None, orientation="h", widthscale=1, heightscale=1, min_support=1, sample_rename=None, one_lib_facet=False, ): """Heatmap of mutation counts or frequencies (for primary target only). Parameters ---------- count_or_frequency : {'count', 'frequency'} Plot mutation counts or frequencies? other_parameters Same as for :meth:`CodonVariantTable.plotCumulMutCoverage`. Returns ------- plotnine.ggplot.ggplot """ df = self.mutCounts( variant_type, mut_type, samples=samples, libraries=libraries, min_support=min_support, sample_rename=sample_rename, ) assert "target" not in set(df.columns) n_variants = self.n_variants_df( libraries=libraries, samples=samples, min_support=min_support, variant_type=variant_type, mut_type=mut_type, sample_rename=sample_rename, primary_target_only=True, ).rename(columns={"count": "nseqs"}) # order amino acids by Kyte-Doolittle hydrophobicity, aa_order = [ tup[0] for tup in sorted( Bio.SeqUtils.ProtParamData.kd.items(), key=lambda tup: tup[1] ) ] aa_order = aa_order + [aa for aa in self._AAS if aa not in aa_order] if mut_type == "codon": height_per = 5.5 mut_desc = "codon" # order codons by the amino acid they encode order = list( itertools.chain.from_iterable( [self._AA_TO_CODONS[aa] for aa in aa_order] ) ) pattern = self._CODON_SUB_RE.pattern elif mut_type == "aa": height_per = 1.7 mut_desc = "amino acid" order = aa_order pattern = self._AA_SUB_RE.pattern else: raise ValueError(f"invalid `mut_type` {mut_type}") df = ( df[["library", "sample", "mutation", "site", "count"]] .merge(n_variants, on=["library", "sample"]) .assign( frequency=lambda x: x["count"] / x["nseqs"], mut_char=lambda x: pd.Categorical( x["mutation"].str.extract(pattern).mut, order, ordered=True, ).remove_unused_categories(), ) ) assert "target" not in set(df.columns) if count_or_frequency not in {"count", "frequency"}: raise ValueError(f"invalid count_or_frequency " f"{count_or_frequency}") nlibraries = len(df["library"].unique()) nsamples = len(df["sample"].unique()) if orientation == "h": if nlibraries > 1 or one_lib_facet: facet_str = "sample ~ library" else: facet_str = "sample ~" width = widthscale * (1.6 + 3.5 * nlibraries) height = heightscale * (0.8 + height_per * nsamples) elif orientation == "v": if nlibraries > 1 or one_lib_facet: facet_str = "library ~ sample" else: facet_str = "~ sample" width = widthscale * (1.6 + 3.5 * nsamples) height = heightscale * (0.8 + height_per * nlibraries) else: raise ValueError(f"invalid `orientation` {orientation}") p = ( p9.ggplot(df, p9.aes("site", "mut_char", fill=count_or_frequency)) + p9.geom_tile() + p9.theme( figure_size=(width, height), legend_key=p9.element_blank(), axis_text_y=p9.element_text(size=6), ) + p9.scale_x_continuous( name=f"{mut_desc} site", limits=(min(self.sites) - 1, max(self.sites) + 1), expand=(0, 0), ) + p9.ylab(mut_desc) ) if samples is None: if nlibraries > 1 or one_lib_facet: p = p + p9.facet_wrap( "~ library", nrow={"h": 1, "v": nlibraries}[orientation] ) else: p = p + p9.facet_grid(facet_str) if plotfile: p.save(plotfile, height=height, width=width, verbose=False, limitsize=False) return p
[docs] def plotMutFreqs( self, variant_type, mut_type, *, libraries="all", samples="all", plotfile=None, orientation="h", widthscale=1, heightscale=1, min_support=1, sample_rename=None, one_lib_facet=False, ): """Mutation frequency along length of gene (primary target only). Parameters ---------- All parameters Same as for :meth:`CodonVariantTable.plotCumulMutCoverage`. Returns ------- plotnine.ggplot.ggplot """ df = self.mutCounts( variant_type, mut_type, samples=samples, libraries=libraries, min_support=min_support, sample_rename=sample_rename, ) n_variants = self.n_variants_df( libraries=libraries, samples=samples, min_support=min_support, variant_type=variant_type, mut_type=mut_type, sample_rename=sample_rename, primary_target_only=True, ).rename(columns={"count": "nseqs"}) assert "target" not in set(df.columns).union(set(n_variants.columns)) df = ( df.groupby(["library", "sample", "mutation_type", "site"], observed=False) .aggregate({"count": "sum"}) .reset_index() .merge(n_variants, on=["library", "sample"]) .assign(freq=lambda x: x["count"] / x["nseqs"]) ) nlibraries = len(df["library"].unique()) nsamples = len(df["sample"].unique()) if orientation == "h": if nlibraries > 1 or one_lib_facet: facet_str = "sample ~ library" else: facet_str = "sample ~" width = widthscale * (1.6 + 1.8 * nlibraries) height = heightscale * (0.8 + 1 * nsamples) elif orientation == "v": if nlibraries > 1 or one_lib_facet: facet_str = "library ~ sample" else: facet_str = "~ sample" width = widthscale * (1.6 + 1.8 * nsamples) height = heightscale * (0.8 + 1 * nlibraries) else: raise ValueError(f"invalid `orientation` {orientation}") if mut_type == "aa": mut_desc = "amino-acid" else: mut_desc = mut_type if height < 3: ylabel = f"{mut_desc} mutation\nfrequency " f"({variant_type} mutants)" else: ylabel = f"{mut_desc} mutation frequency " f"({variant_type} mutants)" p = ( p9.ggplot(df, p9.aes("site", "freq", color="mutation_type")) + p9.geom_step() + p9.scale_color_manual( [ self._mutation_type_colors[m] for m in df.mutation_type.unique().sort_values().tolist() ], name="mutation type", ) + p9.scale_x_continuous( name=f"{mut_desc} site", limits=(min(self.sites), max(self.sites)) ) + p9.ylab(ylabel) + p9.theme( figure_size=(width, height), legend_key=p9.element_blank(), ) ) if samples is None: if nlibraries > 1 or one_lib_facet: p = p + p9.facet_wrap( "~ library", nrow={"h": 1, "v": nlibraries}[orientation] ) else: p = p + p9.facet_grid(facet_str) if plotfile: p.save(plotfile, height=height, width=width, verbose=False) return p
[docs] def plotCountsPerVariant( self, *, ystat="frac_counts", logy=True, by_variant_class=False, classifyVariants_kwargs=None, variant_type="all", libraries="all", samples="all", plotfile=None, orientation="h", widthscale=1, heightscale=1, min_support=1, mut_type="aa", sample_rename=None, one_lib_facet=False, primary_target_only=False, ): """Plot variant index versus counts (or frac counts). Parameters ---------- ystat : {'frac_counts', 'count'} Is y-axis counts from variant, or fraction of counts in library / sample from variant? logy : bool Show the y-axis on a log scale. If so, all values of 0 are set to half the minimum observed value > 0, and dashed line is drawn to indicate that points below it are not observed. other_parameters Same as for :meth:`CodonVariantTable.plotCumulVariantCounts`. Returns ------- plotnine.ggplot.ggplot """ if samples is None: raise ValueError("plot nonsensical with `samples` of `None`") df, nlibraries, nsamples = self._getPlotData( libraries, samples, min_support, primary_target_only, sample_rename=sample_rename, ) if variant_type == "single": if mut_type == "aa": mutstr = "amino acid" elif mut_type == "codon": mutstr = mut_type else: raise ValueError(f"invalid `mut_type` {mut_type}") ylabel = f"single {mutstr} variants with >= this many counts" df = df.query(f"n_{mut_type}_substitutions <= 1") elif variant_type == "all": ylabel = "variants with >= this many counts" else: raise ValueError(f"invalid `variant_type` {variant_type}") if orientation == "h": if nlibraries > 1 or one_lib_facet: facet_str = "sample ~ library" else: facet_str = "sample ~" width = widthscale * (1 + 1.8 * nlibraries) height = heightscale * (0.6 + 1.5 * nsamples) elif orientation == "v": if nlibraries > 1 or one_lib_facet: facet_str = "library ~ sample" else: facet_str = "~ sample" width = widthscale * (1 + 1.5 * nsamples) height = heightscale * (0.6 + 1.5 * nlibraries) else: raise ValueError(f"invalid `orientation` {orientation}") if ystat == "frac_counts": df = self.add_frac_counts(df).drop(columns="count") ylabel = "fraction of counts" elif ystat == "count": ylabel = "number of counts" else: raise ValueError(f"invalid `ystat` of {ystat}") ivariant_group_cols = ["library", "sample"] if by_variant_class: ivariant_group_cols.append("variant class") if not classifyVariants_kwargs: kw_args = {} else: kw_args = dict(classifyVariants_kwargs.items()) if "primary_target" not in kw_args: kw_args["primary_target"] = self.primary_target if "class_as_categorical" not in kw_args: kw_args["class_as_categorical"] = True df = self.classifyVariants(df, **kw_args).rename( columns={"variant_class": "variant class"} ) aes = p9.aes("ivariant", ystat, color="variant class") else: aes = p9.aes("ivariant", ystat) df = df.sort_values(ystat, ascending=False).assign( ivariant=lambda x: (x.groupby(ivariant_group_cols).cumcount() + 1) ) if logy: min_gt_0 = df.query(f"{ystat} > 0")[ystat].min() min_y = min_gt_0 / 2 df[ystat] = numpy.clip(df[ystat], min_y, None) yscale = p9.scale_y_log10(labels=dms_variants.utils.latex_sci_not) hline = p9.geom_hline( yintercept=(min_y + min_gt_0) / 2, linetype="dotted", color=CBPALETTE[0], size=1, ) else: yscale = p9.scale_y_continuous(labels=dms_variants.utils.latex_sci_not) hline = None p = ( p9.ggplot(df) + aes + p9.geom_step() + p9.xlab("variant number") + p9.ylab(ylabel) + yscale + p9.theme( figure_size=(width, height), axis_text_x=p9.element_text(angle=90), ) + p9.facet_grid(facet_str) + p9.scale_color_manual(values=CBPALETTE[1:]) ) if hline is not None: p = p + hline if plotfile: p.save(plotfile, height=height, width=width, verbose=False) return p
[docs] def plotCumulVariantCounts( self, *, variant_type="all", libraries="all", samples="all", plotfile=None, orientation="h", widthscale=1, heightscale=1, min_support=1, mut_type="aa", tot_variants_hline=True, sample_rename=None, one_lib_facet=False, primary_target_only=True, ): """Plot number variants with >= that each number of counts. Parameters ---------- variant_type : {'single', 'all'} Include all variants or just those with <=1 `mut_type` mutation. tot_variants_hline : bool Include dotted horizontal line indicating total number of variants. primary_target_only : bool Only show counts for the primary target. other_parameters Same as for :meth:`CodonVariantTable.plotNumMutsHistogram`. Returns ------- plotnine.ggplot.ggplot """ df, nlibraries, nsamples = self._getPlotData( libraries, samples, min_support, primary_target_only, sample_rename=sample_rename, ) if variant_type == "single": if mut_type == "aa": mutstr = "amino acid" elif mut_type == "codon": mutstr = mut_type else: raise ValueError(f"invalid `mut_type` {mut_type}") ylabel = f"single {mutstr} variants with >= this many counts" df = df.query(f"n_{mut_type}_substitutions <= 1") elif variant_type == "all": ylabel = "variants with >= this many counts" else: raise ValueError(f"invalid `variant_type` {variant_type}") if orientation == "h": if nlibraries > 1 or one_lib_facet: facet_str = "sample ~ library" else: facet_str = "sample ~" width = widthscale * (1 + 1.5 * nlibraries) height = heightscale * (0.6 + 1.5 * nsamples) elif orientation == "v": if nlibraries > 1 or one_lib_facet: facet_str = "library ~ sample" else: facet_str = "~ sample" width = widthscale * (1 + 1.5 * nsamples) height = heightscale * (0.6 + 1.5 * nlibraries) else: raise ValueError(f"invalid `orientation` {orientation}") df = dms_variants.utils.cumul_rows_by_count( df, n_col="nvariants", tot_col="total_variants", group_cols=["library", "sample"], ).query("count > 0") p = ( p9.ggplot(df, p9.aes("count", "nvariants")) + p9.geom_step() + p9.xlab("number of counts") + p9.ylab(ylabel) + p9.scale_x_log10(labels=dms_variants.utils.latex_sci_not) + p9.scale_y_continuous(labels=dms_variants.utils.latex_sci_not) + p9.theme(figure_size=(width, height)) ) if samples is None: p = p + p9.ylab("number of variants") if nlibraries > 1 or one_lib_facet: p = p + p9.facet_wrap( "~ library", nrow={"h": 1, "v": nlibraries}[orientation] ) else: p = p + p9.facet_grid(facet_str) if tot_variants_hline: p = p + p9.geom_hline( p9.aes(yintercept="total_variants"), linetype="dashed", color=CBPALETTE[1], ) if plotfile: p.save(plotfile, height=height, width=width, verbose=False) return p
[docs] def plotCumulMutCoverage( self, variant_type, mut_type, *, libraries="all", samples="all", plotfile=None, orientation="h", widthscale=1, heightscale=1, min_support=1, max_count=None, sample_rename=None, one_lib_facet=False, ): """Frac mutations seen <= some number of times (primary target only). Parameters ---------- variant_type : {'single', 'all'} Include all variants or just those with <=1 `mut_type` mutation. max_count : None or int Plot cumulative fraction plot out to this number of observations. If `None`, a reasonable value is automatically determined. other_parameters Same as for :class:`CodonVariantTable.plotNumMutsHistogram`. Returns ------- plotnine.ggplot.ggplot """ df = self.mutCounts( variant_type, mut_type, samples=samples, libraries=libraries, min_support=min_support, sample_rename=sample_rename, ) assert "target" not in set(df.columns) # add one to counts to plot fraction found < this many # as stat_ecdf by default does <= df = df.assign(count=lambda x: x["count"] + 1) if max_count is None: max_count = ( df.groupby(["library", "sample"], observed=True)["count"] .quantile(0.6) .median() ) nlibraries = len(df["library"].unique()) nsamples = len(df["sample"].unique()) if orientation == "h": if nlibraries > 1 or one_lib_facet: facet_str = "sample ~ library" else: facet_str = "sample ~" width = widthscale * (1.6 + 1.3 * nlibraries) height = heightscale * (1 + 1.2 * nsamples) elif orientation == "v": if nlibraries > 1 or one_lib_facet: facet_str = "library ~ sample" else: facet_str = "~ sample" width = widthscale * (1.6 + 1.3 * nsamples) height = heightscale * (1 + 1.2 * nlibraries) else: raise ValueError(f"invalid `orientation` {orientation}") if width > 4: xlabel = f"counts among {variant_type} mutants" else: xlabel = f"counts among\n{variant_type} mutants" mut_desc = {"aa": "amino-acid", "codon": "codon"}[mut_type] if height > 3: ylabel = f"frac {mut_desc} mutations found < this many times" else: ylabel = f"frac {mut_desc} mutations\nfound < this many times" p = ( p9.ggplot(df, p9.aes("count", color="mutation_type")) + p9.stat_ecdf(geom="step", size=0.75) + p9.coord_cartesian(xlim=(0, max_count), ylim=(0, 1)) + p9.scale_color_manual( [ self._mutation_type_colors[m] for m in df.mutation_type.unique().sort_values().tolist() ], name="mutation type", ) + p9.xlab(xlabel) + p9.ylab(ylabel) + p9.theme( figure_size=(width, height), legend_key=p9.element_blank(), axis_text_x=p9.element_text(angle=90), ) ) if samples is None: if nlibraries > 1 or one_lib_facet: p = p + p9.facet_wrap( "~ library", nrow={"h": 1, "v": nlibraries}[orientation] ) else: p = p + p9.facet_grid(facet_str) if plotfile: p.save(plotfile, height=height, width=width, verbose=False) return p
[docs] def numCodonMutsByType( self, variant_type, *, libraries="all", samples="all", min_support=1, sample_rename=None, ): """Get average nonsynonymous, synonymous, stop mutations per variant. These statistics are only for the primary target. Parameters ---------- all_parameters Same as for :meth:`CodonVariantTable.plotNumCodonMutsByType`. Returns ------- pandas.DataFrame Data frame with average mutations of each type per variant. """ df, _, _ = self._getPlotData( libraries, samples, min_support, primary_target_only=True, sample_rename=sample_rename, ) assert "target" not in set(df.columns) if variant_type == "single": df = df.query("n_codon_substitutions <= 1") elif variant_type != "all": raise ValueError(f"invalid variant_type {variant_type}") codon_mut_types = list(self._mutation_type_colors) # mutations from stop to another amino-acid counted as nonsynonymous aa_re_nostop = "".join([a.replace("-", r"\-") for a in self._AAS if a != "*"]) aa_re_nodel = "".join([a.replace("*", r"\*") for a in self._AAS if a != "-"]) df = ( df.assign( synonymous=lambda x: (x.n_codon_substitutions - x.n_aa_substitutions), stop=lambda x: ( x.aa_substitutions.str.findall(rf"[{aa_re_nostop}]\d+\*").apply(len) ), deletion=lambda x: ( x.aa_substitutions.str.findall(rf"[{aa_re_nodel}]\d+\-").apply(len) ), nonsynonymous=lambda x: ( x.n_codon_substitutions - x.synonymous - x.stop ), ) .melt( id_vars=["library", "sample", "count"], value_vars=codon_mut_types, var_name="mutation_type", value_name="num_muts", ) .assign( mutation_type=lambda x: pd.Categorical( x["mutation_type"], codon_mut_types, ordered=True ), num_muts_count=lambda x: x.num_muts * x["count"], ) .groupby(["library", "sample", "mutation_type"], observed=True) .aggregate({"num_muts_count": "sum", "count": "sum"}) .reset_index() .assign(number=lambda x: x.num_muts_count / x["count"]) ) return df
[docs] def plotNumCodonMutsByType( self, variant_type, *, libraries="all", samples="all", plotfile=None, orientation="h", widthscale=1, heightscale=1, min_support=1, ylabel=None, sample_rename=None, one_lib_facet=False, ): """Plot average nonsynonymous, synonymous, stop mutations per variant. These statistics are only for the primary target. Parameters ---------- variant_type : {'single', 'all'} Include all variants or just those with <=1 codon mutation. ylabel : None or str If not `None`, specify y-axis label (otherwise it is autoset). other_parameters Same as for :class:`CodonVariantTable.plotNumMutsHistogram`. Returns ------- plotnine.ggplot.ggplot """ _, nlibraries, nsamples = self._getPlotData( libraries, samples, min_support, primary_target_only=True, sample_rename=sample_rename, ) df = self.numCodonMutsByType( variant_type=variant_type, libraries=libraries, samples=samples, min_support=min_support, sample_rename=sample_rename, ) assert "target" not in set(df.columns) if orientation == "h": if nlibraries > 1 or one_lib_facet: facet_str = "sample ~ library" else: facet_str = "sample ~" width = widthscale * (1 + 1.4 * nlibraries) height = heightscale * (1 + 1.3 * nsamples) elif orientation == "v": if nlibraries > 1 or one_lib_facet: facet_str = "library ~ sample" else: facet_str = "~ sample" width = widthscale * (1 + 1.4 * nsamples) height = heightscale * (1 + 1.3 * nlibraries) else: raise ValueError(f"invalid `orientation` {orientation}") if ylabel is None: if height > 3: ylabel = f"mutations per variant ({variant_type} mutants)" else: ylabel = f"mutations per variant\n({variant_type} mutants)" p = ( p9.ggplot( df, p9.aes("mutation_type", "number", fill="mutation_type", label="number"), ) + p9.geom_bar(stat="identity") + p9.geom_text(size=9, va="bottom", format_string="{0:.2f}") + p9.scale_y_continuous(name=ylabel, expand=(0.03, 0, 0.15, 0)) + p9.scale_fill_manual( [ self._mutation_type_colors[m] for m in df.mutation_type.unique().sort_values().tolist() ] ) + p9.theme( figure_size=(width, height), axis_title_x=None, axis_text_x=p9.element_text(angle=90), legend_position="none", ) ) if samples is None: if nlibraries > 1 or one_lib_facet: p = p + p9.facet_wrap( "~ library", nrow={"h": 1, "v": nlibraries}[orientation] ) else: p = p + p9.facet_grid(facet_str) if plotfile: p.save(plotfile, height=height, width=width, verbose=False) return p
[docs] def plotVariantSupportHistogram( self, *, libraries="all", plotfile=None, orientation="h", widthscale=1, heightscale=1, max_support=None, sample_rename=None, one_lib_facet=False, primary_target_only=False, ): """Plot histogram of variant call support for variants. Parameters ---------- max_support : int or None Group together all variants with >= this support. other_parameters Same as for :class:`CodonVariantTable.plotNumMutsHistogram`. primary_target_only : bool Only include variants that are of the primary target. Returns ------- plotnine.ggplot.ggplot """ df, nlibraries, nsamples = self._getPlotData( libraries, None, min_support=1, primary_target_only=primary_target_only, sample_rename=sample_rename, ) if orientation == "h": width = widthscale * (1 + 1.4 * nlibraries) height = 2.3 * heightscale nrow = 1 elif orientation == "v": width = 2.4 * widthscale height = heightscale * (1 + 1.3 * nlibraries) nrow = nlibraries else: raise ValueError(f"invalid `orientation` {orientation}") df = ( df.assign( variant_call_support=lambda x: numpy.clip( x["variant_call_support"], None, max_support ) ) .groupby(["library", "variant_call_support"], observed=True) .aggregate({"count": "sum"}) .reset_index() ) p = ( p9.ggplot(df, p9.aes("variant_call_support", "count")) + p9.geom_bar(stat="identity") + p9.scale_x_continuous( name="supporting sequences", breaks=dms_variants.utils.integer_breaks ) + p9.scale_y_continuous( name="number of variants", labels=dms_variants.utils.latex_sci_not ) + p9.theme(figure_size=(width, height)) ) if nlibraries > 1 or one_lib_facet: p = p + p9.facet_wrap("~ library", nrow=nrow) if plotfile: p.save(plotfile, height=height, width=width, verbose=False) return p
[docs] def avgCountsPerVariant( self, *, libraries="all", samples="all", min_support=1, sample_rename=None, by_target=True, ): """Get average counts per variant. Parameters ---------- libraries : {'all', 'all_only', list} Include all libraries including a merge, only a merge of all libraries, or a list of libraries. samples : {'all', list} Include all samples or just samples in list. min_support : int Only include variants with variant call support >= this. sample_rename : dict or None Rename samples by specifying original name as key and new name as value. by_target : bool If `True`, also group counts by target if multiple targets. Returns ------- pandas.DataFrame Average counts per variant for each library and sample, and possibly target. """ if samples is None: raise ValueError("`samples` cannot be `None`") df, nlibraries, nsamples = self._getPlotData( libraries, samples, min_support, primary_target_only=False, sample_rename=sample_rename, ) group_cols = ["library", "sample"] if self.primary_target is None: assert "target" not in set(df.columns) else: assert "target" in set(df.columns) if by_target: group_cols.insert(0, "target") return ( df.groupby(group_cols, observed=True) .aggregate({"count": "mean"}) .rename(columns={"count": "avg_counts_per_variant"}) .reset_index() )
[docs] def plotAvgCountsPerVariant( self, *, libraries="all", samples="all", plotfile=None, orientation="h", widthscale=1, heightscale=1, min_support=1, sample_rename=None, one_lib_facet=False, by_target=True, ): """Plot average counts per variant. Parameters ---------- libraries : {'all', 'all_only', list} Include all libraries including a merge, only a merge of all libraries, or a list of libraries. samples : {'all', list} Include all samples or just samples in list. by_target : bool If `True`, also group counts by target if multiple targets. other_parameters Same as for :class:`CodonVariantTable.plotNumMutsHistogram`. Returns ------- plotnine.ggplot.ggplot """ df = self.avgCountsPerVariant( libraries=libraries, samples=samples, min_support=min_support, sample_rename=sample_rename, by_target=by_target, ) nsamples = df["sample"].nunique() nlibraries = df["library"].nunique() if "target" in set(df.columns): assert (self.primary_target is not None) and by_target ntargets = df["library"].nunique() else: ntargets = 1 assert (self.primary_target is None) or not by_target if orientation == "h": nrow = ntargets width = widthscale * nlibraries * (0.9 + 0.2 * nsamples) height = 2.1 * heightscale * ntargets facet_grid_str = "target ~ library" elif orientation == "v": nrow = nlibraries width = widthscale * (0.9 + 0.2 * nsamples) * ntargets height = 2.1 * nlibraries facet_grid_str = "library ~ target" else: raise ValueError(f"invalid `orientation` {orientation}") p = ( p9.ggplot(df, p9.aes("sample", "avg_counts_per_variant")) + p9.geom_bar(stat="identity") + p9.xlab("") + p9.ylab("average counts per variant") + p9.theme( figure_size=(width, height), axis_text_x=p9.element_text(angle=90) ) ) if nlibraries > 1 or one_lib_facet: if ntargets > 1: p = p + p9.facet_grid(facet_grid_str) else: p = p + p9.facet_wrap("~ library", nrow=nrow) elif ntargets > 1: p = p + p9.facet_wrap("~ target", nrow=nrow) if plotfile: p.save(plotfile, height=height, width=width, verbose=False) return p
[docs] def plotNumMutsHistogram( self, mut_type, *, libraries="all", samples="all", plotfile=None, orientation="h", widthscale=1, heightscale=1, min_support=1, max_muts=None, sample_rename=None, one_lib_facet=False, ): """Plot histogram of num mutations per variant (primary target only). Parameters ---------- mut_type : {'codon' or 'aa'} Mutation type. libraries : {'all', 'all_only', list} Include all libraries including a merge, only a merge of all libraries, or a list of libraries. samples : {'all', None, list} Include all samples, a list of samples, or `None` to just count each barcoded variant once. plotfile : None or str Name of file to which to save plot. orientation : {'h', 'v'} Facet libraries horizontally or vertically? widthscale : float Expand width of plot by this factor. heightscale : float Expand height of plot by this factor. min_support : int Only include variants with variant call support >= this. max_muts : int or None Group together all variants with >= this many mutations. sample_rename : dict or None Rename samples by specifying original name as key and new name as value. one_lib_facet : bool Plot library facet title even if just one library. Returns ------- plotnine.ggplot.ggplot """ df, nlibraries, nsamples = self._getPlotData( libraries, samples, min_support, primary_target_only=True, sample_rename=sample_rename, ) assert "target" not in set(df.columns) if mut_type == "aa": mut_col = "n_aa_substitutions" xlabel = "amino-acid mutations" elif mut_type == "codon": mut_col = "n_codon_substitutions" xlabel = "codon mutations" else: raise ValueError(f"invalid mut_type {mut_type}") if orientation == "h": if nlibraries > 1 or one_lib_facet: facet_str = "sample ~ library" else: facet_str = "sample ~" width = widthscale * (1 + 1.5 * nlibraries) height = heightscale * (0.6 + 1.5 * nsamples) elif orientation == "v": if nlibraries > 1 or one_lib_facet: facet_str = "library ~ sample" else: facet_str = "~ sample" width = widthscale * (1 + 1.5 * nsamples) height = heightscale * (0.6 + 1.5 * nlibraries) else: raise ValueError(f"invalid `orientation` {orientation}") df[mut_col] = numpy.clip(df[mut_col], None, max_muts) df = ( df.groupby(["library", "sample", mut_col], observed=True) .aggregate({"count": "sum"}) .reset_index() ) p = ( p9.ggplot(df, p9.aes(mut_col, "count")) + p9.geom_bar(stat="identity") + p9.scale_x_continuous( name=xlabel, breaks=dms_variants.utils.integer_breaks ) + p9.scale_y_continuous(labels=dms_variants.utils.latex_sci_not) + p9.theme(figure_size=(width, height)) ) if samples is None: p = p + p9.ylab("number of variants") if nlibraries > 1 or one_lib_facet: p = p + p9.facet_wrap( "~ library", nrow={"h": 1, "v": nlibraries}[orientation] ) else: p = p + p9.facet_grid(facet_str) if plotfile: p.save(plotfile, height=height, width=width, verbose=False) return p
[docs] def writeCodonCounts(self, single_or_all, *, outdir=None, include_all_libs=False): """Write codon counts files for all libraries and samples. Only writes the counts for the primary target. Note ---- Useful if you want to analyze individual mutations using `dms_tools2 <https://jbloomlab.github.io/dms_tools2/>`_. File format: https://jbloomlab.github.io/dms_tools2/dms2_bcsubamp.html#counts-file Parameters ---------- single_or_all : {'single', 'all'} If 'single', then counts just from single-codon mutants and wildtype, and count all wildtype codons and just mutated codons for single-codon mutants. If 'all', count all codons for all variants at all sites. Appropriate if enrichment of each mutation is supposed to represent its effect for 'single', and if enrichment of mutation is supposed to represent its average effect across genetic backgrounds in the library for 'all' provided mutations are Poisson distributed. outdir : None or str Name of directory to write counts, created if it does not exist. Use `None` to write to current directory. include_all_libs : bool Include data for a library (named "all-libraries") that has summmed data for all individual libraries if multiple libraries. Returns ------- pandas.DataFrame Gives names of created files. Has columns "library", "sample", and "countfile". The "countfile" columns gives name of the created CSV file, ``<library>_<sample>_codoncounts.csv``. """ def _parseCodonMut(mutstr): m = self._CODON_SUB_RE.fullmatch(mutstr) return (m.group("wt"), int(m.group("r")), m.group("mut")) if self.variant_count_df is None: raise ValueError("no samples with counts") if single_or_all not in {"single", "all"}: raise ValueError(f"invalid `single_or_all` {single_or_all}") if outdir is not None: os.makedirs(outdir, exist_ok=True) else: outdir = "" if include_all_libs: df = self.addMergedLibraries(self.variant_count_df, all_lib="all-libraries") else: df = self.variant_count_df if self.primary_target is not None: assert "target" in set(df.columns) df = df.query("target == @self.primary_target").drop(columns="target") else: assert "target" not in set(df.columns) countfiles = [] liblist = [] samplelist = [] for lib, sample in itertools.product( df["library"].unique().tolist(), df["sample"].unique().tolist() ): i_df = df.query("library == @lib & sample == @sample") if len(i_df) == 0: continue # no data for this library and sample countfile = os.path.join(outdir, f"{lib}_{sample}_codoncounts.csv") countfiles.append(countfile) liblist.append(lib) samplelist.append(sample) codoncounts = {codon: [0] * len(self.sites) for codon in self._CODONS} if single_or_all == "single": n_wt = i_df.query("n_codon_substitutions == 0")["count"].sum() for isite, site in enumerate(self.sites): codoncounts[self.codons[site]][isite] += n_wt for mut, count in i_df.query("n_codon_substitutions == 1")[ ["codon_substitutions", "count"] ].itertuples(index=False, name=None): wtcodon, r, mutcodon = _parseCodonMut(mut) codoncounts[mutcodon][r - 1] += count elif single_or_all == "all": n_wt = i_df["count"].sum() for isite, site in enumerate(self.sites): codoncounts[self.codons[site]][isite] += n_wt for muts, count in i_df.query("n_codon_substitutions > 0")[ ["codon_substitutions", "count"] ].itertuples(index=False, name=None): for mut in muts.split(): wtcodon, r, mutcodon = _parseCodonMut(mut) codoncounts[mutcodon][r - 1] += count codoncounts[wtcodon][r - 1] -= count else: raise ValueError(f"invalid `single_or_all` {single_or_all}") counts_df = pd.DataFrame( collections.OrderedDict( [ ("site", self.sites), ("wildtype", [self.codons[r] for r in self.sites]), ] + [(codon, codoncounts[codon]) for codon in self._CODONS] ) ) counts_df.to_csv(countfile, index=False) assert all(map(os.path.isfile, countfiles)) return pd.DataFrame( {"library": liblist, "sample": samplelist, "countfile": countfiles} )
[docs] @staticmethod def classifyVariants( df, *, variant_class_col="variant_class", max_aa=2, syn_as_wt=False, primary_target=None, non_primary_target_class="secondary target", class_as_categorical=False, ): """Classifies codon variants in `df`. Parameters ---------- df : pandas.DataFrame Must have columns named 'aa_substitutions', 'n_aa_substitutions', and 'n_codon_substitutions'. Such a data frame can be obtained via `variant_count_df` or `barcode_variant_df` attributes of a :class:`CodonVariantTable` or via :meth:`CodonVariantTable.func_scores`. variant_class_col : str Name of column added to `df` that contains variant classification. Overwritten if already exists. max_aa : int When classifying, group all with >= this many amino-acid mutations. syn_as_wt : bool Do not have a category of 'synonymous' and instead classify synonymous variants as 'wildtype'. If using this option, `df` does not need column 'n_codon_substitutions'. primary_target : None or str If `df` has a column named 'target', then this must specify primary target (e.g., :attr:`CodonVariantTable.primary_target`). Variants not from primary target are classified as `non_primary_target_class`. non_primary_target_class : str Classification used for non-primary targets. class_as_categorical : bool Return `variant_class` as a categorical variable with a reasonable ordering. Returns ------- pandas.DataFrame Copy of `df` with column specified by `variant_class_col` as: - 'wildtype': no codon mutations - 'synonymous': only synonymous codon mutations - 'stop': at least one stop-codon mutation - '{n_aa} nonsynonymous' where `n_aa` is number of amino-acid mutations, or is '>{max_aa}' if more than `max_aa`. - potentially `non_primary_target_class`. Example ------- >>> df = pd.DataFrame.from_records( ... [('AAA', '', 0, 0), ... ('AAG', '', 0, 1), ... ('ATA', 'M1* G5K', 2, 3), ... ('GAA', 'G5H', 1, 2), ... ('CTT', 'M1C G5C', 2, 3), ... ('CTT', 'M1A L3T G5C', 3, 3), ... ('AAG', '', 0, 1), ... ], ... columns=['barcode', 'aa_substitutions', ... 'n_aa_substitutions', 'n_codon_substitutions'] ... ) >>> df_classify = CodonVariantTable.classifyVariants(df) >>> all(df_classify.columns == ['barcode', 'aa_substitutions', ... 'n_aa_substitutions', ... 'n_codon_substitutions', ... 'variant_class']) True >>> df_classify[['barcode', 'variant_class']] barcode variant_class 0 AAA wildtype 1 AAG synonymous 2 ATA stop 3 GAA 1 nonsynonymous 4 CTT >1 nonsynonymous 5 CTT >1 nonsynonymous 6 AAG synonymous >>> df_syn_as_wt = CodonVariantTable.classifyVariants(df, ... syn_as_wt=True) >>> df_syn_as_wt[['barcode', 'variant_class']] barcode variant_class 0 AAA wildtype 1 AAG wildtype 2 ATA stop 3 GAA 1 nonsynonymous 4 CTT >1 nonsynonymous 5 CTT >1 nonsynonymous 6 AAG wildtype Now show how we need to specify how to handle when multiple targets: >>> df['target'] = ['secondary'] + ['primary'] * 6 >>> (CodonVariantTable.classifyVariants(df) ... [['target', 'barcode', 'variant_class']]) Traceback (most recent call last): ... ValueError: `df` has "target" so give `primary_target` We need to specify how to handle targets: >>> (CodonVariantTable.classifyVariants( ... df, ... primary_target='primary', ... non_primary_target_class='homolog') ... [['target', 'barcode', 'variant_class']]) target barcode variant_class 0 secondary AAA homolog 1 primary AAG synonymous 2 primary ATA stop 3 primary GAA 1 nonsynonymous 4 primary CTT >1 nonsynonymous 5 primary CTT >1 nonsynonymous 6 primary AAG synonymous """ req_cols = ["aa_substitutions", "n_aa_substitutions"] if not syn_as_wt: req_cols.append("n_codon_substitutions") if not (set(req_cols) <= set(df.columns)): raise ValueError(f"`df` does not have columns {req_cols}") cats = [ "wildtype", "synonymous", *[f"{n} nonsynonymous" for n in range(1, max_aa)], f">{max_aa - 1} nonsynonymous", "stop", "deletion", ] if syn_as_wt: cats.remove("synonymous") if "target" in set(df.columns): req_cols.append("target") if primary_target is None: raise ValueError('`df` has "target" so give `primary_target`') if primary_target not in set(df["target"]): raise ValueError( f"`primary_target` {primary_target} not in " f"`df` targets:\n{set(df['target'])}" ) cats.append(non_primary_target_class) else: primary_target = None def _classify_func(row): if (primary_target is not None) and (row["target"] != primary_target): return non_primary_target_class elif row["n_aa_substitutions"] == 0: if syn_as_wt: return "wildtype" elif row["n_codon_substitutions"] == 0: return "wildtype" else: return "synonymous" elif "*" in row["aa_substitutions"]: return "stop" elif "-" in row["aa_substitutions"]: return "deletion" elif row["n_aa_substitutions"] < max_aa: return f"{row['n_aa_substitutions']} nonsynonymous" else: return f">{max_aa - 1} nonsynonymous" # to speed up, if variants present multiple times just classify # once and then merge into overall data frame. class_df = df[req_cols].drop_duplicates() class_df[variant_class_col] = class_df.apply(_classify_func, axis=1) if class_as_categorical: assert set(class_df[variant_class_col]).issubset(set(cats)) class_df[variant_class_col] = pd.Categorical( class_df[variant_class_col], cats, ordered=True, ).remove_unused_categories() return df.drop(columns=variant_class_col, errors="ignore").merge( class_df, on=req_cols, validate="many_to_one", how="left", )
[docs] @staticmethod def addMergedLibraries(df, *, all_lib="all libraries"): """Add data to `df` for all libraries merged. Parameters ---------- df : pandas.DataFrame Data frame that includes columns named 'library' and 'barcode'. all_lib : str Name given to library that is merge of all other libraries. Returns ------- pandas.DataFrame If `df` only has data for one library, just returns `df`. Otherwise returns copy of `df` with new library with name given by `all_lib` that contains data for all individual libraries with the 'barcode' column giving library name followed by a hyphen and barcode. """ libs = df.library.unique().tolist() if len(libs) <= 1: return df if all_lib in libs: raise ValueError(f"library {all_lib} already exists") df = pd.concat( [ df, df.assign( barcode=(lambda x: x.library.str.cat(x.barcode, sep="-")), library=all_lib, ), ], axis="index", ignore_index=True, sort=False, ).assign( library=lambda x: pd.Categorical( x["library"], libs + [all_lib], ordered=True ) ) return df
def _getPlotData( self, libraries, samples, min_support, primary_target_only, *, sample_rename=None, ): """Get data to plot from library and sample filters. Parameters ---------- primary_target_only : bool Only return data for the primary target. All other parameters Same as for :class:`CodonVariantTable.plotNumMutsHistogram`. Returns ------- tuple `(df, nlibraries, nsamples)` where: - `df`: DataFrame with data to plot. - `nlibraries`: number of libraries being plotted. - `nsamples`: number of samples being plotted. """ if samples is None: df = self.barcode_variant_df.assign(sample="barcoded variants").assign( count=1 ) elif samples == "all": if self.variant_count_df is None: raise ValueError("no samples have been added") df = self.variant_count_df elif isinstance(samples, list): all_samples = set( itertools.chain.from_iterable( self.samples(lib) for lib in self.libraries ) ) if not all_samples.issuperset(set(samples)): raise ValueError(f"invalid sample(s) in {samples}") if len(samples) != len(set(samples)): raise ValueError(f"duplicate samples in {samples}") df = self.variant_count_df.query("sample in @samples") else: raise ValueError(f"invalid `samples` {samples}") df = df.query("variant_call_support >= @min_support") if not len(df): raise ValueError(f"no samples {samples}") else: nsamples = len(df["sample"].unique()) if libraries == "all": df = self.addMergedLibraries(df) elif libraries == "all_only": df = self.addMergedLibraries(df).query('library == "all libraries"') elif isinstance(libraries, list): if not set(self.libraries).issuperset(set(libraries)): raise ValueError(f"invalid library in {libraries}") if len(libraries) != len(set(libraries)): raise ValueError(f"duplicate library in {libraries}") df = df.query("library in @libraries") else: raise ValueError(f"invalid `libraries` {libraries}") if not len(df): raise ValueError(f"no libraries {libraries}") else: nlibraries = len(df["library"].unique()) if sample_rename is None: sample_rename_dict = _dict_missing_is_key() else: if len(sample_rename) != len(set(sample_rename.values())): raise ValueError("duplicates in `sample_rename`") sample_rename_dict = _dict_missing_is_key(sample_rename) df = df.assign( library=lambda x: pd.Categorical( x["library"], x["library"].unique(), ordered=True ).remove_unused_categories(), sample=lambda x: pd.Categorical( x["sample"].map(sample_rename_dict), x["sample"].map(sample_rename_dict).unique(), ordered=True, ).remove_unused_categories(), ) if self.primary_target is not None: assert "target" in set(df.columns) if primary_target_only: df = df.query("target == @self.primary_target").drop(columns="target") else: assert "target" not in set(df.columns) return (df, nlibraries, nsamples)
[docs] @classmethod def codonToAAMuts(cls, codon_mut_str, allowgaps=False): """Convert string of codon mutations to amino-acid mutations. Parameters ---------- codon_mut_str : str Codon mutations, delimited by a space and in 1, ... numbering. allowgaps : bool Allow gap codon (``---``) translated to gap amino acid (``--``). Returns ------- str Amino acid mutations in 1, ... numbering. Example ------- >>> CodonVariantTable.codonToAAMuts('ATG1GTG GGA2GGC TGA3AGA') 'M1V *3R' """ aa_muts = {} regex = cls._CODON_SUB_RE_WITHGAP if allowgaps else cls._CODON_SUB_RE_NOGAP for mut in codon_mut_str.upper().split(): m = regex.fullmatch(mut) if not m: raise ValueError(f"invalid mutation {mut} in {codon_mut_str}") r = int(m.group("r")) if r in aa_muts: raise ValueError(f"duplicate codon mutation for {r}") wt_codon = m.group("wt") mut_codon = m.group("mut") if wt_codon == mut_codon: raise ValueError(f"invalid mutation {mut}") wt_aa = CODON_TO_AA[wt_codon] mut_aa = CODON_TO_AA_WITHGAP[mut_codon] if wt_aa != mut_aa: aa_muts[r] = f"{wt_aa}{r}{mut_aa}" return " ".join([mut_str for r, mut_str in sorted(aa_muts.items())])
def _sortCodonMuts(self, mut_str): """Sort space-delimited codon mutations and make uppercase. Example ------- >>> geneseq = 'ATGGGATGA' >>> with tempfile.NamedTemporaryFile(mode='w') as f: ... _ = f.write('library,barcode,substitutions,' ... 'variant_call_support') ... f.flush() ... variants = CodonVariantTable( ... barcode_variant_file=f.name, ... geneseq=geneseq ... ) >>> variants._sortCodonMuts('GGA2CGT ATG1GTG') 'ATG1GTG GGA2CGT' """ muts = {} for mut in mut_str.upper().split(): m = self._CODON_SUB_RE.fullmatch(mut) if not m: raise ValueError(f"invalid codon mutation {mut}") wt_codon = m.group("wt") r = int(m.group("r")) mut_codon = m.group("mut") if wt_codon == mut_codon: raise ValueError(f"invalid codon mutation {mut}") if r not in self.sites: raise ValueError(f"invalid site in codon mutation {mut}") if wt_codon != self.codons[r]: raise ValueError(f"invalid wt in codon mutation {mut}") if r in muts: raise ValueError(f"duplicate mutation at codon {mut}") muts[r] = mut return " ".join(mut for r, mut in sorted(muts.items())) def _ntToCodonMuts(self, nt_mut_str): """Convert string of nucleotide mutations to codon mutations. Parameters ---------- nt_mut_str : str Nucleotide mutations, delimited by space in 1, ... numbering. Returns ------- str Codon mutations in 1, 2, ... numbering of codon sites. Example ------- >>> geneseq = 'ATGGGATGA' >>> with tempfile.NamedTemporaryFile(mode='w') as f: ... _ = f.write('library,barcode,substitutions,' ... 'variant_call_support') ... f.flush() ... variants = CodonVariantTable( ... barcode_variant_file=f.name, ... geneseq=geneseq ... ) >>> variants._ntToCodonMuts('A1G G4C A6T') 'ATG1GTG GGA2CGT' >>> variants._ntToCodonMuts('G4C A6T A1G') 'ATG1GTG GGA2CGT' >>> variants._ntToCodonMuts('A1G G4C G6T') Traceback (most recent call last): ... ValueError: nucleotide 6 should be A not G in A1G G4C G6T """ mut_codons = collections.defaultdict(set) for mut in nt_mut_str.upper().split(): m = self._NT_SUB_RE.fullmatch(mut) if not m: raise ValueError(f"invalid mutation {mut}") wt_nt = m.group("wt") i = int(m.group("r")) mut_nt = m.group("mut") if wt_nt == mut_nt: raise ValueError(f"invalid mutation {mut}") if i > len(self.geneseq) or i < 1: raise ValueError(f"invalid nucleotide site {i}") if self.geneseq[i - 1] != wt_nt: raise ValueError( f"nucleotide {i} should be " f"{self.geneseq[i - 1]} not {wt_nt} in " f"{nt_mut_str}" ) icodon = (i - 1) // 3 + 1 i_nt = (i - 1) % 3 assert self.codons[icodon][i_nt] == wt_nt if i_nt in mut_codons[icodon]: raise ValueError(f"duplicate mutations {i_nt} in {icodon}") mut_codons[icodon].add((i_nt, mut_nt)) codon_mut_list = [] for r, r_muts in sorted(mut_codons.items()): wt_codon = self.codons[r] mut_codon = list(wt_codon) for i, mut_nt in r_muts: mut_codon[i] = mut_nt mut_codon = "".join(mut_codon) mut_str = f"{wt_codon}{r}{mut_codon}" if mut_codon not in self._CODONS: raise ValueError( f"{mut_str=} has {mut_codon=} " f"not in {self._CODONS=}" ) codon_mut_list.append(mut_str) return " ".join(codon_mut_list)
[docs] def subs_to_seq(self, subs, subs_type="codon"): """Convert substitutions to full sequence. Parameters ---------- subs : str Space delimited substitutions in 1, ... numbering. subs_type : {'codon', 'aa'} Are substitutions codon or amino acid? Returns ------- str Sequence created by these mutations, either codon or amino acid depending on value of `subs_type`. Example ------- >>> geneseq = 'ATGGGATGA' >>> with tempfile.NamedTemporaryFile(mode='w') as f: ... _ = f.write('library,barcode,substitutions,' ... 'variant_call_support') ... f.flush() ... variants = CodonVariantTable( ... barcode_variant_file=f.name, ... geneseq=geneseq ... ) >>> variants.subs_to_seq('GGA2CGT ATG1GTG') 'GTGCGTTGA' >>> variants.subs_to_seq('*3W M1C', subs_type='aa') 'CGW' """ if subs_type == "codon": submatcher = self._CODON_SUB_RE seqdict = self.codons.copy() elif subs_type == "aa": submatcher = self._AA_SUB_RE seqdict = self.aas.copy() else: raise ValueError(f"invalid `subs_type` of {subs_type}") mutated_sites = set() for sub in subs.split(): m = submatcher.fullmatch(sub) if not m: raise ValueError(f"Invalid substitution {sub}") r = int(m.group("r")) if r in mutated_sites: raise ValueError(f"Multiple substitutions at {r}") mutated_sites.add(r) if seqdict[r] != m.group("wt"): raise ValueError(f"Invalid wildtype in {sub}") seqdict[r] = m.group("mut") return "".join(seqdict.values())
[docs] def add_full_seqs( self, df, *, aa_seq_col="aa_sequence", codon_seq_col="codon_sequence", ): """Add full sequences to data frame, for primary target only. Parameters ---------- df : pandas.DataFrame Data frame that specifies substitutions. Might be the `barcode_variant_df` or `variant_count_df` attributes, or the result of calling :meth:`CodonVariantTable.func_scores`. aa_seq_col : str or None Name of added column with amino-acid sequences. In order to add, `df` must have column named 'aa_substitutions'. codon_seq_col : str or None Name of added column with codon sequences. In order to add, `df` must have column named 'codon_substitutions'. Returns ------- pandas.DataFrame Copy of `df` with columns `aa_seq_col` and/or `codon_seq_col`, and only the primary target retained if there is a 'target' column in `df`. """ if (not aa_seq_col) and (not codon_seq_col): raise ValueError("specify either `aa_seq_col` or `codon_seq_col`") df = df.copy(deep=True) if "target" in set(df.columns): if self.primary_target is None: raise ValueError('`df` has "target" col but no primary target') else: df = df.query("target == @self.primary_target") for coltype, col in [("aa", aa_seq_col), ("codon", codon_seq_col)]: if col: if col in df.columns: raise ValueError( f"`df` already has column {col} " f"specified by `{coltype}_seq_col`" ) subs_col = f"{coltype}_substitutions" if subs_col not in df.columns: raise ValueError( f"cannot specify `{coltype}_seq_col " f"because `df` lacks {subs_col} column" ) df[col] = df[subs_col].apply(self.subs_to_seq, args=(coltype,)) if aa_seq_col and codon_seq_col: if not all( df[codon_seq_col].apply(dms_variants.utils.translate) == df[aa_seq_col] ): raise ValueError("codon seqs != translated amino-acid seqs") return df
[docs] def filter_by_subs_observed( df, min_single_counts, min_any_counts, *, and_vs_or="and", subs_col="aa_substitutions", n_subs_col="n_aa_substitutions", ): """Filter for variants by observations substitutions in entire data frame. Filter data frames of the type returned by :class:`CodonVariantTable` to get just variants that have substitutions that are observed in some other number of variants. Parameters ---------- df : pandas.DataFrame Data frame containing variants as rows. min_single_counts : int Only keep variants with substitutions that are observed as single- substitution variants in at least this many variants. min_any_counts : int Only keep variants with substitutions that are observed in at least this many variants with any number of substitutions. and_vs_or : {'and', 'or'} Require variants to pass the `min_single_counts` **and** the 'min_any_counts' filters, or just require to pass one **or** the other. subs_col : 'aa_substitutions' Column in `df` with the substitutions as space-delimited strings. n_subs_col : 'n_aa_substitutions' Column in `df` with the number of substitutions per variant. Returns ------- pandas.DataFrame A copy of `df` that only contains the variants that pass the filters. Examples -------- Create a data frame of variants to filter: >>> df = pd.DataFrame.from_records([ ... ('var1', '', 0), ... ('var2', 'M1A', 1), ... ('var3', 'M1A G2A', 2), ... ('var4', 'M1A G2C', 2), ... ('var5', 'G2A', 1), ... ('var6', 'M1A', 1), ... ('var7', 'M1C', 1), ... ], ... columns=['barcode', 'aa_substitutions', 'n_aa_substitutions']) Calling with `min_single_counts=1` and `min_any_counts=1` gets only variants with substitutions that are observed in single-substitution variants: >>> filter_by_subs_observed(df, 1, 1) barcode aa_substitutions n_aa_substitutions 0 var1 0 1 var2 M1A 1 2 var3 M1A G2A 2 3 var5 G2A 1 4 var6 M1A 1 5 var7 M1C 1 We can also require substitutions to be seen multiple times as single variants: >>> filter_by_subs_observed(df, 2, 1) barcode aa_substitutions n_aa_substitutions 0 var1 0 1 var2 M1A 1 2 var6 M1A 1 Or that substitutions be seen a specified number of times in both single- and multi-substitution contexts: >>> filter_by_subs_observed(df, 1, 2) barcode aa_substitutions n_aa_substitutions 0 var1 0 1 var2 M1A 1 2 var3 M1A G2A 2 3 var5 G2A 1 4 var6 M1A 1 We can also make the requirement **or** rather than **and**: >>> filter_by_subs_observed(df, 1, 2, and_vs_or='or') barcode aa_substitutions n_aa_substitutions 0 var1 0 1 var2 M1A 1 2 var3 M1A G2A 2 3 var5 G2A 1 4 var6 M1A 1 5 var7 M1C 1 Do not filter any variants: >>> filter_by_subs_observed(df, 0, 0) barcode aa_substitutions n_aa_substitutions 0 var1 0 1 var2 M1A 1 2 var3 M1A G2A 2 3 var4 M1A G2C 2 4 var5 G2A 1 5 var6 M1A 1 6 var7 M1C 1 """ df = df.copy() for col in [subs_col, n_subs_col]: if col not in df.columns: raise ValueError(f"`df` lacks column {col}") if and_vs_or not in {"and", "or"}: raise ValueError(f"invalid `and_vs_or` of {and_vs_or}") for var_type, min_counts, df_to_count in [ ("any", min_any_counts, df), ("single", min_single_counts, df.query(f"{n_subs_col} == 1")), ]: filter_col = f"_pass_{var_type}_filter" if filter_col in df.columns: raise ValueError(f"`df` cannot have column {filter_col}") if min_counts == 0: df[filter_col] = True continue subs_counts = collections.Counter( itertools.chain.from_iterable(df_to_count[subs_col].str.split()) ) subs_valid = {s for s, n in subs_counts.items() if n >= min_counts} df[filter_col] = [set(s.split()).issubset(subs_valid) for s in df[subs_col]] return ( df.query( "(_pass_any_filter == True) " + and_vs_or + "(_pass_single_filter == True)" ) .drop(columns=["_pass_any_filter", "_pass_single_filter"]) .reset_index(drop=True) )
class _dict_missing_is_key(dict): """dict that returns key as value for missing keys. Note ---- See here: https://stackoverflow.com/a/6229253 """ def __missing__(self, key): return key if __name__ == "__main__": import doctest doctest.testmod()