Source code for alignparse.utils

"""
=====
utils
=====

"""


import math
import numbers
import re

import numpy

import pandas as pd  # noqa: F401

import alignparse.consensus


[docs]class InFrameDeletionsToSubs: """Convert in-frame codon-length deletions to substitutions. Also shifts deletions to put them in frame when possible. Deletions that are not in-frame and codon length are left as deletions. Parameters ---------- geneseq : str The sequence of the "wildtype" gene. Attributes ---------- geneseq : str Example -------- First, a case where deletions are already in frame: >>> geneseq = 'ATG GCG TCA GTA CCG CAT CTA'.replace(' ', '') >>> deltosubs = InFrameDeletionsToSubs(geneseq) >>> deltosubs.dels_to_subs('A1C del4to6') 'A1C G4- C5- G6-' >>> deltosubs.dels_to_subs('A1C del4to6 del9to11 del13to15 del19to20') 'A1C G4- C5- G6- G10- T11- A12- C13- C14- G15- del19to20' Now case where deletions need to be shifted to in frame: >>> mut_str = 'A1C del3to5 ins11GGG del14to16 del19to20' >>> deltosubs.dels_to_subs(mut_str) 'A1C G4- C5- G6- ins11GGG C13- C14- G15- del19to20' Now case where deletions cannot be shifted to in frame: >>> geneseq2 = 'ATG ATC TCA ATA CAG GAT CTA'.replace(' ', '') >>> deltosubs2 = InFrameDeletionsToSubs(geneseq2) >>> deltosubs2.dels_to_subs(mut_str) 'A1C del3to5 ins11GGG del14to16 del19to20' You can also just directly test if any given deletion can be shifted up or down in position while retaining the same sequence: >>> deltosubs.shiftable(13, 13, 0) True >>> deltosubs.shiftable(13, 13, 1) True >>> deltosubs.shiftable(13, 13, -1) False """ def __init__(self, geneseq): """See main class docstring.""" if len(geneseq) % 3: raise ValueError(f"{len(geneseq)} not a multiple of 3") self.geneseq = geneseq self._nt_sites = dict(enumerate(geneseq, start=1)) self._geneseq_list = list(geneseq) assert list(self._nt_sites.values()) == self._geneseq_list
[docs] def shiftable(self, start, end, shift): """Can deletion be shifted in sequence? Parameters ---------- start : int Start of deletion in 1, 2, ... numbering. end: int End of deletion in 1, 2, ... numbering (inclusive). shift : int Amount we try to shift deletion. Returns ------- bool Can deletion be shifted? """ if not (1 <= start <= end <= len(self.geneseq)): raise ValueError(f"invalid `start` / `end` of {start} / {end}") orig = "".join(self.geneseq[: start - 1] + self.geneseq[end:]) shifted = "".join( self.geneseq[: start - 1 + shift] + self.geneseq[end + shift :] ) assert len(orig) == len(shifted) == len(self.geneseq) - end + start - 1 return orig == shifted
[docs] def dels_to_subs(self, mut_str): """str: Copy of ``mut_str`` with in-frame deletions as substitutions""" muts = alignparse.consensus.process_mut_str(mut_str) codon_len_deletions = [] for deletion in muts.deletions: m = alignparse.consensus._MUT_REGEX["deletion"].fullmatch(deletion) (start, end) = (int(m.group("start")), int(m.group("end"))) assert end >= start, f"{deletion=}, {start=}, {end=}" if 0 == (((end - start) + 1) % 3): codon_len_deletions.append((start, end, deletion)) if not codon_len_deletions: return mut_str # make sure deletion doesn't overlap with substitution or insertion mutated_sites = set() for sub in muts.substitutions: m = alignparse.consensus._MUT_REGEX["substitution"].fullmatch(sub) mutated_sites.add(int(m.group("start"))) for ins in muts.insertions: m = alignparse.consensus._MUT_REGEX["insertion"].fullmatch(ins) mutated_sites.add(int(m.group("start"))) for start, end, deletion in codon_len_deletions: frame = start % 3 if (start % 3 != 0) else 3 if frame == 1: # already in frame assert (end % 3) == 0 new_subs = " ".join( f"{self._nt_sites[i]}{i}-" for i in range(start, end + 1) ) assert mut_str.count(deletion) == 1 mut_str = mut_str.replace(deletion, new_subs) assert mut_str.count(deletion) == 0 else: shifts = {2: (-1, 2), 3: (-2, 1)}[frame] for shift in shifts: if mutated_sites.intersection( range(start + shift, end + shift + 1) ): continue if self.shiftable(start, end, shift): new_subs = " ".join( f"{self._nt_sites[i]}{i}-" for i in range(start + shift, end + shift + 1) ) assert mut_str.count(deletion) == 1 mut_str = mut_str.replace(deletion, new_subs) assert mut_str.count(deletion) == 0 break return mut_str
[docs]def qvals_to_accuracy(qvals, encoding="numbers"): r"""Convert set of quality scores into average accuracy. Parameters ---------- qvals : numpy.array, number, or str Q-values, for how they are encoded see `encoding`. encoding : {'numbers', 'sanger'} If 'numbers' then `qvals` should be a numpy.array of Q-values or a number giving a single Q-value. If 'sanger', then `qvals` is a string, with the Q-value being the ASCII value minus 33. Returns ------- float or nan The average accuracy if the Q-values. `nan` if `qvals` is empty. Note ---- The probability :math:`p` of an error at a given site is related to the Q-value :math:`Q` by :math:`Q = -10 \log_{10} p`. The accuracy is one minus the average error rate. Example ------- >>> qvals = numpy.array([13, 77, 93]) >>> round(qvals_to_accuracy(qvals), 3) 0.983 >>> round(qvals_to_accuracy(qvals[1 : ]), 3) 1.0 >>> qvals_to_accuracy(numpy.array([])) nan >>> qvals_str = '.n~' >>> round(qvals_to_accuracy(qvals_str, encoding='sanger'), 3) 0.983 >>> round(qvals_to_accuracy(15), 3) 0.968 """ if encoding == "numbers": if isinstance(qvals, numbers.Number): qvals = numpy.array([qvals]) elif isinstance(qvals, list): qvals = numpy.array(qvals) if len(qvals) == 0: return math.nan if encoding == "numbers": pass if encoding == "sanger": qvals = numpy.array([ord(q) - 33 for q in qvals]) elif encoding != "numbers": raise ValueError(f"invalid `encoding`: {encoding}") return (1 - 10 ** (qvals / -10)).sum() / len(qvals)
[docs]def sort_mutations(mut_strs): """Sort mutation string by site, and combine multiple mutation strings. Parameters ---------- mut_strs : str or list A single mutation string or a list of such strings. Returns ------- str A single mutation string with all mutations sorted by site. Example ------- Sort a single mutation string: >>> sort_mutations('ins7GC A5C del2to3') 'del2to3 A5C ins7GC' Sort a list of two mutation strings, including a negative site: >>> sort_mutations(['ins7GC', 'A-5C del2to3']) 'A-5C del2to3 ins7GC' """ if isinstance(mut_strs, str): mut_strs = [mut_strs] decorated_list = [] for mut_str in mut_strs: for mut in mut_str.split(): m = re.fullmatch( r"ins(\-?\d+)[A-Z]+|" r"[A-Z](\-?\d+)[A-Z]|" r"del(\-?\d+)to\-?\d+", mut ) if not m: raise ValueError(f"failed to match {mut} in:\n{mut_str}") site = [site for site in m.groups() if site is not None] assert len(site) == 1 site = int(site[0]) decorated_list.append((site, mut)) return " ".join(mut for _, mut in sorted(decorated_list))
[docs]def merge_dels(s): """Merge consecutive deletions Parameters ---------- s : str A single string of mutations. Returns ------- str A mutation strings where consecutive deletions have been merged, and all mutations are sorted by site. Example ------- Merge consecutive deletions: >>> merge_dels('del12to15 del21to30 del210to300 del16to20 ' ... 'del1702to1909 del1910to1930 G885T G85T') 'del12to30 G85T del210to300 G885T del1702to1930' """ # parse deletions subs, deletions, insertions = alignparse.consensus.process_mut_str(s) # if no deletions are available return current mutation if not deletions: return sort_mutations(s) else: # extract position and from:to deletion list mut_list = [] for deletion in deletions: mut_sites = re.fullmatch(r"del(?P<start>\-?\d+)to(?P<end>\-?\d+)", deletion) if not mut_sites: raise ValueError(f"cannot match deletion {deletion}") mut_list.append( [int(mut_sites.group("start")), int(mut_sites.group("end"))] ) # merge consecutive ranges mut_list.sort() new_ranges = [] left, right = mut_list[0] for del_range in mut_list[1:]: next_left, next_right = del_range if right + 1 < next_left: new_ranges.append([left, right]) left, right = next_left, next_right else: right = max(right, next_right) new_ranges.append([left, right]) # create range-corrected (RC) deletion list deletion_RC = [f"del{left}to{right}" for left, right in new_ranges] # overwrite mutation tuple with new range return sort_mutations([*subs, *deletion_RC, *insertions])
[docs]class MutationRenumber: """Re-number mutations. Arguments ---------- number_mapping : pandas.DataFrame Data frame giving mapping from old to new numbering scheme. old_num_col : str Column in `number_mapping` giving old site number. new_num_col : str Column in `number_mapping` giving new site number. wt_nt_col : str or None Column in `number_mapping` giving wildtype nucleotide at each site, or `None` to not check identity. Is also allowed to be an amino acid. err_suffix : str Append this message to any errors raised about invalid sites or mutation strings. Can be useful for debugging. allow_letter_suffixed_numbers : bool Allow site numbers in `number_mapping` to have lowercase letter suffixes as in ``214a``. Attributes ---------- old_to_new_site : dict Maps old site number to new one. old_to_wt : dict or None Maps old site number to wildtype nucleotide if using `wt_nt_col`. Example -------- >>> number_mapping = pd.DataFrame({'old': [1, 2, 3], ... 'new': [5, 6, 7], ... 'wt_nt': ['A', 'C', 'G']}) >>> renumberer = MutationRenumber(number_mapping=number_mapping, ... old_num_col='old', ... new_num_col='new', ... wt_nt_col='wt_nt') >>> renumberer.old_to_new_site {1: 5, 2: 6, 3: 7} >>> renumberer.old_to_wt {1: 'A', 2: 'C', 3: 'G'} >>> renumberer.renumber_muts('A1C del2to3 ins3GC') 'A5C del6to7 ins7GC' Try to renumber with gaps and stop codons, allowed if flags set: >>> renumberer.renumber_muts("A1F C2- G3*") Traceback (most recent call last): ... ValueError: Cannot match C2- in A1F C2- G3* >>> renumberer.renumber_muts("A1F C2- G3*", ... allow_gaps=True, ... allow_stop=True) 'A5F C6- G7*' Use ``allow_letter_suffixed_numbers``: >>> suffixed_number_mapping = pd.DataFrame({'old': [1, 2, 3], ... 'new': ["5", "6", "6a"], ... 'wt_nt': ['A', 'C', 'G']}) >>> suffixed_renumberer = MutationRenumber(number_mapping=suffixed_number_mapping, ... old_num_col='old', ... new_num_col='new', ... wt_nt_col='wt_nt') Traceback (most recent call last): ... ValueError: `number_mapping` column new not integer >>> suffixed_renumberer = MutationRenumber(number_mapping=suffixed_number_mapping, ... old_num_col='old', ... new_num_col='new', ... wt_nt_col='wt_nt', ... allow_letter_suffixed_numbers=True) >>> suffixed_renumberer.renumber_muts('A1C del2to3 ins3GC') 'A5C del6to6a ins6aGC' """ def __init__( self, number_mapping, old_num_col, new_num_col, wt_nt_col, *, err_suffix="", allow_letter_suffixed_numbers=False, ): """See main class docstring.""" self._err_suffix = err_suffix for col in [old_num_col, new_num_col]: if col not in number_mapping.columns: raise ValueError( f"`number_mapping` lacks column {col}" + self._err_suffix ) if number_mapping[col].dtype != int: if allow_letter_suffixed_numbers: if not all( re.fullmatch(r"\-?\d+[a-z]*", r) for r in number_mapping[col] ): raise ValueError( f"`number_mapping` column {col} not integer or integer " "suffixed by lower-case letter" + self._err_suffix ) else: raise ValueError( f"`number_mapping` column {col} not integer" + self._err_suffix ) self.old_to_new_site = number_mapping.set_index(old_num_col)[ new_num_col ].to_dict() if not ( len(self.old_to_new_site) == len(set(self.old_to_new_site.values())) == len(number_mapping) ): raise ValueError("site numbers not unique" + self._err_suffix) self._old_to_new_site_str = { str(old): str(new) for old, new in self.old_to_new_site.items() } if wt_nt_col: if wt_nt_col not in number_mapping.columns: raise ValueError( f"`number_mapping` lacks column {wt_nt_col}" + self._err_suffix ) if not all( isinstance(nt, str) and len(nt) == 1 for nt in number_mapping[wt_nt_col] ): raise ValueError( f"`number_mapping` column {col} not letters" + self._err_suffix ) self.old_to_wt = number_mapping.set_index(old_num_col)[wt_nt_col].to_dict() self._old_to_wt_str = {str(old): wt for old, wt in self.old_to_wt.items()} else: self.old_to_wt = None
[docs] def renumber_muts(self, mut_str, allow_gaps=False, allow_stop=False): """Get re-numbered mutation string. Parameters ---------- mut_str : str Mutations in format 'A1C del2to3 ins3GG'. allow_gaps : bool Allow gap (``-``) characters allow_stop : bool Allow stop (``*``) characters Returns ------- str A version of `mut_str` where sites have been renumbered. """ new_muts = [] for mut in mut_str.split(): try: # try to match substitutions chars = "A-Z" if allow_gaps: chars += r"\-" if allow_stop: chars += r"\*" m = re.fullmatch( rf"(?P<wt>[{chars}])(?P<site>\-?\d+)(?P<mut>[{chars}])", mut ) if m: site = m.group("site") if self.old_to_wt is not None: if self._old_to_wt_str[site] != m.group("wt"): expected_wt = self._old_to_wt_str[m.group("site")] raise ValueError( f"Mutation {mut} invalid wt, " f"expected {expected_wt}" + self._err_suffix ) new_muts.append( m.group("wt") + self._old_to_new_site_str[site] + m.group("mut") ) continue # try to match insertion m = re.fullmatch(rf"ins(?P<site>\-?\d+)(?P<insertion>[{chars}]+)", mut) if m: new_muts.append( "ins" + self._old_to_new_site_str[m.group("site")] + m.group("insertion") ) continue # try to match deletion m = re.fullmatch(r"del(?P<site1>\-?\d+)to(?P<site2>\-?\d+)", mut) if m: new_muts.append( "del" + self._old_to_new_site_str[m.group("site1")] + "to" + self._old_to_new_site_str[m.group("site2")] ) continue # problem if we made it here, couldn't match anything raise ValueError(f"Cannot match {mut} in {mut_str}" + self._err_suffix) except KeyError: raise ValueError( f"Mutation {mut} site out of numbering range" + self._err_suffix ) return " ".join(new_muts)
if __name__ == "__main__": import doctest doctest.testmod()