"""
===============
minimap2
===============
Runs `minimap2 <https://lh3.github.io/minimap2/>`_
aligner.
"""
import os
import sys
import re
import io
import math
import functools
import subprocess
import tempfile
import collections
import random
import packaging.version
import numpy
import Bio.SeqIO
from dms_tools2 import NTS
import dms_tools2.pacbio
import dms_tools2.seqnumbering
#: `options` argument to :class:`Mapper` that works well
#: for codon-mutant libraries such as those created for
#: deep mutational scanning. Indels are highly penalized
#: as they are not expected, and settings are used that
#: effectively call strings of consecutive nucleotide
#: mutations as expected during codon mutagenesis. The
#: queries are assumed to be in the same orientation as
#: the target.
OPTIONS_CODON_DMS = ['--for-only',
'-A2',
'-B4',
'-O12',
'-E2',
'--secondary=no',
'--end-bonus=13',
]
#: `options` argument to :class:`Mapper` that works well
#: for libraries that contain viral genes expected to
#: potentially have both point mutations (at nucleotide, not
#: codon level) and some longer deletions.
#: The queries are assumed to be in the same orientation
#: as the target. These options resemble those suggested for
#: `PacBio IsoSeq <https://github.com/lh3/minimap2/blob/master/cookbook.md#map-iso-seq>`_
#: but use `-C0` and `-un` to avoid splice-site preference as
#: we want to trick the aligner into thinking long deletions are
#: spliced introns.
OPTIONS_VIRUS_W_DEL = [
'-x','splice',
'-un',
'-C0',
'--splice-flank=no',
'--mask-level=1',
'--secondary=no',
'--for-only',
'--end-seed-pen=2',
'--end-bonus=1',
]
# namedtuple to hold alignments
Alignment = collections.namedtuple('Alignment',
['target', 'r_st', 'r_en', 'r_len', 'q_len', 'q_st',
'q_en', 'strand', 'cigar_str', 'additional',
'score'])
Alignment.__doc__ = "Alignment of a query to a target."
Alignment.target.__doc__ = "Name of target to which query was aligned."
Alignment.r_st.__doc__ = "Alignment start in target (0 based)."
Alignment.r_en.__doc__ = "Alignment end in target (0 based)."
Alignment.r_len.__doc__ = "Total length of target prior to any clipping."
Alignment.q_st.__doc__ = "Alignment start in query (0 based)."
Alignment.q_en.__doc__ = "Alignment end in query (0 based)."
Alignment.q_len.__doc__ = "Total length of query prior to any clipping."
Alignment.strand.__doc__ = "1 if aligns in forward polarity, -1 if in reverse."
Alignment.cigar_str.__doc__ = "CIGAR in `PAF long format <https://github.com/lh3/minimap2#cs>`_"
Alignment.additional.__doc__ = ("List of additional :class:`Alignment` "
"objects, useful for multiple alignments.")
Alignment.score.__doc__ = 'Alignment score.'
[docs]def checkAlignment(a, target, query):
"""Checks alignment is valid given target and query.
Arguments:
`a` (:class:`Alignment`)
The alignment.
`target` (str)
The target to which `query` is aligned.
`query` (str)
The query aligned to `target`.
Returns:
`True` if `a` is a valid alignment of `query`
to `target`, `False` otherwise. Being valid does
not mean an alignment is good, just that the
start / ends and CIGAR in `a` are valid.
>>> target = 'ATGCAT'
>>> query = 'TACA'
>>> a_valid = Alignment(target='target', r_st=1, r_en=5,
... r_len=6, q_st=0, q_en=4, q_len=4, strand=1,
... cigar_str='=T*ga=CA', additional=[], score=-1)
>>> checkAlignment(a_valid, target, query)
True
>>> a_invalid = a_valid._replace(r_st=0, r_en=4)
>>> checkAlignment(a_invalid, target, query)
False
>>> target = 'ATGCAT'
>>> a_valid2 = Alignment(target='target', r_st=0, r_en=6,
... r_len=6, q_st=0, q_en=6, q_len=6, strand=1,
... cigar_str='=A+tgc-tgc=AT', additional=[], score=-1)
>>> checkAlignment(a_valid2, target, target)
True
"""
assert a.strand == 1, "not implemented for - strand"
(cigar_query, cigar_target) = cigarToQueryAndTarget(a.cigar_str)
if (
(a.q_len < a.q_en) or
(a.r_len < a.r_en) or
(a.r_st >= a.r_en) or
(a.q_st >= a.q_en) or
(a.q_len != len(query)) or
(a.r_len != len(target)) or
(query[a.q_st : a.q_en] != cigar_query) or
(target[a.r_st : a.r_en] != cigar_target)
):
return False
else:
return True
[docs]class Mutations:
"""Class to hold mutations.
Holds three types of mutations: substitutions (point mutations),
insertions, and deletions.
The numbering scheme used to define the mutations (e.g.,
0-based, 1-based) is determined upstream when deciding how to
number the mutations passed at initialization.
When initializing, set Q-values to `math.nan` if they are
not known. The Q-values are used to calculate accuracy
of mutations via :meth:`dms_tools2.pacbio.qvalsToAccuracy`.
Likewise, pass `math.nan` for homopolymer lengths if they
are not known.
Args:
`substitution_tuples` (list)
Substitutions `(i, mut_str, q)` where
`i` is the site number, `mut_str` is a string
describing substitution, and `q` is the Q-value.
`insertion_tuples` (list)
Insertions `(i, ins_len, mut_str, qs, hplen)` where `i`
is site **after** insertion, `inslen` is insertion length,
`mut_str` is string describing insertion, `qs` is
numpy array of Q-values, and `hplen` is length of homopolymer
in which insertion occurs.
`deletion_tuples` (list)
Deletions `(istart, iend, mut_str, q, hplen)` where
`istart` is first site of deletion, `iend` is
last site of deletion (so a single nucleotide
deletion has `istart == iend`), `mut_str` is string
describing deletion , `q` is the Q-value of the
site immediately **after** the deletion in accordance with
`PacBio CCS <https://github.com/PacificBiosciences/unanimity/blob/develop/doc/PBCCS.md#interpretting-qual-values>`_
specification, and `hplen` is length of homopolymer
in which deletion occurs.
`acc_not_q` (bool)
If `True`, then the tuples used to pass the mutations
should give the accuracy rather than the Q-values.
You can also create objects using :class:`Mutations.from_str`.
After initialization, use the methods described below to get
information about the mutations. All of the methods that
return lists of mutations do them ordered by site (first to last).
We keep track of the length of the homopolymers in which
indels are found because the main error mode of PacBio
sequencing is indels in homopolymers.
Here is an example. Note that Q-value of 20 indicates an accuracy
of 0.99, and a Q-value of 30 indicates an accuracy of 0.999:
>>> muts = Mutations(
... substitution_tuples=[(1, 'A1T', math.nan),
... (15, 'C15A', 30), (13, 'G13A', 20)],
... insertion_tuples=[(5, 2, 'ins5len2', [20, 30], 1)],
... deletion_tuples=[(8, 10, 'del8to10', 20, 2)])
>>> muts.substitutions()
['A1T', 'G13A', 'C15A']
>>> muts.substitutions(returnval='site')
[1, 13, 15]
>>> muts.insertions()
['ins5len2']
>>> muts.insertions(returnval='site')
[5]
>>> muts.deletions()
['del8to10']
>>> muts.deletions(returnval='site')
[8]
Now with some filtering on accuracy:
>>> muts.substitutions(min_acc=0.99)
['G13A', 'C15A']
>>> muts.substitutions(min_acc=0.995)
['C15A']
>>> muts.insertions(min_acc=0.991)
['ins5len2']
>>> muts.insertions(min_acc=0.999)
[]
>>> muts.deletions(min_acc=0.95)
['del8to10']
>>> muts.deletions(min_acc=0.999)
[]
Now get lengths of insertions / deletions:
>>> muts.deletions(returnval='length')
[3]
>>> muts.insertions(returnval='length')
[2]
Get homopolymer lengths:
>>> muts.deletions(returnval='homopolymer_length')
[2]
>>> muts.insertions(returnval='homopolymer_length')
[1]
Show how we can get string representations with
`repr` and `str`:
>>> str(muts) == repr(muts)
True
>>> str(muts)
'A1T; G13A (accuracy = 0.990000); C15A (accuracy = 0.999000); ins5len2 (accuracy = 0.994500, homopolymer length = 1); del8to10 (accuracy = 0.990000, homopolymer length = 2)'
Note that when Q-values or homopolymer lengths are not provided,
then the aren't printed in the `str` representation:
>>> str(Mutations(substitution_tuples=[], insertion_tuples=[],
... deletion_tuples=[(8, 10, 'del8to10', math.nan, math.nan),
... (12, 14, 'del12to14', math.nan, 4)]))
'del8to10; del12to14 (homopolymer length = 4)'
Also show what these representations look like when there
are no mutations:
>>> str(Mutations(substitution_tuples=[], insertion_tuples=[],
... deletion_tuples=[]))
'no mutations'
"""
def __init__(self, *, substitution_tuples, insertion_tuples,
deletion_tuples, acc_not_q=False):
"""See main class doc string."""
self._substitution_tuples = []
for i, mut_str, q in sorted(substitution_tuples):
if acc_not_q:
acc = q
else:
acc = dms_tools2.pacbio.qvalsToAccuracy(q)
self._substitution_tuples.append((i, mut_str, acc))
self._insertion_tuples = []
for i, ins_len, mut_str, qs, hplen in sorted(insertion_tuples):
if acc_not_q:
acc = qs
else:
acc = dms_tools2.pacbio.qvalsToAccuracy(qs)
if hplen is None or math.isnan(hplen):
hplen = math.nan
self._insertion_tuples.append((i, ins_len, mut_str, acc, hplen))
self._deletion_tuples = []
for istart, iend, mut_str, q, hplen in sorted(deletion_tuples):
if acc_not_q:
acc = q
else:
acc = dms_tools2.pacbio.qvalsToAccuracy(q)
if hplen is None or math.isnan(hplen):
hplen = math.nan
self._deletion_tuples.append((istart, iend, mut_str, acc, hplen))
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __repr__(self):
s = []
for i, mut_str, acc in self._substitution_tuples:
if math.isnan(acc):
s.append(mut_str)
else:
s.append(f'{mut_str} (accuracy = {acc:.6f})')
for i, ins_len, mut_str, acc, hplen in self._insertion_tuples:
i_str = []
if not math.isnan(acc):
i_str.append(f'accuracy = {acc:.6f}')
if not math.isnan(hplen):
i_str.append(f'homopolymer length = {hplen}')
if i_str:
i_str = ', '.join(i_str)
s.append(f'{mut_str} ({i_str})')
else:
s.append(mut_str)
for istart, iend, mut_str, acc, hplen in self._deletion_tuples:
i_str = []
if not math.isnan(acc):
i_str.append(f'accuracy = {acc:.6f}')
if not math.isnan(hplen):
i_str.append(f'homopolymer length = {hplen}')
if i_str:
i_str = ', '.join(i_str)
s.append(f'{mut_str} ({i_str})')
else:
s.append(mut_str)
if len(s) == 0:
s.append('no mutations')
return '; '.join(s)
[docs] @classmethod
def from_str(cls, mut_str):
"""Create :class:`Mutations` from str representation.
Args:
`mut_str` (str)
String representation as returned by `str(mut)`
where `mut` is a :class:`Mutations` object.
See the examples below for more details.
Returns:
:class:`Mutations` object.
Examples of creating :class:`Mutations.from_str`.
First, a single substitution without an accuracy:
>>> m = Mutations.from_str('A1T')
>>> str(m)
'A1T'
>>> str(m) == str(Mutations(substitution_tuples=[(1, 'A1T', math.nan)],
... deletion_tuples=[], insertion_tuples=[]))
True
And now substitutions with Q-values:
>>> m2 = Mutations.from_str('A1T (accuracy = 0.99); G3A (accuracy = 0.999)')
>>> m2 == Mutations.from_str(str(m2))
True
Now including deletions:
>>> m3 = Mutations.from_str(
... 'del1to3 (accuracy = 0.99); '
... 'A6T (accuracy = 0.995); '
... 'del8to9 (accuracy = 0.999, homopolymer length = 2)')
>>> str(m3)
'A6T (accuracy = 0.995000); del1to3 (accuracy = 0.990000); del8to9 (accuracy = 0.999000, homopolymer length = 2)'
An insertion:
>>> m4 = Mutations.from_str('ins5len2 (accuracy = 0.99, homopolymer length = 3)')
>>> str(m4)
'ins5len2 (accuracy = 0.990000, homopolymer length = 3)'
Example of a null mutation:
>>> m_null = Mutations.from_str('no mutations')
>>> m_null2 = Mutations.from_str('')
>>> m_null == m_null2
True
>>> str(m_null) == str(m_null2)
True
>>> str(m_null)
'no mutations'
"""
sub_tups = []
ins_tups = []
del_tups = []
if (not mut_str) or mut_str.isspace() or mut_str == 'no mutations':
pass
else:
sub_match = re.compile(
r'^(?P<wt>[ACGT])(?P<i>\d+)(?P<mut>[ACGT])'
r'(?: \(accuracy = (?P<acc>\d+\.{0,1}\d*)\)){0,1}$')
del_match = re.compile(
r'^del(?P<istart>\d+)to(?P<iend>\d+)'
r'(?: \((?:accuracy = (?P<acc>\d+\.{0,1}\d*)){0,1}(?:\, ){0,1}'
r'(?:homopolymer length = (?P<hplen>\d+)){0,1}\)){0,1}$')
ins_match = re.compile(
r'ins(?P<i>\d+)len(?P<ins_len>\d+)'
r'(?: \((?:accuracy = (?P<acc>\d+\.{0,1}\d*)){0,1}(?:\, ){0,1}'
r'(?:homopolymer length = (?P<hplen>\d+)){0,1}\)){0,1}$')
for s in map(str.strip, mut_str.split(';')):
m = sub_match.match(s)
if m:
acc = m.group('acc')
if acc is None:
acc = math.nan
else:
acc = float(acc)
sub_tups.append((
int(m.group('i')),
m.group('wt') + m.group('i') + m.group('mut'),
acc
))
continue
m = del_match.match(s)
if m:
acc = m.group('acc')
if acc is None:
acc = math.nan
else:
acc = float(acc)
hplen = m.group('hplen')
if hplen is None:
hplen = math.nan
else:
hplen = int(hplen)
del_tups.append((
int(m.group('istart')),
int(m.group('iend')),
'del' + m.group('istart') + 'to' + m.group('iend'),
acc,
hplen
))
continue
m = ins_match.match(s)
if m:
acc = m.group('acc')
if acc is None:
acc = math.nan
else:
acc = float(acc)
hplen = m.group('hplen')
if hplen is None:
hplen = math.nan
else:
hplen = int(hplen)
ins_tups.append((
int(m.group('i')),
int(m.group('ins_len')),
'ins' + m.group('i') + 'len' + m.group('ins_len'),
acc,
hplen
))
continue
raise ValueError(f"could not match {s} in {mut_str}")
raise ValueError(f"could not match {s} in {mut_str}")
return cls(substitution_tuples=sub_tups,
insertion_tuples=ins_tups,
deletion_tuples=del_tups,
acc_not_q=True)
[docs] def substitutions(self, *, returnval='mutation', min_acc=None,
min_acc_filter_nan=True):
"""List of substitutions or associated values.
Args:
`min_acc` (float or `None`)
Only include substitutions with >= this accuracy.
`min_acc_filter_nan` (bool)
If using `min_acc` filter, do we also remove
mutations with an accuracy that is `math.nan`?
`returnval` (str)
Type of value to return in list:
- "mutation": Strings giving mutations, where
"A1T" means site 1 is mutated from A to T.
- "site": Sites of substitutions.
- "accuracy": Numbers giving accuracy of each
mutation.
Returns:
List of mutations or other value specified by `returnval`.
"""
if min_acc is None:
subtups = self._substitution_tuples
else:
accs = [tup[2] for tup in self._substitution_tuples]
subtups = [tup for tup, a in zip(self._substitution_tuples, accs) if
(a >= min_acc) or ((not min_acc_filter_nan) and math.isnan(a))]
if returnval == 'mutation':
return [tup[1] for tup in subtups]
elif returnval == 'site':
return [tup[0] for tup in subtups]
elif returnval == 'accuracy':
return [tup[2] for tup in subtups]
else:
raise ValueError("invalid `returnval` {0}".format(returnval))
[docs] def insertions(self, *, returnval='mutation', min_acc=None,
min_acc_filter_nan=True):
"""List of insertions.
Args:
`min_acc` (float or `None`)
Only include insertions with >= this accuracy.
`min_acc_filter_nan` (bool)
If using `min_acc` filter, do we also remove
mutations with an accuracy that is `math.nan`?
`returnval` (str)
Type of value to return in list:
- "mutation": Strings giving mutations, where
"ins10len20" means insertion of length 20
immediately before site 10.
- "site": Sites of insertions.
- "length": Integers giving insertion lengths.
- "accuracy": Numbers giving accuracy of each
mutation. Accuracy of insertion is averaged
over its length.
- "homopolymer_length": Integers giving length
of homopolymer in which insertion is found.
Returns:
List of mutations or other value specified by `returnval`.
"""
if min_acc is None:
instups = self._insertion_tuples
else:
accs = [tup[3] for tup in self._insertion_tuples]
instups = [tup for tup, a in zip(self._insertion_tuples, accs) if
(a >= min_acc) or ((not min_acc_filter_nan) and math.isnan(a))]
if returnval == 'mutation':
return [tup[2] for tup in instups]
elif returnval == 'site':
return [tup[0] for tup in instups]
elif returnval == 'length':
return [tup[1] for tup in instups]
elif returnval == 'accuracy':
return [tup[3] for tup in instups]
elif returnval == 'homopolymer_length':
return [tup[4] for tup in instups]
else:
raise ValueError("invalid `returnval` {0}".format(returnval))
[docs] def deletions(self, *, returnval='mutation', min_acc=None,
min_acc_filter_nan=True):
"""List of deletions.
Args:
`min_acc` (float or `None`)
Only include deletions with >= this accuracy.
`min_acc_filter_nan` (bool)
If using `min_acc` filter, do we also remove
mutations with an accuracy that is `math.nan`?
`returnval` (str)
Type of value to return in list:
- "mutation": Strings giving mutations, where
"del12to13" means deletion of nucleotides 12
to 13, inclusive.
- "site": Sites of where deletions start.
- "length": Integers giving deletion lengths.
- "accuracy": Numbers giving accuracy of each
mutation. Accuracy is for first nucleotide **after**
deletion.
- "homopolymer_length": Integers giving length
of homopolymer in which deletion is found.
Returns:
List of mutations or other value specified by `returnval`.
"""
if min_acc is None:
deltups = self._deletion_tuples
else:
accs = [tup[3] for tup in self._deletion_tuples]
deltups = [tup for tup, a in zip(self._deletion_tuples, accs) if
(a >= min_acc) or ((not min_acc_filter_nan) and math.isnan(a))]
if returnval == 'mutation':
return [tup[2] for tup in deltups]
elif returnval == 'site':
return [tup[0] for tup in deltups]
elif returnval == 'length':
return [tup[1] - tup[0] + 1 for tup in deltups]
elif returnval == 'accuracy':
return [tup[3] for tup in deltups]
elif returnval == 'homopolymer_length':
return [tup[4] for tup in deltups]
else:
raise ValueError("invalid `returnval` {0}".format(returnval))
[docs]class MutationCaller:
"""Call :class:`Mutations` from :class:`Alignment`.
Attributes:
`mapper` (:class:`Mapper`)
The :class:`Mapper` used to create the alignments.
`transcriptconverter` (:class:`dms_tools2.seqnumbering.TranscriptConverter` or `None`)
Use if the alignments made by `mapper` are to transcripts
(or partial transcripts) and you want to convert the numbering
of mutations to 1, 2, ... numbering of the chromosomes
containing these transcripts, plus indicate amino-acid
substitutions in any CDSs. The target names in `mapper` should
match the mRNA names in `transcriptconverter`. If you
use a `transcriptconverter`, `targetindex` must be 1.
`targetindex` (int)
Number assigned to first nucleotide of target when
numbering mutations. A value of 1 means that the first
nucleotide of the target is position 1.
`target_clip` (int)
Ignore any mutations that occur within this many
nucleotides of the termini of target. If an indel
includes any nucleotides that are not ignored,
then the full indel is reported.
`query_softclip` (int)
Ignore any mutations that occur within this many
nucleotides of the termini of the query **and**
are soft clipped in the alignment. If an indel
includes any nucleotides not ignored, then the full
indel is reported.
Here is an example. First, create an :class:`Alignment`
that corresponds to the following::
target: --ATGCATGAAT--CGAAT
query: cgATGaAcG--TatCt---
>>> with tempfile.NamedTemporaryFile(mode='w') as targetfile:
... _ = targetfile.write('>target\\nATGCATGGATCGAAT')
... targetfile.flush()
... mapper = Mapper(targetfile.name, OPTIONS_CODON_DMS)
>>> a = Alignment(q_st=2, q_en=14, q_len=14, strand=1,
... r_st=0, r_en=12, r_len=15, score=16, target='target',
... cigar_str='=ATG*ca=A*tc=G-aa=T+at=C*gt', additional=[])
Also create some Q-values. Just to make things simple for this
example, we use unrealistic Q-values. For aligned sites they
are equal to the site number in the target; for un-aligned sites
they are 50.
>>> qvals = numpy.array([50, 50, 1, 2, 3, 4, 5, 6, 7, 10, 50, 50, 11, 12])
Now call mutations using :class:`MutationCaller.call`:
>>> mutcaller = MutationCaller(mapper)
>>> muts = mutcaller.call(a, qvals)
>>> muts.substitutions()
['C4A', 'T6C', 'G12T']
>>> muts.substitutions(returnval='site')
[4, 6, 12]
>>> muts.deletions()
['del8to9', 'del13to15']
>>> muts.deletions(returnval='site')
[8, 13]
>>> muts.insertions()
['ins1len2', 'ins11len2']
>>> muts.insertions(returnval='site')
[1, 11]
Check that homo-polymer lengths are correct:
>>> muts.deletions(returnval='homopolymer_length')
[2, 2]
>>> muts.insertions(returnval='homopolymer_length')
[1, 1]
Check that Q-values are also correct:
>>> numpy.allclose(muts.substitutions(returnval='accuracy'),
... list(map(dms_tools2.pacbio.qvalsToAccuracy, [4, 6, 12])))
True
>>> numpy.allclose(muts.insertions(returnval='accuracy'), list(
... map(dms_tools2.pacbio.qvalsToAccuracy, [[50, 50], [50, 50]])))
True
>>> numpy.allclose(muts.deletions(returnval='accuracy'),
... [dms_tools2.pacbio.qvalsToAccuracy(10), math.nan], equal_nan=True)
True
Illustrate `targetindex` by re-calling with 0-based idexing:
>>> mutcaller_index0 = MutationCaller(mapper, targetindex=0)
>>> muts_index0 = mutcaller_index0.call(a, qvals)
>>> muts_index0.substitutions()
['C3A', 'T5C', 'G11T']
>>> muts_index0.deletions()
['del7to8', 'del12to14']
>>> muts_index0.insertions()
['ins0len2', 'ins10len2']
Use `target_clip` to ignore mutations near target termini:
>>> mutcaller_targetclip2 = MutationCaller(mapper, target_clip=2)
>>> muts_targetclip2 = mutcaller_targetclip2.call(a, qvals)
>>> muts_targetclip2.substitutions()
['C4A', 'T6C', 'G12T']
>>> muts_targetclip2.deletions()
['del8to9', 'del13to15']
>>> muts_targetclip2.insertions()
['ins11len2']
>>> mutcaller_targetclip4 = MutationCaller(mapper, target_clip=4)
>>> muts_targetclip4 = mutcaller_targetclip4.call(a, qvals)
>>> muts_targetclip4.substitutions()
['T6C']
>>> muts_targetclip4.deletions()
['del8to9']
>>> muts_targetclip4.insertions()
['ins11len2']
Use `query_softclip` to ignore clipped regions in query:
>>> mutcaller_querysoftclip = MutationCaller(mapper, query_softclip=3)
>>> muts_querysoftclip = mutcaller_querysoftclip.call(a, qvals)
>>> muts_querysoftclip.substitutions()
['C4A', 'T6C', 'G12T']
>>> muts_querysoftclip.deletions()
['del8to9', 'del13to15']
>>> muts_querysoftclip.insertions()
['ins11len2']
Now use `transcriptconverter` to get mutations numbered in
chromosome coordinates along with amino-acid substitutions.
First, initialize a :class:`dms_tools2.seqnumbering.TranscriptConverter`:
>>> with tempfile.NamedTemporaryFile(mode='r+') as genbankfile:
... _ = genbankfile.write('''
... LOCUS chromosome 17 bp DNA UNK 01-JAN-1980
... FEATURES Location/Qualifiers
... mRNA 3..17
... /label="target"
... CDS 3..14
... /label="prot"
... ORIGIN
... 1 CAATGCATGG ATCGAAT
... //
... ''')
... genbankfile.flush()
... _ = genbankfile.seek(0)
... transcriptconverter = dms_tools2.seqnumbering.TranscriptConverter(genbankfile)
Now create a :class:`MutationCaller` that uses `transcriptconverter`:
>>> mutcaller_tc = MutationCaller(mapper, transcriptconverter=transcriptconverter)
Use it to call the mutations in chromosome coordinates. Note that
the amino-acid substitutions that result from a nucleotide point
substitution are also indicated in the results:
>>> muts_tc = mutcaller_tc.call(a, qvals)
>>> muts_tc.substitutions()
['chromosome-C6A_prot-His2Asn', 'chromosome-T8C', 'chromosome-G14T']
>>> muts_tc.substitutions(returnval='site')
[6, 8, 14]
>>> muts_tc.deletions()
['chromosome-del10to11', 'chromosome-del15to17']
>>> muts_tc.deletions(returnval='site')
[10, 15]
>>> muts_tc.insertions()
['chromosome-ins3len2', 'chromosome-ins13len2']
>>> muts_tc.insertions(returnval='site')
[3, 13]
"""
def __init__(self, mapper, *, transcriptconverter=None,
targetindex=1, target_clip=0, query_softclip=0):
"""See main class docstring."""
self.mapper = mapper
self.transcriptconverter = transcriptconverter
self.targetindex = targetindex
if (self.transcriptconverter is not None) and self.targetindex != 1:
raise ValueError("If using `transcriptconverter`, then "
"`targetindex` must be 1")
if (not isinstance(target_clip, int)) or (
target_clip < 0):
raise ValueError("`target_clip` not int >= 0")
self.target_clip = target_clip
if (not isinstance(query_softclip, int)) or (
query_softclip < 0):
raise ValueError("`query_softclip` not int >= 0")
self.query_softclip = query_softclip
[docs] def call(self, a, qvals=None):
"""Call mutations in alignment.
Args:
`a` (:class:`Alignment`)
Call mutations in this alignment.
`qvals` (`None` or numpy array)
Array of Q-values for the **entire** query used
to build alignment, not just aligned region.
Returns:
A :class:`Mutations` object holding the mutations.
If you are **not** using `transcriptconverter`,
mutations are named like this:
- "A60T" for substitution (at site 60)
- "ins5len10" for insertion (starts at 5, length 10)
- "del12to14" for deletion (12, 13, and 14 deleted)
where the number refers to the alignment target in
`mapper`.
If you are using `transcriptconverter`, then mutations are
named like this:
- "fluNS-A61T_fluNS1-Asp12Val" indicating
substitution at site 61 in the "fluNS" chromosome
causing amino-acid substitution Asp12Val in
the "fluNS1" CDS. If there is no amino-acid
substitution, the name will just be "fluNS-A61T".
- "fluNS-ins4len10" for insertion (again, now
in coordinates of the chromosome, which is "fluNS".
- "fluNS-del11to13" for deletion.
"""
def _get_qval(i):
"""Q-value for site aligning to target `i`."""
if qvals is None:
return math.nan
else:
j = iTargetToQuery(a, i - self.targetindex)
if j is None:
return math.nan
else:
assert 0 <= j < len(qvals)
return qvals[j]
substitution_tuples = []
deletion_tuples = []
insertion_tuples = []
# deletions / insertions before alignment
if a.r_st > 0:
istart = self.targetindex
iend = self.targetindex + a.r_st
deletion_tuples.append((istart,
iend,
'del{0}to{1}'.format(istart, iend),
_get_qval(self.targetindex + a.r_st)))
if a.q_st > self.query_softclip:
if qvals is None:
i_qvals = None
else:
i_qvals = [qvals[j] for j in range(a.q_st)]
i = self.targetindex
ins_len = a.q_st
insertion_tuples.append((i, ins_len,
'ins{0}len{1}'.format(i, ins_len), i_qvals))
# mutations in alignment
itarget = a.r_st + self.targetindex
cigar = a.cigar_str
while cigar:
m = _CIGAR_GROUP_MATCH.match(cigar)
assert m and m.start() == 0
if m.group()[0] == '=':
n = len(m.group()) - 1
itarget += n
elif m.group()[0] == '*':
assert len(m.group()) == 3
substitution_tuples.append((itarget,
'{0}{1}{2}'.format(m.group()[1].upper(),
itarget, m.group()[2].upper()),
_get_qval(itarget)))
itarget += 1
elif m.group()[0] == '-':
n = len(m.group()) - 1
istart = itarget
iend = itarget + n - 1
deletion_tuples.append((istart, iend,
'del{0}to{1}'.format(istart, iend),
_get_qval(itarget + n)))
itarget += n
elif m.group()[0] == '+':
n = len(m.group()) - 1
if qvals is None:
i_qvals = None
else:
i = iTargetToQuery(a, itarget - self.targetindex)
if i is None:
i_qvals = None
else:
i_qvals = [qvals[i - 1 - j] for j in range(n)]
insertion_tuples.append((itarget, n,
'ins{0}len{1}'.format(itarget, n), i_qvals))
elif m.group()[0] == '~':
raise ValueError("Cannot handle intron operations")
else:
raise RuntimeError("should never get here")
cigar = cigar[m.end() : ]
assert cigar == ''
assert itarget - self.targetindex == a.r_en, (
"itarget = {0}\nself.targetindex = {1}\n"
"a.r_en = {2}\na = {3}".format(itarget,
self.targetindex, a.r_en, a))
# deletions / insertions after alignment
if a.r_en < a.r_len:
istart = self.targetindex + a.r_en
iend = self.targetindex + a. r_len - 1
deletion_tuples.append((
istart,
iend,
'del{0}to{1}'.format(istart, iend),
math.nan))
if a.q_en < a.q_len - self.query_softclip:
n = a.q_len - a.q_en
if qvals is None:
i_qvals = None
else:
i_qvals = [qvals[j] for j in range(a.q_en, a.q_len)]
i = self.targetindex + a.r_en
insertion_tuples.append((
i, n, 'ins{0}len{1}'.format(i, n), i_qvals))
# filter away mutations too near target termini
i_first = self.targetindex + self.target_clip
i_last = a.r_len + self.targetindex - self.target_clip
substitution_tuples = [tup for tup in substitution_tuples
if not (tup[0] < i_first or tup[0] >= i_last)]
insertion_tuples = [tup for tup in insertion_tuples
if not (tup[0] < i_first or tup[0] >= i_last)]
deletion_tuples = [tup for tup in deletion_tuples
if not (tup[1] < i_first or tup[0] >= i_last)]
# add homopolymer lengths for indels
def _hpLen(j, targetseq):
"""Homopolymer length."""
j -= self.targetindex
nt = targetseq[j]
mstart = re.compile('^{0}+'.format(nt))
mend = re.compile('{0}+$'.format(nt))
return (len(mstart.search(targetseq[j : ]).group(0)) +
len(mend.search(targetseq[ : j + 1]).group(0)) - 1)
targetseq = self.mapper.targetseqs[a.target]
insertion_tuples = [(i, ins_len, mut_str, q, _hpLen(i, targetseq))
for i, ins_len, mut_str, q in insertion_tuples]
deletion_tuples = [(istart, iend, mut_str, q, _hpLen(istart, targetseq))
for istart, iend, mut_str, q in deletion_tuples]
if self.transcriptconverter is not None:
chrom = self.transcriptconverter.mRNA_chromosome[a.target]
new_substitution_tuples = []
for i, mut_str, q in substitution_tuples:
i_chrom = self.transcriptconverter.i_mRNAtoChromosome(
a.target, i, mRNAfragment=targetseq)
m = re.match(r'^(?P<wt_nt>[{0}])(?P<site>\d+)(?P<mut_nt>[{0}])$'
.format(''.join(NTS)), mut_str)
assert m, "can't match {0}".format(mut_str)
assert i == int(m.group('site'))
wt_nt = m.group('wt_nt')
mut_nt = m.group('mut_nt')
if wt_nt != self.transcriptconverter.ntIdentity(chrom, i_chrom):
raise ValueError("wrong wildtype in `transcriptconverter`")
aasubs = self.transcriptconverter.aaSubstitutions(chrom,
(wt_nt, i_chrom, mut_nt))
if aasubs:
aasubs = '_' + aasubs
new_substitution_tuples.append((
i_chrom,
'{0}-{1}{2}{3}{4}'.format(
chrom, wt_nt, i_chrom, mut_nt, aasubs),
q))
substitution_tuples = new_substitution_tuples
new_insertion_tuples = []
for i, ins_len, mut_str, q, hplen in insertion_tuples:
i_chrom = self.transcriptconverter.i_mRNAtoChromosome(
a.target, i, mRNAfragment=targetseq)
m = re.match(r'^ins(?P<i>\d+)len(?P<len>\d+)$',
mut_str)
assert m, "can't match {0}".format(mut_str)
assert i == int(m.group('i'))
new_insertion_tuples.append((
i_chrom,
ins_len,
'{0}-ins{1}len{2}'.format(chrom, i_chrom, m.group('len')),
q,
hplen))
insertion_tuples = new_insertion_tuples
new_deletion_tuples = []
for istart, iend, mut_str, q, hplen in deletion_tuples:
istart_chrom = self.transcriptconverter.i_mRNAtoChromosome(
a.target, istart, mRNAfragment=targetseq)
iend_chrom = self.transcriptconverter.i_mRNAtoChromosome(
a.target, iend, mRNAfragment=targetseq)
new_deletion_tuples.append((
istart_chrom,
iend_chrom,
'{0}-del{1}to{2}'.format(chrom, istart_chrom, iend_chrom),
q,
hplen))
deletion_tuples = new_deletion_tuples
return Mutations(substitution_tuples=substitution_tuples,
insertion_tuples=insertion_tuples,
deletion_tuples=deletion_tuples)
[docs]class MutationConsensus:
"""Takes consensus of several :class:`Mutations`.
Designed for when you have called :class:`Mutations`
for several sequences thought to represent the same template.
The method make some use of statistical information in the
accuracies computed from quality scores, but is heuristic.
This is because it is designed for cases where causes other than
simple sequencing error (e.g., mis-assigned sequences, true mix
of templates, errors during library preparation) may be present.
Initialize a :class:`MutationConsensus` object with arguments
below, which become attributes of the object. Then call consensus
mutations with :class:`MutationConsensus.callConsensus` method.
Args:
`n_mut` (int)
At least this many sequences must have mutation to
call it in consensus.
`min_acc` (float)
Completely ignore any mutations with accuracy lower
than this. Should be a threshold that just gets rid
of really low-accuracy ones. Is **not** applied to
mutations with an accuracy of NaN (`math.nan`).
`min_sub_frac` (float)
More than this fraction of sequences must have
substitution (point mutation) to call it as present.
`min_indel_frac` (float)
More than this fraction of sequences must have an
indel to call it as present. By default this is more
stringent than `min_sub_frac` because indel sequencing
errors are more common and deletions can be enriched
during PCR.
`group_indel_frac` (float)
If other overlap by >= this fraction of their
total net length with the most common indel,
group them together as the most common indel.
Designed to handle this case where alignment issues
slightly change called boundaries of long indels.
`homopolymer_calling` (dict)
More stringent calling of **single-nucleotide**
indels in hompolymers, which are defined as runs of
>= 3 consecutive nucleotides. Rationale
is that main mode of PacBio sequencing errors is
short indels in homopolymers. Can contain entries
keyed by "n_mut" and "min_indel_frac".
Then for single-nucleotide indels in >= 3 nt homopolymers,
increases calling stringency to these new parameters.
If no entry in dict, just uses main values set for
other mutations, so supply empty dict if you want to
no homopolymer correction.
Mutations are called from the list of :class:`Mutations`
passed to :class:`MutationConsensus.callConsensus` as follows:
1. Ignore any mutations with accuracy less then `min_acc`,
but do **not** eliminate mutations with accuracy that
is `math.nan`.
2. If there are less then `n_mut` :class:`Mutations` passed,
and they have no mutations, call as wildtype.
3. If there are less than `n_mut` :class:`Mutations`
passed and some have mutations, call as ambiguous.
4. If there are at least `n_mut` :class:`Mutations`
passed but less than `n_mut` of them have a mutation,
call as wildtype.
5. If there are at least `n_mut` :class:`Mutations`
that contain a specific mutation
**and** the fraction of :class:`Mutations` that
have this mutation is >= `min_sub_frac` (for point
mutations) or `min_indel_frac` (for indels), then call
the sequence as having the mutation.
6. Otherwise call as wildtype.
Here is an example.
First, define some :class:`Mutations`:
>>> m_wt = Mutations(
... substitution_tuples=[],
... insertion_tuples=[],
... deletion_tuples=[])
>>> m_A1G_high_acc = Mutations(
... substitution_tuples=[(1, 'A1G', 30)],
... insertion_tuples=[],
... deletion_tuples=[])
>>> m_A1G_low_acc = Mutations(
... substitution_tuples=[(1, 'A1G', 9)],
... insertion_tuples=[],
... deletion_tuples=[])
Initialize a :class:`MutationConsensus` with default args:
>>> mutcons = MutationConsensus()
A single :class:`Mutations` with no mutation is called wildtype,
and so returns an empty string:
>>> mutcons.callConsensus([m_wt], 'substitutions')
''
>>> mutcons.callConsensus([m_wt], 'insertions')
''
>>> mutcons.callConsensus([m_wt], 'deletions')
''
Same for two with no mutations:
>>> mutcons.callConsensus([m_wt, m_wt], 'substitutions')
''
A single :class:`Mutations` with mutations is called as ambiguous:
>>> mutcons.callConsensus([m_A1G_high_acc], 'substitutions')
'unknown'
One with mutations and one without is called wildtype:
>>> mutcons.callConsensus([m_A1G_high_acc, m_wt], 'substitutions')
''
Two high-quality mutation calls are give a consensus mutation:
>>> mutcons.callConsensus([m_A1G_high_acc, m_A1G_high_acc], 'substitutions')
'A1G'
But it two low-quality mutation calls do not:
>>> mutcons.callConsensus([m_A1G_low_acc, m_A1G_low_acc], 'substitutions')
''
Having two mutation calls and one wildtype call gives mutation:
>>> mutcons.callConsensus([m_A1G_high_acc, m_A1G_high_acc, m_wt],
... 'substitutions')
'A1G'
Unless the wildtype is in sufficient excess:
>>> mutcons.callConsensus([m_A1G_high_acc] * 2 + [m_wt] * 8,
... 'substitutions')
''
Example with two substitutions:
>>> m_A1G_T2A_high_acc = Mutations(
... substitution_tuples=[(1, 'A1G', 30), (2, 'T2A', 30)],
... insertion_tuples=[],
... deletion_tuples=[])
>>> mutcons.callConsensus([m_A1G_T2A_high_acc, m_A1G_T2A_high_acc],
... 'substitutions')
'A1G T2A'
Example where one sequence has two mutations (enough to be called)
and the other only has one (not enough to be called):
>>> mutcons.callConsensus([m_A1G_T2A_high_acc, m_A1G_high_acc],
... 'substitutions')
'A1G'
Use the `include_stats` option to get more detailed output:
>>> mutcons.callConsensus([m_A1G_T2A_high_acc, m_A1G_T2A_high_acc,
... m_A1G_high_acc, m_wt], 'substitutions', include_stats=True)
'A1G_(3/4) T2A_(2/4)'
Group sufficiently overlapping indels:
>>> m_shortdel = Mutations(
... substitution_tuples=[],
... insertion_tuples=[],
... deletion_tuples=[(8, 9, 'del8to9', math.nan, 1)])
>>> m_longdel = Mutations(
... substitution_tuples=[],
... insertion_tuples=[],
... deletion_tuples=[(8, 18, 'del8to18', math.nan, 1)])
>>> mutcons.callConsensus([m_shortdel] * 2, 'deletions')
'del8to9'
>>> mutcons.callConsensus([m_longdel] * 2, 'deletions')
'del8to18'
>>> m_overlaplongdel = Mutations(
... substitution_tuples=[],
... insertion_tuples=[],
... deletion_tuples=[(9, 17, 'del9to17', math.nan, 1)])
>>> mutcons.callConsensus([m_longdel, m_shortdel], 'deletions')
''
>>> mutcons.callConsensus([m_wt, m_longdel] + [m_overlaplongdel] * 4,
... 'deletions')
'del9to17'
Demonstrate `min_acc` filter:
>>> mutcons_no_min_acc = MutationConsensus(min_acc=0)
>>> mutcons.callConsensus([m_wt, m_A1G_low_acc] * 6, 'substitutions')
''
>>> mutcons_no_min_acc.callConsensus([m_wt, m_A1G_low_acc] * 6, 'substitutions')
'A1G'
Demonstrate `homopolymer_calling`:
>>> m_homopolymer_indel = Mutations(
... substitution_tuples=[],
... insertion_tuples=[(13, 1, 'ins13len1', math.nan, 3)],
... deletion_tuples=[(9, 10, 'del9to10', math.nan, 3),
... (5, 5, 'del5to5', math.nan, 3)])
>>> mutcons.callConsensus([m_homopolymer_indel] * 2, 'insertions')
''
>>> mutcons.callConsensus([m_homopolymer_indel] * 2, 'deletions')
'del9to10'
>>> mutcons.callConsensus([m_homopolymer_indel] * 3, 'insertions')
'ins13len1'
>>> mutcons.callConsensus([m_homopolymer_indel] * 3, 'deletions')
'del5to5 del9to10'
>>> mutcons.callConsensus([m_homopolymer_indel] * 3 + [m_wt] * 4, 'deletions')
''
>>> mutcons.callConsensus([m_homopolymer_indel] * 3 + [m_wt] * 2, 'deletions')
'del5to5 del9to10'
"""
def __init__(self, *, n_mut=2, min_acc=0.999, min_sub_frac=0.3,
min_indel_frac=0.5, group_indel_frac=0.9,
homopolymer_calling={'n_mut':3},
):
"""See main class doc string."""
self.n_mut = n_mut
self.min_acc = min_acc
self.min_sub_frac = min_sub_frac
self.min_indel_frac = min_indel_frac
homopolymer_params = {'n_mut', 'min_indel_frac'}
if not (set(homopolymer_calling.keys()) <= homopolymer_params):
raise ValueError("invalid entries in `homopolymer_calling`")
for p in homopolymer_params:
val_all = getattr(self, p)
p_homopolymer = 'homopolymer_' + p
if p in homopolymer_calling:
val_homopolymer = homopolymer_calling[p]
if val_homopolymer < val_all:
raise ValueError("`homopolymer_calling` sets less "
"stringent value for {0}, which makes no "
"sense.".format(p))
setattr(self, p_homopolymer, val_homopolymer)
else:
setattr(self, p_homopolymer, val_all)
self.group_indel_frac = group_indel_frac
[docs] def callConsensus(self, mutationlist, mutation_type, *,
include_stats=False):
"""Calls consensus from :class:`Mutations`.
The calling is done using the criteria described in the
main docs for :class:`MutationConsensus`.
Args:
`mutationlist` (list)
List of one or more :class:`Mutations` from
which we call consensus.
`mutation_type` (str)
Type of mutation to call. Should be one of
'substitutions', 'insertions', or 'deletions'.
`include_stats` (bool)
Include statistics on number of CCSs that have
mutation for each called mutation.
Returns:
A str giving the result. If consensus of mutations
cannot be called, returns the string "unknown".
Otherwise returns empty string if no consensus mutations,
or a string giving the mutations separated by spaces
the same way they are returned by the methods of
:class:`Mutations`. Mutations are sorted first by
number of times observed, and then alphabetically.
"""
nseqs = len(mutationlist)
if nseqs < 1:
raise ValueError("empty `mutationlist`")
# function to get mutations of this type
try:
func = {'insertions':Mutations.insertions,
'substitutions':Mutations.substitutions,
'deletions':Mutations.deletions}[mutation_type]
except KeyError:
raise ValueError("invalid `mutation_type` {0}".format(
mutation_type))
# mutations
muts = [func(m, min_acc=self.min_acc, min_acc_filter_nan=False)
for m in mutationlist]
assert len(muts) == nseqs
flatmuts = numpy.array([m for ml in muts for m in ml])
if len(flatmuts) == 0:
return '' # all sequences are wildtype
if nseqs < self.n_mut:
return 'unknown' # not enough sequences to call mutations
# is mutation a short indel in homopolymer?
homopolymer_indel = collections.defaultdict(bool)
if mutation_type in {'insertions', 'deletions'}:
lengths = [func(m, returnval='length', min_acc=self.min_acc,
min_acc_filter_nan=False) for m in mutationlist]
flatlengths = numpy.array([l for ll in lengths for l in ll])
assert len(flatlengths) == len(flatmuts)
homopolymerlengths = [func(m, returnval='homopolymer_length',
min_acc=self.min_acc,
min_acc_filter_nan=False) for m in mutationlist]
flathomopolymerlengths = numpy.array([l for ll in
homopolymerlengths for l in ll])
assert len(flathomopolymerlengths) == len(flatmuts)
starts = [func(m, returnval='site', min_acc=self.min_acc,
min_acc_filter_nan=False) for m in mutationlist]
flatstarts = numpy.array([s for sl in starts for s in sl])
# get indels in hompolymers of length >= 3
for m in set(flatmuts[(flatlengths == 1) &
(flathomopolymerlengths >= 3)]):
homopolymer_indel[m] = True
# group sufficiently overlapping indels
indelcounts = collections.Counter(flatmuts)
max_n = indelcounts.most_common()[0][1]
top_indels = [m for m, n in indelcounts.items() if n == max_n]
if len(top_indels) == 1:
top_indel = top_indels[0]
else:
# several top indels, take longest
top_lengths = [flatlengths[flatmuts == top_indel][0] for
top_indel in top_indels]
top_indel = sorted(zip(top_lengths, top_indels))[-1][1]
overlapping_indels = []
top_length = flatlengths[flatmuts == top_indel][0]
top_start = flatstarts[flatmuts == top_indel][0]
for m in set(flatmuts):
length = flatlengths[flatmuts == m][0]
start = flatstarts[flatmuts == m][0]
tot_len = (max(top_start + top_length, start + length)
- min(top_start, start))
overlap_len = (min(top_start + top_length, start + length)
- max(top_start, start))
if overlap_len / tot_len >= self.group_indel_frac:
overlapping_indels.append(m)
flatmuts = numpy.array([top_indel if m in overlapping_indels
else m for m in flatmuts])
# get counts of all mutations with adequate counts
mutcounts = {m:n for m, n in collections.Counter(flatmuts)
.items() if (not homopolymer_indel[m] and n >= self.n_mut)
or n >= self.homopolymer_n_mut}
if not mutcounts:
return '' # no mutations with enough counts, call wildtype
mutlist = []
if mutation_type == 'substitutions':
min_mut_frac = self.min_sub_frac
homopolymer_min_mut_frac = self.min_sub_frac
elif mutation_type in {'insertions', 'deletions'}:
min_mut_frac = self.min_indel_frac
homopolymer_min_mut_frac = self.homopolymer_min_indel_frac
else:
raise ValueError("invalid `mutation_type` {0}".format(mutation_type))
for m, n in sorted(sorted(mutcounts.items()),
key=lambda tup: tup[1], reverse=True):
f = n / nseqs
if ((not homopolymer_indel[m] and f >= min_mut_frac)
or f >= homopolymer_min_mut_frac):
mstring = m
else:
mstring = None # consider wildtype
if mstring:
if include_stats:
mstring = '{0}_({1}/{2})'.format(mstring, n, nseqs)
mutlist.append(mstring)
return ' '.join(mutlist)
[docs]class Mapper:
"""Class to run ``minimap2`` and get results.
Args:
`targetfile` (str)
FASTA file with target (reference) to which we align
reads.
`options` (list)
Command line options to ``minimap2``. For
recommended options, for different situations, see:
- :data:`OPTIONS_CODON_DMS`
- :data:`OPTIONS_VIRUS_W_DEL`
`prog` (str or `None`)
Path to ``minimap2`` executable. Class is only
tested against version >=2.11 but may work with
other versions too.
`target_isoforms` (dict)
Sometimes targets might be be isoforms of each
other. You can specify that with this dict, which
is keyed by target names and has values that are sets
of other targets. If targets `M1` and `M2` are isoforms,
set `target_isoforms={'M1':['M2'], 'M2':['M1']}`.
This argument is just used to set the `target_isoforms`
attribute, but isn't used during alignment.
Attributes:
`targetfile` (str)
Target (reference) file set at initialization.
`targetseqs` (OrderedDict)
Sequences in `targetfile`. Keys are sequence
names, values are sequences as strings. In
same order as listing in `targetfile`.
`prog` (str)
Path to ``minimap2`` set at initialization.
`options` (list)
Options to ``minimap2`` set at initialization.
`version` (str)
Version of ``minimap2``.
`target_isoforms` (dict)
Isoforms for each target. This is the value set
by `target_isoforms` at initialization plus
ensuring that each target is listed as an isoform
of itself.
Here is an example where we align a few reads to two target
sequences.
First, we generate a few target sequences:
>>> targetlen = 200
>>> random.seed(1)
>>> targets = collections.OrderedDict()
>>> for i in [1, 2]:
... targets['target{0}'.format(i)] = ''.join(random.choice(NTS)
... for _ in range(targetlen))
Now we generate some queries. One is a random sequence that should not
align, and the other two are substrings of the targets into which we
have introduced a single mutation or indel. The names of the queries
give their target, start in query, end in query, cigar string:
>>> queries = {'randseq':''.join(random.choice(NTS) for _ in range(180))}
>>> for qstart, qend, mut in [(0, 183, 'mut53'), (36, 194, 'del140')]:
... target = random.choice(list(targets.keys()))
... qseq = targets[target][qstart : qend]
... mutsite = int(mut[3 : ])
... if 'mut' in mut:
... wt = qseq[mutsite]
... mut = random.choice([nt for nt in NTS if nt != wt])
... cigar = ('=' + qseq[ : mutsite] + '*' + wt.lower() +
... mut.lower() + '=' + qseq[mutsite + 1 : ])
... qseq = qseq[ : mutsite] + mut + qseq[mutsite + 1 : ]
... elif 'del' in mut:
... cigar = ('=' + qseq[ : mutsite] + '-' +
... qseq[mutsite].lower() + '=' + qseq[mutsite + 1 : ])
... qseq = qseq[ : mutsite] + qseq[mutsite + 1 : ]
... queryname = '_'.join(map(str, [target, qstart, qend, cigar]))
... queries[queryname] = qseq
Now map the queries to the targets:
>>> TempFile = functools.partial(tempfile.NamedTemporaryFile, mode='w')
>>> with TempFile() as targetfile, TempFile() as queryfile:
... _ = targetfile.write('\\n'.join('>{0}\\n{1}'.format(*tup)
... for tup in targets.items()))
... targetfile.flush()
... _ = queryfile.write('\\n'.join('>{0}\\n{1}'.format(*tup)
... for tup in queries.items()))
... queryfile.flush()
... mapper = Mapper(targetfile.name, OPTIONS_CODON_DMS)
... mapper2 = Mapper(targetfile.name, OPTIONS_CODON_DMS,
... target_isoforms={'target1':{'target2'},
... 'target2':{'target1'}})
... alignments = mapper.map(queryfile.name)
>>> mapper.targetseqs == targets
True
Now make sure we find the expected alignments:
>>> set(alignments.keys()) == set(q for q in queries if q != 'randseq')
True
>>> matched = []
>>> for (query, a) in alignments.items():
... expected = query.split('_')
... matched.append(a.target == expected[0])
... matched.append([a.r_st, a.r_en] == list(map(int, expected[1 : 3])))
... matched.append([a.q_st, a.q_en] == [0, len(queries[query])])
... matched.append(a.cigar_str == expected[3])
... matched.append(a.strand == 1)
>>> all(matched)
True
Test out the `target_isoform` argument:
>>> mapper.target_isoforms == {'target1':{'target1'}, 'target2':{'target2'}}
True
>>> mapper2.target_isoforms == {'target1':{'target1', 'target2'},
... 'target2':{'target1', 'target2'}}
True
"""
def __init__(self, targetfile, options, *, prog='minimap2',
target_isoforms={}):
"""See main :class:`Mapper` doc string."""
if prog is None:
# use default ``minimap2`` installed as package data
prog = os.path.join(os.path.dirname(__file__),
'minimap2_prog')
try:
version = subprocess.check_output([prog, '--version'])
except:
raise ValueError("Can't execute `prog` {0}".format(prog))
self.version = version.strip().decode('utf-8')
min_version = packaging.version.parse('2.11')
if packaging.version.parse(self.version) < min_version:
raise ValueError("You have `minimap2` version {0}, but "
"need at least {1}".format(self.version, min_version))
self.prog = prog
self.options = options
assert os.path.isfile(targetfile), \
"no `targetfile` {0}".format(targetfile)
self.targetfile = targetfile
self.targetseqs = collections.OrderedDict([(seq.name, str(seq.seq))
for seq in Bio.SeqIO.parse(self.targetfile, 'fasta')])
targetnames = set(self.targetseqs.keys())
in_target_isoforms = set(target_isoforms.keys()).union(
set(t for tl in target_isoforms.values() for t in tl))
if in_target_isoforms - targetnames:
raise ValueError("`target_isoforms` contains following "
"targets not in `targetfile`: {0}".format(
in_target_isoforms - targetnames))
self.target_isoforms = {}
for target in targetnames:
self.target_isoforms[target] = {target}
if target in target_isoforms:
addtl_targets = target_isoforms[target]
if isinstance(addtl_targets, list):
addtl_targets = set(addtl_targets)
self.target_isoforms[target].update(addtl_targets)
[docs] def map(self, queryfile, *, outfile=None, introns_to_gaps=True,
shift_indels=True, check_alignments=True):
"""Map query sequences to target.
Aligns query sequences to targets. Adds ``--c --cs=long``
arguments to `options` to get a long CIGAR string, and
returns results as a dictionary, and optionally writes them
to a PAF file.
This is **not** a memory-efficient implementation as
a lot is read into memory. So if you have very large
queries or targets, that may pose a problem.
Args:
`queryfile` (str)
FASTA file with query sequences to align.
Headers should be unique.
`outfile` (`None` or str)
Name of output file containing alignment
results in PAF format if a str is provided.
Provide `None` if you don't want to create
a permanent alignment file.
`introns_to_gaps` (bool)
If there are introns in the alignment CIGARs, convert
to gaps by running through :meth:`intronsToGaps`.
`shift_indels` (bool)
Pass alignments through :meth:`shiftIndels`.
Only applies to alignments returned in dict,
does not affect any results in `outfile`.
`check_alignments` (bool)
Run all alignments through :meth:`checkAlignment`
before returning, and raise error if any are invalid.
This is a good debugging check, but costs time.
Returns:
A dict where keys are the name of each query in
`queryfile` for which there is an alignment (there
are no keys for queries that do not align). If there
is a single alignment for a query, the value is that
:class:`Alignment` object. There can be multiple primary
alignments (`see here <https://github.com/lh3/minimap2/issues/113>`_).
If there are multiple alignments, the value is the
:class:`Alignment` with the highest score, and the remaining
alignments are listed in the :class:`Alignment.additional`
attribute of that "best" alignment.
"""
assert os.path.isfile(queryfile), "no `queryfile` {0}".format(queryfile)
assert '-a' not in self.options, \
"output should be PAF format, not SAM"
for arg in ['-c', '--cs=long']:
if arg not in self.options:
self.options.append(arg)
if outfile is None:
fout = tempfile.TemporaryFile('w+')
else:
fout = open(outfile, 'w+')
stderr = tempfile.TemporaryFile()
try:
_ = subprocess.check_call(
[self.prog] + self.options + [self.targetfile, queryfile],
stdout=fout, stderr=stderr)
fout.seek(0)
dlist = collections.defaultdict(list)
for query, alignment in parsePAF(fout,
self.targetseqs, introns_to_gaps):
dlist[query].append(alignment)
except:
stderr.seek(0)
sys.stderr.write('\n{0}\n'.format(stderr.read()))
raise
finally:
fout.close()
stderr.close()
d = {}
for query in list(dlist.keys()):
if len(dlist[query]) == 1:
d[query] = dlist[query][0]
else:
assert len(dlist[query]) > 1
sorted_alignments = [tup[1] for tup in sorted(
[(a.score, a) for a in dlist[query]],
reverse=True)]
d[query] = sorted_alignments[0]._replace(
additional=sorted_alignments[1 : ])
del dlist[query]
if shift_indels:
for query in list(d.keys()):
new_cigar_str = shiftIndels(d[query].cigar_str)
if new_cigar_str != d[query].cigar_str:
d[query] = d[query]._replace(cigar_str=new_cigar_str)
if check_alignments:
queryseqs = {seq.name:str(seq.seq) for seq in
Bio.SeqIO.parse(queryfile, 'fasta')}
for query, a in d.items():
if not checkAlignment(a, self.targetseqs[a.target],
queryseqs[query]):
raise ValueError("Invalid alignment for {0}.\n"
"alignment = {1}\ntarget = {2}\nquery = {3}"
.format(query, a, self.targetseqs[a.target],
queryseqs[query]))
return d
[docs]class TargetVariants:
"""After alignment, assign to one of several target variants.
Useful if you aligned against one set of targets, but
in reality the queries could align to several different
**point mutant** variants of that target. Use this
class to take the alignments and see if they instead exactly
match a variant of the target.
Initialize to specify the target variants, then classify
using :class:`TargetVariants.call`.
Args:
`variantfiles` (dict)
Specifies FASTA files giving the target variants.
Each file must have the same target names as in
`mapper.targetfile`. These are the variants to
which we compare the queries, and they must be
point mutants (same length) as the targets for
`mapper`. Currently only works when there are
two sets of variants.
`mapper` (:class:`Mapper`)
The mapper used to make the alignments. Used to check
that the sequences specified in `variantfiles` are
proper point mutant variants of the alignment targets.
`variantsites_min_acc` (float or `None`)
Minimum required accuracy (computed from Q-values)
required for calling by :class:`TargetVariants.call`.
Attributes:
`variantfiles` (dict)
See above.
`mapper` (:class:`Mapper`)
See above.
`variantsites_min_acc` (float or `None`)
See above.
`variantnames` (list)
Alphabeticized list of the variant names that key
`variantfiles`.
`targetnames` (list)
Alphabeticized list of the target names.
`variantseqs` (dict)
The actual sequences in `variantfiles`. Keyed
by each variant in `variantnames`, then keyed
by each name in `targetnames`, and values
are the sequences for that variant of that target.
`variablesites` (dict)
Keyed by target names, each value is a list of
all sites that differ between target variants,
sorted and in 0, 1, ... indexing.
`sitevariants` (dict)
`sitevariants[target][variant]` is a list that gives
the identity of `variant` for target ` at each site
in `variablesites[target]`.
Here is a short example.
First, create two target seqs, and two variants that each
differ at two positions:
>>> target1_wt = 'ATGCATGAA'
>>> target1_var = 'ATCCATGTA'
>>> target2_wt = 'GATACCCGG'
>>> target2_var = 'GCTACCCCG'
Now write these to two targetfiles, initialize :class:`Mapper`
with the wildtype target sets, initialize :class:`TargetVariants`
with wildtype and variant target sets:
>>> TempFile = functools.partial(tempfile.NamedTemporaryFile, mode='w')
>>> with TempFile() as wtfile, TempFile() as varfile:
... _ = wtfile.write('>target1\\n{0}\\n>target2\\n{1}'.format(
... target1_wt, target2_wt))
... wtfile.flush()
... _ = varfile.write('>target1\\n{0}\\n>target2\\n{1}'.format(
... target1_var, target2_var))
... varfile.flush()
... mapper = Mapper(wtfile.name, OPTIONS_CODON_DMS)
... targetvars = TargetVariants(
... {'wildtype':wtfile.name, 'variant':varfile.name},
... mapper, variantsites_min_acc=0.99)
>>> targetvars.variantnames
['variant', 'wildtype']
>>> targetvars.targetnames
['target1', 'target2']
>>> sorted(targetvars.variablesites.items())
[('target1', [2, 7]), ('target2', [1, 7])]
>>> targetvars.sitevariants == {
... 'target1':{'wildtype':['G', 'A'], 'variant':['C', 'T']},
... 'target2':{'wildtype':['A', 'G'], 'variant':['C', 'C']}}
True
Now test on some alignments. First, one that matches the
wildtype of target2:
>>> a_wildtype = Alignment(q_st=0, q_en=8, q_len=8, strand=1,
... r_st=1, r_en=9, r_len=9, score=16, target='target2',
... additional=[], cigar_str='=ATACCCGG')
>>> (variant, a_new) = targetvars.call(a_wildtype)
>>> variant
'wildtype'
>>> a_wildtype == a_new
True
Now one that matches the variant of target2:
>>> a_variant = a_wildtype._replace(cigar_str='*ac=TACCC*gc=G')
>>> (variant, a_new) = targetvars.call(a_variant)
>>> variant
'variant'
>>> a_variant == a_new
False
>>> a_new.cigar_str
'=CTACCCCG'
Now one that matches the variant of target2, but also has another
mutation:
>>> a_variant = a_wildtype._replace(cigar_str='*ac=TA*ca=CC*gc=G')
>>> (variant, a_new) = targetvars.call(a_variant)
>>> variant
'variant'
>>> a_variant == a_new
False
>>> a_new.cigar_str
'=CTA*ca=CCCG'
Now one that is mixed (doesn't match either wildtype or variant):
Now one that is mixed (doesn't match either wildtype or variant):
>>> a_mixed = a_wildtype._replace(cigar_str='=ATACCC*gc=G')
>>> (variant, a_new) = targetvars.call(a_mixed)
>>> variant
'mixed'
>>> a_mixed == a_new
True
Now an alignment that only spans some variable sites:
>>> a_var_partial = Alignment(q_st=0, q_en=7, q_len=7, strand=1,
... r_st=2, r_en=9, r_len=9, score=14, target='target2',
... additional=[], cigar_str='=TACCC*gc=G')
>>> (variant, a_new) = targetvars.call(a_var_partial)
>>> variant
'partial variant'
>>> a_var_partial == a_new
False
>>> a_new.cigar_str
'=TACCCCG'
Now alignments that do and do not pass the accuracy
threshold:
>>> a_qvals = Alignment(q_st=1, q_en=9, q_len=9, strand=1,
... r_st=1, r_en=9, r_len=9, score=16, target='target2',
... additional=[], cigar_str='=ATACCCGG')
>>> qvals_high = numpy.array([30] * 9)
>>> (variant, a_new) = targetvars.call(a_qvals, qvals_high)
>>> variant
'wildtype'
>>> qvals_low = numpy.array([30, 10] + [30] * 7)
>>> (variant, a_new) = targetvars.call(a_qvals, qvals_low)
>>> variant
'low accuracy'
Now an alignment that does not span any variable sites:
>>> a_unknown = Alignment(q_st=0, q_en=5, q_len=5, strand=1,
... r_st=2, r_en=7, r_len=9, score=12, target='target2',
... additional=[], cigar_str='=TACCC')
>>> (variant, a_new) = targetvars.call(a_unknown)
>>> a_unknown == a_new
True
>>> variant
'unknown'
"""
def __init__(self, variantfiles, mapper, *,
variantsites_min_acc=None):
"""See main class doc string."""
if len(variantfiles) != 2:
raise ValueError("Currently only works for two sets of "
"variants in `variantfiles`.")
self.variantnames = sorted(variantfiles.keys())
self.variantfiles = variantfiles
self.mapper = mapper
self.targetnames = sorted(self.mapper.targetseqs.keys())
if not ((variantsites_min_acc is None) or (0 <
variantsites_min_acc < 1)):
raise ValueError("`variantsites_min_acc` must be `None` "
"or between 0 and 1")
self.variantsites_min_acc = variantsites_min_acc
self.variantseqs = {}
for variant, variantfile in self.variantfiles.items():
self.variantseqs[variant] = {s.name:str(s.seq) for s in
Bio.SeqIO.parse(variantfile, 'fasta')}
if set(self.targetnames) != set(
self.variantseqs[variant].keys()):
raise ValueError("The file for variant {0} does not "
"have the expected targets.\nExpected: {1}\n"
"Actual: {2}".format(variant,
set(self.targetnames),
set(self.variantseqs[variant].keys())))
for targetname in self.targetnames:
if (len(self.mapper.targetseqs[targetname]) !=
len(self.variantseqs[variant][targetname])):
raise ValueError("variant {0} of target {1} is "
"not same length as target in `mapper`. "
"Can't handle variants that differ by "
"more than point mutations.".format(
variant, targetname))
self.variablesites = {}
self.sitevariants = {}
assert len(self.variantnames) == 2
for target in self.targetnames:
self.variablesites[target] = [i for i, (x, y) in enumerate(
zip(self.variantseqs[self.variantnames[0]][target],
self.variantseqs[self.variantnames[1]][target]))
if x != y]
self.sitevariants[target] = {''.join(seqs[target][i]
for i in self.variablesites[target]):name
for name, seqs in self.variantseqs.items()}
self.sitevariants[target] = {variant:[seqs[target][i]
for i in self.variablesites[target]] for
variant, seqs in self.variantseqs.items()}
[docs] def call(self, a, qvals=None):
"""Call target variant for an alignment.
Args:
`a` (:class:`Alignment`)
The alignment (built with `mapper`) for which
we want to call the target variant.
`qvals` (`None` or numpy array)
Array of all Q-values for **entire** query used to
build alignment, not just aligned region. If not `None`
**and** `variantsites_min_acc` attribute is not `None`,
then an accuracy requirement is imposed.
Returns:
The 2-tuple `(variant, new_a)`. Possible values are:
- If `a` does not cover any of the variable sites,
then `variant` is "unknown" and `new_a` is
just `a`.
- If any of the variable sites in `a` don't meet
the accuracy threshold of `variantsites_min_acc`, then
`variant` is "low accuracy" and `new_a` is just `a`.
- If all of the variable sites present in `a` don't
exactly match one of the variants, then `variant`
is "mixed" and `new_a` is just `a`.
- If `a` exactly matches one of the target variants
at all variable sites, then `variant` is a
variant in :class:`TargetVariant.variantnames` and
`new_a` is a version of `a` in which any mismatches
relative to this target variant have been removed
from the :class:`Alignment.cigar_str` attribute.
- If `a` only covers some of the variable sites but all
of these match one of the target variants, then
`variant` is "partial <variant>" where <variant>
is a variant in :class:`TargetVariant.variantnames`,
and `new_a` is a version of `a` in which any mismatches
relative to this target variant havea been removed
from the :class:`Alignment.cigar_str` attribute.
"""
if a.strand != 1:
raise ValueError("Currently only implemented for + strand")
try:
sites = self.variablesites[a.target]
except KeyError:
raise ValueError("alignment has unrecognized target {0}"
.format(a.target))
querysites_w_None = [iTargetToQuery(a, i) for i in sites]
querysites = [i for i in querysites_w_None if i is not None]
if not querysites:
# no variable sites covered, so can't call variant
return ('unknown', a)
if qvals is not None and self.variantsites_min_acc:
if a.q_len != len(qvals):
raise ValueError("invalid length of `qvals`")
if any(dms_tools2.pacbio.qvalsToAccuracy(q) <
self.variantsites_min_acc for q in qvals[querysites]):
return ('low accuracy', a)
query = cigarToQueryAndTarget(a.cigar_str)[0]
query_idents = [query[i - a.q_st] for i in querysites]
for v, vsites_all in self.sitevariants[a.target].items():
assert len(vsites_all) == len(querysites_w_None) > 0
vsites = [nt for nt, i in zip(vsites_all, querysites_w_None)
if i is not None]
if query_idents == vsites:
if len(vsites_all) == len(querysites):
variant = v
else:
variant = 'partial ' + v
break
else:
assert a.target in self.sitevariants
# does not match any of the target variants
return ("mixed", a)
if (self.mapper.targetseqs[a.target] !=
self.variantseqs[v][a.target]):
assert len(sites) == len(querysites_w_None)
targetsites = [i - a.r_st for i, j in
zip(sites, querysites_w_None) if j is not None]
assert len(targetsites) == len(query_idents)
a_new = a._replace(cigar_str=removeCIGARmutations(a.cigar_str,
dict(zip(targetsites, query_idents))))
return (variant, a_new)
else:
return (variant, a)
#: match indels for :meth:`shiftIndels`
_INDELMATCH = re.compile(r'(?P<lead>=[A-Z]+)'
r'(?P<indeltype>[\-\+])'
r'(?P<indel>[a-z]+)'
r'(?P<trail>=[A-Z]+)')
[docs]def shiftIndels(cigar):
"""Shifts indels to consistent position.
In some cases it is ambiguous where to place insertions /
deletions in the CIGAR string. This function moves them
to a consistent location (as far forward as possible).
Args:
`cigar` (str)
PAF long CIGAR string, format is
`detailed here <https://github.com/lh3/minimap2#cs>`_.
Returns:
A version of `cigar` with indels shifted as far
forward as possible.
>>> shiftIndels('=AAC-atagcc=GGG-ac=T')
'=AA-catagc=CGGG-ac=T'
>>> shiftIndels('=AAC-atagac=GGG-acg=AT')
'=A-acatag=ACGG-gac=GAT'
>>> shiftIndels('=TCC+c=TCAGA+aga=CT')
'=T+c=CCTC+aga=AGACT'
"""
i = 0
m = _INDELMATCH.search(cigar[i : ])
while m:
n = 0
indel = m.group('indel').upper()
while m.group('lead')[-n - 1 : ] == indel[-n - 1 : ]:
n += 1
if n > 0:
if n == len(m.group('lead')) - 1:
lead = '' # removed entire lead
else:
lead = m.group('lead')[ : -n]
shiftseq = m.group('lead')[-n : ] # sequence to shift
cigar = ''.join([
cigar[ : i + m.start('lead')], # sequence before match
lead, # remaining portion of lead
m.group('indeltype'),
shiftseq.lower(), m.group('indel')[ : -n], # new indel
'=', shiftseq, m.group('trail')[1 : ], # trail after indel
cigar[i + m.end('trail') : ] # sequence after match
])
else:
i += m.start('trail')
m = _INDELMATCH.search(cigar[i : ])
return cigar
[docs]def trimCigar(side, cigar):
"""Trims a nucleotide from CIGAR string.
Currently just trims one site.
Args:
`side` (str)
"start" trim from start, "end" to trim from end.
`cigar` (str)
PAF long CIGAR string, format is
`detailed here <https://github.com/lh3/minimap2#cs>`_.
Returns:
A version of `cigar` with a single site trimmed
from start or end.
>>> trimCigar('start', '=ATG')
'=TG'
>>> trimCigar('end', '=ATG')
'=AT'
>>> trimCigar('start', '*ac=TG')
'=TG'
>>> trimCigar('end', '=AT*ag')
'=AT'
>>> trimCigar('start', '-aac=TG')
'-ac=TG'
>>> trimCigar('end', '=TG+aac')
'=TG+aa'
"""
if side == 'start':
if re.match('=[A-Z]{2}', cigar):
return '=' + cigar[2 : ]
elif re.match(r'=[A-Z][\*\-\+]', cigar):
return cigar[2 : ]
elif re.match(r'\*[a-z]{2}', cigar):
return cigar[3 : ]
elif re.match(r'[\-\+][a-z]{2}', cigar):
return cigar[0] + cigar[2 : ]
elif re.match(r'[\-\+][a-z][^a-z]'):
return cigar[2 : ]
else:
raise ValueError("Cannot match start of {0}".format(cigar))
elif side == 'end':
if re.search('[A-Z]{2}$', cigar):
return cigar[ : -1]
elif re.search('=[A-Z]$', cigar):
return cigar[ : -2]
elif re.search(r'\*[a-z]{2}$', cigar):
return cigar[ : -3]
elif re.search(r'[\-\+][a-z]$', cigar):
return cigar[ : -2]
elif re.search('[a-z]{2}$', cigar):
return cigar[ : -1]
else:
raise ValueError("Cannot match end of {0}".format(cigar))
else:
raise ValueError("`side` must be 'start' or 'end', got {0}"
.format(side))
[docs]def parsePAF(paf_file, targets=None, introns_to_gaps=False):
"""Parse ``*.paf`` file as created by ``minimap2``.
`paf_file` is assumed to be created with
the ``minimap2`` options ``-c --cs=long``, which
creates long `cs` tags with the CIGAR string.
PAF format is `described here <https://github.com/lh3/miniasm/blob/master/PAF.md>`_.
PAF long CIGAR string format from ``minimap2`` is
`detailed here <https://github.com/lh3/minimap2#cs>`_.
Args:
`paf_file` (str or iterator)
If str, should be name of ``*.paf`` file.
Otherwise should be an iterator that returns
lines as would be read from a PAF file.
`targets` (dict or `None`)
If not `None`, is dict keyed by target names,
with values target sequences. You need to provide
this if ``minimap2`` is run with a ``--splice``
option and `introns_to_gaps` is `True`.
`introns_to_gap` (bool)
Pass CIGAR strings through :meth:`intronsToGaps`.
Requires you to provide `targets`.
Returns:
A generator that yields query / alignments on each line in
`paf_file`. Returned as the 2-tuple `(query_name, a)`,
where `query_name` is a str giving the name of the query
sequence, and `a` is an :class:`Alignment`.
Here is a short example:
>>> paf_file = io.StringIO('\\t'.join([
... 'myquery', '10', '0', '10', '+', 'mytarget',
... '20', '5', '15', '9', '10', '60',
... 'cs:Z:=ATG*ga=GAACAT', 'AS:i:7']))
>>> alignments = [tup for tup in parsePAF(paf_file)]
>>> len(alignments)
1
>>> (queryname, alignment) = alignments[0]
>>> queryname
'myquery'
>>> alignment.target
'mytarget'
>>> (alignment.r_st, alignment.r_en)
(5, 15)
>>> (alignment.q_st, alignment.q_en)
(0, 10)
>>> alignment.strand
1
>>> alignment.cigar_str
'=ATG*ga=GAACAT'
>>> alignment.q_len
10
>>> alignment.score
7
Now an example of using `targets` and `introns_to_gaps`.
You can see that this option converts the ``~gg5ac``
to ``-ggaac`` in the `cigar_str` attribute:
>>> targets = {'mytarget':'ATGGGAACAT'}
>>> paf_file = io.StringIO('\\t'.join([
... 'myquery', '9', '0', '9', '+', 'mytarget',
... '10', '1', '10', '?', '4', '60',
... 'cs:Z:=TG~gg5ac=AT', 'AS:i:2']))
>>> a_keep_introns = [tup for tup in parsePAF(paf_file)][0][1]
>>> _ = paf_file.seek(0)
>>> a_introns_to_gaps = [tup for tup in parsePAF(paf_file,
... targets=targets, introns_to_gaps=True)][0][1]
>>> a_keep_introns.cigar_str
'=TG~gg5ac=AT'
>>> a_introns_to_gaps.cigar_str
'=TG-ggaac=AT'
"""
if introns_to_gaps and (not targets or not isinstance(targets, dict)):
raise ValueError("specify `target` if `introns_to_gaps`:\n{0}"
.format(targets))
cigar_m = re.compile(
'cs:Z:(?P<cigar_str>('
r'\*[a-z]{2}|' # matches mutations
r'=[A-Z]+|' # matches identities
r'[\+\-][a-z]+|' # matches indels
r'\~[a-z]{2}\d+[a-z]{2}' # matches introns
r')+)(?:\s+|$)')
score_m = re.compile(r'AS:i:(?P<score>[\-\+]?\d+)(?:\s+|$)')
close_paf_file = False
if isinstance(paf_file, str):
assert os.path.isfile(paf_file), "no `paf_file` {0}".format(
paf_file)
paf_file = open(paf_file, 'r')
close_paf_file = True
elif not isinstance(paf_file, collections.Iterable):
raise ValueError("`paf_file` must be file name or iterable")
for line in paf_file:
entries = line.split('\t', maxsplit=12)
try:
cigar_str = cigar_m.search(entries[12]).group('cigar_str')
except:
raise ValueError("Cannot match CIGAR:\n{0}".format(entries[12]))
try:
score = int(score_m.search(entries[12]).group('score'))
except:
raise ValueError("Cannot match score:\n{0}".format(entries[12]))
query_name = entries[0]
target = entries[5]
r_st = int(entries[7])
r_en = int(entries[8])
if introns_to_gaps:
try:
targetseq = targets[target]
except KeyError:
raise KeyError("No target {0} in targets".format(target))
cigar_str = intronsToGaps(cigar_str, targetseq[r_st : r_en])
a = Alignment(target=target,
r_st=r_st,
r_en=r_en,
r_len=int(entries[6]),
q_st=int(entries[2]),
q_en=int(entries[3]),
q_len=int(entries[1]),
strand={'+':1, '-':-1}[entries[4]],
cigar_str=cigar_str,
additional=[],
score=score)
yield (query_name, a)
if close_paf_file:
paf_file.close()
#: matches an exact match group in long format CIGAR
_EXACT_MATCH = re.compile('=[A-Z]+')
[docs]def numExactMatches(cigar):
"""Number exactly matched nucleotides in long CIGAR.
>>> numExactMatches('=ATG-aca=A*gc+ac=TAC')
7
"""
n = 0
for m in _EXACT_MATCH.finditer(cigar):
n += len(m.group()) - 1
return n
_MUTATION_MATCH = re.compile(r'\*[a-z]{2}')
[docs]def numAligned(cigar):
"""Gets number of aligned nucleotides from PAF long CIGAR.
Args:
`cigar` (str)
CIGAR str.
Returns:
The number of aligned nucleotides in the cigar, where a
nucleotide is considered aligned if it is either a
match or a point mutation, but not if it is an indel.
Example: the CIGAR below has 3 matches, a deletion, 5 matches,
2 mutations, 2 matches, an insertion, 3 matches, 1 mutation, and
2 matches. So this counts as 3 + 5 + 2 + 2 + 3 + 1 + 2 = 18
aligned nucleotides:
>>> numAligned('=ACT-gata=AGTCA*ta*ga=TA+tta=GCA*ca=GT')
18
"""
return (numExactMatches(cigar) +
len(list(_MUTATION_MATCH.finditer(cigar))))
#: matches individual group in long format CIGAR
_CIGAR_GROUP_MATCH = re.compile(r'=[A-Z]+|' # exact matches
r'\*[a-z]{2}|' # mutation
r'[\-\+][a-z]+|' # indel
r'\~[a-z]{2}\d+[a-z]{2}' # intron
)
[docs]def intronsToGaps(cigar, target):
"""Converts introns to gaps in CIGAR string.
If you run ``minimap2``, it reports introns differently
than gaps in the target. This function converts
the intron notation to gaps. This is useful if you are
using the introns as an ad-hoc way to identify
long gaps.
Args:
`cigar` (str)
PAF long CIGAR string, format is
`detailed here <https://github.com/lh3/minimap2#cs>`_.
`target` (str)
The exact portion of the target aligned to the query
in `cigar`.
>>> target = 'ATGGAACTAGCATCTAG'
>>> cigar = '=A+ca=TG-g=A*ag=CT~ag5at=CTAG'
>>> intronsToGaps(cigar, target)
'=A+ca=TG-g=A*ag=CT-agcat=CTAG'
"""
newcigar = []
i = 0 # index in target
while cigar:
m = _CIGAR_GROUP_MATCH.match(cigar)
assert m, "can't match CIGAR:\n{0}".format(cigar)
assert m.start() == 0
if m.group()[0] == '=':
newcigar.append(m.group())
i += m.end() - 1
elif m.group()[0] == '*':
newcigar.append(m.group())
i += 1
elif m.group()[0] == '-':
newcigar.append(m.group())
i += m.end() - 1
elif m.group()[0] == '+':
newcigar.append(m.group())
elif m.group()[0] == '~':
intronlen = int(m.group()[3 : -2])
newcigar += ['-', target[i : i + intronlen].lower()]
assert m.group()[1 : 3].upper() == target[i : i + 2], \
"target = {0}\ncigar = {1}".format(target, cigar)
i += intronlen
assert m.group()[-2 : ].upper() == target[i - 2 : i]
else:
raise RuntimeError('should never get here')
cigar = cigar[m.end() : ]
return ''.join(newcigar)
[docs]def cigarToQueryAndTarget(cigar):
"""Returns `(query, target)` specified by PAF long CIGAR.
Cannot handle CIGAR strings with intron operations.
To check those, you first need to run through
:meth:`intronsToGaps`.
Args:
`cigar` (str)
CIGAR string.
Returns:
The 2-tuple `(query, target)`, where each is
a string giving the encoded query and target.
>>> cigarToQueryAndTarget('=AT*ac=G+at=AG-ac=T')
('ATCGATAGT', 'ATAGAGACT')
>>> cigarToQueryAndTarget('=A+tgc-tgc=AT')
('ATGCAT', 'ATGCAT')
"""
assert isinstance(cigar, str)
query = []
target = []
while cigar:
m = _CIGAR_GROUP_MATCH.match(cigar)
assert m, "can't match CIGAR:\n{0}".format(cigar)
assert m.start() == 0
if m.group()[0] == '=':
query.append(m.group()[1 : ])
target.append(m.group()[1 : ])
elif m.group()[0] == '*':
query.append(m.group()[2].upper())
target.append(m.group()[1].upper())
elif m.group()[0] == '-':
target.append(m.group()[1 : ].upper())
elif m.group()[0] == '+':
query.append(m.group()[1 : ].upper())
elif m.group()[0] == '~':
raise ValueError("Cannot handle intron operations, but."
"string has one:\n{0}".format(m.group()))
else:
raise RuntimeError('should never get here')
cigar = cigar[m.end() : ]
return (''.join(query), ''.join(target))
[docs]def mutateSeq(wtseq, mutations, insertions, deletions):
"""Mutates sequence and gets CIGAR.
Primarily useful for simulations.
In the mutation specifications below, 0-based numbering
is used. Sequence characters are upper case nucleotides.
Operations are applied in the order: mutations, insertions,
deletions. So a deletion can overwrite a mutation or insertion.
The entire insertion counts as just one added site when indexing
the deletions. You will get an error if deletions and insertions
overlap.
Arguments:
`wtseq` (str)
The wildtype sequence.
`mutations` (list)
List of point mutations in form `(i, mut)` where
`i` is site and `mut` is mutant amino acid (can be
same as wildtype).
`deletions` (list)
List of deletion locations in form `(istart, iend)`.
`insertions` (list)
List of insertions in form `(i, seqtoinsert)`.
Returns:
The 2-tuple `(mutantseq, cigar)` where `cigar` is the CIGAR
in `PAF long format <https://github.com/lh3/minimap2#cs>`_.
Here is an example:
>>> wtseq = 'ATGGAATGA'
>>> (mutantseq, cigar) = mutateSeq(wtseq, [], [], [])
>>> mutantseq == wtseq
True
>>> cigar == '=' + wtseq
True
>>> (mutantseq, cigar) = mutateSeq(wtseq,
... [(0, 'C'), (1, 'T'), (3, 'A')],
... [(8, 'TAC')], [(5, 2)])
>>> mutantseq
'CTGAAGTACA'
>>> cigar
'*ac=TG*ga=A-at=G+tac=A'
"""
assert re.match('^[{0}]+$'.format(''.join(NTS)), wtseq), \
"`wtseq` not all upper case nucleotides."
n = len(wtseq)
mutantseq = list(wtseq)
cigar = mutantseq.copy()
for i, mut in mutations:
assert 0 <= i < n
if mut.upper() != wtseq[i]:
mutantseq[i] = mut.upper()
cigar[i] = '*' + wtseq[i].lower() + mut.lower()
# traverse indels from back so index is maintained
for i, seqtoinsert in sorted(insertions, reverse=True):
mutantseq.insert(i, seqtoinsert.upper())
cigar.insert(i, '+' + seqtoinsert.lower())
for i, del_len in sorted(deletions, reverse=True):
delseq = []
for j in range(i, i + del_len):
if mutantseq[j] in NTS:
delseq.append(mutantseq[j])
elif mutantseq[j][0] == '*':
delseq.append(mutantseq[j][1])
else:
raise ValueError("overlapping insertions and deletions")
for _ in range(del_len):
mutantseq.pop(i)
cigar.pop(i)
cigar.insert(i, '-' + ''.join(delseq).lower())
# add equal signs to cigar
for i, op in list(enumerate(cigar)):
if (op in NTS) and (i == 0 or cigar[i - 1][-1] not in NTS):
cigar[i] = '=' + op
mutantseq = ''.join(mutantseq)
cigar = ''.join(cigar)
assert mutantseq == cigarToQueryAndTarget(cigar)[0]
return (mutantseq, cigar)
[docs]def removeCIGARmutations(cigar, muts_to_remove):
"""Removes point mutations from CIGAR string.
Args:
`cigar` (str)
Long format CIGAR string.
`muts_to_remove` (dict)
Dict keyed by site number in 0-based numbering of
target starting at first target position in `cigar`,
values are nucleotides that we want to make the new
target identity for the CIGAR at that site. All
of these nucleotides must be the wildtype in the
current CIGAR mutation.
Returns:
New CIGAR string expected if `cigar` was actually to the
target where the wildtype identity is what is given by
the mutation.
>>> cigar = '=AT-gca=T*at=G+ca*ga=T*at'
>>> muts_to_remove = {6:'T', 8:'A'}
>>> removeCIGARmutations(cigar, muts_to_remove)
'=AT-gca=TTG+ca=AT*at'
"""
new_nts = {i:nt.upper() for i, nt in muts_to_remove.items()}
i_target = 0
newcigar = []
prevgroupmatch = False
while cigar:
m = _CIGAR_GROUP_MATCH.match(cigar)
assert m and m.start() == 0
if m.group()[0] == '=':
n = len(m.group()) - 1
i_target += n
if prevgroupmatch:
newcigar.append(m.group()[1 : ])
else:
newcigar.append(m.group())
prevgroupmatch = True
elif m.group()[0] == '*':
if i_target in new_nts:
query_nt = m.group()[2]
if query_nt.upper() != new_nts[i_target]:
raise ValueError('not removing mutation')
if prevgroupmatch:
newcigar.append(new_nts[i_target])
else:
newcigar.append('=' + new_nts[i_target])
prevgroupmatch = True
del new_nts[i_target]
else:
newcigar.append(m.group())
prevgroupmatch = False
i_target += 1
elif m.group()[0] == '-':
n = len(m.group()) - 1
i_target += n
newcigar.append(m.group())
prevgroupmatch = False
elif m.group()[0] == '+':
newcigar.append(m.group())
prevgroupmatch = False
elif m.group()[0] == '~':
raise ValueError("Cannot handle intron operations")
else:
raise RuntimeError("should never get here")
cigar = cigar[m.end() : ]
assert cigar == ''
if new_nts:
raise ValueError("failed to find all mutations to remove")
return ''.join(newcigar)
[docs]def iTargetToQuery(a, i):
"""Gets index in query aligned to target index.
Args:
`a` (:class:`Alignment`)
The alignment.
`i` (int)
Index in target in 0-based numbering.
Returns:
Index in query that aligns to site `i` in target,
or `None` if there is not an alignment at that site.
>>> a = Alignment(target='target', r_st=1, r_en=9, r_len=9,
... q_st=3, q_en=10, q_len=7, strand=1,
... cigar_str='=T*ga=CA-ga=T+c=T',
... additional=[], score=-1)
>>> iTargetToQuery(a, 0) is None
True
>>> iTargetToQuery(a, 1)
3
>>> iTargetToQuery(a, 4)
6
>>> iTargetToQuery(a, 6) is None
True
>>> iTargetToQuery(a, 7)
7
>>> iTargetToQuery(a, 8)
9
>>> iTargetToQuery(a, 9) is None
True
"""
if i < a.r_st or i >= a.r_en:
return None
i_query = a.q_st
i_target = a.r_st
cigar = a.cigar_str
while cigar:
m = _CIGAR_GROUP_MATCH.match(cigar)
assert m and m.start() == 0
if m.group()[0] == '=':
n = len(m.group()) - 1
if i < i_target + n:
return i - (i_target - i_query)
i_target += n
i_query += n
elif m.group()[0] == '*':
if i < i_target + 1:
return i - (i_target - i_query)
i_target += 1
i_query += 1
elif m.group()[0] == '-':
n = len(m.group()) - 1
if i < i_target + n:
return None
i_target += n
elif m.group()[0] == '+':
n = len(m.group()) - 1
i_query += n
elif m.group()[0] == '~':
raise ValueError("Cannot handle intron operations")
else:
raise RuntimeError("should never get here")
cigar = cigar[m.end() : ]
raise RuntimeError("should not get here\ni={0}\na={1}".format(i, a))
if __name__ == '__main__':
import doctest
doctest.testmod()